Files
alro65 a2f3e82f17 sprint-9: integration tests + hardening + operator manual
Integration tests (64 new tests, 462 total):
- test_integration_cascade: full cascade closed-loop simulation --
  outer PID → inner PID → rudder dynamics → vessel heading; verifies
  convergence across small/90°/180° turns, wrap-around, and low speed
- test_integration_ekf_pid: EKF-smoothed heading feeding outer PID;
  confirms EKF reduces rudder total-variation vs raw noisy heading
- test_integration_alarm_audit: alarm engine → audit log hash-chain;
  verify, tamper detection, cross-session reload, multi-alarm logging
- test_modbus_utils: 38 tests for scale/raw conversion, bounds checking,
  heading/rudder helpers, signed int16 two's-complement round-trip

Hardening:
- heading_ekf: guard NaN/inf in update_heading() and update_rot() -- skip
  bad measurements silently rather than corrupting filter state
- adaptive_tuner: guard NaN/inf in step() -- ignore corrupt error samples
- modbus_utils.py: new shared module with scale_to_raw, scale_to_raw_signed,
  raw_signed_to_scaled, clamp_uint16, validate_holding_write,
  heading_deg_to_raw, rudder_deg_to_raw_signed

Documentation:
- docs/operator_manual.md: 15-section operator manual covering safety,
  installation, all operating modes, alarm reference, commissioning,
  fault-finding, Modbus register summary, and activation/audit log procedure

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 03:35:23 -04:00

174 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""2-state heading EKF (Extended Kalman Filter) -- Sprint 8.
Fuses NMEA 2000 heading (PGN 127250) and rate-of-turn (PGN 127251) into a
smoothed, low-latency heading/ROT estimate for the outer PID loop.
State vector x = [heading_deg, rot_dps]
Process model (constant-ROT):
h_{k+1} = h_k + rot_k * dt
r_{k+1} = r_k (ROT modelled as random walk)
Measurements:
z_heading = h_k + v_h (v_h ~ N(0, R_h))
z_rot = r_k + v_r (v_r ~ N(0, R_r))
Angles are kept in the range [0, 360) for the state but the update step
works on signed shortest-arc differences to avoid wrap-around errors.
Usage::
ekf = HeadingEKF()
# On each 10 Hz tick:
ekf.predict(dt_s=0.1)
if new_heading_available:
ekf.update_heading(heading_deg, noise_deg=2.0)
if new_rot_available:
ekf.update_rot(rot_dps, noise_dps=1.0)
h, rot = ekf.heading_deg, ekf.rot_dps
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
@dataclass
class HeadingEKF:
"""2-state linear Kalman filter for heading and rate-of-turn.
Parameters
----------
heading_deg:
Initial heading estimate (degrees, 0-360).
rot_dps:
Initial rate-of-turn estimate (degrees per second, signed).
process_noise_heading:
Process noise variance for heading (deg²). Larger = trust model less.
process_noise_rot:
Process noise variance for ROT (deg²/s²).
"""
heading_deg: float = 0.0
rot_dps: float = 0.0
# Process noise (Q matrix diagonal)
process_noise_heading: float = 0.01 # deg²
process_noise_rot: float = 0.1 # (deg/s)²
# Covariance matrix P (2×2, stored as flat [p00, p01, p10, p11])
_P: list[float] = field(default_factory=lambda: [1.0, 0.0, 0.0, 1.0])
# ------------------------------------------------------------------
def predict(self, dt_s: float) -> None:
"""Propagate the state and covariance forward by ``dt_s`` seconds."""
h = self.heading_deg
r = self.rot_dps
# State transition: heading += rot * dt
self.heading_deg = (h + r * dt_s) % 360.0
# Jacobian F = [[1, dt], [0, 1]]
dt = dt_s
p00, p01, p10, p11 = self._P
# P = F P Fᵀ + Q
new_p00 = p00 + dt * p10 + dt * p01 + dt * dt * p11 + self.process_noise_heading
new_p01 = p01 + dt * p11
new_p10 = p10 + dt * p11
new_p11 = p11 + self.process_noise_rot
self._P = [new_p00, new_p01, new_p10, new_p11]
def update_heading(self, measured_deg: float, noise_deg: float = 2.0) -> None:
if not math.isfinite(measured_deg) or not math.isfinite(noise_deg):
return
"""Kalman update step for a heading measurement.
Parameters
----------
measured_deg:
Raw heading from PGN 127250, degrees [0, 360).
noise_deg:
Standard deviation of the sensor noise (degrees). Variance = noise²
"""
R = noise_deg * noise_deg
p00, p01, p10, p11 = self._P
# Innovation (shortest arc)
innov = _shortest_arc(measured_deg, self.heading_deg)
# H = [1, 0] (observe heading only)
# S = H P Hᵀ + R = p00 + R
S = p00 + R
if S == 0:
return
# Kalman gain K = P Hᵀ / S → [k0, k1] = [p00/S, p10/S]
k0 = p00 / S
k1 = p10 / S
# Update state
self.heading_deg = (self.heading_deg + k0 * innov) % 360.0
self.rot_dps += k1 * innov
# Update P = (I - K H) P
self._P = [
(1 - k0) * p00,
(1 - k0) * p01,
p10 - k1 * p00,
p11 - k1 * p01,
]
def update_rot(self, measured_rot_dps: float, noise_dps: float = 1.0) -> None:
if not math.isfinite(measured_rot_dps) or not math.isfinite(noise_dps):
return
"""Kalman update step for a rate-of-turn measurement.
Parameters
----------
measured_rot_dps:
Raw ROT from PGN 127251, degrees per second (signed).
noise_dps:
Standard deviation of the ROT sensor noise (deg/s).
"""
R = noise_dps * noise_dps
p00, p01, p10, p11 = self._P
# Innovation
innov = measured_rot_dps - self.rot_dps
# H = [0, 1] (observe ROT only)
# S = p11 + R
S = p11 + R
if S == 0:
return
# Kalman gain: [k0, k1] = [p01/S, p11/S]
k0 = p01 / S
k1 = p11 / S
# Update state
self.heading_deg = (self.heading_deg + k0 * innov) % 360.0
self.rot_dps += k1 * innov
# Update P
self._P = [
p00 - k0 * p01,
(1 - k1) * p01,
p10 - k0 * p11,
(1 - k1) * p11,
]
@property
def covariance(self) -> tuple[float, float, float, float]:
"""Return (p00, p01, p10, p11) — the 2×2 covariance matrix."""
return tuple(self._P) # type: ignore[return-value]
def _shortest_arc(a: float, b: float) -> float:
"""Signed shortest-arc from ``b`` to ``a`` (degrees)."""
diff = (a - b + 180.0) % 360.0 - 180.0
return diff