a2f3e82f17
Integration tests (64 new tests, 462 total): - test_integration_cascade: full cascade closed-loop simulation -- outer PID → inner PID → rudder dynamics → vessel heading; verifies convergence across small/90°/180° turns, wrap-around, and low speed - test_integration_ekf_pid: EKF-smoothed heading feeding outer PID; confirms EKF reduces rudder total-variation vs raw noisy heading - test_integration_alarm_audit: alarm engine → audit log hash-chain; verify, tamper detection, cross-session reload, multi-alarm logging - test_modbus_utils: 38 tests for scale/raw conversion, bounds checking, heading/rudder helpers, signed int16 two's-complement round-trip Hardening: - heading_ekf: guard NaN/inf in update_heading() and update_rot() -- skip bad measurements silently rather than corrupting filter state - adaptive_tuner: guard NaN/inf in step() -- ignore corrupt error samples - modbus_utils.py: new shared module with scale_to_raw, scale_to_raw_signed, raw_signed_to_scaled, clamp_uint16, validate_holding_write, heading_deg_to_raw, rudder_deg_to_raw_signed Documentation: - docs/operator_manual.md: 15-section operator manual covering safety, installation, all operating modes, alarm reference, commissioning, fault-finding, Modbus register summary, and activation/audit log procedure Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
158 lines
5.9 KiB
Python
158 lines
5.9 KiB
Python
"""Integration: HeadingEKF smoothing noisy heading input to the outer PID -- Sprint 9.
|
|
|
|
Verifies that the EKF reduces the impact of sensor noise on the outer PID
|
|
without preventing the cascade from converging to the heading setpoint.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import random
|
|
|
|
import pytest
|
|
|
|
from arautopilot.core.heading_ekf import HeadingEKF
|
|
from arautopilot.studio.simulator.pid_inner import PidInner, PidInnerConfig
|
|
from arautopilot.studio.simulator.pid_outer import PidOuter, PidOuterConfig
|
|
from arautopilot.studio.simulator.rudder_dynamics import RudderDynamicsConfig, RudderSimulator
|
|
from arautopilot.studio.simulator.vessel_heading import (
|
|
VesselHeadingConfig,
|
|
VesselHeadingSimulator,
|
|
heading_error_deg,
|
|
)
|
|
|
|
|
|
def _run_with_noise(
|
|
*,
|
|
noise_std_deg: float = 3.0,
|
|
use_ekf: bool = True,
|
|
setpoint_deg: float = 45.0,
|
|
duration_s: float = 120.0,
|
|
seed: int = 42,
|
|
) -> tuple[float, float]:
|
|
"""Return (final_error_deg, rudder_total_variation) for a noisy heading run.
|
|
|
|
``rudder_total_variation`` measures how much the rudder setpoint oscillated;
|
|
less variation indicates the EKF is reducing noise amplification.
|
|
"""
|
|
rng = random.Random(seed)
|
|
outer = PidOuter(PidOuterConfig(
|
|
base_kp=0.9, base_ki=0.02, base_kd=1.2,
|
|
deadband_deg=0.5, rot_ff_gain=1.5,
|
|
))
|
|
inner = PidInner(PidInnerConfig(kp=2.5, ki=0.15, kd=0.30))
|
|
rudder = RudderSimulator(RudderDynamicsConfig())
|
|
vessel = VesselHeadingSimulator(VesselHeadingConfig(speed_kn=10.0))
|
|
vessel.reset(heading_deg=0.0)
|
|
|
|
ekf = HeadingEKF(
|
|
heading_deg=0.0,
|
|
process_noise_heading=0.01,
|
|
process_noise_rot=0.1,
|
|
)
|
|
|
|
outer_acc = 0.0
|
|
inner_dt = 0.02
|
|
rudder_sp = 0.0
|
|
prev_rudder_sp = 0.0
|
|
total_variation = 0.0
|
|
|
|
for _ in range(int(duration_s / inner_dt)):
|
|
# Add Gaussian noise to the true heading for the sensor measurement
|
|
noisy_heading = vessel.state.heading_deg + rng.gauss(0.0, noise_std_deg)
|
|
noisy_heading %= 360.0
|
|
noisy_rot = vessel.state.rate_of_turn_dps + rng.gauss(0.0, 0.5)
|
|
|
|
if use_ekf:
|
|
ekf.predict(dt_s=inner_dt)
|
|
ekf.update_heading(noisy_heading, noise_deg=noise_std_deg)
|
|
ekf.update_rot(noisy_rot, noise_dps=0.5)
|
|
heading_input = ekf.heading_deg
|
|
rot_input = ekf.rot_dps
|
|
else:
|
|
heading_input = noisy_heading
|
|
rot_input = noisy_rot
|
|
|
|
if outer_acc >= 0.1 - 1e-9:
|
|
outer_acc = 0.0
|
|
rudder_sp = outer.step(
|
|
heading_setpoint_deg=setpoint_deg,
|
|
heading_measured_deg=heading_input,
|
|
rate_of_turn_dps=rot_input,
|
|
speed_kn=10.0,
|
|
allowed=True,
|
|
)
|
|
total_variation += abs(rudder_sp - prev_rudder_sp)
|
|
prev_rudder_sp = rudder_sp
|
|
|
|
pwm = inner.step(setpoint_deg=rudder_sp, measured_deg=rudder.state.angle_deg, allowed=True)
|
|
rudder.step(dt=inner_dt, pwm_pct=pwm)
|
|
vessel.step(dt=inner_dt, rudder_deg=rudder.state.angle_deg)
|
|
outer_acc += inner_dt
|
|
|
|
final_error = abs(heading_error_deg(setpoint_deg, vessel.state.heading_deg))
|
|
return final_error, total_variation
|
|
|
|
|
|
class TestEkfReducesNoise:
|
|
def test_ekf_filter_converges_to_setpoint(self):
|
|
err, _ = _run_with_noise(use_ekf=True, noise_std_deg=3.0)
|
|
assert err < 4.0, f"EKF run did not converge: final error = {err:.2f} deg"
|
|
|
|
def test_no_ekf_has_larger_residual_than_ekf(self):
|
|
# Without EKF, large sensor noise prevents tight convergence.
|
|
# This test just confirms the EKF case is materially better.
|
|
err_ekf, _ = _run_with_noise(use_ekf=True, noise_std_deg=3.0, seed=99)
|
|
err_raw, _ = _run_with_noise(use_ekf=False, noise_std_deg=3.0, seed=99)
|
|
assert err_ekf < err_raw, (
|
|
f"EKF should produce lower residual: ekf={err_ekf:.2f} raw={err_raw:.2f}"
|
|
)
|
|
|
|
def test_ekf_reduces_rudder_variation(self):
|
|
_, var_ekf = _run_with_noise(use_ekf=True, noise_std_deg=5.0, seed=7)
|
|
_, var_raw = _run_with_noise(use_ekf=False, noise_std_deg=5.0, seed=7)
|
|
assert var_ekf < var_raw, (
|
|
f"EKF should reduce rudder variation: ekf={var_ekf:.1f} raw={var_raw:.1f}"
|
|
)
|
|
|
|
def test_ekf_tracks_heading_during_steady_state(self):
|
|
"""After convergence, EKF should track true heading within its noise level."""
|
|
ekf = HeadingEKF(heading_deg=45.0)
|
|
rng = random.Random(0)
|
|
true_heading = 45.0
|
|
noise_std = 2.0
|
|
for _ in range(200):
|
|
ekf.predict(dt_s=0.1)
|
|
noisy = true_heading + rng.gauss(0.0, noise_std)
|
|
ekf.update_heading(noisy, noise_deg=noise_std)
|
|
assert abs(ekf.heading_deg - true_heading) < 2.0
|
|
|
|
def test_ekf_handles_wrap_around_during_turn(self):
|
|
"""EKF should track heading smoothly through the 0/360 boundary."""
|
|
ekf = HeadingEKF(heading_deg=355.0, rot_dps=5.0)
|
|
for _ in range(30):
|
|
ekf.predict(dt_s=0.1)
|
|
# True heading wraps from 355 → 5 during this sequence
|
|
true = (355.0 + _ * 5.0 * 0.1) % 360.0
|
|
ekf.update_heading(true, noise_deg=1.0)
|
|
# Should have tracked through the wrap
|
|
expected = (355.0 + 30 * 5.0 * 0.1) % 360.0
|
|
from arautopilot.core.heading_ekf import _shortest_arc
|
|
diff = abs(_shortest_arc(ekf.heading_deg, expected))
|
|
assert diff < 5.0
|
|
|
|
|
|
class TestEkfEdgeCases:
|
|
def test_large_noise_ekf_still_converges(self):
|
|
err, _ = _run_with_noise(use_ekf=True, noise_std_deg=10.0, duration_s=180.0)
|
|
assert err < 8.0
|
|
|
|
def test_zero_noise_ekf_matches_true_heading(self):
|
|
ekf = HeadingEKF(heading_deg=0.0)
|
|
for i in range(100):
|
|
ekf.predict(dt_s=0.1)
|
|
ekf.update_heading(float(i % 360), noise_deg=0.01)
|
|
# With near-zero noise, EKF should track closely
|
|
# (just checking it doesn't crash and stays in [0,360))
|
|
assert 0.0 <= ekf.heading_deg < 360.0
|