df56f52091
vmssailor/runtime/server/nmea2000.py
- PgnFrame dataclass + parsers/encoders para PGNs clave:
- 127257 Attitude (yaw/pitch/roll en radianes)
- 127488 Engine Parameters Rapid (RPM, boost, trim)
- 127489 Engine Dynamic (oil, coolant, alternator, hours)
- 127505 Fluid Level (tanks)
- 127508 Battery Status
- 129025 Position Rapid
- N2KPublisher: tag updates -> PGNs publicados via transport callable
- Sprint 5: callable de testing/observabilidad
- Sprint 12+: integracion real con python-can / Actisense / YDWG-02
- N2KSubscriber: inyecta PGNs entrantes y los traduce a tag updates
- PGN_ATTITUDE -> VESSEL.ROLL_DEG / VESSEL.PITCH_DEG (PGN 127257 = AR-ECDIS source)
- PGN_ENGINE_RAPID -> ME_INST_{N}.RPM
vmssailor/runtime/server/logbook.py
- LogBook con cadena SHA-256 inmutable (prev_hash -> hash chain)
- LogEntryKind: ENGINE_START, ENGINE_STOP, ALARM_ACTIVE/ACK/CLEARED,
AUTHORITY_TRANSFER, OVERRIDE, SNAPSHOT, MANUAL
- verify_chain() detecta tampering (bit flip o reescritura)
- query() con filtros por kind/since/until/limit
- EngineLogWriter: auto-detecta start/stop via cruces de umbral RPM
con persistencia configurable (default 5s)
- SnapshotLogWriter: snapshots periodicos (default 15 min) condicional
a motor corriendo
API actualizada:
- GET /logbook con filtros
- POST /logbook (entrada manual)
- GET /logbook/verify (integridad de cadena)
RuntimeApp integra todos los servicios. El alarm engine ahora
auto-anota al log book via callback con strong-ref task tracking.
Tests (tests/runtime/, 10 nuevos, total 152/152):
- test_logbook: append, chain, tamper detection, engine start/stop,
snapshot writer con/sin motor corriendo
- test_nmea2000: PGN encode/decode roundtrip, publisher emits engine
rapid frames, subscriber handles attitude
152/152 pytest verde, ruff clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
375 lines
12 KiB
Python
375 lines
12 KiB
Python
"""Log Book naval básico (Sprint 5).
|
|
|
|
Auto-detección de:
|
|
- Arranque/parada de motores (cuando RPM cruza umbral)
|
|
- Eventos de alarma con timestamp + ack
|
|
- Snapshots periódicos cada 15 min con motor corriendo
|
|
|
|
Cada entrada se firma con SHA-256 + previous_hash formando una cadena
|
|
inmutable (no requiere autoridad externa).
|
|
|
|
Sprint 13+: cumplimiento MARPOL/SOLAS, PDF oficial, Oil Record Book separado.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
from contextlib import suppress
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime
|
|
from enum import StrEnum
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from vmssailor.runtime.server.tag_store import TagStore, TagValue
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LogEntryKind(StrEnum):
|
|
ENGINE_START = "engine_start"
|
|
ENGINE_STOP = "engine_stop"
|
|
ALARM_ACTIVE = "alarm_active"
|
|
ALARM_ACK = "alarm_ack"
|
|
ALARM_CLEARED = "alarm_cleared"
|
|
AUTHORITY_TRANSFER = "authority_transfer"
|
|
OVERRIDE = "override"
|
|
SNAPSHOT = "snapshot"
|
|
MANUAL = "manual"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class LogEntry:
|
|
"""Una entrada inmutable del log book."""
|
|
|
|
id: int
|
|
kind: LogEntryKind
|
|
timestamp: datetime
|
|
summary: str
|
|
payload: dict[str, Any]
|
|
user: str = "system"
|
|
prev_hash: str = ""
|
|
hash: str = ""
|
|
|
|
def compute_hash(self) -> str:
|
|
"""Hash de la entrada. `id` no entra (se asigna post-insert)."""
|
|
material = {
|
|
"kind": self.kind.value,
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"summary": self.summary,
|
|
"payload": self.payload,
|
|
"user": self.user,
|
|
"prev_hash": self.prev_hash,
|
|
}
|
|
blob = json.dumps(material, sort_keys=True, ensure_ascii=False).encode("utf-8")
|
|
return hashlib.sha256(blob).hexdigest()
|
|
|
|
|
|
_SCHEMA_SQL = """
|
|
CREATE TABLE IF NOT EXISTS logbook (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
kind TEXT NOT NULL,
|
|
timestamp TEXT NOT NULL,
|
|
summary TEXT NOT NULL,
|
|
payload TEXT NOT NULL,
|
|
user TEXT NOT NULL DEFAULT 'system',
|
|
prev_hash TEXT NOT NULL DEFAULT '',
|
|
hash TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_logbook_ts ON logbook(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_logbook_kind ON logbook(kind);
|
|
"""
|
|
|
|
|
|
class LogBook:
|
|
"""Log book con cadena inmutable de hashes (anti-tamper básico)."""
|
|
|
|
def __init__(self, db_path: Path | str | None = None) -> None:
|
|
self._db_path = str(db_path) if db_path else ":memory:"
|
|
self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
|
|
self._conn.executescript(_SCHEMA_SQL)
|
|
self._conn.commit()
|
|
self._last_hash = self._load_last_hash()
|
|
self._lock = asyncio.Lock()
|
|
|
|
def _load_last_hash(self) -> str:
|
|
row = self._conn.execute(
|
|
"SELECT hash FROM logbook ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
return row[0] if row else ""
|
|
|
|
async def append(
|
|
self,
|
|
kind: LogEntryKind,
|
|
summary: str,
|
|
payload: dict[str, Any] | None = None,
|
|
*,
|
|
user: str = "system",
|
|
timestamp: datetime | None = None,
|
|
) -> LogEntry:
|
|
async with self._lock:
|
|
ts = timestamp or datetime.now(UTC)
|
|
entry = LogEntry(
|
|
id=-1,
|
|
kind=kind,
|
|
timestamp=ts,
|
|
summary=summary,
|
|
payload=payload or {},
|
|
user=user,
|
|
prev_hash=self._last_hash,
|
|
)
|
|
entry.hash = entry.compute_hash()
|
|
cur = self._conn.execute(
|
|
"INSERT INTO logbook (kind, timestamp, summary, payload, user, prev_hash, hash) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
entry.kind.value,
|
|
entry.timestamp.isoformat(),
|
|
entry.summary,
|
|
json.dumps(entry.payload, ensure_ascii=False),
|
|
entry.user,
|
|
entry.prev_hash,
|
|
entry.hash,
|
|
),
|
|
)
|
|
self._conn.commit()
|
|
entry.id = int(cur.lastrowid)
|
|
self._last_hash = entry.hash
|
|
return entry
|
|
|
|
def query(
|
|
self,
|
|
*,
|
|
kind: LogEntryKind | None = None,
|
|
since: datetime | None = None,
|
|
until: datetime | None = None,
|
|
limit: int = 500,
|
|
) -> list[LogEntry]:
|
|
sql = "SELECT id, kind, timestamp, summary, payload, user, prev_hash, hash FROM logbook WHERE 1=1"
|
|
params: list[Any] = []
|
|
if kind:
|
|
sql += " AND kind = ?"
|
|
params.append(kind.value)
|
|
if since:
|
|
sql += " AND timestamp >= ?"
|
|
params.append(since.isoformat())
|
|
if until:
|
|
sql += " AND timestamp <= ?"
|
|
params.append(until.isoformat())
|
|
sql += " ORDER BY id DESC LIMIT ?"
|
|
params.append(limit)
|
|
rows = self._conn.execute(sql, params).fetchall()
|
|
return [
|
|
LogEntry(
|
|
id=r[0],
|
|
kind=LogEntryKind(r[1]),
|
|
timestamp=datetime.fromisoformat(r[2]),
|
|
summary=r[3],
|
|
payload=json.loads(r[4]),
|
|
user=r[5],
|
|
prev_hash=r[6],
|
|
hash=r[7],
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
def verify_chain(self) -> tuple[bool, list[int]]:
|
|
"""Verifica integridad de la cadena de hashes.
|
|
|
|
Devuelve `(ok, broken_ids)`. Si `ok` es True, broken_ids es [].
|
|
"""
|
|
rows = self._conn.execute(
|
|
"SELECT id, kind, timestamp, summary, payload, user, prev_hash, hash FROM logbook ORDER BY id"
|
|
).fetchall()
|
|
prev = ""
|
|
broken: list[int] = []
|
|
for r in rows:
|
|
entry = LogEntry(
|
|
id=r[0],
|
|
kind=LogEntryKind(r[1]),
|
|
timestamp=datetime.fromisoformat(r[2]),
|
|
summary=r[3],
|
|
payload=json.loads(r[4]),
|
|
user=r[5],
|
|
prev_hash=r[6],
|
|
hash=r[7],
|
|
)
|
|
if entry.prev_hash != prev:
|
|
broken.append(entry.id)
|
|
if entry.compute_hash() != entry.hash:
|
|
broken.append(entry.id)
|
|
prev = entry.hash
|
|
return (not broken, sorted(set(broken)))
|
|
|
|
def stats(self) -> dict[str, Any]:
|
|
from collections import Counter
|
|
|
|
rows = self._conn.execute("SELECT kind, COUNT(*) FROM logbook GROUP BY kind").fetchall()
|
|
counts: Counter[str] = Counter()
|
|
for k, n in rows:
|
|
counts[k] = int(n)
|
|
total = self._conn.execute("SELECT COUNT(*) FROM logbook").fetchone()[0]
|
|
return {"total_entries": int(total), "by_kind": dict(counts)}
|
|
|
|
def close(self) -> None:
|
|
self._conn.close()
|
|
|
|
|
|
# ============================================================================
|
|
# Auto writers (suscriptos al tag_store y al alarm engine)
|
|
# ============================================================================
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class EngineLogConfig:
|
|
"""Configuración del auto-detector de arranque/parada de motores."""
|
|
|
|
rpm_running_threshold: float = 600.0
|
|
"""Umbral RPM por encima del cual consideramos "motor corriendo"."""
|
|
|
|
rpm_stopped_threshold: float = 50.0
|
|
"""Umbral RPM por debajo del cual consideramos "motor parado"."""
|
|
|
|
sustain_seconds: float = 5.0
|
|
"""Persistencia mínima para confirmar transición (anti-flicker)."""
|
|
|
|
|
|
class EngineLogWriter:
|
|
"""Detecta arranques y paradas de motores monitoreando RPM."""
|
|
|
|
def __init__(
|
|
self,
|
|
tag_store: TagStore,
|
|
logbook: LogBook,
|
|
config: EngineLogConfig | None = None,
|
|
) -> None:
|
|
self._store = tag_store
|
|
self._logbook = logbook
|
|
self._config = config or EngineLogConfig()
|
|
# state: tag_id -> ("running"|"stopped"|"unknown", since_ts)
|
|
self._state: dict[str, tuple[str, datetime]] = {}
|
|
self._stop = False
|
|
self._task: asyncio.Task | None = None
|
|
self._sub_q: asyncio.Queue[TagValue] | None = None
|
|
|
|
async def start(self) -> None:
|
|
self._sub_q = self._store.subscribe(maxsize=2048)
|
|
self._task = asyncio.create_task(self._loop())
|
|
|
|
async def stop(self) -> None:
|
|
self._stop = True
|
|
if self._task:
|
|
self._task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await self._task
|
|
if self._sub_q:
|
|
self._store.unsubscribe(self._sub_q)
|
|
|
|
async def _loop(self) -> None:
|
|
try:
|
|
while not self._stop:
|
|
tv = await self._sub_q.get()
|
|
if not tv.tag_id.endswith(".RPM"):
|
|
continue
|
|
if not isinstance(tv.value, (int, float)):
|
|
continue
|
|
await self._evaluate(tv)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _evaluate(self, tv: TagValue) -> None:
|
|
rpm = float(tv.value)
|
|
running = rpm > self._config.rpm_running_threshold
|
|
stopped = rpm < self._config.rpm_stopped_threshold
|
|
if not running and not stopped:
|
|
return
|
|
new_state = "running" if running else "stopped"
|
|
prev = self._state.get(tv.tag_id)
|
|
if prev is None:
|
|
self._state[tv.tag_id] = (new_state, tv.timestamp)
|
|
return
|
|
prev_state, prev_ts = prev
|
|
if prev_state == new_state:
|
|
return
|
|
# Transition — check sustain
|
|
if (tv.timestamp - prev_ts).total_seconds() < self._config.sustain_seconds:
|
|
return
|
|
self._state[tv.tag_id] = (new_state, tv.timestamp)
|
|
engine_name = tv.tag_id.removesuffix(".RPM")
|
|
if new_state == "running":
|
|
await self._logbook.append(
|
|
LogEntryKind.ENGINE_START,
|
|
f"Motor {engine_name} arrancó (RPM={rpm:.0f})",
|
|
payload={"engine": engine_name, "rpm": rpm},
|
|
timestamp=tv.timestamp,
|
|
)
|
|
else:
|
|
await self._logbook.append(
|
|
LogEntryKind.ENGINE_STOP,
|
|
f"Motor {engine_name} parado (RPM={rpm:.0f})",
|
|
payload={"engine": engine_name, "rpm": rpm},
|
|
timestamp=tv.timestamp,
|
|
)
|
|
|
|
|
|
class SnapshotLogWriter:
|
|
"""Toma snapshots periódicos de tags clave (default cada 15 min con motor corriendo)."""
|
|
|
|
def __init__(
|
|
self,
|
|
tag_store: TagStore,
|
|
logbook: LogBook,
|
|
*,
|
|
period_s: float = 900.0,
|
|
require_running_engine: bool = True,
|
|
) -> None:
|
|
self._store = tag_store
|
|
self._logbook = logbook
|
|
self._period_s = period_s
|
|
self._require_running = require_running_engine
|
|
self._stop = False
|
|
self._task: asyncio.Task | None = None
|
|
|
|
async def start(self) -> None:
|
|
self._task = asyncio.create_task(self._loop())
|
|
|
|
async def stop(self) -> None:
|
|
self._stop = True
|
|
if self._task:
|
|
self._task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await self._task
|
|
|
|
async def _loop(self) -> None:
|
|
try:
|
|
while not self._stop:
|
|
await asyncio.sleep(self._period_s)
|
|
await self._take_snapshot()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _take_snapshot(self) -> None:
|
|
if self._require_running and not self._any_engine_running():
|
|
return
|
|
snapshot: dict[str, Any] = {}
|
|
for tag_id, tv in self._store.all_values().items():
|
|
snapshot[tag_id] = {
|
|
"value": tv.value,
|
|
"quality": tv.quality.value,
|
|
}
|
|
await self._logbook.append(
|
|
LogEntryKind.SNAPSHOT,
|
|
f"Snapshot periódico ({len(snapshot)} tags)",
|
|
payload={"tags": snapshot},
|
|
)
|
|
|
|
def _any_engine_running(self) -> bool:
|
|
for tag_id, tv in self._store.all_values().items():
|
|
if tag_id.endswith(".RPM") and isinstance(tv.value, (int, float)) and tv.value > 600:
|
|
return True
|
|
return False
|