"""Integration: HeadingEKF smoothing noisy heading input to the outer PID -- Sprint 9. Verifies that the EKF reduces the impact of sensor noise on the outer PID without preventing the cascade from converging to the heading setpoint. """ from __future__ import annotations import math import random import pytest from arautopilot.core.heading_ekf import HeadingEKF from arautopilot.studio.simulator.pid_inner import PidInner, PidInnerConfig from arautopilot.studio.simulator.pid_outer import PidOuter, PidOuterConfig from arautopilot.studio.simulator.rudder_dynamics import RudderDynamicsConfig, RudderSimulator from arautopilot.studio.simulator.vessel_heading import ( VesselHeadingConfig, VesselHeadingSimulator, heading_error_deg, ) def _run_with_noise( *, noise_std_deg: float = 3.0, use_ekf: bool = True, setpoint_deg: float = 45.0, duration_s: float = 120.0, seed: int = 42, ) -> tuple[float, float]: """Return (final_error_deg, rudder_total_variation) for a noisy heading run. ``rudder_total_variation`` measures how much the rudder setpoint oscillated; less variation indicates the EKF is reducing noise amplification. """ rng = random.Random(seed) outer = PidOuter(PidOuterConfig( base_kp=0.9, base_ki=0.02, base_kd=1.2, deadband_deg=0.5, rot_ff_gain=1.5, )) inner = PidInner(PidInnerConfig(kp=2.5, ki=0.15, kd=0.30)) rudder = RudderSimulator(RudderDynamicsConfig()) vessel = VesselHeadingSimulator(VesselHeadingConfig(speed_kn=10.0)) vessel.reset(heading_deg=0.0) ekf = HeadingEKF( heading_deg=0.0, process_noise_heading=0.01, process_noise_rot=0.1, ) outer_acc = 0.0 inner_dt = 0.02 rudder_sp = 0.0 prev_rudder_sp = 0.0 total_variation = 0.0 for _ in range(int(duration_s / inner_dt)): # Add Gaussian noise to the true heading for the sensor measurement noisy_heading = vessel.state.heading_deg + rng.gauss(0.0, noise_std_deg) noisy_heading %= 360.0 noisy_rot = vessel.state.rate_of_turn_dps + rng.gauss(0.0, 0.5) if use_ekf: ekf.predict(dt_s=inner_dt) ekf.update_heading(noisy_heading, noise_deg=noise_std_deg) ekf.update_rot(noisy_rot, noise_dps=0.5) heading_input = ekf.heading_deg rot_input = ekf.rot_dps else: heading_input = noisy_heading rot_input = noisy_rot if outer_acc >= 0.1 - 1e-9: outer_acc = 0.0 rudder_sp = outer.step( heading_setpoint_deg=setpoint_deg, heading_measured_deg=heading_input, rate_of_turn_dps=rot_input, speed_kn=10.0, allowed=True, ) total_variation += abs(rudder_sp - prev_rudder_sp) prev_rudder_sp = rudder_sp pwm = inner.step(setpoint_deg=rudder_sp, measured_deg=rudder.state.angle_deg, allowed=True) rudder.step(dt=inner_dt, pwm_pct=pwm) vessel.step(dt=inner_dt, rudder_deg=rudder.state.angle_deg) outer_acc += inner_dt final_error = abs(heading_error_deg(setpoint_deg, vessel.state.heading_deg)) return final_error, total_variation class TestEkfReducesNoise: def test_ekf_filter_converges_to_setpoint(self): err, _ = _run_with_noise(use_ekf=True, noise_std_deg=3.0) assert err < 4.0, f"EKF run did not converge: final error = {err:.2f} deg" def test_no_ekf_has_larger_residual_than_ekf(self): # Without EKF, large sensor noise prevents tight convergence. # This test just confirms the EKF case is materially better. err_ekf, _ = _run_with_noise(use_ekf=True, noise_std_deg=3.0, seed=99) err_raw, _ = _run_with_noise(use_ekf=False, noise_std_deg=3.0, seed=99) assert err_ekf < err_raw, ( f"EKF should produce lower residual: ekf={err_ekf:.2f} raw={err_raw:.2f}" ) def test_ekf_reduces_rudder_variation(self): _, var_ekf = _run_with_noise(use_ekf=True, noise_std_deg=5.0, seed=7) _, var_raw = _run_with_noise(use_ekf=False, noise_std_deg=5.0, seed=7) assert var_ekf < var_raw, ( f"EKF should reduce rudder variation: ekf={var_ekf:.1f} raw={var_raw:.1f}" ) def test_ekf_tracks_heading_during_steady_state(self): """After convergence, EKF should track true heading within its noise level.""" ekf = HeadingEKF(heading_deg=45.0) rng = random.Random(0) true_heading = 45.0 noise_std = 2.0 for _ in range(200): ekf.predict(dt_s=0.1) noisy = true_heading + rng.gauss(0.0, noise_std) ekf.update_heading(noisy, noise_deg=noise_std) assert abs(ekf.heading_deg - true_heading) < 2.0 def test_ekf_handles_wrap_around_during_turn(self): """EKF should track heading smoothly through the 0/360 boundary.""" ekf = HeadingEKF(heading_deg=355.0, rot_dps=5.0) for _ in range(30): ekf.predict(dt_s=0.1) # True heading wraps from 355 → 5 during this sequence true = (355.0 + _ * 5.0 * 0.1) % 360.0 ekf.update_heading(true, noise_deg=1.0) # Should have tracked through the wrap expected = (355.0 + 30 * 5.0 * 0.1) % 360.0 from arautopilot.core.heading_ekf import _shortest_arc diff = abs(_shortest_arc(ekf.heading_deg, expected)) assert diff < 5.0 class TestEkfEdgeCases: def test_large_noise_ekf_still_converges(self): err, _ = _run_with_noise(use_ekf=True, noise_std_deg=10.0, duration_s=180.0) assert err < 8.0 def test_zero_noise_ekf_matches_true_heading(self): ekf = HeadingEKF(heading_deg=0.0) for i in range(100): ekf.predict(dt_s=0.1) ekf.update_heading(float(i % 360), noise_deg=0.01) # With near-zero noise, EKF should track closely # (just checking it doesn't crash and stays in [0,360)) assert 0.0 <= ekf.heading_deg < 360.0