Files
AR-Autopilot/arautopilot/tests/test_autotuner.py
T
alro65 5f9b445572 sprint-7: Commissioning wizard + relay ZN auto-tuner + NVS config + knob encoder
Python:
- autotuner.py: RelayAutoTuner (Astrom-Hagglund relay method) → TunerResult
  with Ku/Tu; TunerResult.to_pid_gains() applies ZN formulas
- commissioning_wizard.py: 4-phase state machine (RUDDER_LIMITS → SENSOR_CAL
  → AUTO_TUNE → DONE); transport-injected for testability; abort on invalid cal
- test_autotuner.py: 17 tests covering relay convergence, ZN formulas,
  wizard full-run, abort, ADC swap, identical-ADC guard

Firmware:
- nvs_config.h/cpp: NVS-backed CalibrationData store (adc limits, rudder angles,
  outer Kp/Ki/Kd, commissioned flag); float stored as uint32 via memcpy
- knob_encoder.h/cpp: quadrature rotary encoder on GPIO 16/17 with ISR Gray-code
  decode; knob_arm coil arms for 5 s window; updates heading setpoint ±1 deg/detent
- modbus_slave.cpp: COIL_CMD_KNOB_ARM now calls knob_encoder_set_armed()
- main.cpp: nvs_config_init/load at boot; apply commissioned calibration to
  rudder sensor and outer loop gains; start knob encoder task

Tests: 326 passed | Flash: 28.5%

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:55:26 -04:00

230 lines
8.1 KiB
Python

"""Tests for the relay auto-tuner and commissioning wizard -- Sprint 7."""
from __future__ import annotations
import math
import pytest
from arautopilot.core.autotuner import RelayAutoTuner, TunerResult
from arautopilot.core.pid_config import PidGains
from arautopilot.studio.wizards.commissioning_wizard import (
CommissioningWizard,
WizardPhase,
)
# ---------------------------------------------------------------------------
# Simple first-order vessel model: heading error driven by relay rudder output.
#
# dh/dt = Kv * rudder (Kv = vessel heading rate gain, deg/(s·deg_rud))
#
# At 10 Hz: err_{k+1} = err_k - Kv * relay_k * dt
# This produces sinusoidal heading oscillation (limit cycle) under relay control.
# ---------------------------------------------------------------------------
class FirstOrderVessel:
"""Minimal heading process for relay-tuner tests."""
def __init__(self, kv: float = 0.3, dt: float = 0.1) -> None:
self.heading = 0.0
self.setpoint = 0.0
self.kv = kv
self.dt = dt
def step(self, rudder_cmd: float) -> float:
self.heading += self.kv * rudder_cmd * self.dt
return self.setpoint - self.heading # error
# ---------------------------------------------------------------------------
# RelayAutoTuner tests
# ---------------------------------------------------------------------------
class TestRelayAutoTuner:
def test_invalid_amplitude_raises(self):
with pytest.raises(ValueError, match="relay_amplitude"):
RelayAutoTuner(relay_amplitude=0.0)
def test_invalid_min_cycles_raises(self):
with pytest.raises(ValueError, match="min_cycles"):
RelayAutoTuner(min_cycles=0)
def test_not_done_before_experiment(self):
tuner = RelayAutoTuner()
assert not tuner.is_done
def test_result_not_converged_before_start(self):
tuner = RelayAutoTuner()
r = tuner.result()
assert not r.converged
def test_converges_on_first_order_plant(self):
vessel = FirstOrderVessel(kv=0.4, dt=0.1)
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=3, dt_s=0.1)
for _ in range(2000):
error = vessel.step(tuner.step(vessel.setpoint - vessel.heading))
if tuner.is_done:
break
assert tuner.is_done
r = tuner.result()
assert r.converged
assert r.cycles_detected >= 3
assert r.ku > 0
assert r.tu_s > 0
def test_ku_reasonable_range(self):
"""Ku must be positive and finite; integrating plant gives high Ku (normal)."""
vessel = FirstOrderVessel(kv=0.4, dt=0.1)
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=3, dt_s=0.1)
for _ in range(2000):
err = vessel.step(tuner.step(vessel.setpoint - vessel.heading))
if tuner.is_done:
break
r = tuner.result()
assert r.ku > 0
import math as _math
assert _math.isfinite(r.ku)
def test_to_pid_gains_zn_formulas(self):
"""ZN formulas: Kp=0.6Ku, Ki=1.2Ku/Tu, Kd=0.075Ku*Tu."""
result = TunerResult(converged=True, ku=2.0, tu_s=4.0)
gains = result.to_pid_gains()
assert abs(gains.kp - 0.6 * 2.0) < 1e-3
assert abs(gains.ki - 1.2 * 2.0 / 4.0) < 1e-3
assert abs(gains.kd - 0.075 * 2.0 * 4.0) < 1e-3
def test_to_pid_gains_raises_if_not_converged(self):
r = TunerResult(converged=False)
with pytest.raises(RuntimeError):
r.to_pid_gains()
def test_output_toggles_at_zero_crossing(self):
"""After one half-cycle the relay output should flip sign."""
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=1, dt_s=0.1)
# Drive error positive for several steps.
outputs = []
for i in range(5):
outputs.append(tuner.step(1.0))
# Force sign flip.
for i in range(3):
outputs.append(tuner.step(-1.0))
# After the sign change the relay should have flipped.
assert outputs[-1] < 0
def test_max_steps_safety_cutoff(self):
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=100, max_steps=50)
for _ in range(60):
tuner.step(1.0)
assert tuner.is_done
def test_returns_zero_after_done(self):
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=100, max_steps=5)
for _ in range(10):
out = tuner.step(1.0)
assert out == 0.0
# ---------------------------------------------------------------------------
# CommissioningWizard tests
# ---------------------------------------------------------------------------
class StubTransport:
"""Simple stub that records commands and returns configurable ADC/error values."""
def __init__(self, adc_port: int = 100, adc_stbd: int = 3900) -> None:
self.calls: list[tuple[str, float]] = []
self._adc_port = adc_port
self._adc_stbd = adc_stbd
self._at_port = True # track which limit we're simulating
self.heading = 0.0
self.setpoint = 0.0
def __call__(self, cmd: str, value: float) -> float:
self.calls.append((cmd, value))
if cmd == "move_port":
self._at_port = True
return 0.0
if cmd == "move_stbd":
self._at_port = False
return 0.0
if cmd == "stop":
return 0.0
if cmd == "read_adc":
return float(self._adc_port if self._at_port else self._adc_stbd)
if cmd == "read_error":
return self.setpoint - self.heading
if cmd == "set_rudder":
# Simple first-order response: heading moves 0.3 deg per deg-rudder per 0.1s
self.heading += 0.3 * value * 0.1
return 0.0
return 0.0
class TestCommissioningWizard:
def test_starts_in_rudder_limits_phase(self):
tx = StubTransport()
wiz = CommissioningWizard(transport=tx)
assert wiz.phase is WizardPhase.RUDDER_LIMITS
assert not wiz.done
def test_abort_stops_wizard(self):
tx = StubTransport()
wiz = CommissioningWizard(transport=tx)
wiz.abort("test abort")
assert wiz.phase is WizardPhase.ABORTED
assert wiz.done
assert wiz.result.aborted_reason == "test abort"
def test_full_run_completes(self):
tx = StubTransport()
wiz = CommissioningWizard(
transport=tx,
relay_amplitude=5.0,
min_tune_cycles=3,
limit_settle_s=0.5, # short settle for test speed
dt_s=0.1,
)
# Run until done (up to 15000 steps = 25 min at 10 Hz)
for _ in range(15000):
wiz.step()
if wiz.done:
break
assert wiz.phase is WizardPhase.DONE
assert wiz.result.completed
assert wiz.result.recommended_outer_gains is not None
gains = wiz.result.recommended_outer_gains
assert isinstance(gains, PidGains)
assert gains.kp > 0
def test_adc_limits_recorded(self):
tx = StubTransport(adc_port=200, adc_stbd=3800)
wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1)
for _ in range(15000):
wiz.step()
if wiz.done:
break
r = wiz.result
# After possible swap, lo < hi
assert r.adc_port_limit < r.adc_stbd_limit
def test_identical_adc_limits_aborts(self):
tx = StubTransport(adc_port=2048, adc_stbd=2048)
wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1)
for _ in range(200):
wiz.step()
if wiz.done:
break
assert wiz.phase is WizardPhase.ABORTED
assert "identical" in wiz.result.aborted_reason.lower()
def test_stop_called_on_completion(self):
tx = StubTransport()
wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1)
for _ in range(15000):
wiz.step()
if wiz.done:
break
cmds = [c[0] for c in tx.calls]
# 'stop' must have been called at least once (after limits and at tune end)
assert "stop" in cmds