5f9b445572
Python: - autotuner.py: RelayAutoTuner (Astrom-Hagglund relay method) → TunerResult with Ku/Tu; TunerResult.to_pid_gains() applies ZN formulas - commissioning_wizard.py: 4-phase state machine (RUDDER_LIMITS → SENSOR_CAL → AUTO_TUNE → DONE); transport-injected for testability; abort on invalid cal - test_autotuner.py: 17 tests covering relay convergence, ZN formulas, wizard full-run, abort, ADC swap, identical-ADC guard Firmware: - nvs_config.h/cpp: NVS-backed CalibrationData store (adc limits, rudder angles, outer Kp/Ki/Kd, commissioned flag); float stored as uint32 via memcpy - knob_encoder.h/cpp: quadrature rotary encoder on GPIO 16/17 with ISR Gray-code decode; knob_arm coil arms for 5 s window; updates heading setpoint ±1 deg/detent - modbus_slave.cpp: COIL_CMD_KNOB_ARM now calls knob_encoder_set_armed() - main.cpp: nvs_config_init/load at boot; apply commissioned calibration to rudder sensor and outer loop gains; start knob encoder task Tests: 326 passed | Flash: 28.5% Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
230 lines
8.1 KiB
Python
230 lines
8.1 KiB
Python
"""Tests for the relay auto-tuner and commissioning wizard -- Sprint 7."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import pytest
|
|
|
|
from arautopilot.core.autotuner import RelayAutoTuner, TunerResult
|
|
from arautopilot.core.pid_config import PidGains
|
|
from arautopilot.studio.wizards.commissioning_wizard import (
|
|
CommissioningWizard,
|
|
WizardPhase,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Simple first-order vessel model: heading error driven by relay rudder output.
|
|
#
|
|
# dh/dt = Kv * rudder (Kv = vessel heading rate gain, deg/(s·deg_rud))
|
|
#
|
|
# At 10 Hz: err_{k+1} = err_k - Kv * relay_k * dt
|
|
# This produces sinusoidal heading oscillation (limit cycle) under relay control.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class FirstOrderVessel:
|
|
"""Minimal heading process for relay-tuner tests."""
|
|
|
|
def __init__(self, kv: float = 0.3, dt: float = 0.1) -> None:
|
|
self.heading = 0.0
|
|
self.setpoint = 0.0
|
|
self.kv = kv
|
|
self.dt = dt
|
|
|
|
def step(self, rudder_cmd: float) -> float:
|
|
self.heading += self.kv * rudder_cmd * self.dt
|
|
return self.setpoint - self.heading # error
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RelayAutoTuner tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRelayAutoTuner:
|
|
def test_invalid_amplitude_raises(self):
|
|
with pytest.raises(ValueError, match="relay_amplitude"):
|
|
RelayAutoTuner(relay_amplitude=0.0)
|
|
|
|
def test_invalid_min_cycles_raises(self):
|
|
with pytest.raises(ValueError, match="min_cycles"):
|
|
RelayAutoTuner(min_cycles=0)
|
|
|
|
def test_not_done_before_experiment(self):
|
|
tuner = RelayAutoTuner()
|
|
assert not tuner.is_done
|
|
|
|
def test_result_not_converged_before_start(self):
|
|
tuner = RelayAutoTuner()
|
|
r = tuner.result()
|
|
assert not r.converged
|
|
|
|
def test_converges_on_first_order_plant(self):
|
|
vessel = FirstOrderVessel(kv=0.4, dt=0.1)
|
|
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=3, dt_s=0.1)
|
|
for _ in range(2000):
|
|
error = vessel.step(tuner.step(vessel.setpoint - vessel.heading))
|
|
if tuner.is_done:
|
|
break
|
|
assert tuner.is_done
|
|
r = tuner.result()
|
|
assert r.converged
|
|
assert r.cycles_detected >= 3
|
|
assert r.ku > 0
|
|
assert r.tu_s > 0
|
|
|
|
def test_ku_reasonable_range(self):
|
|
"""Ku must be positive and finite; integrating plant gives high Ku (normal)."""
|
|
vessel = FirstOrderVessel(kv=0.4, dt=0.1)
|
|
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=3, dt_s=0.1)
|
|
for _ in range(2000):
|
|
err = vessel.step(tuner.step(vessel.setpoint - vessel.heading))
|
|
if tuner.is_done:
|
|
break
|
|
r = tuner.result()
|
|
assert r.ku > 0
|
|
import math as _math
|
|
assert _math.isfinite(r.ku)
|
|
|
|
def test_to_pid_gains_zn_formulas(self):
|
|
"""ZN formulas: Kp=0.6Ku, Ki=1.2Ku/Tu, Kd=0.075Ku*Tu."""
|
|
result = TunerResult(converged=True, ku=2.0, tu_s=4.0)
|
|
gains = result.to_pid_gains()
|
|
assert abs(gains.kp - 0.6 * 2.0) < 1e-3
|
|
assert abs(gains.ki - 1.2 * 2.0 / 4.0) < 1e-3
|
|
assert abs(gains.kd - 0.075 * 2.0 * 4.0) < 1e-3
|
|
|
|
def test_to_pid_gains_raises_if_not_converged(self):
|
|
r = TunerResult(converged=False)
|
|
with pytest.raises(RuntimeError):
|
|
r.to_pid_gains()
|
|
|
|
def test_output_toggles_at_zero_crossing(self):
|
|
"""After one half-cycle the relay output should flip sign."""
|
|
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=1, dt_s=0.1)
|
|
# Drive error positive for several steps.
|
|
outputs = []
|
|
for i in range(5):
|
|
outputs.append(tuner.step(1.0))
|
|
# Force sign flip.
|
|
for i in range(3):
|
|
outputs.append(tuner.step(-1.0))
|
|
# After the sign change the relay should have flipped.
|
|
assert outputs[-1] < 0
|
|
|
|
def test_max_steps_safety_cutoff(self):
|
|
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=100, max_steps=50)
|
|
for _ in range(60):
|
|
tuner.step(1.0)
|
|
assert tuner.is_done
|
|
|
|
def test_returns_zero_after_done(self):
|
|
tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=100, max_steps=5)
|
|
for _ in range(10):
|
|
out = tuner.step(1.0)
|
|
assert out == 0.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CommissioningWizard tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class StubTransport:
|
|
"""Simple stub that records commands and returns configurable ADC/error values."""
|
|
|
|
def __init__(self, adc_port: int = 100, adc_stbd: int = 3900) -> None:
|
|
self.calls: list[tuple[str, float]] = []
|
|
self._adc_port = adc_port
|
|
self._adc_stbd = adc_stbd
|
|
self._at_port = True # track which limit we're simulating
|
|
self.heading = 0.0
|
|
self.setpoint = 0.0
|
|
|
|
def __call__(self, cmd: str, value: float) -> float:
|
|
self.calls.append((cmd, value))
|
|
if cmd == "move_port":
|
|
self._at_port = True
|
|
return 0.0
|
|
if cmd == "move_stbd":
|
|
self._at_port = False
|
|
return 0.0
|
|
if cmd == "stop":
|
|
return 0.0
|
|
if cmd == "read_adc":
|
|
return float(self._adc_port if self._at_port else self._adc_stbd)
|
|
if cmd == "read_error":
|
|
return self.setpoint - self.heading
|
|
if cmd == "set_rudder":
|
|
# Simple first-order response: heading moves 0.3 deg per deg-rudder per 0.1s
|
|
self.heading += 0.3 * value * 0.1
|
|
return 0.0
|
|
return 0.0
|
|
|
|
|
|
class TestCommissioningWizard:
|
|
def test_starts_in_rudder_limits_phase(self):
|
|
tx = StubTransport()
|
|
wiz = CommissioningWizard(transport=tx)
|
|
assert wiz.phase is WizardPhase.RUDDER_LIMITS
|
|
assert not wiz.done
|
|
|
|
def test_abort_stops_wizard(self):
|
|
tx = StubTransport()
|
|
wiz = CommissioningWizard(transport=tx)
|
|
wiz.abort("test abort")
|
|
assert wiz.phase is WizardPhase.ABORTED
|
|
assert wiz.done
|
|
assert wiz.result.aborted_reason == "test abort"
|
|
|
|
def test_full_run_completes(self):
|
|
tx = StubTransport()
|
|
wiz = CommissioningWizard(
|
|
transport=tx,
|
|
relay_amplitude=5.0,
|
|
min_tune_cycles=3,
|
|
limit_settle_s=0.5, # short settle for test speed
|
|
dt_s=0.1,
|
|
)
|
|
# Run until done (up to 15000 steps = 25 min at 10 Hz)
|
|
for _ in range(15000):
|
|
wiz.step()
|
|
if wiz.done:
|
|
break
|
|
assert wiz.phase is WizardPhase.DONE
|
|
assert wiz.result.completed
|
|
assert wiz.result.recommended_outer_gains is not None
|
|
gains = wiz.result.recommended_outer_gains
|
|
assert isinstance(gains, PidGains)
|
|
assert gains.kp > 0
|
|
|
|
def test_adc_limits_recorded(self):
|
|
tx = StubTransport(adc_port=200, adc_stbd=3800)
|
|
wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1)
|
|
for _ in range(15000):
|
|
wiz.step()
|
|
if wiz.done:
|
|
break
|
|
r = wiz.result
|
|
# After possible swap, lo < hi
|
|
assert r.adc_port_limit < r.adc_stbd_limit
|
|
|
|
def test_identical_adc_limits_aborts(self):
|
|
tx = StubTransport(adc_port=2048, adc_stbd=2048)
|
|
wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1)
|
|
for _ in range(200):
|
|
wiz.step()
|
|
if wiz.done:
|
|
break
|
|
assert wiz.phase is WizardPhase.ABORTED
|
|
assert "identical" in wiz.result.aborted_reason.lower()
|
|
|
|
def test_stop_called_on_completion(self):
|
|
tx = StubTransport()
|
|
wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1)
|
|
for _ in range(15000):
|
|
wiz.step()
|
|
if wiz.done:
|
|
break
|
|
cmds = [c[0] for c in tx.calls]
|
|
# 'stop' must have been called at least once (after limits and at tune end)
|
|
assert "stop" in cmds
|