"""NMEA 2000 navigation data models. Holds the live data received from PGN 127250/127251 (heading/ROT -- Sprint 3) and the new PGN 129026/129284 (COG+SOG / XTE+waypoint -- Sprint 5). All timestamps are float seconds (time.monotonic()). NaN means the value has never been received. """ from __future__ import annotations import math import time from dataclasses import dataclass, field @dataclass class HeadingData: """PGN 127250 + 127251 -- magnetic heading and rate of turn.""" heading_deg: float = math.nan rot_dps: float = 0.0 timestamp: float = field(default_factory=lambda: 0.0) max_age_s: float = 5.0 def update(self, heading_deg: float, rot_dps: float) -> None: self.heading_deg = heading_deg self.rot_dps = rot_dps self.timestamp = time.monotonic() @property def is_valid(self) -> bool: if math.isnan(self.heading_deg): return False return (time.monotonic() - self.timestamp) < self.max_age_s @property def age_ms(self) -> int: return int((time.monotonic() - self.timestamp) * 1000) @dataclass class CogSogData: """PGN 129026 -- Course Over Ground (true) and Speed Over Ground. COG is in degrees true (0..360). SOG is in knots. """ cog_deg: float = math.nan sog_kn: float = math.nan timestamp: float = field(default_factory=lambda: 0.0) max_age_s: float = 5.0 def update(self, cog_deg: float, sog_kn: float) -> None: self.cog_deg = cog_deg % 360.0 self.sog_kn = sog_kn self.timestamp = time.monotonic() @property def is_valid(self) -> bool: if math.isnan(self.cog_deg) or math.isnan(self.sog_kn): return False return (time.monotonic() - self.timestamp) < self.max_age_s @property def age_ms(self) -> int: return int((time.monotonic() - self.timestamp) * 1000) @dataclass class XteData: """PGN 129284 -- Cross Track Error and active waypoint. xte_m: distance off track in metres. Positive → vessel is to starboard of the track line (steer port). Negative → vessel is to port of the track line (steer starboard). waypoint_name: free text from the chart plotter, empty if not available. dtw_m: distance to waypoint in metres (NaN if unknown). """ xte_m: float = math.nan dtw_m: float = math.nan waypoint_name: str = "" timestamp: float = field(default_factory=lambda: 0.0) max_age_s: float = 5.0 def update( self, xte_m: float, dtw_m: float = math.nan, waypoint_name: str = "", ) -> None: self.xte_m = xte_m self.dtw_m = dtw_m self.waypoint_name = waypoint_name self.timestamp = time.monotonic() @property def is_valid(self) -> bool: if math.isnan(self.xte_m): return False return (time.monotonic() - self.timestamp) < self.max_age_s @property def age_ms(self) -> int: return int((time.monotonic() - self.timestamp) * 1000) @dataclass class NmeaNavData: """Aggregated live navigation data used by the autopilot control loop.""" heading: HeadingData = field(default_factory=HeadingData) cog_sog: CogSogData = field(default_factory=CogSogData) xte: XteData = field(default_factory=XteData)