"""Tests for ``arautopilot.studio.session.Session``.""" from __future__ import annotations from pathlib import Path import pytest from arautopilot.core.audit import AuditLog, AuditOutcome from arautopilot.core.rbac import Capability, Role from arautopilot.core.user import User from arautopilot.core.user_store import UserStore from arautopilot.studio.session import Session, SessionHolder @pytest.fixture def store_and_audit(tmp_path: Path) -> tuple[UserStore, AuditLog]: store = UserStore(tmp_path / "users.json") audit = AuditLog(tmp_path / "audit.jsonl") return store, audit def _sa_session(store: UserStore, audit: AuditLog) -> Session: u = User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001") store.add(u) return Session(user=u, store=store, audit=audit) def _eng_session(store: UserStore, audit: AuditLog) -> Session: u = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") store.add(u) return Session(user=u, store=store, audit=audit) def test_session_can_for_super_admin(store_and_audit: tuple[UserStore, AuditLog]) -> None: store, audit = store_and_audit s = _sa_session(store, audit) assert s.can(Capability.FLASH_FIRMWARE) assert s.can(Capability.EDIT_BASE_GAINS) assert s.can(Capability.MANAGE_USERS) def test_session_check_records_audit_on_grant( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit s = _sa_session(store, audit) assert s.check(Capability.FLASH_FIRMWARE, target="COM7") is True events = audit.read_all() assert len(events) == 1 assert events[0].outcome is AuditOutcome.SUCCESS assert events[0].action == "check:flash_firmware" assert events[0].target == "COM7" def test_session_check_records_audit_on_denial( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit u = User.create(display_name="Crew", role=Role.USER, pin="4444") store.add(u) s = Session(user=u, store=store, audit=audit) assert s.check(Capability.FLASH_FIRMWARE) is False events = audit.read_all() assert len(events) == 1 assert events[0].outcome is AuditOutcome.DENIED assert "lacks" in events[0].reason def test_engineer_needs_dual_auth_for_flash( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit s = _eng_session(store, audit) assert s.needs_dual_auth(Capability.FLASH_FIRMWARE) assert not s.needs_dual_auth(Capability.BUILD_FIRMWARE) def test_verify_super_admin_pin_succeeds_with_right_pin( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit sa = User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001") store.add(sa) eng = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") store.add(eng) s = Session(user=eng, store=store, audit=audit) matched = s.verify_super_admin_pin("0001") assert matched is not None assert matched.user_id == sa.user_id def test_verify_super_admin_pin_fails_with_wrong_pin( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit store.add(User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001")) eng = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") store.add(eng) s = Session(user=eng, store=store, audit=audit) assert s.verify_super_admin_pin("9999") is None def test_dual_auth_grant_records_secondary_user( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit sa = User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001") store.add(sa) eng = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") store.add(eng) s = Session(user=eng, store=store, audit=audit) s.log_dual_auth_grant(Capability.FLASH_FIRMWARE, sa, target="COM7:esp32-dev", extra={"variant": "esp32-dev"}) events = audit.read_all() assert events[0].secondary_user_id == sa.user_id assert events[0].outcome is AuditOutcome.SUCCESS assert events[0].target == "COM7:esp32-dev" assert events[0].extra == {"variant": "esp32-dev"} def test_session_holder_set_and_require( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit s = _sa_session(store, audit) SessionHolder.set(s) assert SessionHolder.current() is s assert SessionHolder.require() is s SessionHolder.set(None) assert SessionHolder.current() is None with pytest.raises(RuntimeError): SessionHolder.require() def test_log_action_helper( store_and_audit: tuple[UserStore, AuditLog] ) -> None: store, audit = store_and_audit s = _sa_session(store, audit) s.log_action("mode_change", outcome=AuditOutcome.SUCCESS, extra={"from": "STANDBY", "to": "HEADING_HOLD"}) events = audit.read_all() assert events[0].action == "mode_change" assert events[0].extra["to"] == "HEADING_HOLD"