"""Tests for ``arautopilot.core.alarm_engine``.""" from __future__ import annotations import pytest from arautopilot.core.alarm_engine import AlarmEngine, AlarmThresholds, TelemetrySnapshot from arautopilot.core.alarms import AlarmSeverity, AlarmType # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _engaged_snap(**kwargs) -> TelemetrySnapshot: """Return a baseline engaged snapshot with no alarms asserted.""" defaults = dict( pilot_engaged=True, heading_deg=90.0, heading_setpoint_deg=90.0, heading_age_s=0.1, ) defaults.update(kwargs) return TelemetrySnapshot(**defaults) # --------------------------------------------------------------------------- # Basic fire / clear cycle # --------------------------------------------------------------------------- class TestFireAndClear: def test_no_alarms_by_default(self): eng = AlarmEngine() snap = TelemetrySnapshot(pilot_engaged=False) new = eng.evaluate(snap) assert new == [] assert not eng.any_active def test_fw_bit_fires_alarm(self): eng = AlarmEngine() snap = TelemetrySnapshot(fw_alarm_off_course=True) new = eng.evaluate(snap) assert len(new) == 1 assert new[0].type is AlarmType.OFF_COURSE assert eng.any_active def test_fw_bit_clearing_removes_alarm(self): eng = AlarmEngine() eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True)) new = eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=False)) assert new == [] assert not eng.any_active def test_alarm_not_re_fired_while_active(self): eng = AlarmEngine() snap = TelemetrySnapshot(fw_alarm_off_course=True) first = eng.evaluate(snap) second = eng.evaluate(snap) assert len(first) == 1 assert second == [] # already active, not re-fired def test_all_nine_fw_bits_map_to_distinct_alarm_types(self): all_bits = TelemetrySnapshot( fw_alarm_off_course=True, fw_alarm_off_course_severe=True, fw_alarm_rudder_not_resp=True, fw_alarm_heading_lost=True, fw_alarm_actuator_overcurr=True, fw_alarm_voltage_low=True, fw_alarm_limit_reached=True, fw_alarm_watchdog_tripped=True, fw_alarm_vms_critical=True, ) eng = AlarmEngine() new = eng.evaluate(all_bits) types = {a.type for a in new} assert len(types) == 9 assert AlarmType.OFF_COURSE in types assert AlarmType.VMS_CRITICAL in types # --------------------------------------------------------------------------- # Acknowledge # --------------------------------------------------------------------------- class TestAcknowledge: def test_acknowledge_marks_alarm(self): eng = AlarmEngine() eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True)) eng.acknowledge(AlarmType.OFF_COURSE) assert eng.active_alarms[0].acknowledged is True def test_acknowledge_all(self): eng = AlarmEngine() eng.evaluate(TelemetrySnapshot( fw_alarm_off_course=True, fw_alarm_voltage_low=True, )) eng.acknowledge_all() for a in eng.active_alarms: assert a.acknowledged is True def test_unacknowledged_alarm_after_clear_refire(self): """After a cleared alarm re-asserts it must fire (and be unacknowledged).""" eng = AlarmEngine() eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True)) eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=False)) new = eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True)) assert len(new) == 1 assert new[0].acknowledged is False # --------------------------------------------------------------------------- # Highest severity # --------------------------------------------------------------------------- class TestHighestSeverity: def test_none_when_no_alarms(self): assert AlarmEngine().highest_severity is None def test_emergency_dominates(self): eng = AlarmEngine() eng.evaluate(TelemetrySnapshot( fw_alarm_off_course=True, # LOW fw_alarm_watchdog_tripped=True, # EMERGENCY )) assert eng.highest_severity is AlarmSeverity.EMERGENCY def test_high_when_no_emergency(self): eng = AlarmEngine() eng.evaluate(TelemetrySnapshot( fw_alarm_off_course=True, # LOW fw_alarm_voltage_low=True, # HIGH )) assert eng.highest_severity is AlarmSeverity.HIGH def test_low_when_only_low(self): eng = AlarmEngine() eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True)) assert eng.highest_severity is AlarmSeverity.LOW # --------------------------------------------------------------------------- # Auto-disengage callback # --------------------------------------------------------------------------- class TestAutoDisengage: def test_emergency_triggers_disengage_callback(self): called = [] eng = AlarmEngine(on_disengage=lambda: called.append(1)) eng.evaluate(TelemetrySnapshot(fw_alarm_watchdog_tripped=True)) assert called == [1] def test_low_alarm_does_not_trigger_disengage(self): called = [] eng = AlarmEngine(on_disengage=lambda: called.append(1)) eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True)) assert called == [] def test_callback_called_once_per_event(self): called = [] eng = AlarmEngine(on_disengage=lambda: called.append(1)) snap = TelemetrySnapshot(fw_alarm_watchdog_tripped=True) eng.evaluate(snap) eng.evaluate(snap) # still active — not re-fired assert len(called) == 1 # --------------------------------------------------------------------------- # PC-side heading staleness # --------------------------------------------------------------------------- class TestHeadingStaleness: def test_stale_heading_fires_alarm_when_engaged(self): eng = AlarmEngine() snap = TelemetrySnapshot( pilot_engaged=True, heading_age_s=6.0, # > default 5.0 s threshold ) new = eng.evaluate(snap) types = {a.type for a in new} assert AlarmType.HEADING_SENSOR_LOST in types def test_fresh_heading_does_not_fire(self): eng = AlarmEngine() snap = TelemetrySnapshot(pilot_engaged=True, heading_age_s=0.5) new = eng.evaluate(snap) assert AlarmType.HEADING_SENSOR_LOST not in {a.type for a in new} def test_stale_heading_does_not_fire_when_disengaged(self): eng = AlarmEngine() snap = TelemetrySnapshot(pilot_engaged=False, heading_age_s=60.0) new = eng.evaluate(snap) assert new == [] # --------------------------------------------------------------------------- # PC-side off-course (no firmware bits) # --------------------------------------------------------------------------- class TestOffCoursePC: def test_small_error_no_alarm(self): eng = AlarmEngine() snap = _engaged_snap(heading_deg=91.0, heading_setpoint_deg=90.0) new = eng.evaluate(snap) assert AlarmType.OFF_COURSE not in {a.type for a in new} def test_moderate_error_fires_off_course(self): eng = AlarmEngine() snap = _engaged_snap(heading_deg=105.0, heading_setpoint_deg=90.0) # 15 deg > 10 new = eng.evaluate(snap) assert AlarmType.OFF_COURSE in {a.type for a in new} def test_severe_error_not_immediate(self): """Large error fires OFF_COURSE immediately but not SEVERE until timer expires.""" thr = AlarmThresholds(severe_off_course_deg=30.0, severe_off_course_time_s=5.0) eng = AlarmEngine(thresholds=thr) snap = _engaged_snap(heading_deg=125.0, heading_setpoint_deg=90.0) # 35 deg new = eng.evaluate(snap, dt_s=0.1) types = {a.type for a in new} # Severe timer hasn't elapsed yet assert AlarmType.OFF_COURSE_SEVERE not in types def test_severe_error_fires_after_timer(self): thr = AlarmThresholds(severe_off_course_deg=30.0, severe_off_course_time_s=1.0) eng = AlarmEngine(thresholds=thr) snap = _engaged_snap(heading_deg=125.0, heading_setpoint_deg=90.0) result_types: set[AlarmType] = set() for _ in range(15): # 15 × 0.1 s = 1.5 s > 1.0 s threshold new = eng.evaluate(snap, dt_s=0.1) result_types |= {a.type for a in new} assert AlarmType.OFF_COURSE_SEVERE in result_types def test_fw_bit_suppresses_pc_off_course(self): """When firmware asserts the alarm bit, PC-side must not duplicate it.""" eng = AlarmEngine() snap = _engaged_snap( heading_deg=125.0, heading_setpoint_deg=90.0, fw_alarm_off_course=True, # firmware owns it ) eng.evaluate(snap) # Should have exactly one OFF_COURSE (from fw bit), not two active_types = [a.type for a in eng.active_alarms] assert active_types.count(AlarmType.OFF_COURSE) == 1 def test_wraparound_heading(self): """Shortest-arc logic: 5 deg east of 358 deg setpoint is +5, not -353.""" eng = AlarmEngine() snap = _engaged_snap(heading_deg=3.0, heading_setpoint_deg=358.0) # +5 deg arc new = eng.evaluate(snap) assert AlarmType.OFF_COURSE not in {a.type for a in new} def test_clear_all_resets_state(self): eng = AlarmEngine() eng.evaluate(TelemetrySnapshot(fw_alarm_off_course=True)) eng.clear_all() assert not eng.any_active assert eng.highest_severity is None