Files
alro65 e82dbc449c sprint-6: Alarm engine + safety monitor + NMEA 2000 publisher
Python side:
- alarm_engine.py: AlarmEngine evaluates 9 firmware alarm bits + PC-side
  heading staleness and off-course logic with severe-timer; on_disengage
  callback triggers on first EMERGENCY alarm; acknowledge/clear API
- test_alarm_engine.py: 25 tests covering fire/clear cycle, acknowledge,
  highest_severity, auto-disengage callback, heading staleness, off-course
  with wraparound and timer, fw-bit suppression of duplicate PC alarm

Firmware:
- safety_monitor.h: exposes AlarmBits struct + safety_alarm_bits() API
- safety_monitor.cpp: 50 Hz task evaluates off-course (with severe timer),
  rudder-not-responding (3 s timeout), heading lost, VMS/DI4, limit switches,
  battery voltage, actuator current; buzzer on any alarm; EMERGENCY → force_standby
- modbus_slave.cpp: wires 9 discrete alarm registers to safety_alarm_bits();
  battery voltage and actuator current ADC registers now live
- nmea2000_publisher.h/cpp: new task, PGN 127245 rudder angle at 10 Hz,
  PGN 127237 Heading/Track Control at 1 Hz
- main.cpp: start nmea2000_publisher; set watchdog-tripped flag on ESP_RST_TASK_WDT

Tests: 309 passed | Flash: 27.6%

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 00:16:24 -04:00

260 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for ``arautopilot.core.alarm_engine``."""
from __future__ import annotations
import pytest
from arautopilot.core.alarm_engine import AlarmEngine, AlarmThresholds, TelemetrySnapshot
from arautopilot.core.alarms import AlarmSeverity, AlarmType
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _engaged_snap(**kwargs) -> TelemetrySnapshot:
"""Return a baseline engaged snapshot with no alarms asserted."""
defaults = dict(
pilot_engaged=True,
heading_deg=90.0,
heading_setpoint_deg=90.0,
heading_age_s=0.1,
)
defaults.update(kwargs)
return TelemetrySnapshot(**defaults)
# ---------------------------------------------------------------------------
# Basic fire / clear cycle
# ---------------------------------------------------------------------------
class TestFireAndClear:
def test_no_alarms_by_default(self):
eng = AlarmEngine()
snap = TelemetrySnapshot(pilot_engaged=False)
new = eng.evaluate(snap)
assert new == []
assert not eng.any_active
def test_fw_bit_fires_alarm(self):
eng = AlarmEngine()
snap = TelemetrySnapshot(fw_alarm_off_course=True)
new = eng.evaluate(snap)
assert len(new) == 1
assert new[0].type is AlarmType.OFF_COURSE
assert eng.any_active
def test_fw_bit_clearing_removes_alarm(self):
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
new = eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=False))
assert new == []
assert not eng.any_active
def test_alarm_not_re_fired_while_active(self):
eng = AlarmEngine()
snap = TelemetrySnapshot(fw_alarm_off_course=True)
first = eng.evaluate(snap)
second = eng.evaluate(snap)
assert len(first) == 1
assert second == [] # already active, not re-fired
def test_all_nine_fw_bits_map_to_distinct_alarm_types(self):
all_bits = TelemetrySnapshot(
fw_alarm_off_course=True,
fw_alarm_off_course_severe=True,
fw_alarm_rudder_not_resp=True,
fw_alarm_heading_lost=True,
fw_alarm_actuator_overcurr=True,
fw_alarm_voltage_low=True,
fw_alarm_limit_reached=True,
fw_alarm_watchdog_tripped=True,
fw_alarm_vms_critical=True,
)
eng = AlarmEngine()
new = eng.evaluate(all_bits)
types = {a.type for a in new}
assert len(types) == 9
assert AlarmType.OFF_COURSE in types
assert AlarmType.VMS_CRITICAL in types
# ---------------------------------------------------------------------------
# Acknowledge
# ---------------------------------------------------------------------------
class TestAcknowledge:
def test_acknowledge_marks_alarm(self):
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
eng.acknowledge(AlarmType.OFF_COURSE)
assert eng.active_alarms[0].acknowledged is True
def test_acknowledge_all(self):
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(
fw_alarm_off_course=True,
fw_alarm_voltage_low=True,
))
eng.acknowledge_all()
for a in eng.active_alarms:
assert a.acknowledged is True
def test_unacknowledged_alarm_after_clear_refire(self):
"""After a cleared alarm re-asserts it must fire (and be unacknowledged)."""
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=False))
new = eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
assert len(new) == 1
assert new[0].acknowledged is False
# ---------------------------------------------------------------------------
# Highest severity
# ---------------------------------------------------------------------------
class TestHighestSeverity:
def test_none_when_no_alarms(self):
assert AlarmEngine().highest_severity is None
def test_emergency_dominates(self):
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(
fw_alarm_off_course=True, # LOW
fw_alarm_watchdog_tripped=True, # EMERGENCY
))
assert eng.highest_severity is AlarmSeverity.EMERGENCY
def test_high_when_no_emergency(self):
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(
fw_alarm_off_course=True, # LOW
fw_alarm_voltage_low=True, # HIGH
))
assert eng.highest_severity is AlarmSeverity.HIGH
def test_low_when_only_low(self):
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
assert eng.highest_severity is AlarmSeverity.LOW
# ---------------------------------------------------------------------------
# Auto-disengage callback
# ---------------------------------------------------------------------------
class TestAutoDisengage:
def test_emergency_triggers_disengage_callback(self):
called = []
eng = AlarmEngine(on_disengage=lambda: called.append(1))
eng.evaluate(TelemetrySnapshot(fw_alarm_watchdog_tripped=True))
assert called == [1]
def test_low_alarm_does_not_trigger_disengage(self):
called = []
eng = AlarmEngine(on_disengage=lambda: called.append(1))
eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
assert called == []
def test_callback_called_once_per_event(self):
called = []
eng = AlarmEngine(on_disengage=lambda: called.append(1))
snap = TelemetrySnapshot(fw_alarm_watchdog_tripped=True)
eng.evaluate(snap)
eng.evaluate(snap) # still active — not re-fired
assert len(called) == 1
# ---------------------------------------------------------------------------
# PC-side heading staleness
# ---------------------------------------------------------------------------
class TestHeadingStaleness:
def test_stale_heading_fires_alarm_when_engaged(self):
eng = AlarmEngine()
snap = TelemetrySnapshot(
pilot_engaged=True,
heading_age_s=6.0, # > default 5.0 s threshold
)
new = eng.evaluate(snap)
types = {a.type for a in new}
assert AlarmType.HEADING_SENSOR_LOST in types
def test_fresh_heading_does_not_fire(self):
eng = AlarmEngine()
snap = TelemetrySnapshot(pilot_engaged=True, heading_age_s=0.5)
new = eng.evaluate(snap)
assert AlarmType.HEADING_SENSOR_LOST not in {a.type for a in new}
def test_stale_heading_does_not_fire_when_disengaged(self):
eng = AlarmEngine()
snap = TelemetrySnapshot(pilot_engaged=False, heading_age_s=60.0)
new = eng.evaluate(snap)
assert new == []
# ---------------------------------------------------------------------------
# PC-side off-course (no firmware bits)
# ---------------------------------------------------------------------------
class TestOffCoursePC:
def test_small_error_no_alarm(self):
eng = AlarmEngine()
snap = _engaged_snap(heading_deg=91.0, heading_setpoint_deg=90.0)
new = eng.evaluate(snap)
assert AlarmType.OFF_COURSE not in {a.type for a in new}
def test_moderate_error_fires_off_course(self):
eng = AlarmEngine()
snap = _engaged_snap(heading_deg=105.0, heading_setpoint_deg=90.0) # 15 deg > 10
new = eng.evaluate(snap)
assert AlarmType.OFF_COURSE in {a.type for a in new}
def test_severe_error_not_immediate(self):
"""Large error fires OFF_COURSE immediately but not SEVERE until timer expires."""
thr = AlarmThresholds(severe_off_course_deg=30.0, severe_off_course_time_s=5.0)
eng = AlarmEngine(thresholds=thr)
snap = _engaged_snap(heading_deg=125.0, heading_setpoint_deg=90.0) # 35 deg
new = eng.evaluate(snap, dt_s=0.1)
types = {a.type for a in new}
# Severe timer hasn't elapsed yet
assert AlarmType.OFF_COURSE_SEVERE not in types
def test_severe_error_fires_after_timer(self):
thr = AlarmThresholds(severe_off_course_deg=30.0, severe_off_course_time_s=1.0)
eng = AlarmEngine(thresholds=thr)
snap = _engaged_snap(heading_deg=125.0, heading_setpoint_deg=90.0)
result_types: set[AlarmType] = set()
for _ in range(15): # 15 × 0.1 s = 1.5 s > 1.0 s threshold
new = eng.evaluate(snap, dt_s=0.1)
result_types |= {a.type for a in new}
assert AlarmType.OFF_COURSE_SEVERE in result_types
def test_fw_bit_suppresses_pc_off_course(self):
"""When firmware asserts the alarm bit, PC-side must not duplicate it."""
eng = AlarmEngine()
snap = _engaged_snap(
heading_deg=125.0,
heading_setpoint_deg=90.0,
fw_alarm_off_course=True, # firmware owns it
)
eng.evaluate(snap)
# Should have exactly one OFF_COURSE (from fw bit), not two
active_types = [a.type for a in eng.active_alarms]
assert active_types.count(AlarmType.OFF_COURSE) == 1
def test_wraparound_heading(self):
"""Shortest-arc logic: 5 deg east of 358 deg setpoint is +5, not -353."""
eng = AlarmEngine()
snap = _engaged_snap(heading_deg=3.0, heading_setpoint_deg=358.0) # +5 deg arc
new = eng.evaluate(snap)
assert AlarmType.OFF_COURSE not in {a.type for a in new}
def test_clear_all_resets_state(self):
eng = AlarmEngine()
eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
eng.clear_all()
assert not eng.any_active
assert eng.highest_severity is None