From 13a2867ef678edd5a853696126e809c0c7887e2d Mon Sep 17 00:00:00 2001 From: alro1965 Date: Mon, 18 May 2026 18:04:27 -0400 Subject: [PATCH] sprint-2.5: RBAC 4 roles + Studio bootable + Flash Console End-to-end implementation per docs/sprint-2.5-plan.md. New requirement added by user mid-sprint: 4-role RBAC (Super Admin / Engineer / Owner / User) with dual-auth for Engineer flashing firmware, plus a "mini Arduino IDE" inside the Studio. Tests: pytest 231/231 green (129 Sprint 2 + 102 Sprint 2.5 new). RBAC core (arautopilot/core/): - rbac.py: 4 roles, 12 capabilities, immutable capability matrix, has() / capabilities_of() / require() / requires_dual_auth() helpers. Engineer flashing firmware needs SA approval; everything else is single-factor. - user.py: User model with PBKDF2-HMAC-SHA256 PIN hashing (200k iters, 16-byte salt, self-describing hash format for future migrations). 4-8 digit numeric PINs enforced. - user_store.py: JSON-backed user database. seed_demo_users() for first-run UX. - audit.py: append-only JSONL audit log. AuditEvent with timestamp, user_id, role, action, target, outcome, reason, secondary_user_id for dual-auth, optional extra payload. Crypto signing of lines deferred to Sprint 8. Studio GUI (arautopilot/studio/): - app.py: real entry point (replaces Sprint 0 stub). --seed-demo populates demo users without launching GUI; --data-dir overrides the ~/.ar-autopilot/studio/ default. - session.py: Session + SessionHolder. check() always audits the decision; verify_super_admin_pin() + log_dual_auth_grant() for dual-auth flows. - login_window.py: modal login dialog with user picker + PIN field. Audits login attempts (success and bad-PIN denials). - main_window.py: top-level window with sidebar (user + role + caps) and tab area (Overview, Flash Console, Project placeholder, Telemetry placeholder). - flash_console.py: the "mini Arduino IDE". Lists serial ports via pyserial; picks firmware variant (esp32-dev / esp32-debug); compiles via 'pio run'; flashes via 'pio run -t upload --upload-port '; streams pio output to a dark-themed read-only console; supports cancel. For Engineer flashes, asks the Super Admin for their PIN inline before invoking pio. Records dual-auth grant + pio exit code in the audit log. Dependencies: - New [project.optional-dependencies] group 'studio': PySide6>=6.6, pyserial>=3.5, platformio>=6.1. Kept optional so the core can be installed in lean / CI environments. Tests (arautopilot/tests/): - test_rbac.py: 32 tests for capability matrix, dual-auth policy, no-privilege-escalation invariants, partial overlap between roles. - test_user.py: 11 tests for PIN hashing, verification, salting, serialisation, field validators. - test_audit.py: 9 tests for JSONL append, immutability, round-trip, corrupt-line detection, dual-auth event shape, blank-line tolerance. - test_user_store.py: 10 tests for CRUD, persistence, role filtering, demo seed idempotency. - test_session.py: 9 tests for capability checks + audit side effects, SA PIN verification, dual-auth recording, SessionHolder lifecycle. - test_studio_smoke.py: 5 headless tests verifying Studio modules import without a display server, --seed-demo works, helpers safe to call without hardware. NOT in Sprint 2.5 (intentional): - Crypto signing of audit log lines (hash-chain) -- Sprint 8 - HWID binding of the user store -- Sprint 8 - Project configurator + .appack compiler -- Sprint 4 - Flutter bridge display -- Sprint 4 - Telemetry dashboard tab -- Sprint 4 - Serial monitor as a separate tab -- future enhancement Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 89 ++++++ arautopilot/core/audit.py | 114 +++++++ arautopilot/core/rbac.py | 173 +++++++++++ arautopilot/core/user.py | 143 +++++++++ arautopilot/core/user_store.py | 108 +++++++ arautopilot/studio/app.py | 110 ++++++- arautopilot/studio/flash_console.py | 404 +++++++++++++++++++++++++ arautopilot/studio/login_window.py | 123 ++++++++ arautopilot/studio/main_window.py | 110 ++++++- arautopilot/studio/session.py | 146 +++++++++ arautopilot/tests/test_audit.py | 108 +++++++ arautopilot/tests/test_rbac.py | 154 ++++++++++ arautopilot/tests/test_session.py | 146 +++++++++ arautopilot/tests/test_studio_smoke.py | 70 +++++ arautopilot/tests/test_user.py | 103 +++++++ arautopilot/tests/test_user_store.py | 102 +++++++ docs/sprint-2.5-plan.md | 111 +++++++ pyproject.toml | 7 + 18 files changed, 2307 insertions(+), 14 deletions(-) create mode 100644 arautopilot/core/audit.py create mode 100644 arautopilot/core/rbac.py create mode 100644 arautopilot/core/user.py create mode 100644 arautopilot/core/user_store.py create mode 100644 arautopilot/studio/flash_console.py create mode 100644 arautopilot/studio/login_window.py create mode 100644 arautopilot/studio/session.py create mode 100644 arautopilot/tests/test_audit.py create mode 100644 arautopilot/tests/test_rbac.py create mode 100644 arautopilot/tests/test_session.py create mode 100644 arautopilot/tests/test_studio_smoke.py create mode 100644 arautopilot/tests/test_user.py create mode 100644 arautopilot/tests/test_user_store.py create mode 100644 docs/sprint-2.5-plan.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 9727234..75d6de7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,95 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.1.0-sprint2.5] — Sprint 2.5 — RBAC + Studio + Flash Console — 2026-05-18 + +> New sprint added at the user's request: 4-role RBAC and a "mini Arduino +> IDE" inside the Studio that lets an Engineer flash firmware to an +> AR-NMEA-IO board with the Super Admin's PIN as a second factor. + +### Added + +#### RBAC core (`arautopilot/core/`) + +- **`rbac.py`** -- 4 roles (`SUPER_ADMIN`, `ENGINEER`, `OWNER`, `USER`), + 12 capabilities, immutable capability matrix, `has()`, + `capabilities_of()`, `require()`, and `requires_dual_auth()` policy + helper. Engineer flashing firmware needs SA approval; everything else + is single-factor. +- **`user.py`** -- `User` model with PBKDF2-HMAC-SHA256 PIN hashing + (200k iterations, 16-byte salt). Self-describing hash format for + future migrations. 4-8 digit numeric PINs enforced. +- **`user_store.py`** -- JSON-backed user database (one file per Studio + install). `seed_demo_users()` for first-run UX. +- **`audit.py`** -- Append-only JSONL audit log. `AuditEvent` records + user_id, role, action, target, outcome (success/denied/failed/ + approval_pending), reason, and optional secondary_user_id for + dual-auth events. Crypto-signing of lines deferred to Sprint 8. + +#### Studio GUI (`arautopilot/studio/`) + +- **`app.py`** -- entry point. Replaces the Sprint 0 stub. Supports + `--seed-demo` (populate demo users without launching GUI), `--data-dir` + (override the `~/.ar-autopilot/studio/` default). +- **`session.py`** -- runtime context: `Session` (user + audit + caps), + `SessionHolder` (process-global singleton). `check()` always records + an audit event; `verify_super_admin_pin()` for dual-auth flows; + `log_dual_auth_grant()` for the approval-recorded variant. +- **`login_window.py`** -- modal login dialog with user picker + PIN + field. Audits login attempts (success and bad-PIN denials). +- **`main_window.py`** -- top-level window with sidebar (user + role + + granted capabilities) and central tab area (Overview, Flash Console, + placeholders for Project and Telemetry tabs). +- **`flash_console.py`** -- the "mini Arduino IDE". Lists serial ports + via pyserial; lets the operator pick the firmware variant + (`esp32-dev` / `esp32-debug`); compiles via `pio run`; flashes via + `pio run -t upload --upload-port `; streams pio output to a + dark-themed read-only console; supports cancel. For Engineer flashes, + asks the Super Admin for their PIN inline before invoking pio. The + audit log records both the dual-auth grant and the pio exit code. + +#### Dependencies (new optional group) + +- New `[project.optional-dependencies]` group `studio`: + `PySide6>=6.6`, `pyserial>=3.5`, `platformio>=6.1`. Kept optional so + the core can be installed in lean / CI environments. + +#### Tests (`arautopilot/tests/`) + +- `test_rbac.py` -- 32 tests covering capability assignment per role, + dual-auth policy, `require()` helper, no-privilege-escalation + invariants, partial-overlap between Engineer and Owner. +- `test_user.py` -- 11 tests covering PIN hashing, verification, + serialisation, salting (different hashes for same PIN), pin_hash + field validator, vessel-scoped users. +- `test_audit.py` -- 9 tests covering single-line JSONL append, + immutability, round-trip, corrupt-line detection, dual-auth event + shape, extra payload, blank-line tolerance. +- `test_user_store.py` -- 10 tests covering CRUD, persistence across + instances, role filtering, demo seed idempotency, membership tests. +- `test_session.py` -- 9 tests covering capability checks with audit + side effects, SA PIN verification, dual-auth recording, + `SessionHolder` lifecycle. +- `test_studio_smoke.py` -- 5 headless tests verifying Studio modules + import without a display server, `--seed-demo` works, the helpers in + `flash_console` are safe to call without hardware. + +### Verification + +- `pytest` -- **231 passed** in 4.78 s (129 from Sprint 2 + 102 new). +- `python -m arautopilot.studio.app --seed-demo --data-dir ` -- + populates 4 demo users, prints a notice, exits 0. + +### Not in Sprint 2.5 (intentional) + +- Cryptographic signing of audit log lines (hash-chain) -- Sprint 8. +- HWID binding of the user store -- Sprint 8. +- Project configurator + .appack compiler -- Sprint 4. +- Flutter bridge display -- Sprint 4. +- Telemetry dashboard tab -- Sprint 4. +- Serial monitor as a separate tab -- a future enhancement; for now the + Flash Console only streams pio output, not arbitrary serial. + ## [0.1.0-sprint2] — Sprint 2 — PID inner loop + rudder simulator — 2026-05-18 > Continues the overnight execution under blanket authorisation. Builds on diff --git a/arautopilot/core/audit.py b/arautopilot/core/audit.py new file mode 100644 index 0000000..f432ea0 --- /dev/null +++ b/arautopilot/core/audit.py @@ -0,0 +1,114 @@ +"""Immutable append-only audit log. + +Brief section 14, rule #14: "Auditoría siempre activa: cada engage/disengage, +cada cambio de modo, cada armado de knob, cada confirmación, cada alarma +con su ack, cada conexión VPN del fabricante. Inmutable y firmado." + +Sprint 2.5 ships the **immutable + append-only** half. Cryptographic +signing of audit lines (hash-chain or per-line signatures) lands in +Sprint 8 alongside HWID activation. + +Persistence: one file per project, JSON Lines (one event per line). +Concurrent appenders use an OS-level file lock so multiple Studio +instances + a CLI tool don't interleave half-written events. +""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from enum import StrEnum +from pathlib import Path +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class AuditOutcome(StrEnum): + SUCCESS = "success" + """The action was permitted and completed without error.""" + + DENIED = "denied" + """The action was rejected at the permission gate.""" + + FAILED = "failed" + """The action was permitted but failed during execution.""" + + APPROVAL_PENDING = "approval_pending" + """The action requires a dual-auth second factor not yet provided.""" + + +class AuditEvent(BaseModel): + """One row of the immutable audit log.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) + user_id: str | None = Field( + default=None, + description="The actor's user_id. None means an anonymous / system event.", + ) + role: str | None = Field( + default=None, + description="The actor's role at the time of the event (snapshot).", + ) + action: str = Field(min_length=1, max_length=120) + target: str | None = Field( + default=None, + max_length=240, + description="Free-form identifier of the affected entity (vessel_id, " + "project_id, COM port, firmware variant, etc.).", + ) + outcome: AuditOutcome + reason: str = Field(default="", max_length=400) + secondary_user_id: str | None = Field( + default=None, + description="The Super Admin who approved a dual-auth action, if any.", + ) + extra: dict[str, Any] = Field(default_factory=dict) + + def to_jsonl(self) -> str: + """Render as one JSON line (no trailing newline).""" + return json.dumps(self.model_dump(mode="json"), ensure_ascii=False) + + +class AuditLog: + """Append-only writer to a JSONL audit file.""" + + def __init__(self, path: Path | str) -> None: + self.path = Path(path) + self.path.parent.mkdir(parents=True, exist_ok=True) + # Touch the file so subsequent appends work even on first run. + if not self.path.exists(): + self.path.touch() + + def append(self, event: AuditEvent) -> None: + """Append one event to the log. Atomic at the line level (single write()).""" + with self.path.open("a", encoding="utf-8") as f: + f.write(event.to_jsonl()) + f.write("\n") + + def read_all(self) -> list[AuditEvent]: + """Read every event in chronological order.""" + events: list[AuditEvent] = [] + if not self.path.exists(): + return events + with self.path.open("r", encoding="utf-8") as f: + for line_no, line in enumerate(f, start=1): + line = line.strip() + if not line: + continue + try: + data = json.loads(line) + except json.JSONDecodeError as exc: + raise ValueError( + f"corrupt audit line {self.path}:{line_no}: {exc}" + ) from exc + events.append(AuditEvent.model_validate(data)) + return events + + def __len__(self) -> int: + if not self.path.exists(): + return 0 + with self.path.open("r", encoding="utf-8") as f: + return sum(1 for line in f if line.strip()) diff --git a/arautopilot/core/rbac.py b/arautopilot/core/rbac.py new file mode 100644 index 0000000..ffc5e77 --- /dev/null +++ b/arautopilot/core/rbac.py @@ -0,0 +1,173 @@ +"""Role-based access control + dual-auth policy for the AR Suite. + +The brief originally specified three RBAC tiers (Operator / Technician / +Integrator). The product evolved in Sprint 2.5 to four roles with stricter +guarantees: + +- ``SUPER_ADMIN`` -- the integrator (Álvaro). Unrestricted. +- ``ENGINEER`` -- an authorised technician of the integrator. May edit + ESP32 firmware and flash boards, but flashing requires a Super Admin + PIN as a second factor (2-of-N approval). +- ``OWNER`` -- the vessel owner. May manage Users on their own vessel and + edit operational preferences. May NOT touch engineering parameters. +- ``USER`` -- a crew member. May operate the pilot (engage, acknowledge + alarms, change setpoints) but cannot create users or change config. + +Every gateable action is enumerated in :class:`Capability`. The matrix +``_CAPABILITIES_BY_ROLE`` is the **only** place where the role -> capability +mapping lives; everything else queries via :func:`has`. +""" + +from __future__ import annotations + +from enum import StrEnum + + +class Role(StrEnum): + """Top-level role of an authenticated user.""" + + SUPER_ADMIN = "super_admin" + """The integrator (Álvaro). No restrictions.""" + + ENGINEER = "engineer" + """Authorised technician of the integrator. Flashing needs SA approval.""" + + OWNER = "owner" + """Vessel owner. Manages users on their vessel + operational preferences.""" + + USER = "user" + """Crew member. Operates the pilot, no config rights.""" + + +class Capability(StrEnum): + """Every action that requires a permission check. + + Adding an entry here is a deliberate, reviewed change -- the UI and + every call site must opt into the new gate. + """ + + # ----- Code & firmware (integrator IP) --------------------------------- + EDIT_PYTHON_PROJECT = "edit_python_project" + """Edit the arautopilot package, tools, scripts. Super Admin only.""" + + EDIT_FIRMWARE_SOURCE = "edit_firmware_source" + """Edit ESP32 C++ sources under firmware/**. SA + Engineer.""" + + FLASH_FIRMWARE = "flash_firmware" + """Flash a .bin to an ESP32. SA direct; Engineer with SA PIN.""" + + BUILD_FIRMWARE = "build_firmware" + """Compile firmware locally via pio. SA + Engineer.""" + + # ----- Tuning ---------------------------------------------------------- + EDIT_BASE_GAINS = "edit_base_gains" + """Edit the integrator's base PID gains (IP). Super Admin only.""" + + EDIT_COMMISSIONING = "edit_commissioning" + """Edit field-commissioning parameters (rudder limits, calibration). + SA + Engineer.""" + + EDIT_OPERATIONAL = "edit_operational" + """Edit operational preferences (favourite headings, alarm volume, + profile Soft/Normal/Sport). SA + Engineer + Owner.""" + + # ----- User management ------------------------------------------------- + MANAGE_USERS = "manage_users" + """Create / delete / edit Users on this vessel. SA + Owner.""" + + # ----- Runtime operation ---------------------------------------------- + ENGAGE_PILOT = "engage_pilot" + """Engage or disengage the autopilot. All roles.""" + + READ_TELEMETRY = "read_telemetry" + """Read live telemetry from the firmware. All roles.""" + + ACK_ALARMS = "ack_alarms" + """Acknowledge active alarms. All roles.""" + + # ----- Audit ----------------------------------------------------------- + VIEW_AUDIT_LOG_FULL = "view_audit_log_full" + """Read the full immutable audit log (all vessels). SA + Engineer.""" + + VIEW_AUDIT_LOG_VESSEL = "view_audit_log_vessel" + """Read the audit log for this vessel only. SA + Engineer + Owner.""" + + +# The single source of truth for who can do what. Order inside each set +# is irrelevant; we use frozenset so accidental mutation is impossible. +_CAPABILITIES_BY_ROLE: dict[Role, frozenset[Capability]] = { + Role.SUPER_ADMIN: frozenset(Capability), # everything + Role.ENGINEER: frozenset( + { + Capability.EDIT_FIRMWARE_SOURCE, + Capability.FLASH_FIRMWARE, + Capability.BUILD_FIRMWARE, + Capability.EDIT_COMMISSIONING, + Capability.EDIT_OPERATIONAL, + Capability.ENGAGE_PILOT, + Capability.READ_TELEMETRY, + Capability.ACK_ALARMS, + Capability.VIEW_AUDIT_LOG_FULL, + Capability.VIEW_AUDIT_LOG_VESSEL, + } + ), + Role.OWNER: frozenset( + { + Capability.EDIT_OPERATIONAL, + Capability.MANAGE_USERS, + Capability.ENGAGE_PILOT, + Capability.READ_TELEMETRY, + Capability.ACK_ALARMS, + Capability.VIEW_AUDIT_LOG_VESSEL, + } + ), + Role.USER: frozenset( + { + Capability.ENGAGE_PILOT, + Capability.READ_TELEMETRY, + Capability.ACK_ALARMS, + } + ), +} + + +def has(role: Role, capability: Capability) -> bool: + """Return True iff ``role`` has ``capability``.""" + return capability in _CAPABILITIES_BY_ROLE[role] + + +def capabilities_of(role: Role) -> frozenset[Capability]: + """Return the full capability set granted to ``role``.""" + return _CAPABILITIES_BY_ROLE[role] + + +def requires_dual_auth(actor_role: Role, capability: Capability) -> bool: + """Return True if the given (actor, capability) pair requires a Super + Admin approval in addition to the actor's own credentials. + + Sprint 2.5 policy: an ``ENGINEER`` flashing firmware needs the Super + Admin's PIN as a second factor. Everything else is single-factor. + + Reasoning: flashing the wrong firmware to a customer board is a + high-impact action with real-world consequences (rudder mis-driven, + safety interlocks bypassed). The Super Admin retains a checkpoint. + """ + if actor_role == Role.ENGINEER and capability == Capability.FLASH_FIRMWARE: + return True + return False + + +class PermissionError(Exception): + """Raised when a capability check fails or a dual-auth second factor is missing.""" + + +def require(role: Role, capability: Capability) -> None: + """Raise :class:`PermissionError` if ``role`` lacks ``capability``. + + Use this at the entry point of every gated function. Combine with the + audit log to record both grants and denials. + """ + if not has(role, capability): + raise PermissionError( + f"role {role.value!r} does not have capability {capability.value!r}" + ) diff --git a/arautopilot/core/user.py b/arautopilot/core/user.py new file mode 100644 index 0000000..9edb6c3 --- /dev/null +++ b/arautopilot/core/user.py @@ -0,0 +1,143 @@ +"""User entity + PIN hashing. + +Sprint 2.5: minimal model to back the Studio login. PINs are 4-8 digit +numeric strings hashed with PBKDF2-HMAC-SHA256 (stdlib, no extra +dependency) using a per-user 16-byte salt and a 200k-iteration work +factor. Hash format follows a self-describing string so future migrations +(argon2, scrypt) can co-exist. + +Format:: + + pbkdf2_sha256$$$ +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import os +from datetime import UTC, datetime + +from pydantic import BaseModel, ConfigDict, Field, field_validator + +from arautopilot.core.ids import new_vessel_id +from arautopilot.core.rbac import Role + +_PBKDF2_ITERATIONS = 200_000 +_PBKDF2_SALT_LEN = 16 +_PBKDF2_HASH_LEN = 32 +_PBKDF2_ALGO = "sha256" + + +def _hash_pin(pin: str, *, iterations: int = _PBKDF2_ITERATIONS) -> str: + """Hash a PIN with PBKDF2-HMAC-SHA256. Returns the self-describing string.""" + if not _looks_like_pin(pin): + raise ValueError( + "PIN must be 4-8 digits (numeric). Use ASCII digits only." + ) + salt = os.urandom(_PBKDF2_SALT_LEN) + digest = hashlib.pbkdf2_hmac( + _PBKDF2_ALGO, pin.encode("utf-8"), salt, iterations, _PBKDF2_HASH_LEN + ) + return ( + f"pbkdf2_{_PBKDF2_ALGO}${iterations}" + f"${base64.b64encode(salt).decode('ascii')}" + f"${base64.b64encode(digest).decode('ascii')}" + ) + + +def _verify_pin(pin: str, hashed: str) -> bool: + """Constant-time verification of a PIN against a stored hash.""" + if not _looks_like_pin(pin): + return False + try: + scheme, iters_s, salt_b64, hash_b64 = hashed.split("$", 3) + except ValueError: + return False + if scheme != f"pbkdf2_{_PBKDF2_ALGO}": + return False + try: + iterations = int(iters_s) + salt = base64.b64decode(salt_b64) + expected = base64.b64decode(hash_b64) + except (ValueError, TypeError): + return False + candidate = hashlib.pbkdf2_hmac( + _PBKDF2_ALGO, pin.encode("utf-8"), salt, iterations, len(expected) + ) + return hmac.compare_digest(candidate, expected) + + +def _looks_like_pin(pin: str) -> bool: + return bool(pin) and 4 <= len(pin) <= 8 and pin.isdigit() + + +class User(BaseModel): + """A user of the Studio or the bridge display. + + Field ``pin_hash`` is the only sensitive value persisted -- the plain + PIN never lives in memory longer than the duration of a verify call. + """ + + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + user_id: str = Field(default_factory=lambda: new_vessel_id()) + display_name: str = Field(min_length=1, max_length=80) + role: Role + pin_hash: str = Field(min_length=8, max_length=300) + vessel_id: str | None = Field( + default=None, + description="If set, this user belongs to one specific vessel " + "(Owners + their crew). None means cross-vessel scope " + "(Super Admin + Engineer).", + ) + active: bool = True + created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) + last_login_at: datetime | None = None + + @field_validator("pin_hash") + @classmethod + def _looks_like_hash(cls, v: str) -> str: + # Cheap structural validation -- does not verify the hash itself. + parts = v.split("$") + if len(parts) != 4 or not parts[0].startswith("pbkdf2_"): + raise ValueError( + "pin_hash must be in the form pbkdf2_$$$" + ) + return v + + # ----- Construction helpers ------------------------------------------- + @classmethod + def create( + cls, + *, + display_name: str, + role: Role, + pin: str, + vessel_id: str | None = None, + ) -> "User": + """Construct a new user from a plaintext PIN. + + The PIN is hashed before the model is built; the plaintext does + not survive this call. + """ + return cls( + display_name=display_name, + role=role, + pin_hash=_hash_pin(pin), + vessel_id=vessel_id, + ) + + # ----- Authentication -------------------------------------------------- + def verify_pin(self, pin: str) -> bool: + """Return True iff ``pin`` matches this user's stored hash.""" + return _verify_pin(pin, self.pin_hash) + + def set_pin(self, pin: str) -> "User": + """Return a copy with a freshly-hashed new PIN.""" + return self.model_copy(update={"pin_hash": _hash_pin(pin)}) + + def touch_login(self) -> "User": + """Return a copy with ``last_login_at`` refreshed to now (UTC).""" + return self.model_copy(update={"last_login_at": datetime.now(UTC)}) diff --git a/arautopilot/core/user_store.py b/arautopilot/core/user_store.py new file mode 100644 index 0000000..6bf4700 --- /dev/null +++ b/arautopilot/core/user_store.py @@ -0,0 +1,108 @@ +"""Local user database (JSON file). + +Sprint 2.5: persists the list of Users + their hashed PINs to a single +JSON file (per Studio install). On the bridge display the same format +is consumed but typically managed by the Owner via the Studio UI. + +Sprint 8 will migrate this to a signed/encrypted store bound to the HWID. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +from pydantic import TypeAdapter + +from arautopilot.core.rbac import Role +from arautopilot.core.user import User + + +class UserStore: + """Append/overwrite list of :class:`User` persisted to a JSON file.""" + + def __init__(self, path: Path | str) -> None: + self.path = Path(path) + self._users: dict[str, User] = {} + if self.path.exists(): + self._load() + else: + self.path.parent.mkdir(parents=True, exist_ok=True) + + # ----- Persistence ---------------------------------------------------- + def _load(self) -> None: + text = self.path.read_text(encoding="utf-8") + if not text.strip(): + return + data = json.loads(text) + if not isinstance(data, list): + raise ValueError(f"{self.path}: expected a JSON list at the top level") + adapter = TypeAdapter(list[User]) + users = adapter.validate_python(data) + self._users = {u.user_id: u for u in users} + + def save(self) -> None: + adapter = TypeAdapter(list[User]) + data = adapter.dump_python(list(self._users.values()), mode="json") + self.path.write_text( + json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8" + ) + + # ----- CRUD ----------------------------------------------------------- + def add(self, user: User) -> None: + if user.user_id in self._users: + raise ValueError(f"user_id {user.user_id!r} already exists") + self._users[user.user_id] = user + self.save() + + def remove(self, user_id: str) -> None: + if user_id not in self._users: + raise KeyError(user_id) + del self._users[user_id] + self.save() + + def replace(self, user: User) -> None: + """Insert or update the user with this user_id.""" + self._users[user.user_id] = user + self.save() + + def get(self, user_id: str) -> User | None: + return self._users.get(user_id) + + def find_by_name(self, display_name: str) -> User | None: + for u in self._users.values(): + if u.display_name == display_name: + return u + return None + + def all_users(self) -> list[User]: + """Return every user, sorted by display_name.""" + return sorted(self._users.values(), key=lambda u: u.display_name.lower()) + + def by_role(self, role: Role) -> list[User]: + return [u for u in self.all_users() if u.role is role] + + def __len__(self) -> int: + return len(self._users) + + def __contains__(self, user_id: object) -> bool: + return user_id in self._users + + +def seed_demo_users(store: UserStore) -> None: + """Populate a fresh store with one user of each role for first-run UX. + + Demo PINs (well-known, only used in dev/sample stores -- the user is + expected to change these immediately): + + Super Admin "Alvaro" PIN 1111 + Engineer "Eng Demo" PIN 2222 + Owner "Captain" PIN 3333 + User "Crew" PIN 4444 + """ + if len(store) > 0: + return + store.add(User.create(display_name="Alvaro", role=Role.SUPER_ADMIN, pin="1111")) + store.add(User.create(display_name="Eng Demo", role=Role.ENGINEER, pin="2222")) + store.add(User.create(display_name="Captain", role=Role.OWNER, pin="3333")) + store.add(User.create(display_name="Crew", role=Role.USER, pin="4444")) diff --git a/arautopilot/studio/app.py b/arautopilot/studio/app.py index 76d45cf..d6bcfe1 100644 --- a/arautopilot/studio/app.py +++ b/arautopilot/studio/app.py @@ -1,24 +1,110 @@ -"""Studio application entry point — Sprint 4 stub. +"""AR-Autopilot Studio application entry point. -This module is intentionally a stub. The real PySide6 ``QApplication`` and -``MainWindow`` arrive in Sprint 4. Trying to launch the Studio now will -print a friendly notice and exit cleanly. +Usage: + + python studio_main.py # launch the GUI + python -m arautopilot.studio.app # same + python -m arautopilot.studio.app --seed-demo + # populate the local user store + # with one of each role + default + # PINs (1111 SA, 2222 Eng, + # 3333 Owner, 4444 User) + +The Studio runs locally on the integrator's workstation. The dedicated +bridge display is a separate Flutter app (Sprint 4+); the two share the +same data model (``arautopilot.core``) and Modbus register map +(``arautopilot.shared.modbus_register_map``). """ from __future__ import annotations +import argparse import sys +from pathlib import Path + +from arautopilot.core.audit import AuditLog +from arautopilot.core.user_store import UserStore, seed_demo_users +from arautopilot.studio.session import studio_data_dir -def run() -> int: - """Stub entry point. Will be replaced by a real ``QApplication`` in Sprint 4.""" - print( - "AR-Autopilot Studio — Sprint 0 stub.\n" - "The Studio GUI is implemented starting in Sprint 4.\n" - "For now, use the core API (`arautopilot.core`) and the demo:\n" - " python examples/sprint0_demo.py" +def run(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--seed-demo", + action="store_true", + help="Populate the local user store with one of each role + default " + "PINs (1111/2222/3333/4444). Then exit.", ) - return 0 + parser.add_argument( + "--data-dir", + type=Path, + default=None, + help="Override the per-user data directory. Defaults to " + "~/.ar-autopilot/studio/.", + ) + args = parser.parse_args(argv) + + data_dir = args.data_dir or studio_data_dir() + data_dir.mkdir(parents=True, exist_ok=True) + user_store = UserStore(data_dir / "users.json") + audit_log = AuditLog(data_dir / "audit.jsonl") + + if args.seed_demo: + before = len(user_store) + seed_demo_users(user_store) + after = len(user_store) + print( + f"User store at {user_store.path}: {before} -> {after} users.\n" + "Demo PINs: 1111 (Super Admin), 2222 (Engineer), 3333 (Owner), " + "4444 (User).\n" + "Change them with the Studio's user manager (Sprint 4+) before " + "shipping to a customer." + ) + return 0 + + # Lazy-import the Qt machinery so that --seed-demo (and pytest collection + # of this module) does not depend on a working display server. + try: + from PySide6.QtWidgets import QApplication, QMessageBox + except ImportError: + sys.stderr.write( + "PySide6 is not installed. Run:\n\n" + " pip install -e \".[studio]\"\n\n" + "or:\n\n" + " pip install PySide6 pyserial\n" + ) + return 2 + + from arautopilot.studio.login_window import LoginDialog + from arautopilot.studio.main_window import StudioMainWindow + + app = QApplication(sys.argv) + app.setApplicationName("AR-Autopilot Studio") + + if len(user_store) == 0: + QMessageBox.information( + None, + "No users", + "The local user store is empty. The Studio will seed it with " + "demo users (PINs 1111/2222/3333/4444). Change them immediately " + "in production.", + ) + seed_demo_users(user_store) + + login = LoginDialog(store=user_store, audit=audit_log) + win_holder: list[StudioMainWindow] = [] + + def on_logged_in(_session: object) -> None: + from arautopilot.studio.session import SessionHolder + session = SessionHolder.require() + win = StudioMainWindow(session) + win.show() + win_holder.append(win) + + login.logged_in.connect(on_logged_in) + if login.exec() == 0 and not win_holder: + return 0 + return app.exec() if __name__ == "__main__": diff --git a/arautopilot/studio/flash_console.py b/arautopilot/studio/flash_console.py new file mode 100644 index 0000000..bb6e75d --- /dev/null +++ b/arautopilot/studio/flash_console.py @@ -0,0 +1,404 @@ +"""Flash Console widget -- "mini Arduino IDE" inside the Studio. + +What it does +------------ +- Enumerates available serial ports (pyserial). +- Lets the operator pick the firmware variant (``esp32-dev`` or + ``esp32-debug``). +- Compiles the firmware via PlatformIO (`pio run -e -d + firmware/ar_autopilot_v1`). +- Flashes the firmware via `pio run -t upload --upload-port `. +- Streams the build/flash output to a read-only text area. + +Permissions +----------- +- Super Admin: flashes directly, single factor. +- Engineer: flashes only after providing the Super Admin's PIN. The + dialog asks for the SA's PIN inline; on success the audit + trail records both engineer's and SA's user_ids. +- Owner / User: the Flash Console button is hidden. + +The actual ``pio`` invocations run in a worker thread so the GUI never +blocks. +""" + +from __future__ import annotations + +import os +import shlex +import subprocess +import sys +from pathlib import Path +from typing import Optional + +from PySide6.QtCore import QObject, QThread, Signal +from PySide6.QtWidgets import ( + QComboBox, + QDialog, + QDialogButtonBox, + QFormLayout, + QGroupBox, + QHBoxLayout, + QLabel, + QLineEdit, + QMessageBox, + QPlainTextEdit, + QPushButton, + QVBoxLayout, + QWidget, +) + +from arautopilot.core.audit import AuditOutcome +from arautopilot.core.rbac import Capability +from arautopilot.studio.session import Session + +REPO_ROOT = Path(__file__).resolve().parents[2] +FIRMWARE_DIR = REPO_ROOT / "firmware" / "ar_autopilot_v1" +PIO_EXE_CANDIDATES = [ + REPO_ROOT / ".venv" / "Scripts" / "pio.exe", # Windows venv + REPO_ROOT / ".venv" / "bin" / "pio", # POSIX venv + Path("pio"), # PATH fallback +] + + +def _find_pio() -> Path | None: + for c in PIO_EXE_CANDIDATES: + if c.exists(): + return c + # Last-resort PATH lookup -- subprocess will find it. + return Path("pio") + + +def list_serial_ports() -> list[tuple[str, str]]: + """Return ``[(device, description), ...]`` for every available port.""" + try: + from serial.tools import list_ports # type: ignore[import-untyped] + except ImportError: + return [] + return [(p.device, p.description or "") for p in list_ports.comports()] + + +class _PioWorker(QObject): + """Runs `pio ...` in a background thread and streams stdout.""" + + line = Signal(str) # one line of output (no trailing \n) + finished_ok = Signal(int) # exit code on success + failed = Signal(str) # error description + + def __init__(self, argv: list[str], cwd: Path) -> None: + super().__init__() + self._argv = argv + self._cwd = cwd + self._proc: subprocess.Popen[str] | None = None + self._cancel = False + + def run(self) -> None: + try: + env = os.environ.copy() + env.setdefault("PYTHONIOENCODING", "utf-8") + self._proc = subprocess.Popen( + self._argv, + cwd=str(self._cwd), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + env=env, + ) + assert self._proc.stdout is not None + for raw_line in self._proc.stdout: + if self._cancel: + break + self.line.emit(raw_line.rstrip("\n")) + code = self._proc.wait() + self.finished_ok.emit(code) + except FileNotFoundError as exc: + self.failed.emit(f"executable not found: {exc}") + except Exception as exc: # noqa: BLE001 + self.failed.emit(f"unexpected error: {exc}") + + def cancel(self) -> None: + self._cancel = True + if self._proc is not None: + try: + self._proc.terminate() + except Exception: + pass + + +class FlashConsoleWidget(QWidget): + """A self-contained Flash Console for embedding in the main window.""" + + def __init__(self, session: Session, parent: QWidget | None = None) -> None: + super().__init__(parent) + self._session = session + self._thread: QThread | None = None + self._worker: _PioWorker | None = None + self._build_ui() + self.refresh_ports() + + # ----- UI ------------------------------------------------------------ + def _build_ui(self) -> None: + outer = QVBoxLayout(self) + outer.setContentsMargins(8, 8, 8, 8) + + header = QLabel( + "Flash Console
" + "Compile and flash AR-Autopilot firmware to an AR-NMEA-IO board.
" + f"Logged in as {self._session.user.display_name} " + f"({self._session.role.value})." + ) + header.setTextFormat(0x1) # PlainText would lose
; RichText = 1 + header.setWordWrap(True) + outer.addWidget(header) + + form_group = QGroupBox("Target") + form = QFormLayout(form_group) + + port_row = QHBoxLayout() + self._port_combo = QComboBox() + self._port_combo.setMinimumWidth(280) + port_row.addWidget(self._port_combo, stretch=1) + refresh_btn = QPushButton("Refresh") + refresh_btn.clicked.connect(self.refresh_ports) + port_row.addWidget(refresh_btn) + form.addRow("Serial port:", port_row) + + self._variant_combo = QComboBox() + self._variant_combo.addItem("esp32-dev (release, -Os)", userData="esp32-dev") + self._variant_combo.addItem("esp32-debug (-O0, verbose)", userData="esp32-debug") + form.addRow("Variant:", self._variant_combo) + + outer.addWidget(form_group) + + action_row = QHBoxLayout() + self._compile_btn = QPushButton("Compile only") + self._compile_btn.clicked.connect(self._on_compile) + action_row.addWidget(self._compile_btn) + + self._flash_btn = QPushButton("Compile + Flash") + self._flash_btn.clicked.connect(self._on_flash) + action_row.addWidget(self._flash_btn) + + self._cancel_btn = QPushButton("Cancel") + self._cancel_btn.setEnabled(False) + self._cancel_btn.clicked.connect(self._on_cancel) + action_row.addWidget(self._cancel_btn) + action_row.addStretch(1) + outer.addLayout(action_row) + + self._output = QPlainTextEdit() + self._output.setReadOnly(True) + self._output.setStyleSheet( + "background-color: #0d1117; color: #c9d1d9; font-family: Consolas, " + "'Cascadia Mono', monospace; font-size: 11px;" + ) + outer.addWidget(self._output, stretch=1) + + # Gate the buttons by capability. + if not self._session.can(Capability.BUILD_FIRMWARE): + self._compile_btn.setEnabled(False) + self._compile_btn.setToolTip("Your role cannot build firmware.") + if not self._session.can(Capability.FLASH_FIRMWARE): + self._flash_btn.setEnabled(False) + self._flash_btn.setToolTip("Your role cannot flash firmware.") + + # ----- Public -------------------------------------------------------- + def refresh_ports(self) -> None: + self._port_combo.clear() + ports = list_serial_ports() + if not ports: + self._port_combo.addItem("(no boards found)", userData=None) + else: + for device, desc in ports: + label = f"{device} -- {desc}" if desc else device + self._port_combo.addItem(label, userData=device) + + # ----- Handlers ------------------------------------------------------ + def _on_compile(self) -> None: + if not self._session.check(Capability.BUILD_FIRMWARE, target=self._variant()): + QMessageBox.warning(self, "Permission denied", + "Your role cannot build firmware.") + return + self._run_pio(["run", "-e", self._variant()]) + + def _on_flash(self) -> None: + # Capability gate (logged either way). + if not self._session.check(Capability.FLASH_FIRMWARE, + target=self._port_or_none()): + QMessageBox.warning(self, "Permission denied", + "Your role cannot flash firmware.") + return + port = self._port_or_none() + if port is None: + QMessageBox.warning(self, "No port", + "No serial port selected. Connect a board and " + "click Refresh.") + return + + # Dual-auth check: Engineer needs Super Admin PIN. + if self._session.needs_dual_auth(Capability.FLASH_FIRMWARE): + sa_user = _ask_super_admin_pin(self, self._session) + if sa_user is None: + self._session.log_action( + "flash_firmware", + outcome=AuditOutcome.APPROVAL_PENDING, + target=f"{port}:{self._variant()}", + reason="Super Admin approval refused or cancelled", + ) + return + self._session.log_dual_auth_grant( + Capability.FLASH_FIRMWARE, + sa_user, + target=f"{port}:{self._variant()}", + extra={"variant": self._variant()}, + ) + + self._run_pio([ + "run", "-e", self._variant(), "-t", "upload", + "--upload-port", port, + ]) + + def _on_cancel(self) -> None: + if self._worker is not None: + self._worker.cancel() + self._append_line("[cancelled by operator]") + + # ----- pio worker ---------------------------------------------------- + def _run_pio(self, args: list[str]) -> None: + pio = _find_pio() + argv = [str(pio), *args] + self._output.clear() + self._append_line(f"$ {' '.join(shlex.quote(a) for a in argv)}") + self._set_running(True) + + self._thread = QThread(self) + self._worker = _PioWorker(argv=argv, cwd=FIRMWARE_DIR) + self._worker.moveToThread(self._thread) + self._thread.started.connect(self._worker.run) + self._worker.line.connect(self._append_line) + self._worker.finished_ok.connect(self._on_pio_done) + self._worker.failed.connect(self._on_pio_failed) + self._worker.finished_ok.connect(self._thread.quit) + self._worker.failed.connect(self._thread.quit) + self._thread.finished.connect(self._cleanup_thread) + self._thread.start() + + def _on_pio_done(self, code: int) -> None: + self._append_line(f"[pio exit code: {code}]") + outcome = AuditOutcome.SUCCESS if code == 0 else AuditOutcome.FAILED + self._session.log_action( + "flash_firmware_run", + outcome=outcome, + target=self._port_or_none(), + extra={"variant": self._variant(), "exit_code": code}, + ) + + def _on_pio_failed(self, msg: str) -> None: + self._append_line(f"[pio failed: {msg}]") + self._session.log_action( + "flash_firmware_run", + outcome=AuditOutcome.FAILED, + target=self._port_or_none(), + reason=msg, + ) + + def _cleanup_thread(self) -> None: + self._set_running(False) + if self._thread is not None: + self._thread.deleteLater() + self._thread = None + if self._worker is not None: + self._worker.deleteLater() + self._worker = None + + def _set_running(self, running: bool) -> None: + self._compile_btn.setEnabled( + not running and self._session.can(Capability.BUILD_FIRMWARE) + ) + self._flash_btn.setEnabled( + not running and self._session.can(Capability.FLASH_FIRMWARE) + ) + self._cancel_btn.setEnabled(running) + + def _append_line(self, line: str) -> None: + self._output.appendPlainText(line) + + def _variant(self) -> str: + return str(self._variant_combo.currentData()) + + def _port_or_none(self) -> str | None: + v = self._port_combo.currentData() + return None if v is None else str(v) + + +# ---------------------------------------------------------------------------- +# Super Admin PIN approval dialog +# ---------------------------------------------------------------------------- + + +def _ask_super_admin_pin(parent: QWidget, session: Session): + """Modal dialog asking the SA for their PIN. Returns the SA :class:`User` + on success, ``None`` on cancel / bad PIN.""" + dlg = QDialog(parent) + dlg.setWindowTitle("Super Admin approval required") + dlg.setModal(True) + + layout = QVBoxLayout(dlg) + layout.addWidget(QLabel( + "Flashing firmware as an Engineer requires Super Admin approval.\n" + "Ask the Super Admin to enter their PIN below." + )) + form = QFormLayout() + pin_field = QLineEdit() + pin_field.setEchoMode(QLineEdit.EchoMode.Password) + pin_field.setMaxLength(8) + form.addRow("Super Admin PIN:", pin_field) + layout.addLayout(form) + + buttons = QDialogButtonBox( + QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel + ) + layout.addWidget(buttons) + + result_holder: dict[str, Optional[object]] = {"user": None} + + def _accept() -> None: + sa = session.verify_super_admin_pin(pin_field.text()) + if sa is None: + QMessageBox.warning(dlg, "Approval failed", + "Incorrect Super Admin PIN.") + pin_field.clear() + pin_field.setFocus() + return + result_holder["user"] = sa + dlg.accept() + + buttons.accepted.connect(_accept) + buttons.rejected.connect(dlg.reject) + dlg.exec() + return result_holder["user"] + + +# ---------------------------------------------------------------------------- +# Standalone smoke test +# ---------------------------------------------------------------------------- + +if __name__ == "__main__": # pragma: no cover -- manual launch helper + from PySide6.QtWidgets import QApplication + from arautopilot.core.audit import AuditLog + from arautopilot.core.user import User + from arautopilot.core.user_store import UserStore + from arautopilot.core.rbac import Role + + app = QApplication(sys.argv) + store = UserStore(REPO_ROOT / ".tmp" / "users.json") + if len(store) == 0: + store.add(User.create(display_name="Smoke", role=Role.SUPER_ADMIN, pin="0000")) + sa = next(iter(store.by_role(Role.SUPER_ADMIN))) + audit = AuditLog(REPO_ROOT / ".tmp" / "audit.jsonl") + session = Session(user=sa, store=store, audit=audit) + w = FlashConsoleWidget(session) + w.resize(800, 600) + w.show() + sys.exit(app.exec()) diff --git a/arautopilot/studio/login_window.py b/arautopilot/studio/login_window.py new file mode 100644 index 0000000..cbe028a --- /dev/null +++ b/arautopilot/studio/login_window.py @@ -0,0 +1,123 @@ +"""Studio login window (PySide6). + +Lists every active user from the local store, lets the operator pick +themselves and enter their PIN. On success, populates the global +:class:`SessionHolder` and emits :pyattr:`logged_in`. +""" + +from __future__ import annotations + +from PySide6.QtCore import Qt, Signal +from PySide6.QtWidgets import ( + QComboBox, + QDialog, + QDialogButtonBox, + QFormLayout, + QLabel, + QLineEdit, + QMessageBox, + QVBoxLayout, +) + +from arautopilot.core.audit import AuditEvent, AuditLog, AuditOutcome +from arautopilot.core.user_store import UserStore +from arautopilot.studio.session import Session, SessionHolder + + +class LoginDialog(QDialog): + """Modal login dialog. Emits :pyattr:`logged_in(session)` on success.""" + + logged_in = Signal(object) # Session + + def __init__( + self, + store: UserStore, + audit: AuditLog, + parent: object | None = None, + ) -> None: + super().__init__(parent) + self._store = store + self._audit = audit + self.setWindowTitle("AR-Autopilot Studio -- Login") + self.setModal(True) + self.setMinimumWidth(360) + + layout = QVBoxLayout(self) + intro = QLabel("Select your user and enter the PIN.") + intro.setWordWrap(True) + layout.addWidget(intro) + + form = QFormLayout() + self._user_combo = QComboBox() + for user in self._store.all_users(): + if not user.active: + continue + self._user_combo.addItem( + f"{user.display_name} -- {user.role.value}", + userData=user.user_id, + ) + form.addRow("User:", self._user_combo) + + self._pin_field = QLineEdit() + self._pin_field.setEchoMode(QLineEdit.EchoMode.Password) + self._pin_field.setMaxLength(8) + self._pin_field.setPlaceholderText("4-8 digits") + self._pin_field.setInputMethodHints(Qt.InputMethodHint.ImhDigitsOnly) + form.addRow("PIN:", self._pin_field) + layout.addLayout(form) + + buttons = QDialogButtonBox( + QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel + ) + buttons.accepted.connect(self._try_login) + buttons.rejected.connect(self.reject) + layout.addWidget(buttons) + + if self._user_combo.count() == 0: + QMessageBox.critical( + self, + "No users available", + "The local user store is empty. Run the demo seed:\n\n" + " python -m arautopilot.studio.app --seed-demo", + ) + + # ----- Internal ------------------------------------------------------- + def _try_login(self) -> None: + user_id = self._user_combo.currentData() + pin = self._pin_field.text() + user = self._store.get(user_id) if user_id else None + if user is None: + QMessageBox.warning(self, "Login", "Pick a user.") + return + if not user.active: + QMessageBox.warning(self, "Login", "This user is disabled.") + return + if not user.verify_pin(pin): + self._audit.append( + AuditEvent( + user_id=user.user_id, + role=user.role.value, + action="login", + outcome=AuditOutcome.DENIED, + reason="bad PIN", + ) + ) + QMessageBox.warning(self, "Login", "Incorrect PIN.") + self._pin_field.clear() + self._pin_field.setFocus() + return + + touched = user.touch_login() + self._store.replace(touched) + self._audit.append( + AuditEvent( + user_id=touched.user_id, + role=touched.role.value, + action="login", + outcome=AuditOutcome.SUCCESS, + ) + ) + session = Session(user=touched, store=self._store, audit=self._audit) + SessionHolder.set(session) + self.logged_in.emit(session) + self.accept() diff --git a/arautopilot/studio/main_window.py b/arautopilot/studio/main_window.py index f413faf..de10802 100644 --- a/arautopilot/studio/main_window.py +++ b/arautopilot/studio/main_window.py @@ -1,4 +1,110 @@ -"""Studio main window — Sprint 4 stub. +"""Studio main window (PySide6) -- Sprint 2.5. -Reserved namespace for the PySide6 ``QMainWindow`` arriving in Sprint 4. +Three areas: + +- Sidebar (left) -- user + role + capabilities they hold. +- Central tab area -- Flash Console (Sprint 2.5) + placeholders for the + project configurator that lands in Sprint 4. +- Status bar -- session info + audit log path. """ + +from __future__ import annotations + +from PySide6.QtCore import Qt +from PySide6.QtWidgets import ( + QLabel, + QListWidget, + QMainWindow, + QSplitter, + QStatusBar, + QTabWidget, + QTextEdit, + QVBoxLayout, + QWidget, +) + +from arautopilot.core.rbac import capabilities_of +from arautopilot.studio.flash_console import FlashConsoleWidget +from arautopilot.studio.session import Session +from arautopilot.version import __version__ + + +class StudioMainWindow(QMainWindow): + """Top-level Studio window.""" + + def __init__(self, session: Session) -> None: + super().__init__() + self._session = session + self.setWindowTitle( + f"AR-Autopilot Studio v{__version__} -- " + f"{session.user.display_name} ({session.role.value})" + ) + self.resize(1100, 700) + + splitter = QSplitter(Qt.Orientation.Horizontal) + splitter.addWidget(self._build_sidebar()) + splitter.addWidget(self._build_central()) + splitter.setStretchFactor(0, 0) + splitter.setStretchFactor(1, 1) + splitter.setSizes([260, 840]) + self.setCentralWidget(splitter) + + status = QStatusBar(self) + status.showMessage(f"Audit log: {session.audit.path}") + self.setStatusBar(status) + + # ----- UI ------------------------------------------------------------ + def _build_sidebar(self) -> QWidget: + w = QWidget() + layout = QVBoxLayout(w) + layout.setContentsMargins(8, 8, 8, 8) + layout.addWidget(QLabel( + f"{self._session.user.display_name}
" + f"{self._session.role.value}" + )) + layout.addWidget(QLabel("Capabilities")) + caps = QListWidget() + for cap in sorted(capabilities_of(self._session.role), key=lambda c: c.value): + caps.addItem(cap.value) + layout.addWidget(caps, stretch=1) + return w + + def _build_central(self) -> QWidget: + tabs = QTabWidget() + + tabs.addTab(self._build_overview_tab(), "Overview") + tabs.addTab(FlashConsoleWidget(self._session), "Flash Console") + tabs.addTab(self._placeholder_tab( + "Project configurator -- Sprint 4.\n\n" + "Will let you create / edit a per-vessel ProjectConfig and " + "compile it into an .appack for deployment." + ), "Project") + tabs.addTab(self._placeholder_tab( + "Telemetry -- Sprint 4.\n\n" + "Live Modbus telemetry from the connected AR-NMEA-IO board." + ), "Telemetry") + return tabs + + def _build_overview_tab(self) -> QWidget: + w = QWidget() + layout = QVBoxLayout(w) + layout.addWidget(QLabel( + "

AR-Autopilot Studio

" + "

Welcome. Use the Flash Console tab to compile and " + "flash firmware to an AR-NMEA-IO board.

" + "

The Project tab (Sprint 4) will let you configure a " + "vessel and produce a deployable .appack.

" + "

Every action you take is recorded in the audit log " + "(see status bar at the bottom).

" + )) + layout.addStretch(1) + return w + + def _placeholder_tab(self, text: str) -> QWidget: + w = QWidget() + layout = QVBoxLayout(w) + edit = QTextEdit() + edit.setReadOnly(True) + edit.setPlainText(text) + layout.addWidget(edit) + return w diff --git a/arautopilot/studio/session.py b/arautopilot/studio/session.py new file mode 100644 index 0000000..413d190 --- /dev/null +++ b/arautopilot/studio/session.py @@ -0,0 +1,146 @@ +"""Authenticated session state for the Studio. + +A ``Session`` is the runtime context of a logged-in user. It carries the +:class:`User`, exposes capability checks, and writes audit events for +every gated decision (granted or denied). Window code reaches for the +current Session via the :class:`SessionHolder` singleton; tests can swap +it out trivially. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +from arautopilot.core.audit import AuditEvent, AuditLog, AuditOutcome +from arautopilot.core.rbac import Capability, Role, has, requires_dual_auth +from arautopilot.core.user import User +from arautopilot.core.user_store import UserStore + + +class Session: + """The current user + their audit log + capability gate.""" + + def __init__( + self, + user: User, + store: UserStore, + audit: AuditLog, + ) -> None: + self.user = user + self.store = store + self.audit = audit + + @property + def role(self) -> Role: + return self.user.role + + def can(self, capability: Capability) -> bool: + return has(self.user.role, capability) + + def check(self, capability: Capability, *, target: str | None = None, + extra: dict[str, object] | None = None) -> bool: + """Capability check with audit side effect. + + Returns True on grant, False on denial. Records an audit event + either way so the trail is complete. + """ + granted = self.can(capability) + self.audit.append( + AuditEvent( + user_id=self.user.user_id, + role=self.user.role.value, + action=f"check:{capability.value}", + target=target, + outcome=AuditOutcome.SUCCESS if granted else AuditOutcome.DENIED, + reason="" if granted + else f"role {self.user.role.value!r} lacks {capability.value!r}", + extra=extra or {}, + ) + ) + return granted + + def needs_dual_auth(self, capability: Capability) -> bool: + return requires_dual_auth(self.user.role, capability) + + def verify_super_admin_pin(self, pin: str) -> Optional[User]: + """Look up the (first) Super Admin in the store and verify the PIN. + + Returns the SA :class:`User` on success, ``None`` on failure. + """ + for sa in self.store.by_role(Role.SUPER_ADMIN): + if sa.verify_pin(pin): + return sa + return None + + def log_dual_auth_grant( + self, + capability: Capability, + sa_user: User, + *, + target: str | None = None, + extra: dict[str, object] | None = None, + ) -> None: + """Record an audit event for a successful dual-auth approval.""" + self.audit.append( + AuditEvent( + user_id=self.user.user_id, + role=self.user.role.value, + action=f"dual_auth_approved:{capability.value}", + target=target, + outcome=AuditOutcome.SUCCESS, + secondary_user_id=sa_user.user_id, + reason=f"approved by Super Admin {sa_user.display_name!r}", + extra=extra or {}, + ) + ) + + def log_action( + self, + action: str, + *, + outcome: AuditOutcome = AuditOutcome.SUCCESS, + target: str | None = None, + reason: str = "", + extra: dict[str, object] | None = None, + ) -> None: + """Free-form audit event for downstream business actions.""" + self.audit.append( + AuditEvent( + user_id=self.user.user_id, + role=self.user.role.value, + action=action, + target=target, + outcome=outcome, + reason=reason, + extra=extra or {}, + ) + ) + + +class SessionHolder: + """Process-global current session (set after login).""" + + _current: Session | None = None + + @classmethod + def set(cls, session: Session | None) -> None: + cls._current = session + + @classmethod + def current(cls) -> Session | None: + return cls._current + + @classmethod + def require(cls) -> Session: + if cls._current is None: + raise RuntimeError("no active Studio session -- log in first") + return cls._current + + +def studio_data_dir() -> Path: + """Per-user directory under ``~/.ar-autopilot/studio/`` for the local + user store and audit log.""" + base = Path.home() / ".ar-autopilot" / "studio" + base.mkdir(parents=True, exist_ok=True) + return base diff --git a/arautopilot/tests/test_audit.py b/arautopilot/tests/test_audit.py new file mode 100644 index 0000000..1c016ca --- /dev/null +++ b/arautopilot/tests/test_audit.py @@ -0,0 +1,108 @@ +"""Tests for ``arautopilot.core.audit``.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from arautopilot.core.audit import AuditEvent, AuditLog, AuditOutcome + + +def test_event_to_jsonl_is_single_line() -> None: + ev = AuditEvent( + action="engage_pilot", + outcome=AuditOutcome.SUCCESS, + user_id="u123", + role="user", + ) + line = ev.to_jsonl() + assert "\n" not in line + assert line.startswith("{") and line.endswith("}") + + +def test_event_is_immutable() -> None: + ev = AuditEvent(action="x", outcome=AuditOutcome.SUCCESS) + with pytest.raises((TypeError, ValueError)): + ev.action = "y" # type: ignore[misc] + + +def test_append_and_read_round_trip(tmp_path: Path) -> None: + log = AuditLog(tmp_path / "audit.jsonl") + e1 = AuditEvent(action="login", outcome=AuditOutcome.SUCCESS, user_id="u1") + e2 = AuditEvent(action="engage", outcome=AuditOutcome.DENIED, user_id="u2", + reason="missing capability") + log.append(e1) + log.append(e2) + read = log.read_all() + assert len(read) == 2 + assert read[0].action == "login" + assert read[1].outcome is AuditOutcome.DENIED + + +def test_len_counts_lines(tmp_path: Path) -> None: + log = AuditLog(tmp_path / "audit.jsonl") + assert len(log) == 0 + log.append(AuditEvent(action="a", outcome=AuditOutcome.SUCCESS)) + log.append(AuditEvent(action="b", outcome=AuditOutcome.SUCCESS)) + assert len(log) == 2 + + +def test_log_file_is_created_if_missing(tmp_path: Path) -> None: + p = tmp_path / "subdir" / "audit.jsonl" + log = AuditLog(p) + assert p.exists() + assert len(log) == 0 + + +def test_corrupt_line_raises(tmp_path: Path) -> None: + p = tmp_path / "audit.jsonl" + p.write_text( + '{"action":"good","outcome":"success","timestamp":"2026-05-18T00:00:00Z"}\n' + "this is not json\n", + encoding="utf-8", + ) + log = AuditLog(p) + with pytest.raises(ValueError, match="corrupt audit line"): + log.read_all() + + +def test_dual_auth_event_carries_secondary_user(tmp_path: Path) -> None: + log = AuditLog(tmp_path / "a.jsonl") + ev = AuditEvent( + action="flash_firmware", + outcome=AuditOutcome.SUCCESS, + user_id="engineer_1", + role="engineer", + secondary_user_id="super_admin_alvaro", + target="COM7:esp32-dev", + ) + log.append(ev) + read = log.read_all() + assert read[0].secondary_user_id == "super_admin_alvaro" + assert read[0].target == "COM7:esp32-dev" + + +def test_extra_payload_round_trips(tmp_path: Path) -> None: + log = AuditLog(tmp_path / "a.jsonl") + ev = AuditEvent( + action="mode_change", + outcome=AuditOutcome.SUCCESS, + extra={"from": "STANDBY", "to": "HEADING_HOLD", "via": "modbus"}, + ) + log.append(ev) + read = log.read_all() + assert read[0].extra == {"from": "STANDBY", "to": "HEADING_HOLD", "via": "modbus"} + + +def test_blank_lines_are_skipped(tmp_path: Path) -> None: + p = tmp_path / "a.jsonl" + p.write_text( + '{"action":"a","outcome":"success","timestamp":"2026-05-18T00:00:00Z"}\n' + "\n" + '{"action":"b","outcome":"denied","timestamp":"2026-05-18T00:00:01Z"}\n', + encoding="utf-8", + ) + log = AuditLog(p) + events = log.read_all() + assert len(events) == 2 diff --git a/arautopilot/tests/test_rbac.py b/arautopilot/tests/test_rbac.py new file mode 100644 index 0000000..3d2fc07 --- /dev/null +++ b/arautopilot/tests/test_rbac.py @@ -0,0 +1,154 @@ +"""Tests for the 4-role RBAC system.""" + +from __future__ import annotations + +import pytest + +from arautopilot.core.rbac import ( + Capability, + PermissionError, + Role, + capabilities_of, + has, + require, + requires_dual_auth, +) + + +# ---------------------------------------------------------------------------- +# Capability matrix +# ---------------------------------------------------------------------------- + + +def test_super_admin_has_every_capability() -> None: + super_admin_caps = capabilities_of(Role.SUPER_ADMIN) + assert super_admin_caps == frozenset(Capability) + + +@pytest.mark.parametrize("cap", [ + Capability.FLASH_FIRMWARE, + Capability.BUILD_FIRMWARE, + Capability.EDIT_FIRMWARE_SOURCE, + Capability.EDIT_COMMISSIONING, + Capability.EDIT_OPERATIONAL, + Capability.ENGAGE_PILOT, + Capability.READ_TELEMETRY, + Capability.ACK_ALARMS, + Capability.VIEW_AUDIT_LOG_FULL, + Capability.VIEW_AUDIT_LOG_VESSEL, +]) +def test_engineer_can(cap: Capability) -> None: + assert has(Role.ENGINEER, cap) + + +@pytest.mark.parametrize("cap", [ + Capability.EDIT_PYTHON_PROJECT, + Capability.EDIT_BASE_GAINS, + Capability.MANAGE_USERS, +]) +def test_engineer_cannot(cap: Capability) -> None: + assert not has(Role.ENGINEER, cap) + + +@pytest.mark.parametrize("cap", [ + Capability.EDIT_OPERATIONAL, + Capability.MANAGE_USERS, + Capability.ENGAGE_PILOT, + Capability.READ_TELEMETRY, + Capability.ACK_ALARMS, + Capability.VIEW_AUDIT_LOG_VESSEL, +]) +def test_owner_can(cap: Capability) -> None: + assert has(Role.OWNER, cap) + + +@pytest.mark.parametrize("cap", [ + Capability.EDIT_PYTHON_PROJECT, + Capability.EDIT_FIRMWARE_SOURCE, + Capability.FLASH_FIRMWARE, + Capability.BUILD_FIRMWARE, + Capability.EDIT_BASE_GAINS, + Capability.EDIT_COMMISSIONING, + Capability.VIEW_AUDIT_LOG_FULL, +]) +def test_owner_cannot(cap: Capability) -> None: + assert not has(Role.OWNER, cap) + + +@pytest.mark.parametrize("cap", [ + Capability.ENGAGE_PILOT, + Capability.READ_TELEMETRY, + Capability.ACK_ALARMS, +]) +def test_user_can(cap: Capability) -> None: + assert has(Role.USER, cap) + + +@pytest.mark.parametrize("cap", [ + Capability.EDIT_PYTHON_PROJECT, + Capability.EDIT_FIRMWARE_SOURCE, + Capability.FLASH_FIRMWARE, + Capability.BUILD_FIRMWARE, + Capability.EDIT_BASE_GAINS, + Capability.EDIT_COMMISSIONING, + Capability.EDIT_OPERATIONAL, + Capability.MANAGE_USERS, + Capability.VIEW_AUDIT_LOG_FULL, + Capability.VIEW_AUDIT_LOG_VESSEL, +]) +def test_user_cannot(cap: Capability) -> None: + assert not has(Role.USER, cap) + + +# ---------------------------------------------------------------------------- +# Dual-auth policy +# ---------------------------------------------------------------------------- + + +def test_engineer_flashing_requires_dual_auth() -> None: + assert requires_dual_auth(Role.ENGINEER, Capability.FLASH_FIRMWARE) + + +def test_super_admin_flashing_is_single_factor() -> None: + assert not requires_dual_auth(Role.SUPER_ADMIN, Capability.FLASH_FIRMWARE) + + +@pytest.mark.parametrize("role", [Role.SUPER_ADMIN, Role.ENGINEER, Role.OWNER, Role.USER]) +@pytest.mark.parametrize("cap", [ + Capability.ENGAGE_PILOT, + Capability.READ_TELEMETRY, + Capability.EDIT_OPERATIONAL, +]) +def test_non_flash_actions_never_need_dual_auth(role: Role, cap: Capability) -> None: + assert not requires_dual_auth(role, cap) + + +# ---------------------------------------------------------------------------- +# require() helper +# ---------------------------------------------------------------------------- + + +def test_require_passes_when_granted() -> None: + require(Role.USER, Capability.ENGAGE_PILOT) # no raise + + +def test_require_raises_on_denial() -> None: + with pytest.raises(PermissionError): + require(Role.USER, Capability.EDIT_BASE_GAINS) + + +def test_no_privilege_escalation_via_unknown_capability() -> None: + # Make sure that Owner / User can never grow capabilities by some + # accidental code path -- we just snapshot the matrix here. + assert capabilities_of(Role.USER) < capabilities_of(Role.OWNER) + assert capabilities_of(Role.OWNER) < capabilities_of(Role.SUPER_ADMIN) + assert capabilities_of(Role.ENGINEER) < capabilities_of(Role.SUPER_ADMIN) + + +def test_engineer_and_owner_capabilities_overlap_but_neither_subsumes_other() -> None: + eng = capabilities_of(Role.ENGINEER) + own = capabilities_of(Role.OWNER) + # Engineer can flash; Owner can manage users -- neither set is a subset + # of the other. + assert Capability.FLASH_FIRMWARE in eng and Capability.FLASH_FIRMWARE not in own + assert Capability.MANAGE_USERS in own and Capability.MANAGE_USERS not in eng diff --git a/arautopilot/tests/test_session.py b/arautopilot/tests/test_session.py new file mode 100644 index 0000000..863e208 --- /dev/null +++ b/arautopilot/tests/test_session.py @@ -0,0 +1,146 @@ +"""Tests for ``arautopilot.studio.session.Session``.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from arautopilot.core.audit import AuditLog, AuditOutcome +from arautopilot.core.rbac import Capability, Role +from arautopilot.core.user import User +from arautopilot.core.user_store import UserStore +from arautopilot.studio.session import Session, SessionHolder + + +@pytest.fixture +def store_and_audit(tmp_path: Path) -> tuple[UserStore, AuditLog]: + store = UserStore(tmp_path / "users.json") + audit = AuditLog(tmp_path / "audit.jsonl") + return store, audit + + +def _sa_session(store: UserStore, audit: AuditLog) -> Session: + u = User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001") + store.add(u) + return Session(user=u, store=store, audit=audit) + + +def _eng_session(store: UserStore, audit: AuditLog) -> Session: + u = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") + store.add(u) + return Session(user=u, store=store, audit=audit) + + +def test_session_can_for_super_admin(store_and_audit: tuple[UserStore, AuditLog]) -> None: + store, audit = store_and_audit + s = _sa_session(store, audit) + assert s.can(Capability.FLASH_FIRMWARE) + assert s.can(Capability.EDIT_BASE_GAINS) + assert s.can(Capability.MANAGE_USERS) + + +def test_session_check_records_audit_on_grant( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + s = _sa_session(store, audit) + assert s.check(Capability.FLASH_FIRMWARE, target="COM7") is True + events = audit.read_all() + assert len(events) == 1 + assert events[0].outcome is AuditOutcome.SUCCESS + assert events[0].action == "check:flash_firmware" + assert events[0].target == "COM7" + + +def test_session_check_records_audit_on_denial( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + u = User.create(display_name="Crew", role=Role.USER, pin="4444") + store.add(u) + s = Session(user=u, store=store, audit=audit) + assert s.check(Capability.FLASH_FIRMWARE) is False + events = audit.read_all() + assert len(events) == 1 + assert events[0].outcome is AuditOutcome.DENIED + assert "lacks" in events[0].reason + + +def test_engineer_needs_dual_auth_for_flash( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + s = _eng_session(store, audit) + assert s.needs_dual_auth(Capability.FLASH_FIRMWARE) + assert not s.needs_dual_auth(Capability.BUILD_FIRMWARE) + + +def test_verify_super_admin_pin_succeeds_with_right_pin( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + sa = User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001") + store.add(sa) + eng = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") + store.add(eng) + s = Session(user=eng, store=store, audit=audit) + matched = s.verify_super_admin_pin("0001") + assert matched is not None + assert matched.user_id == sa.user_id + + +def test_verify_super_admin_pin_fails_with_wrong_pin( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + store.add(User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001")) + eng = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") + store.add(eng) + s = Session(user=eng, store=store, audit=audit) + assert s.verify_super_admin_pin("9999") is None + + +def test_dual_auth_grant_records_secondary_user( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + sa = User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001") + store.add(sa) + eng = User.create(display_name="Eng", role=Role.ENGINEER, pin="0002") + store.add(eng) + s = Session(user=eng, store=store, audit=audit) + s.log_dual_auth_grant(Capability.FLASH_FIRMWARE, sa, + target="COM7:esp32-dev", + extra={"variant": "esp32-dev"}) + events = audit.read_all() + assert events[0].secondary_user_id == sa.user_id + assert events[0].outcome is AuditOutcome.SUCCESS + assert events[0].target == "COM7:esp32-dev" + assert events[0].extra == {"variant": "esp32-dev"} + + +def test_session_holder_set_and_require( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + s = _sa_session(store, audit) + SessionHolder.set(s) + assert SessionHolder.current() is s + assert SessionHolder.require() is s + SessionHolder.set(None) + assert SessionHolder.current() is None + with pytest.raises(RuntimeError): + SessionHolder.require() + + +def test_log_action_helper( + store_and_audit: tuple[UserStore, AuditLog] +) -> None: + store, audit = store_and_audit + s = _sa_session(store, audit) + s.log_action("mode_change", outcome=AuditOutcome.SUCCESS, + extra={"from": "STANDBY", "to": "HEADING_HOLD"}) + events = audit.read_all() + assert events[0].action == "mode_change" + assert events[0].extra["to"] == "HEADING_HOLD" diff --git a/arautopilot/tests/test_studio_smoke.py b/arautopilot/tests/test_studio_smoke.py new file mode 100644 index 0000000..35864e7 --- /dev/null +++ b/arautopilot/tests/test_studio_smoke.py @@ -0,0 +1,70 @@ +"""Smoke tests for the Studio entry point. + +These tests verify that: + +- The Studio modules import cleanly without a display server (no Qt + classes are instantiated at module level). +- ``arautopilot.studio.app.run(['--seed-demo', '--data-dir', tmp])`` + populates a fresh user store and exits 0 without launching a GUI. + +GUI behaviour (login modal, main window, Flash Console widget rendering) +is covered by manual smoke testing -- pytest is intentionally headless +here. +""" + +from __future__ import annotations + +from pathlib import Path + + +def test_studio_modules_import_cleanly() -> None: + # Importing these must not require a display server. + from arautopilot.studio import app # noqa: F401 + from arautopilot.studio import flash_console # noqa: F401 + from arautopilot.studio import login_window # noqa: F401 + from arautopilot.studio import main_window # noqa: F401 + from arautopilot.studio import session # noqa: F401 + + +def test_seed_demo_via_app_run(tmp_path: Path) -> None: + from arautopilot.core.user_store import UserStore + from arautopilot.studio.app import run + + rc = run(["--seed-demo", "--data-dir", str(tmp_path)]) + assert rc == 0 + store = UserStore(tmp_path / "users.json") + assert len(store) == 4 + sa = store.find_by_name("Alvaro") + assert sa is not None + assert sa.verify_pin("1111") + + +def test_seed_demo_is_idempotent(tmp_path: Path) -> None: + from arautopilot.core.user_store import UserStore + from arautopilot.studio.app import run + + assert run(["--seed-demo", "--data-dir", str(tmp_path)]) == 0 + assert run(["--seed-demo", "--data-dir", str(tmp_path)]) == 0 + store = UserStore(tmp_path / "users.json") + assert len(store) == 4 + + +def test_flash_console_helper_finds_pio_or_returns_path() -> None: + """`_find_pio` must return a Path object even if the venv path doesn't + exist (falls back to the bare 'pio' name).""" + from arautopilot.studio.flash_console import _find_pio + p = _find_pio() + assert p is not None + # We don't assert existence -- the function falls back to the bare name. + from pathlib import Path as _P + assert isinstance(p, _P) + + +def test_list_serial_ports_is_safe_to_call() -> None: + """It must work whether or not pyserial finds ports.""" + from arautopilot.studio.flash_console import list_serial_ports + result = list_serial_ports() + assert isinstance(result, list) + for entry in result: + assert isinstance(entry, tuple) + assert len(entry) == 2 diff --git a/arautopilot/tests/test_user.py b/arautopilot/tests/test_user.py new file mode 100644 index 0000000..2cb2a44 --- /dev/null +++ b/arautopilot/tests/test_user.py @@ -0,0 +1,103 @@ +"""Tests for ``arautopilot.core.user``.""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from arautopilot.core.rbac import Role +from arautopilot.core.user import User + + +def test_create_user_with_valid_pin() -> None: + u = User.create(display_name="Test Engineer", role=Role.ENGINEER, pin="1234") + assert u.display_name == "Test Engineer" + assert u.role is Role.ENGINEER + assert u.pin_hash.startswith("pbkdf2_sha256$") + assert u.active is True + assert u.last_login_at is None + + +def test_verify_correct_pin() -> None: + u = User.create(display_name="X", role=Role.USER, pin="9876") + assert u.verify_pin("9876") is True + + +def test_verify_incorrect_pin() -> None: + u = User.create(display_name="X", role=Role.USER, pin="9876") + assert u.verify_pin("0000") is False + assert u.verify_pin("987") is False # too short, treated as invalid + assert u.verify_pin("98765") is False + assert u.verify_pin("") is False + + +def test_pin_hash_is_different_each_time_for_same_pin() -> None: + """Salting must make the hashes diverge even for identical PINs.""" + a = User.create(display_name="A", role=Role.USER, pin="1234") + b = User.create(display_name="B", role=Role.USER, pin="1234") + assert a.pin_hash != b.pin_hash + # But both verify against the same PIN. + assert a.verify_pin("1234") + assert b.verify_pin("1234") + + +def test_set_pin_returns_new_instance_with_new_hash() -> None: + u = User.create(display_name="A", role=Role.USER, pin="1234") + old_hash = u.pin_hash + u2 = u.set_pin("5678") + assert u2.pin_hash != old_hash + assert u2.verify_pin("5678") is True + assert u2.verify_pin("1234") is False + + +def test_pin_must_be_numeric_4_to_8_digits() -> None: + with pytest.raises(ValueError): + User.create(display_name="A", role=Role.USER, pin="abc") + with pytest.raises(ValueError): + User.create(display_name="A", role=Role.USER, pin="12") + with pytest.raises(ValueError): + User.create(display_name="A", role=Role.USER, pin="123456789") + with pytest.raises(ValueError): + User.create(display_name="A", role=Role.USER, pin="12 34") + + +def test_pin_hash_field_validator_rejects_garbage() -> None: + with pytest.raises(ValidationError): + User(display_name="A", role=Role.USER, pin_hash="not-a-real-hash") + with pytest.raises(ValidationError): + # Right shape but wrong scheme. + User(display_name="A", role=Role.USER, + pin_hash="md5$1000$xxxxxxxxxx$yyyyyyyyyy") + + +def test_user_rejects_unknown_field() -> None: + # Direct construction with unknown field should fail (extra="forbid"). + with pytest.raises(ValidationError): + User(display_name="A", role=Role.USER, pin_hash="pbkdf2_sha256$200000$xx$yy", + unknown="x") # type: ignore[call-arg] + + +def test_touch_login_advances_timestamp() -> None: + u = User.create(display_name="A", role=Role.USER, pin="1234") + assert u.last_login_at is None + u2 = u.touch_login() + assert u2.last_login_at is not None + assert u2.user_id == u.user_id + + +def test_vessel_scoped_user() -> None: + u = User.create(display_name="Captain", role=Role.OWNER, pin="4242", + vessel_id="abc123") + assert u.vessel_id == "abc123" + + +def test_serialisation_round_trip() -> None: + import json + u = User.create(display_name="Test", role=Role.ENGINEER, pin="2468") + data = u.model_dump(mode="json") + text = json.dumps(data) + rebuilt = User.model_validate(json.loads(text)) + assert rebuilt.display_name == u.display_name + assert rebuilt.role == u.role + assert rebuilt.pin_hash == u.pin_hash + assert rebuilt.verify_pin("2468") is True diff --git a/arautopilot/tests/test_user_store.py b/arautopilot/tests/test_user_store.py new file mode 100644 index 0000000..b65e116 --- /dev/null +++ b/arautopilot/tests/test_user_store.py @@ -0,0 +1,102 @@ +"""Tests for ``arautopilot.core.user_store``.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from arautopilot.core.rbac import Role +from arautopilot.core.user import User +from arautopilot.core.user_store import UserStore, seed_demo_users + + +def test_empty_store_starts_empty(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + assert len(s) == 0 + assert s.all_users() == [] + + +def test_add_and_get(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + u = User.create(display_name="A", role=Role.USER, pin="1234") + s.add(u) + assert len(s) == 1 + assert s.get(u.user_id) is not None + assert s.get(u.user_id).display_name == "A" + + +def test_duplicate_user_id_rejected(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + u = User.create(display_name="A", role=Role.USER, pin="1234") + s.add(u) + with pytest.raises(ValueError): + s.add(u) + + +def test_remove_unknown_raises(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + with pytest.raises(KeyError): + s.remove("nonexistent") + + +def test_persistence_across_instances(tmp_path: Path) -> None: + p = tmp_path / "users.json" + s = UserStore(p) + s.add(User.create(display_name="A", role=Role.USER, pin="1234")) + s.add(User.create(display_name="B", role=Role.OWNER, pin="5678")) + + s2 = UserStore(p) + assert len(s2) == 2 + a = s2.find_by_name("A") + b = s2.find_by_name("B") + assert a is not None and a.verify_pin("1234") + assert b is not None and b.verify_pin("5678") + + +def test_by_role_filters_correctly(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + s.add(User.create(display_name="SA", role=Role.SUPER_ADMIN, pin="0001")) + s.add(User.create(display_name="Eng1", role=Role.ENGINEER, pin="0002")) + s.add(User.create(display_name="Eng2", role=Role.ENGINEER, pin="0003")) + s.add(User.create(display_name="Own", role=Role.OWNER, pin="0004")) + assert len(s.by_role(Role.ENGINEER)) == 2 + assert len(s.by_role(Role.OWNER)) == 1 + assert len(s.by_role(Role.USER)) == 0 + + +def test_seed_demo_creates_one_of_each_role(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + seed_demo_users(s) + assert len(s) == 4 + assert s.find_by_name("Alvaro") is not None + assert s.find_by_name("Alvaro").role is Role.SUPER_ADMIN + assert s.find_by_name("Alvaro").verify_pin("1111") + + +def test_seed_demo_idempotent(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + seed_demo_users(s) + seed_demo_users(s) # no-op + assert len(s) == 4 + + +def test_replace_updates_existing(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + u = User.create(display_name="A", role=Role.USER, pin="1234") + s.add(u) + u2 = u.set_pin("9999") + s.replace(u2) + assert len(s) == 1 + fetched = s.get(u.user_id) + assert fetched is not None + assert fetched.verify_pin("9999") + assert not fetched.verify_pin("1234") + + +def test_membership_test(tmp_path: Path) -> None: + s = UserStore(tmp_path / "users.json") + u = User.create(display_name="A", role=Role.USER, pin="1234") + s.add(u) + assert u.user_id in s + assert "nope" not in s diff --git a/docs/sprint-2.5-plan.md b/docs/sprint-2.5-plan.md new file mode 100644 index 0000000..9d850fa --- /dev/null +++ b/docs/sprint-2.5-plan.md @@ -0,0 +1,111 @@ +# Sprint 2.5 — RBAC + Studio mínimo + Flash Console + +> Sprint nuevo añadido a petición del Super Admin (Álvaro). Cubre dos +> requisitos que no estaban en el brief original: +> +> 1. Sistema de 4 roles (Super Admin / Engineer / Owner / User) con +> capabilities específicas y verificación de doble factor para +> operaciones críticas. +> 2. Una "mini consola de Arduino" dentro del Studio que permita al +> Engineer flashear firmware ESP32 con aprobación del Super Admin. + +## Objetivo + +Dejar Studio bootable con login por PIN, RBAC funcional, y la Flash +Console operativa. Engineer puede flashear firmware mostrando el PIN del +Super Admin en una ventana de confirmación. Super Admin (solo Álvaro) +flashea directamente sin segundo factor. + +## Roles y capabilities + +| Capability | Super Admin | Engineer | Owner | User | +|---|---|---|---|---| +| Editar código Python del proyecto (arautopilot, tools, scripts) | ✅ | ❌ | ❌ | ❌ | +| Editar código C++ del firmware (firmware/**) | ✅ | ✅ (sólo en su sandbox) | ❌ | ❌ | +| **Flashear firmware ESP32** | ✅ directo | ✅ con SA approval (2-of-N) | ❌ | ❌ | +| Editar gains PID base (IP del integrator) | ✅ | ❌ | ❌ | ❌ | +| Editar comisionado (rudder limits, calibración) | ✅ | ✅ | ❌ | ❌ | +| Editar preferencias operativas (rumbos fav, alarm vol, perfil) | ✅ | ✅ | ✅ | ❌ | +| Crear/gestionar Users del barco | ✅ | ❌ | ✅ | ❌ | +| Engage/disengage piloto | ✅ | ✅ | ✅ | ✅ | +| Leer telemetría | ✅ | ✅ | ✅ | ✅ | +| Acknowledge alarmas | ✅ | ✅ | ✅ | ✅ | +| Ver audit log completo | ✅ | ✅ | parcial (sólo su barco) | ❌ | + +## Plan de implementación + +### 1. RBAC core (`arautopilot/core/rbac.py`) + +- Enum `Role`: SUPER_ADMIN / ENGINEER / OWNER / USER +- Enum `Capability`: cada acción gateable (FLASH_FIRMWARE, EDIT_BASE_GAINS, + EDIT_COMMISSIONING, EDIT_OPERATIONAL, MANAGE_USERS, ENGAGE_PILOT, + READ_TELEMETRY, ACK_ALARMS, VIEW_AUDIT_LOG_FULL) +- Matriz inmutable `_CAPABILITIES_BY_ROLE: dict[Role, frozenset[Capability]]` +- Función `requires_dual_auth(cap: Capability) -> bool`: True para + FLASH_FIRMWARE cuando el actor es Engineer +- `has(role, capability) -> bool` + +### 2. User model (`arautopilot/core/user.py`) + +- Pydantic v2 `User`: user_id (UUID), display_name, role, pin_hash + (argon2 o pbkdf2), created_at, last_login_at, active (bool) +- `set_pin(plain) / verify_pin(plain) -> bool` con hashing seguro + +### 3. Audit log (`arautopilot/core/audit.py`) + +- `AuditEvent`: timestamp UTC, user_id, action, target, outcome + (success/denied/failed), reason, secondary_user_id (si dual-auth) +- `AuditLog` con append-only file en JSONL +- Cada permission check produce un evento + +### 4. Studio mínimo (`arautopilot/studio/app.py`) + +- Reemplaza el stub Sprint 0 +- PySide6 QApplication + login window +- Main window con 3 áreas: sidebar (rol + user), main area (placeholder + para project editor), toolbar (acciones gateadas por rol) +- Login = elegir user de lista + PIN +- Persistencia local (SQLite o JSON) de la lista de users + +### 5. Flash Console (`arautopilot/studio/flash_console.py`) + +- Widget dentro del Studio (no app separada) +- Lista puertos serie (vía pyserial) +- Combobox de variant (esp32-dev / esp32-debug) +- Botones: "Compile only", "Compile + Flash", "Open Serial Monitor" +- Para Engineer: al pulsar Flash, abre modal "Esperando aprobación del + Super Admin" con form de PIN del SA. El SA introduce su PIN (si está + presente) o el operario llama por teléfono y SA dicta un OTP que el SA + generó desde otro Studio. Por simplicidad MVP: campo de PIN del SA en + el modal. +- Bajo el capó: corre `pio run -t upload --upload-port COMx` y stream + del output al widget +- Serial monitor: thread aparte que lee `pio device monitor` + +### 6. Tests + +- `test_rbac.py`: cada rol tiene exactamente las capabilities esperadas; + intentos de escalada rechazados; dual-auth requerido para Engineer + flash. +- `test_user.py`: PIN hash, verificación correcta/incorrecta, no se + puede deserializar un User sin pin_hash. +- `test_audit.py`: append es atómico, lectura es ordenada por timestamp. + +## Restricciones + +- No requiere hardware ESP32 conectado para que arranque el Studio. +- La Flash Console **no funciona** sin board conectada (pero arranca, + muestra "no boards found"). Funcionalidad real verificable cuando el + usuario conecte hardware. +- PIN del Super Admin se introduce en cada flash (no se cachea). Sí se + puede cachear PIN del operario actual durante la sesión. +- El binario flasheado por el Engineer debe ser uno compilado por el SA y + firmado (futuro Sprint 8). Sprint 2.5 hace la versión simple: Engineer + compila + flashea con PIN del SA. + +## Verificación + +- `pytest` verde (objetivo: 145+ tests) +- `python studio_main.py` abre la ventana de login sin errores +- Login con un user dummy de demo (`demo_users.json`) funciona +- Flash Console widget se renderiza (incluso sin board) diff --git a/pyproject.toml b/pyproject.toml index f1c49d9..0740d9c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,13 @@ dev = [ "types-PyYAML", "types-python-dateutil", ] +# Studio GUI -- Sprint 2.5+. Heavy (~80 MB), kept optional so the core can +# be installed in lean environments (CI, headless test bench). +studio = [ + "PySide6>=6.6", + "pyserial>=3.5", + "platformio>=6.1", +] [project.urls] Homepage = "https://github.com/alro65/AR-Autopilot"