"""Tests for the relay auto-tuner and commissioning wizard -- Sprint 7.""" from __future__ import annotations import math import pytest from arautopilot.core.autotuner import RelayAutoTuner, TunerResult from arautopilot.core.pid_config import PidGains from arautopilot.studio.wizards.commissioning_wizard import ( CommissioningWizard, WizardPhase, ) # --------------------------------------------------------------------------- # Simple first-order vessel model: heading error driven by relay rudder output. # # dh/dt = Kv * rudder (Kv = vessel heading rate gain, deg/(s·deg_rud)) # # At 10 Hz: err_{k+1} = err_k - Kv * relay_k * dt # This produces sinusoidal heading oscillation (limit cycle) under relay control. # --------------------------------------------------------------------------- class FirstOrderVessel: """Minimal heading process for relay-tuner tests.""" def __init__(self, kv: float = 0.3, dt: float = 0.1) -> None: self.heading = 0.0 self.setpoint = 0.0 self.kv = kv self.dt = dt def step(self, rudder_cmd: float) -> float: self.heading += self.kv * rudder_cmd * self.dt return self.setpoint - self.heading # error # --------------------------------------------------------------------------- # RelayAutoTuner tests # --------------------------------------------------------------------------- class TestRelayAutoTuner: def test_invalid_amplitude_raises(self): with pytest.raises(ValueError, match="relay_amplitude"): RelayAutoTuner(relay_amplitude=0.0) def test_invalid_min_cycles_raises(self): with pytest.raises(ValueError, match="min_cycles"): RelayAutoTuner(min_cycles=0) def test_not_done_before_experiment(self): tuner = RelayAutoTuner() assert not tuner.is_done def test_result_not_converged_before_start(self): tuner = RelayAutoTuner() r = tuner.result() assert not r.converged def test_converges_on_first_order_plant(self): vessel = FirstOrderVessel(kv=0.4, dt=0.1) tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=3, dt_s=0.1) for _ in range(2000): error = vessel.step(tuner.step(vessel.setpoint - vessel.heading)) if tuner.is_done: break assert tuner.is_done r = tuner.result() assert r.converged assert r.cycles_detected >= 3 assert r.ku > 0 assert r.tu_s > 0 def test_ku_reasonable_range(self): """Ku must be positive and finite; integrating plant gives high Ku (normal).""" vessel = FirstOrderVessel(kv=0.4, dt=0.1) tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=3, dt_s=0.1) for _ in range(2000): err = vessel.step(tuner.step(vessel.setpoint - vessel.heading)) if tuner.is_done: break r = tuner.result() assert r.ku > 0 import math as _math assert _math.isfinite(r.ku) def test_to_pid_gains_zn_formulas(self): """ZN formulas: Kp=0.6Ku, Ki=1.2Ku/Tu, Kd=0.075Ku*Tu.""" result = TunerResult(converged=True, ku=2.0, tu_s=4.0) gains = result.to_pid_gains() assert abs(gains.kp - 0.6 * 2.0) < 1e-3 assert abs(gains.ki - 1.2 * 2.0 / 4.0) < 1e-3 assert abs(gains.kd - 0.075 * 2.0 * 4.0) < 1e-3 def test_to_pid_gains_raises_if_not_converged(self): r = TunerResult(converged=False) with pytest.raises(RuntimeError): r.to_pid_gains() def test_output_toggles_at_zero_crossing(self): """After one half-cycle the relay output should flip sign.""" tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=1, dt_s=0.1) # Drive error positive for several steps. outputs = [] for i in range(5): outputs.append(tuner.step(1.0)) # Force sign flip. for i in range(3): outputs.append(tuner.step(-1.0)) # After the sign change the relay should have flipped. assert outputs[-1] < 0 def test_max_steps_safety_cutoff(self): tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=100, max_steps=50) for _ in range(60): tuner.step(1.0) assert tuner.is_done def test_returns_zero_after_done(self): tuner = RelayAutoTuner(relay_amplitude=5.0, min_cycles=100, max_steps=5) for _ in range(10): out = tuner.step(1.0) assert out == 0.0 # --------------------------------------------------------------------------- # CommissioningWizard tests # --------------------------------------------------------------------------- class StubTransport: """Simple stub that records commands and returns configurable ADC/error values.""" def __init__(self, adc_port: int = 100, adc_stbd: int = 3900) -> None: self.calls: list[tuple[str, float]] = [] self._adc_port = adc_port self._adc_stbd = adc_stbd self._at_port = True # track which limit we're simulating self.heading = 0.0 self.setpoint = 0.0 def __call__(self, cmd: str, value: float) -> float: self.calls.append((cmd, value)) if cmd == "move_port": self._at_port = True return 0.0 if cmd == "move_stbd": self._at_port = False return 0.0 if cmd == "stop": return 0.0 if cmd == "read_adc": return float(self._adc_port if self._at_port else self._adc_stbd) if cmd == "read_error": return self.setpoint - self.heading if cmd == "set_rudder": # Simple first-order response: heading moves 0.3 deg per deg-rudder per 0.1s self.heading += 0.3 * value * 0.1 return 0.0 return 0.0 class TestCommissioningWizard: def test_starts_in_rudder_limits_phase(self): tx = StubTransport() wiz = CommissioningWizard(transport=tx) assert wiz.phase is WizardPhase.RUDDER_LIMITS assert not wiz.done def test_abort_stops_wizard(self): tx = StubTransport() wiz = CommissioningWizard(transport=tx) wiz.abort("test abort") assert wiz.phase is WizardPhase.ABORTED assert wiz.done assert wiz.result.aborted_reason == "test abort" def test_full_run_completes(self): tx = StubTransport() wiz = CommissioningWizard( transport=tx, relay_amplitude=5.0, min_tune_cycles=3, limit_settle_s=0.5, # short settle for test speed dt_s=0.1, ) # Run until done (up to 15000 steps = 25 min at 10 Hz) for _ in range(15000): wiz.step() if wiz.done: break assert wiz.phase is WizardPhase.DONE assert wiz.result.completed assert wiz.result.recommended_outer_gains is not None gains = wiz.result.recommended_outer_gains assert isinstance(gains, PidGains) assert gains.kp > 0 def test_adc_limits_recorded(self): tx = StubTransport(adc_port=200, adc_stbd=3800) wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1) for _ in range(15000): wiz.step() if wiz.done: break r = wiz.result # After possible swap, lo < hi assert r.adc_port_limit < r.adc_stbd_limit def test_identical_adc_limits_aborts(self): tx = StubTransport(adc_port=2048, adc_stbd=2048) wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1) for _ in range(200): wiz.step() if wiz.done: break assert wiz.phase is WizardPhase.ABORTED assert "identical" in wiz.result.aborted_reason.lower() def test_stop_called_on_completion(self): tx = StubTransport() wiz = CommissioningWizard(transport=tx, limit_settle_s=0.3, dt_s=0.1) for _ in range(15000): wiz.step() if wiz.done: break cmds = [c[0] for c in tx.calls] # 'stop' must have been called at least once (after limits and at tune end) assert "stop" in cmds