sprint-5: True Course + Track Keeping + XTE + PGN 129026/129284

- Python: NmeaNavData (COG/SOG/XTE data models with staleness tracking)
- Python: TrueCoursePilot with TRUE_COURSE and TRACK_KEEPING modes
- Python: 26 new tests (test_nmea_data, test_true_course)
- Modbus: COG/SOG/XTE input registers + TC setpoint/XTE-gain holdings
- Firmware: nmea2000_consumer handles PGN 129026 + 129284
- Firmware: pid_outer_task wired for TC + TK modes with live SOG scheduling
- YAML regenerated; 284 tests pass, firmware compiles clean

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-19 20:12:57 -04:00
parent 0be60c5161
commit 0f00ad10da
12 changed files with 1051 additions and 101 deletions
+117
View File
@@ -0,0 +1,117 @@
"""NMEA 2000 navigation data models.
Holds the live data received from PGN 127250/127251 (heading/ROT -- Sprint 3)
and the new PGN 129026/129284 (COG+SOG / XTE+waypoint -- Sprint 5).
All timestamps are float seconds (time.monotonic()).
NaN means the value has never been received.
"""
from __future__ import annotations
import math
import time
from dataclasses import dataclass, field
@dataclass
class HeadingData:
"""PGN 127250 + 127251 -- magnetic heading and rate of turn."""
heading_deg: float = math.nan
rot_dps: float = 0.0
timestamp: float = field(default_factory=lambda: 0.0)
max_age_s: float = 5.0
def update(self, heading_deg: float, rot_dps: float) -> None:
self.heading_deg = heading_deg
self.rot_dps = rot_dps
self.timestamp = time.monotonic()
@property
def is_valid(self) -> bool:
if math.isnan(self.heading_deg):
return False
return (time.monotonic() - self.timestamp) < self.max_age_s
@property
def age_ms(self) -> int:
return int((time.monotonic() - self.timestamp) * 1000)
@dataclass
class CogSogData:
"""PGN 129026 -- Course Over Ground (true) and Speed Over Ground.
COG is in degrees true (0..360).
SOG is in knots.
"""
cog_deg: float = math.nan
sog_kn: float = math.nan
timestamp: float = field(default_factory=lambda: 0.0)
max_age_s: float = 5.0
def update(self, cog_deg: float, sog_kn: float) -> None:
self.cog_deg = cog_deg % 360.0
self.sog_kn = sog_kn
self.timestamp = time.monotonic()
@property
def is_valid(self) -> bool:
if math.isnan(self.cog_deg) or math.isnan(self.sog_kn):
return False
return (time.monotonic() - self.timestamp) < self.max_age_s
@property
def age_ms(self) -> int:
return int((time.monotonic() - self.timestamp) * 1000)
@dataclass
class XteData:
"""PGN 129284 -- Cross Track Error and active waypoint.
xte_m: distance off track in metres.
Positive → vessel is to starboard of the track line (steer port).
Negative → vessel is to port of the track line (steer starboard).
waypoint_name: free text from the chart plotter, empty if not available.
dtw_m: distance to waypoint in metres (NaN if unknown).
"""
xte_m: float = math.nan
dtw_m: float = math.nan
waypoint_name: str = ""
timestamp: float = field(default_factory=lambda: 0.0)
max_age_s: float = 5.0
def update(
self,
xte_m: float,
dtw_m: float = math.nan,
waypoint_name: str = "",
) -> None:
self.xte_m = xte_m
self.dtw_m = dtw_m
self.waypoint_name = waypoint_name
self.timestamp = time.monotonic()
@property
def is_valid(self) -> bool:
if math.isnan(self.xte_m):
return False
return (time.monotonic() - self.timestamp) < self.max_age_s
@property
def age_ms(self) -> int:
return int((time.monotonic() - self.timestamp) * 1000)
@dataclass
class NmeaNavData:
"""Aggregated live navigation data used by the autopilot control loop."""
heading: HeadingData = field(default_factory=HeadingData)
cog_sog: CogSogData = field(default_factory=CogSogData)
xte: XteData = field(default_factory=XteData)
@@ -91,6 +91,12 @@ INPUTS: dict[str, Reg] = {
"PID_OUTER_KP_X1000": Reg(addr=54, name="PID_OUTER_KP_X1000", desc='Outer-loop active kp * 1000', unit="", scale=0.001, offset=0.0),
"PID_OUTER_KI_X1000": Reg(addr=55, name="PID_OUTER_KI_X1000", desc='Outer-loop active ki * 1000', unit="", scale=0.001, offset=0.0),
"PID_OUTER_KD_X1000": Reg(addr=56, name="PID_OUTER_KD_X1000", desc='Outer-loop active kd * 1000', unit="", scale=0.001, offset=0.0),
"COG_DEG_X100": Reg(addr=60, name="COG_DEG_X100", desc='Course Over Ground (true) from PGN 129026, deg*100 (0..35999)', unit="deg", scale=0.01, offset=0.0),
"SOG_KN_X10": Reg(addr=61, name="SOG_KN_X10", desc='Speed Over Ground from PGN 129026, knots*10 (unsigned)', unit="kn", scale=0.1, offset=0.0),
"COG_AGE_MS": Reg(addr=62, name="COG_AGE_MS", desc='Milliseconds since last COG/SOG update (0..60000)', unit="ms", scale=1.0, offset=0.0),
"XTE_DM_SIGNED": Reg(addr=63, name="XTE_DM_SIGNED", desc='Cross Track Error from PGN 129284, decimetres (signed int16, +stbd/-port)', unit="dm", scale=0.1, offset=0.0),
"XTE_AGE_MS": Reg(addr=64, name="XTE_AGE_MS", desc='Milliseconds since last XTE update (0..60000)', unit="ms", scale=1.0, offset=0.0),
"DTW_M": Reg(addr=65, name="DTW_M", desc='Distance to next waypoint, metres (unsigned int16, 0..65535)', unit="m", scale=1.0, offset=0.0),
}
# HOLDINGS
@@ -109,6 +115,9 @@ HOLDINGS: dict[str, Reg] = {
"PID_OUTER_KP_REQ_X1000": Reg(addr=26, name="PID_OUTER_KP_REQ_X1000", desc='Requested outer-loop base kp * 1000', unit="", scale=0.001, offset=0.0),
"PID_OUTER_KI_REQ_X1000": Reg(addr=27, name="PID_OUTER_KI_REQ_X1000", desc='Requested outer-loop base ki * 1000', unit="", scale=0.001, offset=0.0),
"PID_OUTER_KD_REQ_X1000": Reg(addr=28, name="PID_OUTER_KD_REQ_X1000", desc='Requested outer-loop base kd * 1000', unit="", scale=0.001, offset=0.0),
"TRUE_COURSE_SP_X100": Reg(addr=32, name="TRUE_COURSE_SP_X100", desc='Desired COG setpoint for TRUE_COURSE / TRACK_KEEPING, deg*100 (0..35999)', unit="deg", scale=0.01, offset=0.0),
"XTE_GAIN_X1000": Reg(addr=33, name="XTE_GAIN_X1000", desc='XTE correction gain * 1000 (deg of heading correction per metre of XTE)', unit="", scale=0.001, offset=0.0),
"XTE_MAX_CORRECTION_X100": Reg(addr=34, name="XTE_MAX_CORRECTION_X100", desc='Maximum XTE heading correction, deg*100 (default 2000 = 20 deg)', unit="deg", scale=0.01, offset=0.0),
}
ALL_GROUPS: dict[str, dict[str, Reg]] = {
+170
View File
@@ -0,0 +1,170 @@
"""True Course and Track Keeping pilot -- Sprint 5.
TRUE_COURSE mode
----------------
Holds a fixed Course Over Ground (COG) instead of compass heading.
This compensates for current/wind drift: if the vessel is pushed sideways,
COG drifts and the outer PID corrects heading to restore the intended track.
Input: cog_setpoint_deg, measured_cog_deg, sog_kn, rot_dps
Output: rudder_setpoint_deg (passed to inner PID)
TRACK_KEEPING mode
------------------
Follows an ECDIS/chart-plotter route using XTE feedback.
An XTE proportional term adds a heading correction to the COG setpoint:
xte_correction_deg = -xte_gain * xte_m (clamped ± max_correction)
effective_setpoint = cog_setpoint_deg + xte_correction_deg
Sign convention (NMEA 0183 / IEC 61162-1):
XTE > 0 → vessel is to starboard of track → steer port → negative correction
XTE < 0 → vessel is to port of track → steer stbd → positive correction
The correction is clamped so the autopilot cannot be tricked into dangerous
large heading swings by a stale or erroneous XTE feed.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from arautopilot.studio.simulator.pid_outer import (
PidOuter,
PidOuterConfig,
PidOuterState,
)
from arautopilot.studio.simulator.vessel_heading import heading_error_deg
@dataclass
class TrueCourseConfig:
"""Configuration for both TRUE_COURSE and TRACK_KEEPING modes."""
# Inner PID outer loop config (reused — same dynamics, different input)
pid: PidOuterConfig = field(default_factory=PidOuterConfig)
# XTE correction parameters (TRACK_KEEPING only)
xte_gain: float = 0.5 # deg of heading correction per metre of XTE
xte_max_correction_deg: float = 20.0 # hard clamp on XTE term
xte_max_age_s: float = 5.0 # treat XTE as stale beyond this
@dataclass
class TrueCourseState:
xte_correction_deg: float = 0.0
effective_setpoint_deg: float = 0.0
class TrueCoursePilot:
"""Outer-loop pilot for TRUE_COURSE and TRACK_KEEPING modes.
Wraps PidOuter, substituting COG for heading and optionally adding
an XTE proportional correction to the setpoint.
"""
def __init__(self, config: TrueCourseConfig | None = None) -> None:
self.config = config or TrueCourseConfig()
self._pid = PidOuter(self.config.pid)
self.tc_state = TrueCourseState()
def reset(self) -> None:
self._pid.reset()
self.tc_state = TrueCourseState()
def update_config(self, config: TrueCourseConfig) -> None:
self.config = config
self._pid.update_config(config.pid)
# ------------------------------------------------------------------
# TRUE_COURSE step
# ------------------------------------------------------------------
def step_true_course(
self,
*,
cog_setpoint_deg: float,
measured_cog_deg: float,
sog_kn: float,
rot_dps: float = 0.0,
allowed: bool = True,
) -> float:
"""Pure COG-hold step (no XTE). Returns rudder setpoint (deg)."""
self.tc_state.effective_setpoint_deg = cog_setpoint_deg
self.tc_state.xte_correction_deg = 0.0
return self._pid.step(
heading_setpoint_deg=cog_setpoint_deg,
heading_measured_deg=measured_cog_deg,
rate_of_turn_dps=rot_dps,
speed_kn=sog_kn,
allowed=allowed,
)
# ------------------------------------------------------------------
# TRACK_KEEPING step
# ------------------------------------------------------------------
def step_track_keeping(
self,
*,
cog_setpoint_deg: float,
measured_cog_deg: float,
sog_kn: float,
rot_dps: float = 0.0,
xte_m: float = math.nan,
xte_age_s: float = 0.0,
allowed: bool = True,
) -> float:
"""COG-hold with XTE correction. Returns rudder setpoint (deg).
If XTE is NaN or stale the correction is zeroed and the pilot
falls back to pure COG hold — safer than going to STANDBY.
"""
cfg = self.config
xte_valid = (
not math.isnan(xte_m)
and xte_age_s < cfg.xte_max_age_s
)
if xte_valid:
raw_correction = -cfg.xte_gain * xte_m
correction = _clamp(
raw_correction,
-cfg.xte_max_correction_deg,
cfg.xte_max_correction_deg,
)
else:
correction = 0.0
self.tc_state.xte_correction_deg = correction
effective_sp = (cog_setpoint_deg + correction) % 360.0
self.tc_state.effective_setpoint_deg = effective_sp
return self._pid.step(
heading_setpoint_deg=effective_sp,
heading_measured_deg=measured_cog_deg,
rate_of_turn_dps=rot_dps,
speed_kn=sog_kn,
allowed=allowed,
)
# ------------------------------------------------------------------
# Accessors
# ------------------------------------------------------------------
@property
def pid_state(self) -> PidOuterState:
return self._pid.state
@property
def last_rudder_setpoint_deg(self) -> float:
return self._pid.state.last_output_deg
def _clamp(x: float, lo: float, hi: float) -> float:
if x < lo:
return lo
if x > hi:
return hi
return x
+100
View File
@@ -0,0 +1,100 @@
"""Tests for arautopilot.core.nmea_data -- Sprint 5."""
from __future__ import annotations
import math
import time
from unittest.mock import patch
import pytest
from arautopilot.core.nmea_data import CogSogData, HeadingData, NmeaNavData, XteData
class TestCogSogData:
def test_initial_state_invalid(self):
d = CogSogData()
assert not d.is_valid
assert math.isnan(d.cog_deg)
assert math.isnan(d.sog_kn)
def test_update_marks_valid(self):
d = CogSogData()
d.update(270.0, 8.5)
assert d.is_valid
assert d.cog_deg == pytest.approx(270.0)
assert d.sog_kn == pytest.approx(8.5)
def test_cog_wraps_360(self):
d = CogSogData()
d.update(370.0, 5.0)
assert d.cog_deg == pytest.approx(10.0)
def test_cog_zero_stays_zero(self):
d = CogSogData()
d.update(0.0, 5.0)
assert d.cog_deg == pytest.approx(0.0)
def test_stale_after_max_age(self):
d = CogSogData(max_age_s=1.0)
d.update(90.0, 6.0)
assert d.is_valid
with patch("arautopilot.core.nmea_data.time") as mock_time:
mock_time.monotonic.return_value = d.timestamp + 2.0
assert not d.is_valid
def test_age_ms(self):
d = CogSogData()
d.update(180.0, 10.0)
assert d.age_ms >= 0
class TestXteData:
def test_initial_invalid(self):
d = XteData()
assert not d.is_valid
assert math.isnan(d.xte_m)
def test_update_positive_xte(self):
"""Positive XTE = vessel to starboard of track."""
d = XteData()
d.update(xte_m=5.0, dtw_m=200.0, waypoint_name="WP01")
assert d.is_valid
assert d.xte_m == pytest.approx(5.0)
assert d.dtw_m == pytest.approx(200.0)
assert d.waypoint_name == "WP01"
def test_update_negative_xte(self):
d = XteData()
d.update(xte_m=-3.5)
assert d.xte_m == pytest.approx(-3.5)
def test_stale_after_max_age(self):
d = XteData(max_age_s=2.0)
d.update(1.0)
with patch("arautopilot.core.nmea_data.time") as mock_time:
mock_time.monotonic.return_value = d.timestamp + 3.0
assert not d.is_valid
def test_update_without_optional_fields(self):
d = XteData()
d.update(xte_m=0.0)
assert d.is_valid
assert math.isnan(d.dtw_m)
assert d.waypoint_name == ""
class TestNmeaNavData:
def test_aggregate_has_all_fields(self):
nav = NmeaNavData()
assert isinstance(nav.heading, HeadingData)
assert isinstance(nav.cog_sog, CogSogData)
assert isinstance(nav.xte, XteData)
def test_independent_updates(self):
nav = NmeaNavData()
nav.cog_sog.update(45.0, 7.0)
nav.xte.update(2.5)
assert nav.cog_sog.is_valid
assert nav.xte.is_valid
assert not nav.heading.is_valid
+180
View File
@@ -0,0 +1,180 @@
"""Tests for TrueCoursePilot -- Sprint 5."""
from __future__ import annotations
import math
import pytest
from arautopilot.studio.simulator.true_course import TrueCoursePilot, TrueCourseConfig
from arautopilot.studio.simulator.pid_outer import PidOuterConfig
def _pilot(xte_gain: float = 0.5, max_corr: float = 20.0) -> TrueCoursePilot:
cfg = TrueCourseConfig(
pid=PidOuterConfig(base_kp=1.0, base_ki=0.0, base_kd=0.0,
rot_ff_gain=0.0, rate_limit_dps=360.0),
xte_gain=xte_gain,
xte_max_correction_deg=max_corr,
)
return TrueCoursePilot(cfg)
# ---------------------------------------------------------------------------
# TRUE_COURSE mode
# ---------------------------------------------------------------------------
class TestTrueCoursePilot:
def test_zero_error_zero_output(self):
p = _pilot()
out = p.step_true_course(
cog_setpoint_deg=90.0,
measured_cog_deg=90.0,
sog_kn=8.0,
)
assert out == pytest.approx(0.0, abs=0.6)
def test_positive_error_steer_starboard(self):
"""COG is west of setpoint → steer to starboard (positive rudder)."""
p = _pilot()
out = p.step_true_course(
cog_setpoint_deg=90.0,
measured_cog_deg=80.0, # 10° behind
sog_kn=8.0,
)
assert out > 0.0
def test_negative_error_steer_port(self):
p = _pilot()
out = p.step_true_course(
cog_setpoint_deg=90.0,
measured_cog_deg=100.0,
sog_kn=8.0,
)
assert out < 0.0
def test_360_boundary_crossing(self):
"""Setpoint=5°, measured=355° → shortest arc = +10° → steer stbd."""
p = _pilot()
out = p.step_true_course(
cog_setpoint_deg=5.0,
measured_cog_deg=355.0,
sog_kn=8.0,
)
assert out > 0.0
def test_not_allowed_returns_zero(self):
p = _pilot()
out = p.step_true_course(
cog_setpoint_deg=90.0,
measured_cog_deg=80.0,
sog_kn=8.0,
allowed=False,
)
assert out == pytest.approx(0.0)
def test_xte_correction_zero_in_true_course_mode(self):
p = _pilot()
p.step_true_course(cog_setpoint_deg=90.0, measured_cog_deg=90.0, sog_kn=8.0)
assert p.tc_state.xte_correction_deg == pytest.approx(0.0)
# ---------------------------------------------------------------------------
# TRACK_KEEPING mode
# ---------------------------------------------------------------------------
class TestTrackKeeping:
def test_no_xte_correction_when_nan(self):
p = _pilot()
p.step_track_keeping(
cog_setpoint_deg=90.0,
measured_cog_deg=90.0,
sog_kn=8.0,
xte_m=math.nan,
)
assert p.tc_state.xte_correction_deg == pytest.approx(0.0)
assert p.tc_state.effective_setpoint_deg == pytest.approx(90.0)
def test_positive_xte_steers_port(self):
"""XTE>0 = vessel to starboard → correction negative → setpoint shifted port."""
p = _pilot(xte_gain=1.0)
p.step_track_keeping(
cog_setpoint_deg=90.0,
measured_cog_deg=90.0,
sog_kn=8.0,
xte_m=5.0, # 5 m to starboard
)
assert p.tc_state.xte_correction_deg == pytest.approx(-5.0)
assert p.tc_state.effective_setpoint_deg == pytest.approx(85.0)
def test_negative_xte_steers_starboard(self):
p = _pilot(xte_gain=1.0)
p.step_track_keeping(
cog_setpoint_deg=90.0,
measured_cog_deg=90.0,
sog_kn=8.0,
xte_m=-5.0,
)
assert p.tc_state.xte_correction_deg == pytest.approx(5.0)
assert p.tc_state.effective_setpoint_deg == pytest.approx(95.0)
def test_xte_correction_clamped(self):
p = _pilot(xte_gain=1.0, max_corr=10.0)
p.step_track_keeping(
cog_setpoint_deg=90.0,
measured_cog_deg=90.0,
sog_kn=8.0,
xte_m=50.0, # huge XTE → clamp to 10°
)
assert p.tc_state.xte_correction_deg == pytest.approx(-10.0)
def test_stale_xte_ignored(self):
"""If xte_age_s > max_age_s, correction must be zero."""
p = _pilot(xte_gain=1.0)
p.step_track_keeping(
cog_setpoint_deg=90.0,
measured_cog_deg=90.0,
sog_kn=8.0,
xte_m=10.0,
xte_age_s=10.0, # stale (default max_age_s=5)
)
assert p.tc_state.xte_correction_deg == pytest.approx(0.0)
def test_convergence_to_track(self):
"""Simulate 30 steps: XTE should shrink (very basic closed-loop sanity)."""
from arautopilot.studio.simulator.vessel_heading import heading_error_deg
p = TrueCoursePilot(TrueCourseConfig(
pid=PidOuterConfig(base_kp=2.0, base_ki=0.0, base_kd=0.0,
rot_ff_gain=0.0, rate_limit_dps=360.0,
max_rudder_deg=35.0),
xte_gain=0.5,
xte_max_correction_deg=20.0,
))
xte = 20.0 # 20 m to starboard
cog = 90.0 # vessel COG
dt = 0.1 # 10 Hz
sog = 5.0 # kn ≈ 2.57 m/s
for _ in range(200):
rudder = p.step_track_keeping(
cog_setpoint_deg=90.0,
measured_cog_deg=cog,
sog_kn=sog,
xte_m=xte,
)
cog_change = rudder * 0.5 * dt
cog = (cog + cog_change) % 360.0
# Cross-track rate: -SOG * cos(COG) for east-going track (COG=90 nominal)
xte -= sog * 0.5144 * math.cos(math.radians(cog)) * dt
assert abs(xte) < 18.0 # must have improved from initial 20 m
def test_reset_clears_state(self):
p = _pilot()
p.step_track_keeping(
cog_setpoint_deg=90.0, measured_cog_deg=80.0,
sog_kn=8.0, xte_m=5.0,
)
p.reset()
assert p.pid_state.integral == pytest.approx(0.0)