13a2867ef6
End-to-end implementation per docs/sprint-2.5-plan.md. New requirement added by user mid-sprint: 4-role RBAC (Super Admin / Engineer / Owner / User) with dual-auth for Engineer flashing firmware, plus a "mini Arduino IDE" inside the Studio. Tests: pytest 231/231 green (129 Sprint 2 + 102 Sprint 2.5 new). RBAC core (arautopilot/core/): - rbac.py: 4 roles, 12 capabilities, immutable capability matrix, has() / capabilities_of() / require() / requires_dual_auth() helpers. Engineer flashing firmware needs SA approval; everything else is single-factor. - user.py: User model with PBKDF2-HMAC-SHA256 PIN hashing (200k iters, 16-byte salt, self-describing hash format for future migrations). 4-8 digit numeric PINs enforced. - user_store.py: JSON-backed user database. seed_demo_users() for first-run UX. - audit.py: append-only JSONL audit log. AuditEvent with timestamp, user_id, role, action, target, outcome, reason, secondary_user_id for dual-auth, optional extra payload. Crypto signing of lines deferred to Sprint 8. Studio GUI (arautopilot/studio/): - app.py: real entry point (replaces Sprint 0 stub). --seed-demo populates demo users without launching GUI; --data-dir overrides the ~/.ar-autopilot/studio/ default. - session.py: Session + SessionHolder. check() always audits the decision; verify_super_admin_pin() + log_dual_auth_grant() for dual-auth flows. - login_window.py: modal login dialog with user picker + PIN field. Audits login attempts (success and bad-PIN denials). - main_window.py: top-level window with sidebar (user + role + caps) and tab area (Overview, Flash Console, Project placeholder, Telemetry placeholder). - flash_console.py: the "mini Arduino IDE". Lists serial ports via pyserial; picks firmware variant (esp32-dev / esp32-debug); compiles via 'pio run'; flashes via 'pio run -t upload --upload-port <port>'; streams pio output to a dark-themed read-only console; supports cancel. For Engineer flashes, asks the Super Admin for their PIN inline before invoking pio. Records dual-auth grant + pio exit code in the audit log. Dependencies: - New [project.optional-dependencies] group 'studio': PySide6>=6.6, pyserial>=3.5, platformio>=6.1. Kept optional so the core can be installed in lean / CI environments. Tests (arautopilot/tests/): - test_rbac.py: 32 tests for capability matrix, dual-auth policy, no-privilege-escalation invariants, partial overlap between roles. - test_user.py: 11 tests for PIN hashing, verification, salting, serialisation, field validators. - test_audit.py: 9 tests for JSONL append, immutability, round-trip, corrupt-line detection, dual-auth event shape, blank-line tolerance. - test_user_store.py: 10 tests for CRUD, persistence, role filtering, demo seed idempotency. - test_session.py: 9 tests for capability checks + audit side effects, SA PIN verification, dual-auth recording, SessionHolder lifecycle. - test_studio_smoke.py: 5 headless tests verifying Studio modules import without a display server, --seed-demo works, helpers safe to call without hardware. NOT in Sprint 2.5 (intentional): - Crypto signing of audit log lines (hash-chain) -- Sprint 8 - HWID binding of the user store -- Sprint 8 - Project configurator + .appack compiler -- Sprint 4 - Flutter bridge display -- Sprint 4 - Telemetry dashboard tab -- Sprint 4 - Serial monitor as a separate tab -- future enhancement Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
109 lines
3.2 KiB
Python
109 lines
3.2 KiB
Python
"""Tests for ``arautopilot.core.audit``."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from arautopilot.core.audit import AuditEvent, AuditLog, AuditOutcome
|
|
|
|
|
|
def test_event_to_jsonl_is_single_line() -> None:
|
|
ev = AuditEvent(
|
|
action="engage_pilot",
|
|
outcome=AuditOutcome.SUCCESS,
|
|
user_id="u123",
|
|
role="user",
|
|
)
|
|
line = ev.to_jsonl()
|
|
assert "\n" not in line
|
|
assert line.startswith("{") and line.endswith("}")
|
|
|
|
|
|
def test_event_is_immutable() -> None:
|
|
ev = AuditEvent(action="x", outcome=AuditOutcome.SUCCESS)
|
|
with pytest.raises((TypeError, ValueError)):
|
|
ev.action = "y" # type: ignore[misc]
|
|
|
|
|
|
def test_append_and_read_round_trip(tmp_path: Path) -> None:
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
e1 = AuditEvent(action="login", outcome=AuditOutcome.SUCCESS, user_id="u1")
|
|
e2 = AuditEvent(action="engage", outcome=AuditOutcome.DENIED, user_id="u2",
|
|
reason="missing capability")
|
|
log.append(e1)
|
|
log.append(e2)
|
|
read = log.read_all()
|
|
assert len(read) == 2
|
|
assert read[0].action == "login"
|
|
assert read[1].outcome is AuditOutcome.DENIED
|
|
|
|
|
|
def test_len_counts_lines(tmp_path: Path) -> None:
|
|
log = AuditLog(tmp_path / "audit.jsonl")
|
|
assert len(log) == 0
|
|
log.append(AuditEvent(action="a", outcome=AuditOutcome.SUCCESS))
|
|
log.append(AuditEvent(action="b", outcome=AuditOutcome.SUCCESS))
|
|
assert len(log) == 2
|
|
|
|
|
|
def test_log_file_is_created_if_missing(tmp_path: Path) -> None:
|
|
p = tmp_path / "subdir" / "audit.jsonl"
|
|
log = AuditLog(p)
|
|
assert p.exists()
|
|
assert len(log) == 0
|
|
|
|
|
|
def test_corrupt_line_raises(tmp_path: Path) -> None:
|
|
p = tmp_path / "audit.jsonl"
|
|
p.write_text(
|
|
'{"action":"good","outcome":"success","timestamp":"2026-05-18T00:00:00Z"}\n'
|
|
"this is not json\n",
|
|
encoding="utf-8",
|
|
)
|
|
log = AuditLog(p)
|
|
with pytest.raises(ValueError, match="corrupt audit line"):
|
|
log.read_all()
|
|
|
|
|
|
def test_dual_auth_event_carries_secondary_user(tmp_path: Path) -> None:
|
|
log = AuditLog(tmp_path / "a.jsonl")
|
|
ev = AuditEvent(
|
|
action="flash_firmware",
|
|
outcome=AuditOutcome.SUCCESS,
|
|
user_id="engineer_1",
|
|
role="engineer",
|
|
secondary_user_id="super_admin_alvaro",
|
|
target="COM7:esp32-dev",
|
|
)
|
|
log.append(ev)
|
|
read = log.read_all()
|
|
assert read[0].secondary_user_id == "super_admin_alvaro"
|
|
assert read[0].target == "COM7:esp32-dev"
|
|
|
|
|
|
def test_extra_payload_round_trips(tmp_path: Path) -> None:
|
|
log = AuditLog(tmp_path / "a.jsonl")
|
|
ev = AuditEvent(
|
|
action="mode_change",
|
|
outcome=AuditOutcome.SUCCESS,
|
|
extra={"from": "STANDBY", "to": "HEADING_HOLD", "via": "modbus"},
|
|
)
|
|
log.append(ev)
|
|
read = log.read_all()
|
|
assert read[0].extra == {"from": "STANDBY", "to": "HEADING_HOLD", "via": "modbus"}
|
|
|
|
|
|
def test_blank_lines_are_skipped(tmp_path: Path) -> None:
|
|
p = tmp_path / "a.jsonl"
|
|
p.write_text(
|
|
'{"action":"a","outcome":"success","timestamp":"2026-05-18T00:00:00Z"}\n'
|
|
"\n"
|
|
'{"action":"b","outcome":"denied","timestamp":"2026-05-18T00:00:01Z"}\n',
|
|
encoding="utf-8",
|
|
)
|
|
log = AuditLog(p)
|
|
events = log.read_all()
|
|
assert len(events) == 2
|