"""Cascaded PID configuration + gain scheduling by vessel speed. Mirrors section 6 of the brief: - Inner loop (50 Hz) drives rudder position to setpoint. - Outer loop (10 Hz) drives heading error to a rudder setpoint, with Rate-of-Turn feed-forward and gain scheduling by SOG. - Adaptive tuning (Sprint 9+) may shift the active gains by no more than ±50 % of the base gains — this hard limit is enforced here. """ from __future__ import annotations from enum import StrEnum from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator class AccessLevel(StrEnum): """Three-level RBAC for PID tuning (brief section 6).""" OPERATOR = "operator" """Captain: chooses a pre-configured profile only (Soft/Normal/Sport).""" TECHNICIAN = "technician" """Authorised installer (PIN): may shift gains within ±30 % of base.""" INTEGRATOR = "integrator" """Alvaro (PIN): full access, may edit base gains and adaptation limits.""" class PidGains(BaseModel): """A single set of PID gains.""" model_config = ConfigDict(extra="forbid", validate_assignment=True) kp: float = Field(ge=0.0, description="Proportional gain.") ki: float = Field(default=0.0, ge=0.0, description="Integral gain.") kd: float = Field(default=0.0, ge=0.0, description="Derivative gain.") class GainSchedulePoint(BaseModel): """One point of the SOG-indexed gain schedule for the outer loop.""" model_config = ConfigDict(extra="forbid", validate_assignment=True) speed_knots: float = Field(ge=0.0, le=80.0, description="SOG at which these gains apply.") gains: PidGains class PidConfig(BaseModel): """Complete PID configuration for one vessel. The base gains (``inner_loop_base`` and ``outer_loop_base``) are the integrator's IP and are NEVER exposed to the operator. They are the starting point for adaptive tuning and the anchor against which the adaptive bound (``adaptive_max_deviation_pct``) is enforced. """ model_config = ConfigDict(extra="forbid", validate_assignment=True) schema_version: str = Field(default="0.1.0", pattern=r"^\d+\.\d+\.\d+$") # --- Inner loop (rudder position controller, 50 Hz) --------------------- inner_loop_base: PidGains inner_loop_freq_hz: float = Field(default=50.0, gt=0.0, le=200.0) # --- Outer loop (heading controller, 10 Hz) ----------------------------- outer_loop_base: PidGains outer_loop_freq_hz: float = Field(default=10.0, gt=0.0, le=50.0) # --- Gain scheduling by vessel SOG (outer loop) ------------------------- gain_schedule: list[GainSchedulePoint] = Field( default_factory=list, description=( "Outer-loop gains by SOG. Linear interpolation between points; " "outside the range, the closest endpoint is held. Empty list " "means scheduling is disabled and the base gains are used." ), ) # --- Feed-forward & saturation ------------------------------------------ rot_feedforward_gain: float = Field( default=0.0, ge=0.0, le=10.0, description="Rate-of-Turn feed-forward gain (brief section 6 — anticipates inertia).", ) setpoint_deadband_deg: float = Field( default=0.5, ge=0.0, le=5.0, description="Heading deadband around the setpoint to suppress noise oscillation.", ) setpoint_rate_limit_dps: float = Field( default=4.0, gt=0.0, le=20.0, description="Maximum rate of change of the heading setpoint (typical 3-6 dps).", ) anti_windup_limit: float = Field( default=10.0, gt=0.0, description="Integrator absolute clamp; saturates when the actuator saturates.", ) # --- Adaptive tuning safety bound (Sprint 9+) --------------------------- adaptive_enabled: bool = Field( default=False, description="Enabled at commissioning by the integrator only.", ) adaptive_max_deviation_pct: float = Field( default=50.0, gt=0.0, le=50.0, description=( "Hard cap on how far adaptive tuning may shift the active gains " "away from the base gains. Brief section 6: 'never out of ±50 %'." ), ) # --- Validators --------------------------------------------------------- @field_validator("gain_schedule") @classmethod def _schedule_must_be_ascending( cls, v: list[GainSchedulePoint] ) -> list[GainSchedulePoint]: speeds = [p.speed_knots for p in v] if speeds != sorted(speeds): raise ValueError("gain_schedule points must be sorted by ascending speed_knots") if len(set(speeds)) != len(speeds): raise ValueError("gain_schedule points must have unique speed_knots values") return v @model_validator(mode="after") def _check_loop_frequencies(self) -> PidConfig: if self.inner_loop_freq_hz <= self.outer_loop_freq_hz: raise ValueError( "inner_loop_freq_hz must be strictly greater than outer_loop_freq_hz " "(cascaded control requires the inner loop to be faster)" ) return self def is_within_adaptive_bound(self, candidate: PidGains, *, of: str = "outer") -> bool: """Return ``True`` if ``candidate`` is within the adaptive bound of the base. ``of`` is either ``"inner"`` or ``"outer"``. """ base = self.inner_loop_base if of == "inner" else self.outer_loop_base limit = self.adaptive_max_deviation_pct / 100.0 return all( _within_pct(getattr(candidate, axis), getattr(base, axis), limit) for axis in ("kp", "ki", "kd") ) def _within_pct(candidate: float, base: float, frac: float) -> bool: """``True`` iff ``candidate`` lies within ``±frac`` of ``base``. If ``base`` is exactly zero, we require the candidate to be zero too (any non-zero gain breaks the ratio test). """ if base == 0.0: return candidate == 0.0 lo = base * (1.0 - frac) hi = base * (1.0 + frac) return lo <= candidate <= hi def interpolate_gains(schedule: list[GainSchedulePoint], speed_knots: float) -> PidGains: """Linear interpolation of outer-loop gains over the speed schedule. Outside the range of the schedule, the nearest endpoint is held (no extrapolation). Raises ``ValueError`` if the schedule is empty. """ if not schedule: raise ValueError("Cannot interpolate over an empty gain_schedule") # Endpoint hold if speed_knots <= schedule[0].speed_knots: return schedule[0].gains if speed_knots >= schedule[-1].speed_knots: return schedule[-1].gains # Find bracketing points for i in range(len(schedule) - 1): a, b = schedule[i], schedule[i + 1] if a.speed_knots <= speed_knots <= b.speed_knots: span = b.speed_knots - a.speed_knots t = 0.0 if span == 0.0 else (speed_knots - a.speed_knots) / span return PidGains( kp=a.gains.kp + t * (b.gains.kp - a.gains.kp), ki=a.gains.ki + t * (b.gains.ki - a.gains.ki), kd=a.gains.kd + t * (b.gains.kd - a.gains.kd), ) # Unreachable given the endpoint guards above raise RuntimeError("Gain schedule interpolation failed unexpectedly")