"""Runtime alarm engine -- Sprint 6. Evaluates live telemetry against the alarm catalogue (brief section 7) and maintains a set of currently active alarms. Designed to run on the PC side (Python display software); the firmware publishes alarm bits via Modbus discrete inputs and this engine turns them into typed Alarm records that the UI displays and the audit log records. Usage:: engine = AlarmEngine() # On each 10 Hz display tick: alarms = engine.evaluate(snapshot) for a in alarms: # newly fired alarms display.show_alarm(a) audit_log.append(a) engine.acknowledge(AlarmType.OFF_COURSE) engine.acknowledge_all() """ from __future__ import annotations from dataclasses import dataclass, field from typing import Callable from arautopilot.core.alarms import Alarm, AlarmType, AlarmSeverity, triggers_auto_disengage @dataclass class AlarmThresholds: """Tunable thresholds; can be loaded from the project config.""" # Off-course off_course_deg: float = 10.0 # LOW warning threshold severe_off_course_deg: float = 30.0 # EMERGENCY threshold severe_off_course_time_s: float = 5.0 # must persist this long # Rudder not responding rudder_response_timeout_s: float = 3.0 # setpoint sent but no motion rudder_deadband_deg: float = 0.5 # Sensor staleness (must match firmware STALE_THRESHOLD_MS) heading_stale_s: float = 5.0 cog_stale_s: float = 5.0 # Voltage / current (used by display if it has ADC readback) voltage_low_v: float = 10.5 current_high_a: float = 30.0 @dataclass class TelemetrySnapshot: """Aggregated telemetry read from Modbus or internal state.""" # Mode pilot_engaged: bool = False # Heading heading_deg: float | None = None heading_setpoint_deg: float | None = None heading_age_s: float = 0.0 # Rudder rudder_angle_deg: float | None = None rudder_setpoint_deg: float | None = None rudder_valid: bool = False # COG (True Course / Track Keeping modes) cog_deg: float | None = None cog_age_s: float = 0.0 # Electrical battery_v: float | None = None actuator_a: float | None = None # Digital inputs limit_port: bool = False limit_stbd: bool = False vms_critical: bool = False # Firmware discrete alarms (read directly from Modbus) fw_alarm_off_course: bool = False fw_alarm_off_course_severe: bool = False fw_alarm_rudder_not_resp: bool = False fw_alarm_heading_lost: bool = False fw_alarm_actuator_overcurr: bool = False fw_alarm_voltage_low: bool = False fw_alarm_limit_reached: bool = False fw_alarm_watchdog_tripped: bool = False fw_alarm_vms_critical: bool = False class AlarmEngine: """Evaluates telemetry and manages the active alarm set. The primary job is to bridge between raw Modbus bits (from the firmware) and the typed :class:`Alarm` records that the UI and audit log consume. The engine also performs PC-side logic for alarms the firmware doesn't compute (e.g. heading-sensor age on the Python side). """ def __init__( self, thresholds: AlarmThresholds | None = None, on_disengage: Callable[[], None] | None = None, ) -> None: self.thresholds = thresholds or AlarmThresholds() self._on_disengage = on_disengage self._active: dict[AlarmType, Alarm] = {} self._acknowledged: set[AlarmType] = set() self._severe_off_course_timer_s: float = 0.0 self._dt_s: float = 0.1 # 10 Hz default # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def evaluate( self, snap: TelemetrySnapshot, dt_s: float | None = None, ) -> list[Alarm]: """Evaluate conditions against ``snap``. Returns the list of **newly fired** alarms since the last call. Already-active alarms are not re-fired. """ if dt_s is not None: self._dt_s = dt_s new_alarms: list[Alarm] = [] # Map firmware discrete bits → alarm types fw_map: list[tuple[bool, AlarmType]] = [ (snap.fw_alarm_off_course, AlarmType.OFF_COURSE), (snap.fw_alarm_off_course_severe, AlarmType.OFF_COURSE_SEVERE), (snap.fw_alarm_rudder_not_resp, AlarmType.RUDDER_NOT_RESPONDING), (snap.fw_alarm_heading_lost, AlarmType.HEADING_SENSOR_LOST), (snap.fw_alarm_actuator_overcurr, AlarmType.ACTUATOR_OVERCURRENT), (snap.fw_alarm_voltage_low, AlarmType.VOLTAGE_LOW), (snap.fw_alarm_limit_reached, AlarmType.LIMIT_SWITCH_REACHED), (snap.fw_alarm_watchdog_tripped, AlarmType.WATCHDOG_TRIPPED), (snap.fw_alarm_vms_critical, AlarmType.VMS_CRITICAL), ] for condition, alarm_type in fw_map: if condition: fired = self._maybe_fire(alarm_type) if fired: new_alarms.append(fired) else: self._clear(alarm_type) # PC-side: heading sensor staleness if snap.pilot_engaged and snap.heading_age_s > self.thresholds.heading_stale_s: fired = self._maybe_fire(AlarmType.HEADING_SENSOR_LOST) if fired: new_alarms.append(fired) # PC-side: off-course (if firmware bits not present, compute from heading) if (snap.pilot_engaged and snap.heading_deg is not None and snap.heading_setpoint_deg is not None and not snap.fw_alarm_off_course and not snap.fw_alarm_off_course_severe): err = _shortest_arc(snap.heading_setpoint_deg, snap.heading_deg) thr = self.thresholds if abs(err) >= thr.severe_off_course_deg: self._severe_off_course_timer_s += self._dt_s if self._severe_off_course_timer_s >= thr.severe_off_course_time_s: fired = self._maybe_fire(AlarmType.OFF_COURSE_SEVERE) if fired: new_alarms.append(fired) else: fired = self._maybe_fire(AlarmType.OFF_COURSE) if fired: new_alarms.append(fired) elif abs(err) >= thr.off_course_deg: self._severe_off_course_timer_s = 0.0 fired = self._maybe_fire(AlarmType.OFF_COURSE) if fired: new_alarms.append(fired) else: self._severe_off_course_timer_s = 0.0 self._clear(AlarmType.OFF_COURSE) self._clear(AlarmType.OFF_COURSE_SEVERE) # Trigger auto-disengage for any new EMERGENCY alarms. for a in new_alarms: if a.auto_disengage_triggered and self._on_disengage: self._on_disengage() break return new_alarms def acknowledge(self, alarm_type: AlarmType) -> None: """Acknowledge a specific alarm (marks it as seen).""" self._acknowledged.add(alarm_type) if alarm_type in self._active: # Replace the alarm record with an acknowledged copy. old = self._active[alarm_type] self._active[alarm_type] = old.model_copy(update={"acknowledged": True}) def acknowledge_all(self) -> None: for at in list(self._active): self.acknowledge(at) def clear(self, alarm_type: AlarmType) -> None: self._clear(alarm_type) self._acknowledged.discard(alarm_type) def clear_all(self) -> None: self._active.clear() self._acknowledged.clear() self._severe_off_course_timer_s = 0.0 @property def active_alarms(self) -> list[Alarm]: return list(self._active.values()) @property def any_active(self) -> bool: return bool(self._active) @property def highest_severity(self) -> AlarmSeverity | None: if not self._active: return None order = [AlarmSeverity.EMERGENCY, AlarmSeverity.HIGH, AlarmSeverity.LOW, AlarmSeverity.INFO] for sev in order: if any(a.severity == sev for a in self._active.values()): return sev return None # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _maybe_fire(self, alarm_type: AlarmType) -> Alarm | None: if alarm_type in self._active: return None # already active alarm = Alarm.from_type(alarm_type) self._active[alarm_type] = alarm self._acknowledged.discard(alarm_type) return alarm def _clear(self, alarm_type: AlarmType) -> None: self._active.pop(alarm_type, None) def _shortest_arc(setpoint: float, measured: float) -> float: """Signed shortest-arc error in degrees (setpoint - measured).""" err = (setpoint - measured + 180.0) % 360.0 - 180.0 return err