"""Commissioning wizard state machine -- Sprint 7. A pure Python state machine (no UI dependencies) that guides an operator through the four commissioning steps: 1. RUDDER_LIMITS -- drive rudder full port/stbd, record ADC limits. 2. SENSOR_CAL -- compute linear calibration from ADC limits. 3. AUTO_TUNE -- relay-feedback ZN auto-tune at the configured speed. 4. DONE -- results ready to be written to NVS and ProjectConfig. The wizard produces a :class:`CommissioningResult` containing calibration values and recommended PID gains. The Studio main window can drive it by calling :meth:`CommissioningWizard.step` on each 10 Hz control tick. Design intent ------------- All hardware interaction is behind a ``transport`` callable (inject the real Modbus client or a test stub). This keeps the wizard fully unit-testable without hardware. Usage:: def send(cmd: str, value: float) -> float: # cmd is one of: 'move_port', 'move_stbd', 'stop', # 'read_adc', 'read_error', 'set_rudder' ... wiz = CommissioningWizard(transport=send) while not wiz.done: wiz.step() result = wiz.result """ from __future__ import annotations import time from dataclasses import dataclass, field from enum import auto, Enum from arautopilot.core.autotuner import RelayAutoTuner, TunerResult from arautopilot.core.pid_config import PidGains class WizardPhase(Enum): RUDDER_LIMITS = auto() SENSOR_CAL = auto() AUTO_TUNE = auto() DONE = auto() ABORTED = auto() @dataclass class CommissioningResult: """Output produced by a completed commissioning run.""" # Rudder sensor calibration adc_port_limit: int = 0 # raw ADC at full-port stop adc_stbd_limit: int = 4095 # raw ADC at full-stbd stop rudder_port_deg: float = -35.0 rudder_stbd_deg: float = 35.0 # PID recommendations (from ZN auto-tune) tuner_result: TunerResult | None = None recommended_outer_gains: PidGains | None = None # Metadata completed: bool = False aborted_reason: str = "" # --------------------------------------------------------------------------- # Transport protocol # --------------------------------------------------------------------------- # The transport callable is called with a command string and an optional # float argument; it must return a float (the "response" value, 0.0 if N/A). # Commands: # "move_port" (value ignored) → 0.0 # "move_stbd" (value ignored) → 0.0 # "stop" (value ignored) → 0.0 # "read_adc" (value ignored) → current raw ADC (0..4095) # "read_error" (value ignored) → current heading error (deg) # "set_rudder" (value = deg) → 0.0 (apply relay output to inner loop) TransportFn = "Callable[[str, float], float]" class CommissioningWizard: """Drives the commissioning sequence at 10 Hz (call ``step()`` each tick). Parameters ---------- transport: Callable conforming to the protocol above. relay_amplitude: Relay output magnitude for auto-tune, degrees of rudder. min_tune_cycles: Minimum oscillation cycles before auto-tune declares convergence. limit_settle_s: Seconds to wait at each limit stop before sampling ADC. dt_s: Control step in seconds (default 0.1 for 10 Hz). """ def __init__( self, transport: "Callable[[str, float], float]", relay_amplitude: float = 5.0, min_tune_cycles: int = 3, limit_settle_s: float = 2.0, dt_s: float = 0.1, ) -> None: from typing import Callable # noqa: F401 self._tx = transport self._relay_amp = relay_amplitude self._min_cycles = min_tune_cycles self._settle_s = limit_settle_s self._dt = dt_s self._phase: WizardPhase = WizardPhase.RUDDER_LIMITS self._sub: str = "drive_port" # sub-state within RUDDER_LIMITS self._timer: float = 0.0 # time accumulator for settle waits self._result = CommissioningResult() self._tuner: RelayAutoTuner | None = None # ------------------------------------------------------------------ @property def phase(self) -> WizardPhase: return self._phase @property def done(self) -> bool: return self._phase in (WizardPhase.DONE, WizardPhase.ABORTED) @property def result(self) -> CommissioningResult: return self._result def abort(self, reason: str = "operator abort") -> None: self._tx("stop", 0.0) self._phase = WizardPhase.ABORTED self._result.aborted_reason = reason # ------------------------------------------------------------------ # Main tick # ------------------------------------------------------------------ def step(self) -> str: """Advance the wizard by one 10 Hz tick. Returns a human-readable status string (for display / logging). """ if self.done: return f"wizard {self._phase.name}" if self._phase is WizardPhase.RUDDER_LIMITS: return self._tick_limits() if self._phase is WizardPhase.SENSOR_CAL: return self._tick_cal() if self._phase is WizardPhase.AUTO_TUNE: return self._tick_tune() return "unknown phase" # ------------------------------------------------------------------ # Phase: RUDDER_LIMITS # ------------------------------------------------------------------ def _tick_limits(self) -> str: if self._sub == "drive_port": self._tx("move_port", 0.0) self._timer += self._dt if self._timer >= self._settle_s: self._result.adc_port_limit = int(self._tx("read_adc", 0.0)) self._sub = "settle_port" self._timer = 0.0 return f"driving to port limit... ({self._timer:.1f}s)" if self._sub == "settle_port": self._tx("stop", 0.0) self._timer += self._dt if self._timer >= 0.5: self._sub = "drive_stbd" self._timer = 0.0 return "settled at port limit" if self._sub == "drive_stbd": self._tx("move_stbd", 0.0) self._timer += self._dt if self._timer >= self._settle_s: self._result.adc_stbd_limit = int(self._tx("read_adc", 0.0)) self._sub = "settle_stbd" self._timer = 0.0 return f"driving to stbd limit... ({self._timer:.1f}s)" if self._sub == "settle_stbd": self._tx("stop", 0.0) self._timer += self._dt if self._timer >= 0.5: self._phase = WizardPhase.SENSOR_CAL self._timer = 0.0 return "settled at stbd limit" return "limits: unknown sub-state" # ------------------------------------------------------------------ # Phase: SENSOR_CAL # ------------------------------------------------------------------ def _tick_cal(self) -> str: # The calibration is computed from the ADC limits gathered above. # This is instantaneous (no hardware interaction needed). lo = self._result.adc_port_limit hi = self._result.adc_stbd_limit if lo == hi: self.abort("sensor cal: port and stbd ADC values are identical") return "sensor cal FAILED: identical ADC values" # Ensure lo < hi (swap if wiring is inverted). if lo > hi: lo, hi = hi, lo self._result.rudder_port_deg, self._result.rudder_stbd_deg = ( self._result.rudder_stbd_deg, self._result.rudder_port_deg, ) self._result.adc_port_limit = lo self._result.adc_stbd_limit = hi self._phase = WizardPhase.AUTO_TUNE self._tuner = RelayAutoTuner( relay_amplitude=self._relay_amp, min_cycles=self._min_cycles, dt_s=self._dt, ) return "sensor calibration done" # ------------------------------------------------------------------ # Phase: AUTO_TUNE # ------------------------------------------------------------------ def _tick_tune(self) -> str: assert self._tuner is not None error = self._tx("read_error", 0.0) rudder_cmd = self._tuner.step(error) self._tx("set_rudder", rudder_cmd) if self._tuner.is_done: res = self._tuner.result() self._result.tuner_result = res if res.converged: self._result.recommended_outer_gains = res.to_pid_gains() self._tx("stop", 0.0) self._result.completed = res.converged self._phase = WizardPhase.DONE return f"auto-tune done: Ku={res.ku:.3f} Tu={res.tu_s:.2f}s" steps = self._tuner._step_count # noqa: SLF001 return f"auto-tuning... step={steps} cycles={len(self._tuner._half_period_steps)//2}"