"""Pure-Python reference implementation of the inner rudder-position PID. This module is the **algorithmic source of truth** for the inner loop. The firmware C++ port in ``firmware/ar_autopilot_v1/src/pid/pid_inner.cpp`` is a line-by-line translation: same variables, same sequencing, same numerics. Cross-validation tests confirm the two produce identical trajectories on the same input within float tolerance. Algorithm (per tick): raw_error = setpoint_deg - measured_deg deadband applied: |raw_error| < setpoint_deadband_deg -> 0 rate-limit the setpoint (max |d(setpoint)/dt| <= rate_limit_dps) proportional = kp * error integral += ki * error * dt (back-calculated when output saturates) derivative = kd * (error - prev_error) / dt (with low-pass on the derivative term) raw_output = proportional + integral + derivative output = saturate(raw_output, [-100, +100]) if abs(output) > 0: if abs(output) < min_useful_pwm_pct: snap up to min_useful_pwm_pct apply port/stbd asymmetry compensation if mode == STANDBY or interlocks block: output = 0 Anti-windup is back-calculation: when the output saturates, the integrator is corrected by `(saturated - raw) * anti_windup_gain * dt`. The gain defaults to `1 / kp` if kp != 0, else 0. """ from __future__ import annotations from dataclasses import dataclass @dataclass class PidInnerConfig: """All parameters for the inner PID + actuator compensation. Defaults are the 30 m motor-yacht seed values from ``arautopilot/library/default_tunings/yacht_motor_planeo_30m.yaml``. """ # --- Gains -------------------------------------------------------------- kp: float = 2.5 ki: float = 0.15 kd: float = 0.30 # --- Sampling ----------------------------------------------------------- freq_hz: float = 50.0 """Nominal loop rate. dt = 1 / freq_hz.""" # --- Setpoint handling -------------------------------------------------- deadband_deg: float = 0.5 """Errors within this band do not contribute to P, I, or D (suppresses noise).""" rate_limit_dps: float = 30.0 """Maximum |d(setpoint)/dt| applied to the working setpoint, deg/s.""" # --- Output saturation -------------------------------------------------- output_min_pct: float = -100.0 output_max_pct: float = +100.0 # --- Anti-windup -------------------------------------------------------- integral_clamp: float = 30.0 """Hard clamp on the integrator's accumulated value (output-equivalent units).""" aw_gain: float | None = None """Back-calculation anti-windup gain. ``None`` -> 1/kp if kp != 0, else 0.""" # --- Derivative low-pass ------------------------------------------------ d_lpf_tau_s: float = 0.05 """Time constant of the first-order low-pass on the derivative term.""" # --- Actuator non-linearity compensation -------------------------------- deadband_pct: float = 7.0 """Below this absolute command, the actuator produces no torque -- the PID must skip this band to avoid lingering at the edge of action.""" min_useful_pwm_pct: float = 12.0 """Snap any non-zero command up to at least this magnitude.""" asymmetry_stbd_over_port: float = 1.0 """Inverse of the simulator's asymmetry. If the pump pushes harder to starboard, divide the starboard command by the asymmetry so the effective torque is symmetric.""" def dt(self) -> float: return 1.0 / self.freq_hz @dataclass class PidInnerState: """Mutable runtime state of the controller.""" integral: float = 0.0 prev_error: float = 0.0 prev_d_term: float = 0.0 prev_setpoint_deg: float = 0.0 last_output_pct: float = 0.0 class PidInner: """Inner rudder-position PID controller. Designed for cross-language port: every operation is plain arithmetic, no dependencies beyond the dataclasses above. """ def __init__(self, config: PidInnerConfig | None = None) -> None: self.config: PidInnerConfig = config or PidInnerConfig() self.state: PidInnerState = PidInnerState() self._dt: float = self.config.dt() # -- Lifecycle ----------------------------------------------------------- def reset(self, *, measured_deg: float = 0.0, setpoint_deg: float = 0.0) -> None: self.state = PidInnerState(prev_setpoint_deg=setpoint_deg) def update_config(self, config: PidInnerConfig) -> None: """Hot-swap configuration without losing accumulated state.""" self.config = config self._dt = config.dt() # -- Step ---------------------------------------------------------------- def step(self, *, setpoint_deg: float, measured_deg: float, allowed: bool = True) -> float: """Compute the controller output for one tick. ``allowed`` lets the caller force the output to zero (e.g. while in STANDBY or with the actuator master power off). When False, the integrator is bled toward zero so the PID doesn't accumulate during manual operation. Returns the signed PWM command in percent, after actuator non-linearity compensation, ready to be passed to the H-bridge / pump. """ cfg = self.config st = self.state dt = self._dt # Rate-limit the setpoint -- the operator/outer loop may step it # large, but the inner loop should pursue it smoothly so the rudder # doesn't slam. target = self._rate_limit_setpoint(setpoint_deg) st.prev_setpoint_deg = target # Compute the (possibly dead-banded) error. raw_error = target - measured_deg if abs(raw_error) <= cfg.deadband_deg: error = 0.0 else: # Strip the deadband so the action is continuous at the edge. sign = 1.0 if raw_error > 0.0 else -1.0 error = raw_error - sign * cfg.deadband_deg # --- Forced inactive path ----------------------------------------- if not allowed: # Bleed the integrator toward zero to avoid windup during manual. st.integral *= 0.95 st.prev_error = error st.last_output_pct = 0.0 return 0.0 # --- Proportional ------------------------------------------------- p_term = cfg.kp * error # --- Integral (provisional, before anti-windup correction) -------- st.integral += cfg.ki * error * dt st.integral = _clamp(st.integral, -cfg.integral_clamp, cfg.integral_clamp) # --- Derivative with low-pass filter ------------------------------ if dt > 0.0: d_raw = cfg.kd * (error - st.prev_error) / dt else: d_raw = 0.0 alpha = _lpf_alpha(cfg.d_lpf_tau_s, dt) d_term = (1.0 - alpha) * st.prev_d_term + alpha * d_raw st.prev_d_term = d_term raw_output = p_term + st.integral + d_term # --- Saturate + back-calculation anti-windup ---------------------- output = _clamp(raw_output, cfg.output_min_pct, cfg.output_max_pct) if raw_output != output: aw = cfg.aw_gain if cfg.aw_gain is not None else ( 1.0 / cfg.kp if cfg.kp != 0.0 else 0.0 ) st.integral -= aw * (raw_output - output) * dt st.integral = _clamp(st.integral, -cfg.integral_clamp, cfg.integral_clamp) # --- Actuator non-linearity compensation -------------------------- cmd = self._compensate(output) st.prev_error = error st.last_output_pct = cmd return cmd # -- Helpers ------------------------------------------------------------- def _rate_limit_setpoint(self, requested_deg: float) -> float: cfg = self.config st = self.state max_delta = cfg.rate_limit_dps * self._dt delta = requested_deg - st.prev_setpoint_deg if delta > max_delta: return st.prev_setpoint_deg + max_delta if delta < -max_delta: return st.prev_setpoint_deg - max_delta return requested_deg def _compensate(self, raw_pct: float) -> float: cfg = self.config if raw_pct == 0.0: return 0.0 magnitude = abs(raw_pct) if magnitude <= cfg.deadband_pct: return 0.0 if magnitude < cfg.min_useful_pwm_pct: magnitude = cfg.min_useful_pwm_pct sign = 1.0 if raw_pct > 0.0 else -1.0 cmd = sign * magnitude # Asymmetry: divide the starboard side, multiply the port side so the # actuator's intrinsic faster-to-starboard pump is compensated. if cmd > 0.0 and cfg.asymmetry_stbd_over_port != 0.0: cmd /= cfg.asymmetry_stbd_over_port elif cmd < 0.0: cmd *= cfg.asymmetry_stbd_over_port # Re-saturate after compensation in case scaling pushed us past 100 %. return _clamp(cmd, cfg.output_min_pct, cfg.output_max_pct) def _clamp(x: float, lo: float, hi: float) -> float: if x < lo: return lo if x > hi: return hi return x def _lpf_alpha(tau_s: float, dt: float) -> float: """First-order low-pass coefficient: alpha = dt / (tau + dt). Smaller alpha -> heavier filtering. alpha == 1.0 means no filtering. """ if tau_s <= 0.0: return 1.0 return dt / (tau_s + dt)