"""Adaptive gain tuner -- Sprint 8. Watches the steady-state heading error and adjusts the outer-loop Kp/Ki within the bounds defined by ``PidConfig.adaptive_max_deviation_pct``. The strategy is a simple integral-error gradient scheme: - If |mean_error| > dead_band for a sustained window, nudge Kp up by step_pct. - If the response is oscillating (error sign changes frequently), nudge Kp down. - Ki is adjusted proportionally to maintain the ZN ratio Ki = 2*Kp / Tu_est. - Kd is not adapted (derivative magnifies noise; ZN auto-tune sets it once). All changes are bounded to ±adaptive_max_deviation_pct of the base gains (brief section 6: "never outside ±50 %"). Usage:: tuner = AdaptiveTuner(pid_config, base_gains) # On each outer-loop tick (10 Hz): new_gains = tuner.step(heading_error_deg, dt_s=0.1) if new_gains is not None: apply_outer_gains(new_gains) """ from __future__ import annotations import math from dataclasses import dataclass, field from arautopilot.core.pid_config import PidConfig, PidGains @dataclass class AdaptiveTuner: """Gradient-descent adaptive gain tuner for the outer PID loop. Parameters ---------- config: PidConfig that owns the base gains and adaptive bounds. base_gains: The current base gains (set by commissioning or default). dead_band_deg: Error below which no adaptation occurs. window_steps: Number of 10 Hz steps over which the error statistics are computed. step_pct: Fractional Kp adjustment per adaptation event (default 2 %). """ config: PidConfig base_gains: PidGains dead_band_deg: float = 1.0 window_steps: int = 100 # 10 seconds step_pct: float = 0.02 # 2 % per step _error_buffer: list[float] = field(default_factory=list) _current_kp: float = field(init=False) _current_ki: float = field(init=False) _current_kd: float = field(init=False) def __post_init__(self) -> None: self._current_kp = self.base_gains.kp self._current_ki = self.base_gains.ki self._current_kd = self.base_gains.kd # ------------------------------------------------------------------ @property def current_gains(self) -> PidGains: return PidGains(kp=self._current_kp, ki=self._current_ki, kd=self._current_kd) def step(self, error_deg: float, dt_s: float = 0.1) -> PidGains | None: """Feed one heading error sample; return updated gains if adapted. Returns ``None`` if no adaptation occurred this step. """ if not self.config.adaptive_enabled: return None if not math.isfinite(error_deg): return None self._error_buffer.append(error_deg) if len(self._error_buffer) > self.window_steps: self._error_buffer.pop(0) if len(self._error_buffer) < self.window_steps: return None # buffer not yet full mean_abs = sum(abs(e) for e in self._error_buffer) / len(self._error_buffer) # Count sign changes (oscillation indicator) sign_changes = sum( 1 for i in range(1, len(self._error_buffer)) if (self._error_buffer[i] >= 0) != (self._error_buffer[i - 1] >= 0) ) oscillating = sign_changes > self.window_steps * 0.3 # > 30 % sign flips # Decision if oscillating: # Reduce Kp to damp oscillation. return self._adjust_kp(-self.step_pct) elif mean_abs > self.dead_band_deg: # Increase Kp to reduce steady-state error. return self._adjust_kp(+self.step_pct) return None def _adjust_kp(self, delta_frac: float) -> PidGains | None: """Adjust Kp by ``delta_frac`` fraction (signed), clamped to bounds.""" new_kp = self._current_kp * (1.0 + delta_frac) # Clamp to ±adaptive_max_deviation_pct of base. limit = self.config.adaptive_max_deviation_pct / 100.0 lo = self.base_gains.kp * (1.0 - limit) hi = self.base_gains.kp * (1.0 + limit) new_kp = max(lo, min(hi, new_kp)) if new_kp == self._current_kp: return None # Adjust Ki proportionally (maintain integral-to-proportional ratio). if self.base_gains.kp > 0: ki_ratio = self.base_gains.ki / self.base_gains.kp else: ki_ratio = 0.0 new_ki = new_kp * ki_ratio # Clamp Ki as well. ki_lo = self.base_gains.ki * (1.0 - limit) ki_hi = self.base_gains.ki * (1.0 + limit) new_ki = max(ki_lo, min(ki_hi, new_ki)) self._current_kp = new_kp self._current_ki = new_ki # Clear buffer after each adaptation to avoid consecutive nudges. self._error_buffer.clear() return self.current_gains def reset(self) -> None: """Reset to base gains.""" self._current_kp = self.base_gains.kp self._current_ki = self.base_gains.ki self._current_kd = self.base_gains.kd self._error_buffer.clear()