a2f3e82f17
Integration tests (64 new tests, 462 total): - test_integration_cascade: full cascade closed-loop simulation -- outer PID → inner PID → rudder dynamics → vessel heading; verifies convergence across small/90°/180° turns, wrap-around, and low speed - test_integration_ekf_pid: EKF-smoothed heading feeding outer PID; confirms EKF reduces rudder total-variation vs raw noisy heading - test_integration_alarm_audit: alarm engine → audit log hash-chain; verify, tamper detection, cross-session reload, multi-alarm logging - test_modbus_utils: 38 tests for scale/raw conversion, bounds checking, heading/rudder helpers, signed int16 two's-complement round-trip Hardening: - heading_ekf: guard NaN/inf in update_heading() and update_rot() -- skip bad measurements silently rather than corrupting filter state - adaptive_tuner: guard NaN/inf in step() -- ignore corrupt error samples - modbus_utils.py: new shared module with scale_to_raw, scale_to_raw_signed, raw_signed_to_scaled, clamp_uint16, validate_holding_write, heading_deg_to_raw, rudder_deg_to_raw_signed Documentation: - docs/operator_manual.md: 15-section operator manual covering safety, installation, all operating modes, alarm reference, commissioning, fault-finding, Modbus register summary, and activation/audit log procedure Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
171 lines
5.7 KiB
Python
171 lines
5.7 KiB
Python
"""Integration: alarm engine → audit log hash-chain -- Sprint 9.
|
|
|
|
Tests the complete flow: alarm fires → AlarmEngine produces Alarm records →
|
|
AuditEvent is written to the log → hash-chain verifies OK.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from arautopilot.core.alarm_engine import AlarmEngine, TelemetrySnapshot
|
|
from arautopilot.core.alarms import AlarmType, AlarmSeverity
|
|
from arautopilot.core.audit import AuditEvent, AuditLog, AuditOutcome
|
|
|
|
|
|
def _alarm_to_event(alarm) -> AuditEvent:
|
|
return AuditEvent(
|
|
action=f"alarm.{alarm.type}",
|
|
outcome=AuditOutcome.FAILED,
|
|
reason=alarm.message,
|
|
extra={"severity": str(alarm.severity), "auto_disengage": alarm.auto_disengage_triggered},
|
|
)
|
|
|
|
|
|
def _ack_event(alarm_type: AlarmType, user_id: str = "operator") -> AuditEvent:
|
|
return AuditEvent(
|
|
user_id=user_id,
|
|
action=f"alarm.acknowledge.{alarm_type}",
|
|
outcome=AuditOutcome.SUCCESS,
|
|
)
|
|
|
|
|
|
class TestAlarmFiredAndLogged:
|
|
def test_single_alarm_logged_and_chain_valid(self, tmp_path: Path):
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
engine = AlarmEngine()
|
|
|
|
# Fire an off-course alarm
|
|
alarms = engine.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
|
|
assert len(alarms) == 1
|
|
log.append(_alarm_to_event(alarms[0]))
|
|
|
|
ok, reason = log.verify_chain()
|
|
assert ok, reason
|
|
assert len(log) == 1
|
|
|
|
def test_alarm_acknowledge_logged_and_chain_valid(self, tmp_path: Path):
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
engine = AlarmEngine()
|
|
|
|
alarms = engine.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
|
|
log.append(_alarm_to_event(alarms[0]))
|
|
|
|
engine.acknowledge(AlarmType.OFF_COURSE)
|
|
log.append(_ack_event(AlarmType.OFF_COURSE))
|
|
|
|
ok, reason = log.verify_chain()
|
|
assert ok, reason
|
|
assert len(log) == 2
|
|
|
|
def test_multiple_alarms_all_logged(self, tmp_path: Path):
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
engine = AlarmEngine()
|
|
|
|
snap = TelemetrySnapshot(
|
|
fw_alarm_off_course=True,
|
|
fw_alarm_voltage_low=True,
|
|
)
|
|
alarms = engine.evaluate(snap)
|
|
assert len(alarms) == 2
|
|
|
|
for a in alarms:
|
|
log.append(_alarm_to_event(a))
|
|
|
|
ok, reason = log.verify_chain()
|
|
assert ok, reason
|
|
assert len(log) == 2
|
|
|
|
def test_disengage_event_logged_in_chain(self, tmp_path: Path):
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
disengages = []
|
|
engine = AlarmEngine(on_disengage=lambda: disengages.append(True))
|
|
|
|
# EMERGENCY alarm triggers auto-disengage
|
|
alarms = engine.evaluate(TelemetrySnapshot(fw_alarm_heading_lost=True))
|
|
assert len(disengages) >= 1
|
|
assert alarms[0].auto_disengage_triggered
|
|
|
|
log.append(_alarm_to_event(alarms[0]))
|
|
log.append(AuditEvent(
|
|
action="pilot.disengage",
|
|
outcome=AuditOutcome.SUCCESS,
|
|
reason="auto-disengage from alarm",
|
|
extra={"trigger": str(alarms[0].type)},
|
|
))
|
|
|
|
ok, reason = log.verify_chain()
|
|
assert ok, reason
|
|
|
|
def test_alarm_clear_and_refire_both_logged(self, tmp_path: Path):
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
engine = AlarmEngine()
|
|
|
|
alarms = engine.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
|
|
log.append(_alarm_to_event(alarms[0]))
|
|
|
|
# Clear
|
|
engine.evaluate(TelemetrySnapshot(fw_alarm_off_course=False))
|
|
log.append(AuditEvent(action="alarm.cleared.off_course", outcome=AuditOutcome.SUCCESS))
|
|
|
|
# Refire
|
|
alarms2 = engine.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
|
|
assert len(alarms2) == 1
|
|
log.append(_alarm_to_event(alarms2[0]))
|
|
|
|
ok, reason = log.verify_chain()
|
|
assert ok, reason
|
|
assert len(log) == 3
|
|
|
|
|
|
class TestAuditPersistenceAcrossReload:
|
|
def test_reloaded_log_continues_chain(self, tmp_path: Path):
|
|
p = tmp_path / "audit.jsonl"
|
|
log1 = AuditLog(p)
|
|
engine = AlarmEngine()
|
|
alarms = engine.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
|
|
log1.append(_alarm_to_event(alarms[0]))
|
|
|
|
# Simulate restarting the Studio
|
|
log2 = AuditLog(p)
|
|
log2.append(AuditEvent(action="studio.startup", outcome=AuditOutcome.SUCCESS))
|
|
|
|
ok, reason = log2.verify_chain()
|
|
assert ok, reason
|
|
assert len(log2) == 2
|
|
|
|
def test_tampered_alarm_entry_detected(self, tmp_path: Path):
|
|
import json
|
|
p = tmp_path / "audit.jsonl"
|
|
log = AuditLog(p)
|
|
engine = AlarmEngine()
|
|
alarms = engine.evaluate(TelemetrySnapshot(fw_alarm_off_course=True))
|
|
log.append(_alarm_to_event(alarms[0]))
|
|
|
|
# Tamper: change the action field
|
|
lines = p.read_text(encoding="utf-8").splitlines()
|
|
data = json.loads(lines[0])
|
|
data["action"] = "alarm.no_problem_here"
|
|
lines[0] = json.dumps(data)
|
|
p.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
log2 = AuditLog(p)
|
|
ok, reason = log2.verify_chain()
|
|
assert not ok
|
|
assert "mismatch" in reason.lower() or "tamper" in reason.lower()
|
|
|
|
|
|
class TestAlarmSeverityInAudit:
|
|
def test_emergency_severity_recorded(self, tmp_path: Path):
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
engine = AlarmEngine()
|
|
alarms = engine.evaluate(TelemetrySnapshot(fw_alarm_heading_lost=True))
|
|
assert alarms[0].severity == AlarmSeverity.EMERGENCY
|
|
event = _alarm_to_event(alarms[0])
|
|
assert event.extra["severity"] == "emergency"
|
|
log.append(event)
|
|
events = log.read_all()
|
|
assert events[0].extra["severity"] == "emergency"
|