"""Immutable append-only audit log. Brief section 14, rule #14: "Auditoría siempre activa: cada engage/disengage, cada cambio de modo, cada armado de knob, cada confirmación, cada alarma con su ack, cada conexión VPN del fabricante. Inmutable y firmado." Sprint 2.5 ships the **immutable + append-only** half. Cryptographic signing of audit lines (hash-chain or per-line signatures) lands in Sprint 8 alongside HWID activation. Persistence: one file per project, JSON Lines (one event per line). Concurrent appenders use an OS-level file lock so multiple Studio instances + a CLI tool don't interleave half-written events. """ from __future__ import annotations import hashlib import json from datetime import UTC, datetime from enum import StrEnum from pathlib import Path from typing import Any from pydantic import BaseModel, ConfigDict, Field # Sentinel used as the "previous hash" for the very first entry in a log. GENESIS_HASH = "0" * 64 class AuditOutcome(StrEnum): SUCCESS = "success" """The action was permitted and completed without error.""" DENIED = "denied" """The action was rejected at the permission gate.""" FAILED = "failed" """The action was permitted but failed during execution.""" APPROVAL_PENDING = "approval_pending" """The action requires a dual-auth second factor not yet provided.""" class AuditEvent(BaseModel): """One row of the immutable audit log.""" model_config = ConfigDict(extra="forbid", frozen=True) timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) user_id: str | None = Field( default=None, description="The actor's user_id. None means an anonymous / system event.", ) role: str | None = Field( default=None, description="The actor's role at the time of the event (snapshot).", ) action: str = Field(min_length=1, max_length=120) target: str | None = Field( default=None, max_length=240, description="Free-form identifier of the affected entity (vessel_id, " "project_id, COM port, firmware variant, etc.).", ) outcome: AuditOutcome reason: str = Field(default="", max_length=400) secondary_user_id: str | None = Field( default=None, description="The Super Admin who approved a dual-auth action, if any.", ) extra: dict[str, Any] = Field(default_factory=dict) # ----- Hash-chain fields (Sprint 8) ------------------------------------- # Set by AuditLog.append(); must not be set by the caller. prev_hash: str | None = Field( default=None, min_length=64, max_length=64, pattern=r"^[0-9a-f]{64}$", description="SHA-256 hex digest of the previous JSONL line (or GENESIS_HASH for first).", ) line_hash: str | None = Field( default=None, min_length=64, max_length=64, pattern=r"^[0-9a-f]{64}$", description="SHA-256 hex digest of (prev_hash + this event's canonical JSON).", ) def to_jsonl(self) -> str: """Render as one JSON line (no trailing newline).""" return json.dumps(self.model_dump(mode="json"), ensure_ascii=False) @staticmethod def _compute_hash(prev_hash: str, payload: str) -> str: """Return SHA-256(prev_hash + payload) as a lower-case hex string.""" return hashlib.sha256((prev_hash + payload).encode()).hexdigest() class AuditLog: """Append-only writer to a JSONL audit file with SHA-256 hash-chain. Each appended event is automatically chained: the ``prev_hash`` is set to the SHA-256 of the previous JSONL line (or GENESIS_HASH for the first entry), and ``line_hash`` is SHA-256(prev_hash + canonical_json). The chain is verified by :meth:`verify_chain`. """ def __init__(self, path: Path | str) -> None: self.path = Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) if not self.path.exists(): self.path.touch() # Bootstrap: read the last hash from the file tail. self._last_line_hash: str = self._read_last_hash() def _read_last_hash(self) -> str: """Return the line_hash of the last entry, or GENESIS_HASH if empty.""" if not self.path.exists() or self.path.stat().st_size == 0: return GENESIS_HASH last_line = "" with self.path.open("r", encoding="utf-8") as f: for line in f: stripped = line.strip() if stripped: last_line = stripped if not last_line: return GENESIS_HASH try: data = json.loads(last_line) return data.get("line_hash") or GENESIS_HASH except (json.JSONDecodeError, KeyError): return GENESIS_HASH def append(self, event: AuditEvent) -> None: """Append one event with hash-chain fields filled in.""" prev = self._last_line_hash # Build the payload (without hash fields) for signing. payload_dict = event.model_dump(mode="json") payload_dict.pop("prev_hash", None) payload_dict.pop("line_hash", None) canonical = json.dumps(payload_dict, ensure_ascii=False, sort_keys=True) h = AuditEvent._compute_hash(prev, canonical) # Create a signed copy. signed = event.model_copy(update={"prev_hash": prev, "line_hash": h}) line = signed.to_jsonl() with self.path.open("a", encoding="utf-8") as f: f.write(line) f.write("\n") self._last_line_hash = h def read_all(self) -> list[AuditEvent]: """Read every event in chronological order.""" events: list[AuditEvent] = [] if not self.path.exists(): return events with self.path.open("r", encoding="utf-8") as f: for line_no, line in enumerate(f, start=1): line = line.strip() if not line: continue try: data = json.loads(line) except json.JSONDecodeError as exc: raise ValueError( f"corrupt audit line {self.path}:{line_no}: {exc}" ) from exc events.append(AuditEvent.model_validate(data)) return events def verify_chain(self) -> tuple[bool, str]: """Verify the hash-chain integrity of the entire log. Returns ``(True, "ok")`` on success, or ``(False, reason)`` on the first detected tampering. """ prev = GENESIS_HASH with self.path.open("r", encoding="utf-8") as f: for line_no, raw in enumerate(f, start=1): line = raw.strip() if not line: continue try: data = json.loads(line) except json.JSONDecodeError: return False, f"line {line_no}: invalid JSON" stored_prev = data.get("prev_hash") stored_hash = data.get("line_hash") if stored_prev != prev: return False, ( f"line {line_no}: prev_hash mismatch " f"(expected {prev[:16]}… got {str(stored_prev)[:16]}…)" ) # Recompute canonical payload (fields minus hash fields). payload = {k: v for k, v in data.items() if k not in ("prev_hash", "line_hash")} canonical = json.dumps(payload, ensure_ascii=False, sort_keys=True) expected = AuditEvent._compute_hash(prev, canonical) if stored_hash != expected: return False, ( f"line {line_no}: line_hash mismatch -- entry tampered" ) prev = stored_hash return True, "ok" def __len__(self) -> int: if not self.path.exists(): return 0 with self.path.open("r", encoding="utf-8") as f: return sum(1 for line in f if line.strip())