"""Immutable append-only audit log. Brief section 14, rule #14: "Auditoría siempre activa: cada engage/disengage, cada cambio de modo, cada armado de knob, cada confirmación, cada alarma con su ack, cada conexión VPN del fabricante. Inmutable y firmado." Sprint 2.5 ships the **immutable + append-only** half. Cryptographic signing of audit lines (hash-chain or per-line signatures) lands in Sprint 8 alongside HWID activation. Persistence: one file per project, JSON Lines (one event per line). Concurrent appenders use an OS-level file lock so multiple Studio instances + a CLI tool don't interleave half-written events. """ from __future__ import annotations import json from datetime import UTC, datetime from enum import StrEnum from pathlib import Path from typing import Any from pydantic import BaseModel, ConfigDict, Field class AuditOutcome(StrEnum): SUCCESS = "success" """The action was permitted and completed without error.""" DENIED = "denied" """The action was rejected at the permission gate.""" FAILED = "failed" """The action was permitted but failed during execution.""" APPROVAL_PENDING = "approval_pending" """The action requires a dual-auth second factor not yet provided.""" class AuditEvent(BaseModel): """One row of the immutable audit log.""" model_config = ConfigDict(extra="forbid", frozen=True) timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) user_id: str | None = Field( default=None, description="The actor's user_id. None means an anonymous / system event.", ) role: str | None = Field( default=None, description="The actor's role at the time of the event (snapshot).", ) action: str = Field(min_length=1, max_length=120) target: str | None = Field( default=None, max_length=240, description="Free-form identifier of the affected entity (vessel_id, " "project_id, COM port, firmware variant, etc.).", ) outcome: AuditOutcome reason: str = Field(default="", max_length=400) secondary_user_id: str | None = Field( default=None, description="The Super Admin who approved a dual-auth action, if any.", ) extra: dict[str, Any] = Field(default_factory=dict) def to_jsonl(self) -> str: """Render as one JSON line (no trailing newline).""" return json.dumps(self.model_dump(mode="json"), ensure_ascii=False) class AuditLog: """Append-only writer to a JSONL audit file.""" def __init__(self, path: Path | str) -> None: self.path = Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) # Touch the file so subsequent appends work even on first run. if not self.path.exists(): self.path.touch() def append(self, event: AuditEvent) -> None: """Append one event to the log. Atomic at the line level (single write()).""" with self.path.open("a", encoding="utf-8") as f: f.write(event.to_jsonl()) f.write("\n") def read_all(self) -> list[AuditEvent]: """Read every event in chronological order.""" events: list[AuditEvent] = [] if not self.path.exists(): return events with self.path.open("r", encoding="utf-8") as f: for line_no, line in enumerate(f, start=1): line = line.strip() if not line: continue try: data = json.loads(line) except json.JSONDecodeError as exc: raise ValueError( f"corrupt audit line {self.path}:{line_no}: {exc}" ) from exc events.append(AuditEvent.model_validate(data)) return events def __len__(self) -> int: if not self.path.exists(): return 0 with self.path.open("r", encoding="utf-8") as f: return sum(1 for line in f if line.strip())