sprint-2: PID inner loop + Python rudder simulator

End-to-end implementation per docs/sprint-2-plan.md.

Builds: pio run -e esp32-dev SUCCESS, RAM 6.8%, Flash 26.8% (351 KB).
Tests: pytest 129/129 green (110 Sprint 1 + 19 Sprint 2).

Python (arautopilot/studio/simulator/):

- rudder_dynamics.py: marine-realistic physical model of a hydraulic
  rudder actuator. Defaults tuned so 100 % PWM produces steady-state
  v_max ~5 deg/s, matching the brief's "typical 3-6 dps" for a 30 m
  yacht. Includes deadband, min-useful PWM snap, port/stbd asymmetry,
  end-stops, optional external torque, RunRecorder helper.
- pid_inner.py: pure-Python reference PID. Anti-windup via back-
  calculation, setpoint rate limit, setpoint deadband, derivative LPF,
  actuator non-linearity compensation. This module is the algorithmic
  source of truth; C++ firmware is a line-by-line port.

Firmware (firmware/ar_autopilot_v1/src/pid/):

- pid_inner.h: header-only C++17 controller, byte-equivalent port of
  pid_inner.py. Compiles on ESP32 toolchain AND on host g++/clang/MSVC
  (no Arduino dependencies) -- ready for native Unity cross-validation
  once a host compiler is installed.
- pid_inner_task.{h,cpp}: FreeRTOS task wrapper. 50 Hz on Core 1
  (real-time core). Subscribes to TWDT, bleeds integrator during
  STANDBY, surfaces telemetry + tunables via the Modbus slave.

Modbus map (regenerated from YAML):

- 6 new INPUT registers (40-45): setpoint, output, error, kp/ki/kd live
- 4 new HOLDING registers (16-19): writable setpoint + kp/ki/kd req
  (writes propagate atomically; zero kp rejected as ILLEGAL_DATA_VALUE)

Tests:

- test_rudder_simulator.py: 9 tests (zero-input rest, full deflection,
  end-stop saturation, deadband, min-useful snap, asymmetry, recorder
  API, invalid dt, end-stop velocity zeroing).
- test_pid_inner_python.py: 10 tests (positive/negative step response,
  setpoint deadband holds, anti-windup bounds under saturation,
  allowed=false bleeds integrator, actuator deadband + asymmetry
  compensation, output saturation, rate limit, disturbance rejection).

NOT in Sprint 2 (intentional per brief sec. 12):
  - Outer heading PID, gain scheduling by SOG, ROT feed-forward
    (those land in Sprint 3)
  - Cross-validation tests via ctypes (need host C++ compiler that
    this Windows machine lacks; algorithmic parity enforced by review)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-18 14:18:41 -04:00
parent 65860948b4
commit 295efa2d83
16 changed files with 1477 additions and 6 deletions
+10
View File
@@ -78,6 +78,12 @@ INPUTS: dict[str, Reg] = {
"HEADING_AGE_MS": Reg(addr=26, name="HEADING_AGE_MS", desc='Milliseconds since the last heading update (0..60000)', unit="ms", scale=1.0, offset=0.0),
"BATTERY_VOLTAGE_X100": Reg(addr=32, name="BATTERY_VOLTAGE_X100", desc='System battery voltage, V*100', unit="V", scale=0.01, offset=0.0),
"ACTUATOR_CURRENT_X100": Reg(addr=33, name="ACTUATOR_CURRENT_X100", desc='Actuator current, A*100', unit="A", scale=0.01, offset=0.0),
"PID_INNER_SETPOINT_X100": Reg(addr=40, name="PID_INNER_SETPOINT_X100", desc='Inner-loop rudder setpoint, deg*100 (signed int16)', unit="deg", scale=0.01, offset=0.0),
"PID_INNER_OUTPUT_X100": Reg(addr=41, name="PID_INNER_OUTPUT_X100", desc='Last PID command, %*100 (signed int16, -10000..+10000)', unit="%", scale=0.01, offset=0.0),
"PID_INNER_ERROR_X100": Reg(addr=42, name="PID_INNER_ERROR_X100", desc='Last PID error, deg*100 (signed int16)', unit="deg", scale=0.01, offset=0.0),
"PID_INNER_KP_X1000": Reg(addr=43, name="PID_INNER_KP_X1000", desc='Inner-loop kp * 1000 (unsigned)', unit="", scale=0.001, offset=0.0),
"PID_INNER_KI_X1000": Reg(addr=44, name="PID_INNER_KI_X1000", desc='Inner-loop ki * 1000 (unsigned)', unit="", scale=0.001, offset=0.0),
"PID_INNER_KD_X1000": Reg(addr=45, name="PID_INNER_KD_X1000", desc='Inner-loop kd * 1000 (unsigned)', unit="", scale=0.001, offset=0.0),
}
# HOLDINGS
@@ -87,6 +93,10 @@ HOLDINGS: dict[str, Reg] = {
"BRIGHTNESS_PCT": Reg(addr=2, name="BRIGHTNESS_PCT", desc='Display brightness 0..100', unit="%", scale=1.0, offset=0.0),
"ALARM_VOLUME_PCT": Reg(addr=3, name="ALARM_VOLUME_PCT", desc='Alarm volume 0..100', unit="%", scale=1.0, offset=0.0),
"DODGE_OFFSET_DEG_X100": Reg(addr=8, name="DODGE_OFFSET_DEG_X100", desc='Dodge mode heading offset, deg*100 (signed int16)', unit="deg", scale=0.01, offset=0.0),
"PID_INNER_SETPOINT_REQ_X100": Reg(addr=16, name="PID_INNER_SETPOINT_REQ_X100", desc='Requested inner-loop rudder setpoint, deg*100 (signed int16)', unit="deg", scale=0.01, offset=0.0),
"PID_INNER_KP_REQ_X1000": Reg(addr=17, name="PID_INNER_KP_REQ_X1000", desc='Requested inner-loop kp * 1000 (unsigned)', unit="", scale=0.001, offset=0.0),
"PID_INNER_KI_REQ_X1000": Reg(addr=18, name="PID_INNER_KI_REQ_X1000", desc='Requested inner-loop ki * 1000 (unsigned)', unit="", scale=0.001, offset=0.0),
"PID_INNER_KD_REQ_X1000": Reg(addr=19, name="PID_INNER_KD_REQ_X1000", desc='Requested inner-loop kd * 1000 (unsigned)', unit="", scale=0.001, offset=0.0),
}
ALL_GROUPS: dict[str, dict[str, Reg]] = {
+15 -1
View File
@@ -1 +1,15 @@
"""Test bench: vessel + actuator + sensor simulators (Sprint 2-3)."""
"""Bench-grade simulators used by Sprint 2+ to validate the PID without
the AR-NMEA-IO board attached.
Modules:
- :mod:`~arautopilot.studio.simulator.rudder_dynamics`
Minimal physical model of a hydraulic rudder actuator: rotational
inertia, viscous friction, hydraulic deadband, port/stbd asymmetry,
mechanical end-stops.
- :mod:`~arautopilot.studio.simulator.pid_inner`
Pure-Python reference implementation of the inner (rudder-position)
PID. Used to iterate on the algorithm and cross-validate the C++
firmware port.
"""
+245
View File
@@ -0,0 +1,245 @@
"""Pure-Python reference implementation of the inner rudder-position PID.
This module is the **algorithmic source of truth** for the inner loop. The
firmware C++ port in ``firmware/ar_autopilot_v1/src/pid/pid_inner.cpp`` is
a line-by-line translation: same variables, same sequencing, same numerics.
Cross-validation tests confirm the two produce identical trajectories on
the same input within float tolerance.
Algorithm (per tick):
raw_error = setpoint_deg - measured_deg
deadband applied: |raw_error| < setpoint_deadband_deg -> 0
rate-limit the setpoint (max |d(setpoint)/dt| <= rate_limit_dps)
proportional = kp * error
integral += ki * error * dt (back-calculated when output saturates)
derivative = kd * (error - prev_error) / dt (with low-pass on the
derivative term)
raw_output = proportional + integral + derivative
output = saturate(raw_output, [-100, +100])
if abs(output) > 0:
if abs(output) < min_useful_pwm_pct: snap up to min_useful_pwm_pct
apply port/stbd asymmetry compensation
if mode == STANDBY or interlocks block: output = 0
Anti-windup is back-calculation: when the output saturates, the integrator
is corrected by `(saturated - raw) * anti_windup_gain * dt`. The gain
defaults to `1 / kp` if kp != 0, else 0.
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class PidInnerConfig:
"""All parameters for the inner PID + actuator compensation.
Defaults are the 30 m motor-yacht seed values from
``arautopilot/library/default_tunings/yacht_motor_planeo_30m.yaml``.
"""
# --- Gains --------------------------------------------------------------
kp: float = 2.5
ki: float = 0.15
kd: float = 0.30
# --- Sampling -----------------------------------------------------------
freq_hz: float = 50.0
"""Nominal loop rate. dt = 1 / freq_hz."""
# --- Setpoint handling --------------------------------------------------
deadband_deg: float = 0.5
"""Errors within this band do not contribute to P, I, or D (suppresses noise)."""
rate_limit_dps: float = 30.0
"""Maximum |d(setpoint)/dt| applied to the working setpoint, deg/s."""
# --- Output saturation --------------------------------------------------
output_min_pct: float = -100.0
output_max_pct: float = +100.0
# --- Anti-windup --------------------------------------------------------
integral_clamp: float = 30.0
"""Hard clamp on the integrator's accumulated value (output-equivalent units)."""
aw_gain: float | None = None
"""Back-calculation anti-windup gain. ``None`` -> 1/kp if kp != 0, else 0."""
# --- Derivative low-pass ------------------------------------------------
d_lpf_tau_s: float = 0.05
"""Time constant of the first-order low-pass on the derivative term."""
# --- Actuator non-linearity compensation --------------------------------
deadband_pct: float = 7.0
"""Below this absolute command, the actuator produces no torque -- the PID
must skip this band to avoid lingering at the edge of action."""
min_useful_pwm_pct: float = 12.0
"""Snap any non-zero command up to at least this magnitude."""
asymmetry_stbd_over_port: float = 1.0
"""Inverse of the simulator's asymmetry. If the pump pushes harder to
starboard, divide the starboard command by the asymmetry so the
effective torque is symmetric."""
def dt(self) -> float:
return 1.0 / self.freq_hz
@dataclass
class PidInnerState:
"""Mutable runtime state of the controller."""
integral: float = 0.0
prev_error: float = 0.0
prev_d_term: float = 0.0
prev_setpoint_deg: float = 0.0
last_output_pct: float = 0.0
class PidInner:
"""Inner rudder-position PID controller.
Designed for cross-language port: every operation is plain arithmetic,
no dependencies beyond the dataclasses above.
"""
def __init__(self, config: PidInnerConfig | None = None) -> None:
self.config: PidInnerConfig = config or PidInnerConfig()
self.state: PidInnerState = PidInnerState()
self._dt: float = self.config.dt()
# -- Lifecycle -----------------------------------------------------------
def reset(self, *, measured_deg: float = 0.0, setpoint_deg: float = 0.0) -> None:
self.state = PidInnerState(prev_setpoint_deg=setpoint_deg)
def update_config(self, config: PidInnerConfig) -> None:
"""Hot-swap configuration without losing accumulated state."""
self.config = config
self._dt = config.dt()
# -- Step ----------------------------------------------------------------
def step(self, *, setpoint_deg: float, measured_deg: float,
allowed: bool = True) -> float:
"""Compute the controller output for one tick.
``allowed`` lets the caller force the output to zero (e.g. while in
STANDBY or with the actuator master power off). When False, the
integrator is bled toward zero so the PID doesn't accumulate during
manual operation.
Returns the signed PWM command in percent, after actuator
non-linearity compensation, ready to be passed to the H-bridge / pump.
"""
cfg = self.config
st = self.state
dt = self._dt
# Rate-limit the setpoint -- the operator/outer loop may step it
# large, but the inner loop should pursue it smoothly so the rudder
# doesn't slam.
target = self._rate_limit_setpoint(setpoint_deg)
st.prev_setpoint_deg = target
# Compute the (possibly dead-banded) error.
raw_error = target - measured_deg
if abs(raw_error) <= cfg.deadband_deg:
error = 0.0
else:
# Strip the deadband so the action is continuous at the edge.
sign = 1.0 if raw_error > 0.0 else -1.0
error = raw_error - sign * cfg.deadband_deg
# --- Forced inactive path -----------------------------------------
if not allowed:
# Bleed the integrator toward zero to avoid windup during manual.
st.integral *= 0.95
st.prev_error = error
st.last_output_pct = 0.0
return 0.0
# --- Proportional -------------------------------------------------
p_term = cfg.kp * error
# --- Integral (provisional, before anti-windup correction) --------
st.integral += cfg.ki * error * dt
st.integral = _clamp(st.integral, -cfg.integral_clamp, cfg.integral_clamp)
# --- Derivative with low-pass filter ------------------------------
if dt > 0.0:
d_raw = cfg.kd * (error - st.prev_error) / dt
else:
d_raw = 0.0
alpha = _lpf_alpha(cfg.d_lpf_tau_s, dt)
d_term = (1.0 - alpha) * st.prev_d_term + alpha * d_raw
st.prev_d_term = d_term
raw_output = p_term + st.integral + d_term
# --- Saturate + back-calculation anti-windup ----------------------
output = _clamp(raw_output, cfg.output_min_pct, cfg.output_max_pct)
if raw_output != output:
aw = cfg.aw_gain if cfg.aw_gain is not None else (
1.0 / cfg.kp if cfg.kp != 0.0 else 0.0
)
st.integral -= aw * (raw_output - output) * dt
st.integral = _clamp(st.integral, -cfg.integral_clamp, cfg.integral_clamp)
# --- Actuator non-linearity compensation --------------------------
cmd = self._compensate(output)
st.prev_error = error
st.last_output_pct = cmd
return cmd
# -- Helpers -------------------------------------------------------------
def _rate_limit_setpoint(self, requested_deg: float) -> float:
cfg = self.config
st = self.state
max_delta = cfg.rate_limit_dps * self._dt
delta = requested_deg - st.prev_setpoint_deg
if delta > max_delta:
return st.prev_setpoint_deg + max_delta
if delta < -max_delta:
return st.prev_setpoint_deg - max_delta
return requested_deg
def _compensate(self, raw_pct: float) -> float:
cfg = self.config
if raw_pct == 0.0:
return 0.0
magnitude = abs(raw_pct)
if magnitude <= cfg.deadband_pct:
return 0.0
if magnitude < cfg.min_useful_pwm_pct:
magnitude = cfg.min_useful_pwm_pct
sign = 1.0 if raw_pct > 0.0 else -1.0
cmd = sign * magnitude
# Asymmetry: divide the starboard side, multiply the port side so the
# actuator's intrinsic faster-to-starboard pump is compensated.
if cmd > 0.0 and cfg.asymmetry_stbd_over_port != 0.0:
cmd /= cfg.asymmetry_stbd_over_port
elif cmd < 0.0:
cmd *= cfg.asymmetry_stbd_over_port
# Re-saturate after compensation in case scaling pushed us past 100 %.
return _clamp(cmd, cfg.output_min_pct, cfg.output_max_pct)
def _clamp(x: float, lo: float, hi: float) -> float:
if x < lo:
return lo
if x > hi:
return hi
return x
def _lpf_alpha(tau_s: float, dt: float) -> float:
"""First-order low-pass coefficient: alpha = dt / (tau + dt).
Smaller alpha -> heavier filtering. alpha == 1.0 means no filtering.
"""
if tau_s <= 0.0:
return 1.0
return dt / (tau_s + dt)
@@ -0,0 +1,210 @@
"""Minimal physical model of a hydraulic rudder actuator.
The state is the rudder angle in degrees and its angular velocity in deg/s.
The driver is a signed PWM command in percent (-100..+100):
pwm > 0 -> drive starboard (positive angular acceleration)
pwm < 0 -> drive port (negative angular acceleration)
pwm == 0 -> no torque from the actuator (still subject to friction)
The model is intentionally small: it must capture the qualitative behaviour
that makes a PID interesting (inertia, friction, deadband, asymmetry,
end-stop saturation) but it does NOT pretend to be a faithful CFD of any
real pump. The integrator's affinated tunings live elsewhere.
Equations (per integration step ``dt``):
effective_cmd = compensate_deadband_and_asymmetry(pwm)
torque = actuator_gain * effective_cmd
accel = (torque - friction * angular_vel) / inertia
angular_vel += accel * dt
angle += angular_vel * dt
# then clamp to [-max_angle_deg, +max_angle_deg], zeroing velocity at the stop
Units throughout:
angle [deg]
velocity [deg / s]
accel [deg / s^2]
torque [arbitrary scaled units; absorbed into actuator_gain]
friction [(deg/s) -> (deg/s^2)], i.e. viscous coefficient
"""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class RudderDynamicsConfig:
"""Tunable parameters of the rudder physical model.
Defaults are reasonable for a 30 m motor yacht with a reversible
hydraulic pump (calibration profile shipped in the seed library).
"""
# --- Mechanical ---------------------------------------------------------
max_angle_deg: float = 35.0
"""Mechanical end-stop on either side, degrees."""
inertia: float = 1.0
"""Rotational inertia (arbitrary units; absorbed with actuator_gain)."""
friction: float = 4.0
"""Viscous friction coefficient (higher = more damping)."""
# --- Actuator -----------------------------------------------------------
# Tuned so that 100 % PWM produces a steady-state angular velocity of
# ~5 deg/s, which matches the brief's "typical 3-6 dps" max rate for a
# 30 m yacht hydraulic pump:
# v_steady = (actuator_gain * 100) / friction = (0.2 * 100) / 4 = 5
actuator_gain: float = 0.2
"""Torque-equivalent per percent of PWM. Marine-realistic for a 30 m yacht."""
deadband_pct: float = 7.0
"""Below this absolute command, the pump produces no torque (static friction)."""
min_useful_pwm_pct: float = 12.0
"""Once above the deadband, the effective PWM is snapped up to this minimum."""
asymmetry_stbd_over_port: float = 1.0
"""Ratio of starboard speed to port speed; 1.0 = symmetric."""
# --- Optional disturbance -----------------------------------------------
external_torque: float = 0.0
"""Constant external torque (e.g. simulating wave action). 0 by default."""
@dataclass
class RudderState:
angle_deg: float = 0.0
angular_vel_dps: float = 0.0
class RudderSimulator:
"""Discrete-time integrator of the rudder model.
Typical usage::
sim = RudderSimulator()
sim.reset(angle_deg=0.0)
for k in range(1000):
sim.step(dt=0.001, pwm_pct=cmd)
print(sim.state.angle_deg)
"""
def __init__(self, config: RudderDynamicsConfig | None = None) -> None:
self.config: RudderDynamicsConfig = config or RudderDynamicsConfig()
self.state: RudderState = RudderState()
# -- Lifecycle -----------------------------------------------------------
def reset(self, *, angle_deg: float = 0.0, angular_vel_dps: float = 0.0) -> None:
self.state = RudderState(angle_deg=angle_deg, angular_vel_dps=angular_vel_dps)
# -- Integration ---------------------------------------------------------
def step(self, *, dt: float, pwm_pct: float) -> RudderState:
"""Integrate the model forward by ``dt`` seconds with constant ``pwm_pct``."""
if dt <= 0.0:
raise ValueError(f"dt must be > 0, got {dt}")
cfg = self.config
st = self.state
effective = self._compensate(pwm_pct)
torque = cfg.actuator_gain * effective + cfg.external_torque
accel = (torque - cfg.friction * st.angular_vel_dps) / cfg.inertia
st.angular_vel_dps += accel * dt
st.angle_deg += st.angular_vel_dps * dt
# End-stop saturation.
if st.angle_deg > cfg.max_angle_deg:
st.angle_deg = cfg.max_angle_deg
if st.angular_vel_dps > 0.0:
st.angular_vel_dps = 0.0
elif st.angle_deg < -cfg.max_angle_deg:
st.angle_deg = -cfg.max_angle_deg
if st.angular_vel_dps < 0.0:
st.angular_vel_dps = 0.0
return st
# -- Actuator non-linearity ---------------------------------------------
def _compensate(self, pwm_pct: float) -> float:
"""Apply deadband + min-useful-PWM + asymmetry.
Returned units: signed effective PWM in percent. Sign convention
matches the input.
"""
cfg = self.config
if pwm_pct == 0.0:
return 0.0
magnitude = abs(pwm_pct)
if magnitude <= cfg.deadband_pct:
return 0.0
if magnitude < cfg.min_useful_pwm_pct:
magnitude = cfg.min_useful_pwm_pct
sign = 1.0 if pwm_pct > 0.0 else -1.0
effective = sign * magnitude
# Asymmetry: starboard (positive) is multiplied; port is divided so
# the symmetric case asymmetry=1.0 leaves both sides untouched.
if effective > 0.0:
effective *= cfg.asymmetry_stbd_over_port
elif cfg.asymmetry_stbd_over_port != 0.0:
effective /= cfg.asymmetry_stbd_over_port
return effective
@dataclass
class TrajectorySample:
"""One row of a recorded run."""
t: float
setpoint_deg: float
angle_deg: float
angular_vel_dps: float
pwm_pct: float
@dataclass
class RunRecorder:
"""Capture a (time, setpoint, angle, vel, cmd) trace for plotting/tests."""
samples: list[TrajectorySample] = field(default_factory=list)
def record(self, *, t: float, setpoint_deg: float, sim: RudderSimulator,
pwm_pct: float) -> None:
self.samples.append(
TrajectorySample(
t=t,
setpoint_deg=setpoint_deg,
angle_deg=sim.state.angle_deg,
angular_vel_dps=sim.state.angular_vel_dps,
pwm_pct=pwm_pct,
)
)
def final_angle(self) -> float:
return self.samples[-1].angle_deg if self.samples else 0.0
def settling_time_s(self, target: float, tolerance_deg: float = 0.5) -> float | None:
"""Return time at which ``|angle - target| <= tolerance`` and stays there
until the end of the run. ``None`` if it never settles."""
if not self.samples:
return None
# Walk backwards: find the last time the error exceeded tolerance.
for i in range(len(self.samples) - 1, -1, -1):
if abs(self.samples[i].angle_deg - target) > tolerance_deg:
if i + 1 < len(self.samples):
return self.samples[i + 1].t
return None
return self.samples[0].t
def max_overshoot_deg(self, target: float) -> float:
"""Maximum overshoot past ``target`` in the direction of the step."""
if not self.samples:
return 0.0
start = self.samples[0].angle_deg
going_up = target > start
peak = max((s.angle_deg for s in self.samples), default=start) if going_up \
else min((s.angle_deg for s in self.samples), default=start)
overshoot = (peak - target) if going_up else (target - peak)
return max(0.0, overshoot)
+226
View File
@@ -0,0 +1,226 @@
"""Tests for ``arautopilot.studio.simulator.pid_inner`` against the rudder simulator."""
from __future__ import annotations
import pytest
from arautopilot.studio.simulator.pid_inner import PidInner, PidInnerConfig
from arautopilot.studio.simulator.rudder_dynamics import (
RudderDynamicsConfig,
RudderSimulator,
RunRecorder,
)
def _run(pid: PidInner, sim: RudderSimulator, *, setpoint_deg: float,
seconds: float, allowed: bool = True, dt_inner: float | None = None,
dt_sim: float = 0.002) -> RunRecorder:
"""Drive ``sim`` at ``dt_sim`` while ticking ``pid`` at its own rate."""
if dt_inner is None:
dt_inner = pid.config.dt()
rec = RunRecorder()
pid_carry = 0.0
cmd = 0.0
t = 0.0
steps = int(seconds / dt_sim)
for _ in range(steps):
pid_carry += dt_sim
if pid_carry + 1e-12 >= dt_inner:
cmd = pid.step(setpoint_deg=setpoint_deg,
measured_deg=sim.state.angle_deg,
allowed=allowed)
pid_carry -= dt_inner
sim.step(dt=dt_sim, pwm_pct=cmd)
t += dt_sim
rec.record(t=t, setpoint_deg=setpoint_deg, sim=sim, pwm_pct=cmd)
return rec
def _aggressive_pid() -> PidInner:
"""Gains tuned for the marine-realistic simulator (v_max ~5 dps).
With slow rudder dynamics (full deflection in ~7 s) we want the PID
to drive the actuator at max for a couple of seconds, then back off
smoothly. kp dominates; ki small to clean up steady-state bias from
any external torque; kd light because the rudder is heavily damped.
"""
return PidInner(PidInnerConfig(
kp=10.0, ki=0.5, kd=0.5,
deadband_deg=0.2,
rate_limit_dps=10000.0, # disable rate limit in unit tests
deadband_pct=0.0,
min_useful_pwm_pct=0.0,
integral_clamp=200.0,
))
def test_step_response_settles_near_setpoint() -> None:
"""20 deg step with marine-realistic v_max ~5 dps should settle in ~10 s."""
sim = RudderSimulator()
sim.reset()
pid = _aggressive_pid()
pid.reset(measured_deg=0.0, setpoint_deg=0.0)
rec = _run(pid, sim, setpoint_deg=20.0, seconds=12.0)
final = rec.final_angle()
assert final == pytest.approx(20.0, abs=1.0), f"final angle {final}, want ~20"
def test_negative_step_response() -> None:
sim = RudderSimulator()
sim.reset()
pid = _aggressive_pid()
pid.reset(measured_deg=0.0, setpoint_deg=0.0)
rec = _run(pid, sim, setpoint_deg=-15.0, seconds=12.0)
assert rec.final_angle() == pytest.approx(-15.0, abs=1.0)
def test_setpoint_deadband_holds_when_close() -> None:
"""If the working setpoint stays within the deadband of the measurement,
no command is produced and the rudder doesn't move."""
sim = RudderSimulator()
sim.reset(angle_deg=10.0)
cfg = PidInnerConfig(kp=10.0, ki=0.5, kd=0.5,
deadband_deg=2.0,
rate_limit_dps=10000.0,
deadband_pct=0.0,
min_useful_pwm_pct=0.0)
pid = PidInner(cfg)
# Critically: initialise the working setpoint to the current measurement
# so the rate limiter doesn't pull it through the deadband from 0.
pid.reset(measured_deg=10.0, setpoint_deg=10.0)
rec = _run(pid, sim, setpoint_deg=11.0, seconds=2.0)
final = rec.final_angle()
assert final == pytest.approx(10.0, abs=0.5), f"deadband not holding, final={final}"
def test_anti_windup_keeps_integrator_bounded_under_saturation() -> None:
"""Park the rudder at the end-stop with a far setpoint -> output saturates
indefinitely. The integrator must not run away."""
sim = RudderSimulator()
sim.reset(angle_deg=35.0)
pid = PidInner(PidInnerConfig(
kp=8.0, ki=10.0, kd=0.5,
deadband_deg=0.0,
rate_limit_dps=200.0,
deadband_pct=0.0,
min_useful_pwm_pct=0.0,
integral_clamp=20.0,
))
pid.reset(measured_deg=35.0)
_ = _run(pid, sim, setpoint_deg=80.0, seconds=2.0)
assert abs(pid.state.integral) <= pid.config.integral_clamp + 1e-6
def test_allowed_false_zeros_output_and_bleeds_integrator() -> None:
pid = _aggressive_pid()
pid.reset(measured_deg=0.0)
pid.state.integral = 5.0
out = pid.step(setpoint_deg=20.0, measured_deg=0.0, allowed=False)
assert out == 0.0
assert abs(pid.state.integral) < 5.0
def test_actuator_deadband_compensation_skips_band() -> None:
"""Below deadband_pct=10 the command should be zero; just above, snapped to 15."""
pid = PidInner(PidInnerConfig(
kp=1.0, ki=0.0, kd=0.0,
deadband_deg=0.0,
rate_limit_dps=10000.0, # disable -- testing single tick
deadband_pct=10.0,
min_useful_pwm_pct=15.0,
))
pid.reset(measured_deg=0.0)
# error=5 -> raw_output=5 -> below deadband -> 0
assert pid.step(setpoint_deg=5.0, measured_deg=0.0) == 0.0
pid.reset(measured_deg=0.0)
# error=11 -> raw_output=11 -> snapped to 15
out = pid.step(setpoint_deg=11.0, measured_deg=0.0)
assert out == pytest.approx(15.0, abs=1e-6)
pid.reset(measured_deg=0.0)
# error=-11 -> snapped to -15
out = pid.step(setpoint_deg=-11.0, measured_deg=0.0)
assert out == pytest.approx(-15.0, abs=1e-6)
def test_asymmetry_divides_stbd_command() -> None:
"""If the pump is intrinsically faster to stbd (asymmetry=1.5), the PID
should divide its starboard command by 1.5 to compensate."""
pid = PidInner(PidInnerConfig(
kp=1.0, ki=0.0, kd=0.0,
deadband_deg=0.0,
rate_limit_dps=10000.0, # disable -- testing single tick
deadband_pct=0.0,
min_useful_pwm_pct=0.0,
asymmetry_stbd_over_port=1.5,
))
pid.reset(measured_deg=0.0)
out = pid.step(setpoint_deg=30.0, measured_deg=0.0)
# raw_output = 30 -> divided by 1.5 -> 20
assert out == pytest.approx(20.0, abs=1e-6)
pid.reset(measured_deg=0.0)
out = pid.step(setpoint_deg=-30.0, measured_deg=0.0)
# raw_output = -30 -> port multiplied -> -45 (within saturation)
assert out == pytest.approx(-45.0, abs=1e-6)
def test_output_saturation_at_plus_minus_100() -> None:
pid = PidInner(PidInnerConfig(
kp=50.0, ki=0.0, kd=0.0,
deadband_deg=0.0,
rate_limit_dps=10000.0,
deadband_pct=0.0,
min_useful_pwm_pct=0.0,
))
pid.reset(measured_deg=0.0)
out = pid.step(setpoint_deg=10.0, measured_deg=0.0)
assert out == 100.0
pid.reset(measured_deg=0.0)
out = pid.step(setpoint_deg=-10.0, measured_deg=0.0)
assert out == -100.0
def test_rate_limit_caps_setpoint_slew() -> None:
"""With rate_limit_dps=5 and dt=0.02, the working setpoint should advance
by no more than 0.1 deg per tick toward a step of 50 deg."""
pid = PidInner(PidInnerConfig(
kp=1.0, ki=0.0, kd=0.0,
deadband_deg=0.0,
rate_limit_dps=5.0,
deadband_pct=0.0,
min_useful_pwm_pct=0.0,
freq_hz=50.0,
))
pid.reset(measured_deg=0.0, setpoint_deg=0.0)
# First tick: working setpoint can grow by 5 * 0.02 = 0.1 deg.
out1 = pid.step(setpoint_deg=50.0, measured_deg=0.0)
assert out1 == pytest.approx(0.1, abs=1e-6)
out2 = pid.step(setpoint_deg=50.0, measured_deg=0.0)
assert out2 == pytest.approx(0.2, abs=1e-6)
def test_disturbance_rejection() -> None:
"""Apply a constant external torque; the controller should still settle
near the setpoint, with the integrator absorbing the bias.
External torque +1.0 with actuator_gain=0.2 means the controller needs
a steady-state command of ~-5 % to keep position. The integrator gets
there by integrating the residual error -- with ki=2.0 it converges
within ~25 s.
"""
sim = RudderSimulator(RudderDynamicsConfig(external_torque=1.0))
sim.reset()
# Use higher integral gain than the default _aggressive_pid() for faster
# disturbance rejection in the test window.
pid = PidInner(PidInnerConfig(
kp=10.0, ki=2.0, kd=0.5,
deadband_deg=0.2,
rate_limit_dps=10000.0,
deadband_pct=0.0,
min_useful_pwm_pct=0.0,
integral_clamp=200.0,
))
pid.reset(measured_deg=0.0, setpoint_deg=0.0)
rec = _run(pid, sim, setpoint_deg=10.0, seconds=25.0)
assert rec.final_angle() == pytest.approx(10.0, abs=1.0), (
f"final angle {rec.final_angle()}, want ~10"
)
+117
View File
@@ -0,0 +1,117 @@
"""Tests for ``arautopilot.studio.simulator.rudder_dynamics``."""
from __future__ import annotations
import pytest
from arautopilot.studio.simulator.rudder_dynamics import (
RudderDynamicsConfig,
RudderSimulator,
RunRecorder,
)
def test_zero_input_stays_at_rest() -> None:
sim = RudderSimulator()
sim.reset(angle_deg=0.0)
for _ in range(1000):
sim.step(dt=0.001, pwm_pct=0.0)
assert sim.state.angle_deg == pytest.approx(0.0, abs=1e-9)
assert sim.state.angular_vel_dps == pytest.approx(0.0, abs=1e-9)
def test_full_starboard_command_drives_to_stbd_endstop() -> None:
"""With marine-realistic gain (v_max ~5 dps) reaching 35 deg takes ~8 s."""
sim = RudderSimulator()
sim.reset(angle_deg=0.0)
for _ in range(15000):
sim.step(dt=0.001, pwm_pct=100.0)
assert sim.state.angle_deg == pytest.approx(sim.config.max_angle_deg, abs=1e-6)
assert sim.state.angular_vel_dps == pytest.approx(0.0, abs=1e-6)
def test_full_port_command_drives_to_port_endstop() -> None:
sim = RudderSimulator()
sim.reset(angle_deg=0.0)
for _ in range(15000):
sim.step(dt=0.001, pwm_pct=-100.0)
assert sim.state.angle_deg == pytest.approx(-sim.config.max_angle_deg, abs=1e-6)
assert sim.state.angular_vel_dps == pytest.approx(0.0, abs=1e-6)
def test_command_inside_deadband_produces_no_motion() -> None:
sim = RudderSimulator(RudderDynamicsConfig(deadband_pct=10.0,
min_useful_pwm_pct=15.0))
sim.reset(angle_deg=0.0)
for _ in range(2000):
sim.step(dt=0.001, pwm_pct=5.0) # under deadband
assert sim.state.angle_deg == pytest.approx(0.0, abs=1e-9)
def test_command_just_above_deadband_snaps_to_min_useful() -> None:
"""A command of 11% with deadband=10/min_useful=15 should act like 15%."""
cfg = RudderDynamicsConfig(deadband_pct=10.0, min_useful_pwm_pct=15.0)
sim_a = RudderSimulator(cfg)
sim_b = RudderSimulator(cfg)
sim_a.reset()
sim_b.reset()
for _ in range(3000):
sim_a.step(dt=0.001, pwm_pct=11.0)
sim_b.step(dt=0.001, pwm_pct=15.0)
assert sim_a.state.angle_deg == pytest.approx(sim_b.state.angle_deg, abs=1e-9)
def test_asymmetry_makes_starboard_faster() -> None:
cfg = RudderDynamicsConfig(asymmetry_stbd_over_port=1.5,
deadband_pct=0.0, min_useful_pwm_pct=0.0)
sim_s = RudderSimulator(cfg)
sim_p = RudderSimulator(cfg)
sim_s.reset()
sim_p.reset()
for _ in range(2000):
sim_s.step(dt=0.001, pwm_pct=+50.0)
sim_p.step(dt=0.001, pwm_pct=-50.0)
# Starboard angle should be larger in magnitude than port at the same
# effort, because the pump pushes harder to starboard.
assert abs(sim_s.state.angle_deg) > abs(sim_p.state.angle_deg)
def test_recorder_captures_trajectory() -> None:
"""Sanity check the recorder: capture a few seconds of constant-command
motion and verify the samples reflect the rudder moving in the commanded
direction with strictly monotonic position (no oscillation under constant
drive)."""
sim = RudderSimulator()
sim.reset()
rec = RunRecorder()
for k in range(2000):
t = k * 0.001
sim.step(dt=0.001, pwm_pct=80.0)
rec.record(t=t, setpoint_deg=25.0, sim=sim, pwm_pct=80.0)
assert len(rec.samples) == 2000
# Monotonically increasing angle (constant +ve command + viscous damping
# means no overshoot during the run).
assert rec.samples[-1].angle_deg > rec.samples[0].angle_deg
assert all(rec.samples[i + 1].angle_deg >= rec.samples[i].angle_deg
for i in range(len(rec.samples) - 1))
def test_invalid_dt_rejected() -> None:
sim = RudderSimulator()
with pytest.raises(ValueError):
sim.step(dt=0.0, pwm_pct=50.0)
with pytest.raises(ValueError):
sim.step(dt=-0.001, pwm_pct=50.0)
def test_endstop_zeros_outward_velocity_only() -> None:
"""Hitting the stbd end-stop should kill +velocity but allow reverse."""
sim = RudderSimulator()
sim.reset(angle_deg=sim.config.max_angle_deg, angular_vel_dps=20.0)
sim.step(dt=0.001, pwm_pct=0.0)
assert sim.state.angle_deg == pytest.approx(sim.config.max_angle_deg, abs=1e-9)
assert sim.state.angular_vel_dps == 0.0
# Now reverse: should move away from the stop.
for _ in range(200):
sim.step(dt=0.001, pwm_pct=-80.0)
assert sim.state.angle_deg < sim.config.max_angle_deg