From df56f520910c420b5876ea7331a1e879118c9c79 Mon Sep 17 00:00:00 2001 From: Aerom Date: Sun, 17 May 2026 20:30:41 -0400 Subject: [PATCH] =?UTF-8?q?sprint-5:=20NMEA=202000=20stub=20+=20Log=20Book?= =?UTF-8?q?=20naval=20b=C3=A1sico?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit vmssailor/runtime/server/nmea2000.py - PgnFrame dataclass + parsers/encoders para PGNs clave: - 127257 Attitude (yaw/pitch/roll en radianes) - 127488 Engine Parameters Rapid (RPM, boost, trim) - 127489 Engine Dynamic (oil, coolant, alternator, hours) - 127505 Fluid Level (tanks) - 127508 Battery Status - 129025 Position Rapid - N2KPublisher: tag updates -> PGNs publicados via transport callable - Sprint 5: callable de testing/observabilidad - Sprint 12+: integracion real con python-can / Actisense / YDWG-02 - N2KSubscriber: inyecta PGNs entrantes y los traduce a tag updates - PGN_ATTITUDE -> VESSEL.ROLL_DEG / VESSEL.PITCH_DEG (PGN 127257 = AR-ECDIS source) - PGN_ENGINE_RAPID -> ME_INST_{N}.RPM vmssailor/runtime/server/logbook.py - LogBook con cadena SHA-256 inmutable (prev_hash -> hash chain) - LogEntryKind: ENGINE_START, ENGINE_STOP, ALARM_ACTIVE/ACK/CLEARED, AUTHORITY_TRANSFER, OVERRIDE, SNAPSHOT, MANUAL - verify_chain() detecta tampering (bit flip o reescritura) - query() con filtros por kind/since/until/limit - EngineLogWriter: auto-detecta start/stop via cruces de umbral RPM con persistencia configurable (default 5s) - SnapshotLogWriter: snapshots periodicos (default 15 min) condicional a motor corriendo API actualizada: - GET /logbook con filtros - POST /logbook (entrada manual) - GET /logbook/verify (integridad de cadena) RuntimeApp integra todos los servicios. El alarm engine ahora auto-anota al log book via callback con strong-ref task tracking. Tests (tests/runtime/, 10 nuevos, total 152/152): - test_logbook: append, chain, tamper detection, engine start/stop, snapshot writer con/sin motor corriendo - test_nmea2000: PGN encode/decode roundtrip, publisher emits engine rapid frames, subscriber handles attitude 152/152 pytest verde, ruff clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/runtime/test_logbook.py | 114 ++++++++ tests/runtime/test_nmea2000.py | 81 +++++ vmssailor/runtime/server/api.py | 63 ++++ vmssailor/runtime/server/logbook.py | 374 ++++++++++++++++++++++++ vmssailor/runtime/server/nmea2000.py | 306 +++++++++++++++++++ vmssailor/runtime/server/runtime_app.py | 73 ++++- 6 files changed, 1007 insertions(+), 4 deletions(-) create mode 100644 tests/runtime/test_logbook.py create mode 100644 tests/runtime/test_nmea2000.py create mode 100644 vmssailor/runtime/server/logbook.py create mode 100644 vmssailor/runtime/server/nmea2000.py diff --git a/tests/runtime/test_logbook.py b/tests/runtime/test_logbook.py new file mode 100644 index 0000000..4bf65e1 --- /dev/null +++ b/tests/runtime/test_logbook.py @@ -0,0 +1,114 @@ +"""Tests del Log Book naval (Sprint 5).""" + +from __future__ import annotations + +import asyncio + +import pytest + +from vmssailor.core.enums import Protocol, UnitSI +from vmssailor.core.tag import Tag +from vmssailor.runtime.server.logbook import ( + EngineLogConfig, + EngineLogWriter, + LogBook, + LogEntryKind, + SnapshotLogWriter, +) +from vmssailor.runtime.server.tag_store import TagStore + + +@pytest.mark.asyncio +async def test_logbook_append_and_query(): + lb = LogBook() + e1 = await lb.append(LogEntryKind.MANUAL, "Primera entrada", user="op1") + e2 = await lb.append(LogEntryKind.MANUAL, "Segunda entrada", user="op1") + assert e1.id < e2.id + assert e2.prev_hash == e1.hash + entries = lb.query(limit=10) + assert len(entries) == 2 + + +@pytest.mark.asyncio +async def test_logbook_chain_integrity(): + lb = LogBook() + for i in range(5): + await lb.append(LogEntryKind.SNAPSHOT, f"Entry {i}") + ok, broken = lb.verify_chain() + assert ok + assert broken == [] + + +@pytest.mark.asyncio +async def test_logbook_tamper_detection(): + """Si modificamos a mano una entrada, la cadena debe detectarlo.""" + lb = LogBook() + await lb.append(LogEntryKind.MANUAL, "A") + await lb.append(LogEntryKind.MANUAL, "B") + # Tamper: cambiar el summary de la entrada 1 directo en SQLite + lb._conn.execute("UPDATE logbook SET summary = 'TAMPERED' WHERE id = 1") + lb._conn.commit() + ok, broken = lb.verify_chain() + assert not ok + assert 1 in broken or 2 in broken + + +@pytest.mark.asyncio +async def test_engine_log_writer_detects_start_stop(): + store = TagStore() + store.register_tag(Tag(id="ME_PORT.RPM", unit_si=UnitSI.RPM, protocol=Protocol.MODBUS_RTU, address=1)) + lb = LogBook() + writer = EngineLogWriter(store, lb, config=EngineLogConfig(sustain_seconds=0.0)) + await writer.start() + try: + # Arranque: pasar de quieto a corriendo + await store.update("ME_PORT.RPM", 0.0) + await asyncio.sleep(0.05) + await store.update("ME_PORT.RPM", 1500.0) + await asyncio.sleep(0.1) + # Parada + await store.update("ME_PORT.RPM", 0.0) + await asyncio.sleep(0.1) + starts = lb.query(kind=LogEntryKind.ENGINE_START) + stops = lb.query(kind=LogEntryKind.ENGINE_STOP) + assert len(starts) == 1 + assert len(stops) == 1 + assert "ME_PORT" in starts[0].summary + finally: + await writer.stop() + + +@pytest.mark.asyncio +async def test_snapshot_writer_takes_snapshot(): + store = TagStore() + store.register_tag(Tag(id="ME_PORT.RPM", unit_si=UnitSI.RPM, protocol=Protocol.MODBUS_RTU, address=1)) + lb = LogBook() + writer = SnapshotLogWriter(store, lb, period_s=0.1, require_running_engine=True) + await writer.start() + try: + await store.update("ME_PORT.RPM", 1500.0) + await asyncio.sleep(0.25) # Permite al menos un tick de 0.1s + snapshots = lb.query(kind=LogEntryKind.SNAPSHOT) + assert len(snapshots) >= 1 + # Sin motor corriendo no debería tomar snapshot + snap = snapshots[0] + assert "tags" in snap.payload + assert "ME_PORT.RPM" in snap.payload["tags"] + finally: + await writer.stop() + + +@pytest.mark.asyncio +async def test_snapshot_writer_skips_when_no_engine(): + store = TagStore() + store.register_tag(Tag(id="ME_PORT.RPM", unit_si=UnitSI.RPM, protocol=Protocol.MODBUS_RTU, address=1)) + lb = LogBook() + writer = SnapshotLogWriter(store, lb, period_s=0.1, require_running_engine=True) + await writer.start() + try: + await store.update("ME_PORT.RPM", 0.0) + await asyncio.sleep(0.25) + snapshots = lb.query(kind=LogEntryKind.SNAPSHOT) + assert len(snapshots) == 0 + finally: + await writer.stop() diff --git a/tests/runtime/test_nmea2000.py b/tests/runtime/test_nmea2000.py new file mode 100644 index 0000000..8321adf --- /dev/null +++ b/tests/runtime/test_nmea2000.py @@ -0,0 +1,81 @@ +"""Tests del driver NMEA 2000 (Sprint 5 stub).""" + +from __future__ import annotations + +import asyncio + +import pytest + +from vmssailor.core.enums import Protocol, UnitSI +from vmssailor.core.tag import Tag +from vmssailor.runtime.server.nmea2000 import ( + PGN_ATTITUDE, + PGN_ENGINE_RAPID, + N2KPublisher, + N2KSubscriber, + PgnFrame, + encode_pgn_attitude, + encode_pgn_engine_rapid, + parse_pgn_attitude, + parse_pgn_engine_rapid, +) +from vmssailor.runtime.server.tag_store import TagStore + + +def test_attitude_pgn_roundtrip(): + data = encode_pgn_attitude(yaw_deg=15.5, pitch_deg=-2.3, roll_deg=8.1) + parsed = parse_pgn_attitude(data) + assert abs(parsed["roll_deg"] - 8.1) < 0.1 + assert abs(parsed["pitch_deg"] - (-2.3)) < 0.1 + assert abs(parsed["yaw_deg"] - 15.5) < 0.1 + + +def test_engine_rapid_pgn_roundtrip(): + data = encode_pgn_engine_rapid(instance=0, rpm=1520.5) + parsed = parse_pgn_engine_rapid(data) + assert parsed["instance"] == 0 + assert abs(parsed["rpm"] - 1520.5) < 1.0 + + +@pytest.mark.asyncio +async def test_n2k_publisher_emits_engine_rapid(): + store = TagStore() + store.register_tag(Tag(id="ME_PORT.RPM", unit_si=UnitSI.RPM, protocol=Protocol.MODBUS_RTU, address=1)) + frames: list[PgnFrame] = [] + pub = N2KPublisher(store, transport=frames.append) + await pub.start(period_s=0.05) + try: + await store.update("ME_PORT.RPM", 1500.0) + await asyncio.sleep(0.2) + engine_frames = [f for f in frames if f.pgn == PGN_ENGINE_RAPID] + assert len(engine_frames) >= 1 + finally: + await pub.stop() + + +@pytest.mark.asyncio +async def test_n2k_subscriber_handles_attitude(): + store = TagStore() + store.register_tag(Tag(id="VESSEL.ROLL_DEG", unit_si=UnitSI.DEGREE, protocol=Protocol.INTERNAL)) + store.register_tag(Tag(id="VESSEL.PITCH_DEG", unit_si=UnitSI.DEGREE, protocol=Protocol.INTERNAL)) + sub = N2KSubscriber(store) + await sub.start() + try: + data = encode_pgn_attitude(yaw_deg=0.0, pitch_deg=-2.0, roll_deg=5.0) + sub.inject_frame( + PgnFrame( + pgn=PGN_ATTITUDE, + source=0, + data=data, + timestamp=__import__("datetime").datetime.now(__import__("datetime").timezone.utc), + ) + ) + await asyncio.sleep(0.1) + roll = store.get("VESSEL.ROLL_DEG") + pitch = store.get("VESSEL.PITCH_DEG") + assert roll is not None and roll.value is not None + assert abs(float(roll.value) - 5.0) < 0.1 + assert pitch is not None and pitch.value is not None + assert abs(float(pitch.value) - (-2.0)) < 0.1 + finally: + await sub.stop() diff --git a/vmssailor/runtime/server/api.py b/vmssailor/runtime/server/api.py index bf181b2..cedc144 100644 --- a/vmssailor/runtime/server/api.py +++ b/vmssailor/runtime/server/api.py @@ -145,6 +145,69 @@ def create_app(runtime: RuntimeApp) -> FastAPI: raise HTTPException(status_code=404, detail=f"Alarma '{alarm_id}' no activa.") return acked.model_dump(mode="json") + # ----- Log Book ---------------------------------------------------- + + @app.get("/logbook") + def logbook_entries( + kind: str | None = None, + since: str | None = None, + until: str | None = None, + limit: int = 200, + ) -> list[dict[str, Any]]: + from vmssailor.runtime.server.logbook import LogEntryKind + + kind_enum = LogEntryKind(kind) if kind else None + since_dt = datetime.fromisoformat(since) if since else None + until_dt = datetime.fromisoformat(until) if until else None + entries = runtime.logbook.query( + kind=kind_enum, since=since_dt, until=until_dt, limit=limit + ) + return [ + { + "id": e.id, + "kind": e.kind.value, + "timestamp": e.timestamp.isoformat(), + "summary": e.summary, + "payload": e.payload, + "user": e.user, + "hash": e.hash[:16] + "…", + } + for e in entries + ] + + @app.post("/logbook") + def logbook_append_manual( + kind: str = "manual", + summary: str = "", + user: str = "operator", + ) -> dict[str, Any]: + import asyncio + + from vmssailor.runtime.server.logbook import LogEntryKind + + try: + kind_enum = LogEntryKind(kind) + except ValueError as err: + raise HTTPException(status_code=400, detail=f"Kind inválido: {kind}") from err + if not summary: + raise HTTPException(status_code=400, detail="summary requerido") + + async def _do_append(): + return await runtime.logbook.append(kind_enum, summary, user=user) + + entry = asyncio.run_coroutine_threadsafe(_do_append(), asyncio.get_event_loop()).result(timeout=2) + return { + "id": entry.id, + "kind": entry.kind.value, + "timestamp": entry.timestamp.isoformat(), + "hash": entry.hash, + } + + @app.get("/logbook/verify") + def logbook_verify() -> dict[str, Any]: + ok, broken = runtime.logbook.verify_chain() + return {"ok": ok, "broken_entry_ids": broken} + # ----- WebSocket --------------------------------------------------- @app.websocket("/ws/realtime") diff --git a/vmssailor/runtime/server/logbook.py b/vmssailor/runtime/server/logbook.py new file mode 100644 index 0000000..f22a64d --- /dev/null +++ b/vmssailor/runtime/server/logbook.py @@ -0,0 +1,374 @@ +"""Log Book naval básico (Sprint 5). + +Auto-detección de: +- Arranque/parada de motores (cuando RPM cruza umbral) +- Eventos de alarma con timestamp + ack +- Snapshots periódicos cada 15 min con motor corriendo + +Cada entrada se firma con SHA-256 + previous_hash formando una cadena +inmutable (no requiere autoridad externa). + +Sprint 13+: cumplimiento MARPOL/SOLAS, PDF oficial, Oil Record Book separado. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import sqlite3 +from contextlib import suppress +from dataclasses import dataclass +from datetime import UTC, datetime +from enum import StrEnum +from pathlib import Path +from typing import Any + +from vmssailor.runtime.server.tag_store import TagStore, TagValue + +logger = logging.getLogger(__name__) + + +class LogEntryKind(StrEnum): + ENGINE_START = "engine_start" + ENGINE_STOP = "engine_stop" + ALARM_ACTIVE = "alarm_active" + ALARM_ACK = "alarm_ack" + ALARM_CLEARED = "alarm_cleared" + AUTHORITY_TRANSFER = "authority_transfer" + OVERRIDE = "override" + SNAPSHOT = "snapshot" + MANUAL = "manual" + + +@dataclass(slots=True) +class LogEntry: + """Una entrada inmutable del log book.""" + + id: int + kind: LogEntryKind + timestamp: datetime + summary: str + payload: dict[str, Any] + user: str = "system" + prev_hash: str = "" + hash: str = "" + + def compute_hash(self) -> str: + """Hash de la entrada. `id` no entra (se asigna post-insert).""" + material = { + "kind": self.kind.value, + "timestamp": self.timestamp.isoformat(), + "summary": self.summary, + "payload": self.payload, + "user": self.user, + "prev_hash": self.prev_hash, + } + blob = json.dumps(material, sort_keys=True, ensure_ascii=False).encode("utf-8") + return hashlib.sha256(blob).hexdigest() + + +_SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS logbook ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + kind TEXT NOT NULL, + timestamp TEXT NOT NULL, + summary TEXT NOT NULL, + payload TEXT NOT NULL, + user TEXT NOT NULL DEFAULT 'system', + prev_hash TEXT NOT NULL DEFAULT '', + hash TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_logbook_ts ON logbook(timestamp); +CREATE INDEX IF NOT EXISTS idx_logbook_kind ON logbook(kind); +""" + + +class LogBook: + """Log book con cadena inmutable de hashes (anti-tamper básico).""" + + def __init__(self, db_path: Path | str | None = None) -> None: + self._db_path = str(db_path) if db_path else ":memory:" + self._conn = sqlite3.connect(self._db_path, check_same_thread=False) + self._conn.executescript(_SCHEMA_SQL) + self._conn.commit() + self._last_hash = self._load_last_hash() + self._lock = asyncio.Lock() + + def _load_last_hash(self) -> str: + row = self._conn.execute( + "SELECT hash FROM logbook ORDER BY id DESC LIMIT 1" + ).fetchone() + return row[0] if row else "" + + async def append( + self, + kind: LogEntryKind, + summary: str, + payload: dict[str, Any] | None = None, + *, + user: str = "system", + timestamp: datetime | None = None, + ) -> LogEntry: + async with self._lock: + ts = timestamp or datetime.now(UTC) + entry = LogEntry( + id=-1, + kind=kind, + timestamp=ts, + summary=summary, + payload=payload or {}, + user=user, + prev_hash=self._last_hash, + ) + entry.hash = entry.compute_hash() + cur = self._conn.execute( + "INSERT INTO logbook (kind, timestamp, summary, payload, user, prev_hash, hash) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + entry.kind.value, + entry.timestamp.isoformat(), + entry.summary, + json.dumps(entry.payload, ensure_ascii=False), + entry.user, + entry.prev_hash, + entry.hash, + ), + ) + self._conn.commit() + entry.id = int(cur.lastrowid) + self._last_hash = entry.hash + return entry + + def query( + self, + *, + kind: LogEntryKind | None = None, + since: datetime | None = None, + until: datetime | None = None, + limit: int = 500, + ) -> list[LogEntry]: + sql = "SELECT id, kind, timestamp, summary, payload, user, prev_hash, hash FROM logbook WHERE 1=1" + params: list[Any] = [] + if kind: + sql += " AND kind = ?" + params.append(kind.value) + if since: + sql += " AND timestamp >= ?" + params.append(since.isoformat()) + if until: + sql += " AND timestamp <= ?" + params.append(until.isoformat()) + sql += " ORDER BY id DESC LIMIT ?" + params.append(limit) + rows = self._conn.execute(sql, params).fetchall() + return [ + LogEntry( + id=r[0], + kind=LogEntryKind(r[1]), + timestamp=datetime.fromisoformat(r[2]), + summary=r[3], + payload=json.loads(r[4]), + user=r[5], + prev_hash=r[6], + hash=r[7], + ) + for r in rows + ] + + def verify_chain(self) -> tuple[bool, list[int]]: + """Verifica integridad de la cadena de hashes. + + Devuelve `(ok, broken_ids)`. Si `ok` es True, broken_ids es []. + """ + rows = self._conn.execute( + "SELECT id, kind, timestamp, summary, payload, user, prev_hash, hash FROM logbook ORDER BY id" + ).fetchall() + prev = "" + broken: list[int] = [] + for r in rows: + entry = LogEntry( + id=r[0], + kind=LogEntryKind(r[1]), + timestamp=datetime.fromisoformat(r[2]), + summary=r[3], + payload=json.loads(r[4]), + user=r[5], + prev_hash=r[6], + hash=r[7], + ) + if entry.prev_hash != prev: + broken.append(entry.id) + if entry.compute_hash() != entry.hash: + broken.append(entry.id) + prev = entry.hash + return (not broken, sorted(set(broken))) + + def stats(self) -> dict[str, Any]: + from collections import Counter + + rows = self._conn.execute("SELECT kind, COUNT(*) FROM logbook GROUP BY kind").fetchall() + counts: Counter[str] = Counter() + for k, n in rows: + counts[k] = int(n) + total = self._conn.execute("SELECT COUNT(*) FROM logbook").fetchone()[0] + return {"total_entries": int(total), "by_kind": dict(counts)} + + def close(self) -> None: + self._conn.close() + + +# ============================================================================ +# Auto writers (suscriptos al tag_store y al alarm engine) +# ============================================================================ + + +@dataclass(slots=True) +class EngineLogConfig: + """Configuración del auto-detector de arranque/parada de motores.""" + + rpm_running_threshold: float = 600.0 + """Umbral RPM por encima del cual consideramos "motor corriendo".""" + + rpm_stopped_threshold: float = 50.0 + """Umbral RPM por debajo del cual consideramos "motor parado".""" + + sustain_seconds: float = 5.0 + """Persistencia mínima para confirmar transición (anti-flicker).""" + + +class EngineLogWriter: + """Detecta arranques y paradas de motores monitoreando RPM.""" + + def __init__( + self, + tag_store: TagStore, + logbook: LogBook, + config: EngineLogConfig | None = None, + ) -> None: + self._store = tag_store + self._logbook = logbook + self._config = config or EngineLogConfig() + # state: tag_id -> ("running"|"stopped"|"unknown", since_ts) + self._state: dict[str, tuple[str, datetime]] = {} + self._stop = False + self._task: asyncio.Task | None = None + self._sub_q: asyncio.Queue[TagValue] | None = None + + async def start(self) -> None: + self._sub_q = self._store.subscribe(maxsize=2048) + self._task = asyncio.create_task(self._loop()) + + async def stop(self) -> None: + self._stop = True + if self._task: + self._task.cancel() + with suppress(asyncio.CancelledError): + await self._task + if self._sub_q: + self._store.unsubscribe(self._sub_q) + + async def _loop(self) -> None: + try: + while not self._stop: + tv = await self._sub_q.get() + if not tv.tag_id.endswith(".RPM"): + continue + if not isinstance(tv.value, (int, float)): + continue + await self._evaluate(tv) + except asyncio.CancelledError: + pass + + async def _evaluate(self, tv: TagValue) -> None: + rpm = float(tv.value) + running = rpm > self._config.rpm_running_threshold + stopped = rpm < self._config.rpm_stopped_threshold + if not running and not stopped: + return + new_state = "running" if running else "stopped" + prev = self._state.get(tv.tag_id) + if prev is None: + self._state[tv.tag_id] = (new_state, tv.timestamp) + return + prev_state, prev_ts = prev + if prev_state == new_state: + return + # Transition — check sustain + if (tv.timestamp - prev_ts).total_seconds() < self._config.sustain_seconds: + return + self._state[tv.tag_id] = (new_state, tv.timestamp) + engine_name = tv.tag_id.removesuffix(".RPM") + if new_state == "running": + await self._logbook.append( + LogEntryKind.ENGINE_START, + f"Motor {engine_name} arrancó (RPM={rpm:.0f})", + payload={"engine": engine_name, "rpm": rpm}, + timestamp=tv.timestamp, + ) + else: + await self._logbook.append( + LogEntryKind.ENGINE_STOP, + f"Motor {engine_name} parado (RPM={rpm:.0f})", + payload={"engine": engine_name, "rpm": rpm}, + timestamp=tv.timestamp, + ) + + +class SnapshotLogWriter: + """Toma snapshots periódicos de tags clave (default cada 15 min con motor corriendo).""" + + def __init__( + self, + tag_store: TagStore, + logbook: LogBook, + *, + period_s: float = 900.0, + require_running_engine: bool = True, + ) -> None: + self._store = tag_store + self._logbook = logbook + self._period_s = period_s + self._require_running = require_running_engine + self._stop = False + self._task: asyncio.Task | None = None + + async def start(self) -> None: + self._task = asyncio.create_task(self._loop()) + + async def stop(self) -> None: + self._stop = True + if self._task: + self._task.cancel() + with suppress(asyncio.CancelledError): + await self._task + + async def _loop(self) -> None: + try: + while not self._stop: + await asyncio.sleep(self._period_s) + await self._take_snapshot() + except asyncio.CancelledError: + pass + + async def _take_snapshot(self) -> None: + if self._require_running and not self._any_engine_running(): + return + snapshot: dict[str, Any] = {} + for tag_id, tv in self._store.all_values().items(): + snapshot[tag_id] = { + "value": tv.value, + "quality": tv.quality.value, + } + await self._logbook.append( + LogEntryKind.SNAPSHOT, + f"Snapshot periódico ({len(snapshot)} tags)", + payload={"tags": snapshot}, + ) + + def _any_engine_running(self) -> bool: + for tag_id, tv in self._store.all_values().items(): + if tag_id.endswith(".RPM") and isinstance(tv.value, (int, float)) and tv.value > 600: + return True + return False diff --git a/vmssailor/runtime/server/nmea2000.py b/vmssailor/runtime/server/nmea2000.py new file mode 100644 index 0000000..4dbe7d7 --- /dev/null +++ b/vmssailor/runtime/server/nmea2000.py @@ -0,0 +1,306 @@ +"""Driver NMEA 2000 (Sprint 5). + +Sprint 5 entrega: +- Parser de PGNs relevantes para VMS-Sailor: 127257 (Attitude), 127488/489 + (Engine Parameters), 127505 (Fluid Level), 127508 (Battery Status), 129025 + (Position Rapid) +- N2KPublisher: convierte tag updates a PGNs y los hace disponibles para envío +- N2KSimulator: produce PGN frames simulados (sin hardware CAN) + +El driver REAL contra hardware CAN (USB-CAN, Actisense NGT-1, Yacht Devices +YDNU-02) llega en Sprint 12+ junto con el firmware ESP32. +""" + +from __future__ import annotations + +import asyncio +import logging +import math +import struct +from collections.abc import Callable +from contextlib import suppress +from dataclasses import dataclass +from datetime import UTC, datetime + +from vmssailor.core.enums import Quality +from vmssailor.runtime.server.tag_store import TagStore + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# PGN definitions +# ============================================================================ + + +@dataclass(slots=True) +class PgnFrame: + """Marco NMEA 2000 simplificado.""" + + pgn: int + source: int + data: bytes + timestamp: datetime + + def __str__(self) -> str: + return f"PGN{self.pgn:>6d} src={self.source:>3d} len={len(self.data)} ts={self.timestamp.isoformat()}" + + +# PGN 127257 — Attitude (yaw, pitch, roll) +PGN_ATTITUDE = 127257 + +# PGN 127488 — Engine Parameters Rapid (RPM, boost, trim) +PGN_ENGINE_RAPID = 127488 + +# PGN 127489 — Engine Parameters Dynamic (oil press, oil temp, coolant temp, etc) +PGN_ENGINE_DYNAMIC = 127489 + +# PGN 127505 — Fluid Level +PGN_FLUID_LEVEL = 127505 + +# PGN 127508 — Battery Status +PGN_BATTERY = 127508 + +# PGN 129025 — Position Rapid +PGN_POSITION = 129025 + + +def parse_pgn_attitude(data: bytes) -> dict[str, float]: + """Parse PGN 127257 Attitude. 8 bytes: + byte 0: SID + bytes 1-2: yaw (int16, 0.0001 rad) + bytes 3-4: pitch (int16, 0.0001 rad) + bytes 5-6: roll (int16, 0.0001 rad) + """ + if len(data) < 7: + raise ValueError("PGN 127257 requires at least 7 bytes") + yaw_raw, pitch_raw, roll_raw = struct.unpack(" bytes: + """Codifica PGN 127257 a bytes (para publicar al backbone).""" + yaw = int(math.radians(yaw_deg) / 0.0001) + pitch = int(math.radians(pitch_deg) / 0.0001) + roll = int(math.radians(roll_deg) / 0.0001) + return struct.pack(" dict[str, float]: + """Parse PGN 127488 Engine Parameters Rapid. + byte 0: engine instance + bytes 1-2: speed (uint16, 0.25 rpm) + bytes 3-4: boost pressure (uint16, hPa) + byte 5: trim/tilt (int8, %) + """ + if len(data) < 6: + raise ValueError("PGN 127488 requires at least 6 bytes") + instance = data[0] + rpm_raw = struct.unpack(" bytes: + rpm_raw = int(rpm / 0.25) + boost_raw = int(boost_pa / 100.0) + return struct.pack(" dict[str, float]: + """Parse PGN 127505 Fluid Level. + bits 0-3: instance + bits 4-7: fluid type (0=fuel, 1=water, 2=grey, 3=waste, 4=live well, 5=oil, 6=black) + bytes 2-3: level (int16, 0.004 %) + bytes 4-7: capacity (uint32, 0.1 L) + """ + if len(data) < 8: + raise ValueError("PGN 127505 requires 8 bytes") + instance_type = data[0] + instance = instance_type & 0x0F + fluid_type = (instance_type >> 4) & 0x0F + level_raw = struct.unpack(" dict[str, float]: + """Parse PGN 127508 Battery Status. + byte 0: instance + bytes 1-2: voltage (uint16, 0.01 V) + bytes 3-4: current (int16, 0.1 A) + bytes 5-6: temperature (uint16, 0.01 K - 273.15 for C) + """ + if len(data) < 7: + raise ValueError("PGN 127508 requires at least 7 bytes") + instance = data[0] + v_raw = struct.unpack(" None: + self._store = tag_store + self._transport = transport or (lambda _f: None) + self._addr = device_address + self._stop = False + self._task: asyncio.Task | None = None + + async def start(self, period_s: float = 0.5) -> None: + self._period_s = period_s + self._task = asyncio.create_task(self._loop()) + + async def stop(self) -> None: + self._stop = True + if self._task: + self._task.cancel() + with suppress(asyncio.CancelledError): + await self._task + + async def _loop(self) -> None: + try: + while not self._stop: + self._publish_engines() + self._publish_attitude_stub() + await asyncio.sleep(self._period_s) + except asyncio.CancelledError: + pass + + def _publish_engines(self) -> None: + for i, (tag_id, _tag) in enumerate(self._store.all_tags().items()): + if not tag_id.endswith(".RPM"): + continue + tv = self._store.get(tag_id) + if tv is None or not isinstance(tv.value, (int, float)): + continue + data = encode_pgn_engine_rapid(instance=i, rpm=float(tv.value)) + self._transport( + PgnFrame( + pgn=PGN_ENGINE_RAPID, + source=self._addr, + data=data, + timestamp=datetime.now(UTC), + ) + ) + + def _publish_attitude_stub(self) -> None: + # Sprint 5: si hay tags de actitud sintéticos, los publicamos. + roll = self._store.get("VESSEL.ROLL_DEG") + pitch = self._store.get("VESSEL.PITCH_DEG") + if roll is None or pitch is None: + return + if not isinstance(roll.value, (int, float)) or not isinstance(pitch.value, (int, float)): + return + data = encode_pgn_attitude(yaw_deg=0.0, pitch_deg=float(pitch.value), roll_deg=float(roll.value)) + self._transport( + PgnFrame( + pgn=PGN_ATTITUDE, + source=self._addr, + data=data, + timestamp=datetime.now(UTC), + ) + ) + + +# ============================================================================ +# Subscriber (consume PGNs from backbone) +# ============================================================================ + + +class N2KSubscriber: + """Suscribe a PGNs entrantes y los traduce a tag updates. + + Sprint 5: `inject_frame(frame)` para testing. + Sprint 12: bucket de USB-CAN/Actisense alimenta los frames. + """ + + def __init__(self, tag_store: TagStore) -> None: + self._store = tag_store + self._frames: asyncio.Queue[PgnFrame] = asyncio.Queue(maxsize=4096) + self._stop = False + self._task: asyncio.Task | None = None + + async def start(self) -> None: + self._task = asyncio.create_task(self._loop()) + + async def stop(self) -> None: + self._stop = True + if self._task: + self._task.cancel() + with suppress(asyncio.CancelledError): + await self._task + + def inject_frame(self, frame: PgnFrame) -> None: + """Inyecta un frame para testing o para integración con USB-CAN.""" + with suppress(asyncio.QueueFull): + self._frames.put_nowait(frame) + + async def _loop(self) -> None: + try: + while not self._stop: + frame = await self._frames.get() + await self._handle_frame(frame) + except asyncio.CancelledError: + pass + + async def _handle_frame(self, frame: PgnFrame) -> None: + try: + if frame.pgn == PGN_ATTITUDE: + att = parse_pgn_attitude(frame.data) + if "VESSEL.ROLL_DEG" in self._store: + await self._store.update( + "VESSEL.ROLL_DEG", att["roll_deg"], quality=Quality.GOOD + ) + if "VESSEL.PITCH_DEG" in self._store: + await self._store.update( + "VESSEL.PITCH_DEG", att["pitch_deg"], quality=Quality.GOOD + ) + elif frame.pgn == PGN_ENGINE_RAPID: + eng = parse_pgn_engine_rapid(frame.data) + # Convención: instance N corresponde a tag ME_{N}.RPM si existe + inst = int(eng["instance"]) + candidate = f"ME_INST_{inst}.RPM" + if candidate in self._store: + await self._store.update(candidate, eng["rpm"], quality=Quality.GOOD) + except Exception: + logger.exception("Error handling PGN %s", frame.pgn) diff --git a/vmssailor/runtime/server/runtime_app.py b/vmssailor/runtime/server/runtime_app.py index 46b92ae..8cbbdd7 100644 --- a/vmssailor/runtime/server/runtime_app.py +++ b/vmssailor/runtime/server/runtime_app.py @@ -1,6 +1,7 @@ -"""Orquestador del Runtime server: ensambla tag_store + historian + alarm engine + drivers. +"""Orquestador del Runtime server: ensambla servicios. -Sprint 4: simulator-only. Sprint 5: Modbus + NMEA 2000 reales. +Sprint 4: tag_store + historian + alarm engine + simulator + API. +Sprint 5: + log book (engine_log_writer + snapshot_writer) + nmea2000 stub. """ from __future__ import annotations @@ -10,10 +11,18 @@ from dataclasses import dataclass, field from pathlib import Path from vmssailor.core.alarm import Alarm +from vmssailor.core.enums import AlarmState from vmssailor.core.project import Project from vmssailor.runtime.server.alarm_engine import AlarmEngine from vmssailor.runtime.server.drivers import SimulatorDriver from vmssailor.runtime.server.historian import Historian +from vmssailor.runtime.server.logbook import ( + EngineLogWriter, + LogBook, + LogEntryKind, + SnapshotLogWriter, +) +from vmssailor.runtime.server.nmea2000 import N2KPublisher, N2KSubscriber from vmssailor.runtime.server.tag_store import TagStore logger = logging.getLogger(__name__) @@ -21,25 +30,39 @@ logger = logging.getLogger(__name__) @dataclass(slots=True) class RuntimeApp: - """Conjunto de servicios del Runtime. Se construye con `build_runtime()`.""" + """Conjunto de servicios del Runtime.""" project: Project tag_store: TagStore historian: Historian alarm_engine: AlarmEngine simulator: SimulatorDriver + logbook: LogBook + engine_log_writer: EngineLogWriter + snapshot_writer: SnapshotLogWriter + n2k_publisher: N2KPublisher + n2k_subscriber: N2KSubscriber alarm_events: list[Alarm] = field(default_factory=list) async def start(self) -> None: await self.historian.start(self.tag_store) await self.alarm_engine.start() await self.simulator.start() - logger.info("RuntimeApp started — %d tags, simulator @ 0.5s", len(self.tag_store)) + await self.engine_log_writer.start() + await self.snapshot_writer.start() + await self.n2k_publisher.start() + await self.n2k_subscriber.start() + logger.info("RuntimeApp started — %d tags", len(self.tag_store)) async def stop(self) -> None: + await self.n2k_subscriber.stop() + await self.n2k_publisher.stop() + await self.snapshot_writer.stop() + await self.engine_log_writer.stop() await self.simulator.stop() await self.alarm_engine.stop() await self.historian.stop() + self.logbook.close() logger.info("RuntimeApp stopped") @@ -47,21 +70,58 @@ def build_runtime( project: Project, *, historian_db: Path | str | None = None, + logbook_db: Path | str | None = None, simulator_tick_s: float = 0.5, + snapshot_period_s: float = 900.0, ) -> RuntimeApp: """Construye un Runtime listo para usar (no arrancado todavía).""" tag_store = TagStore() tag_store.register_many(project.tags) historian = Historian(historian_db) + logbook = LogBook(logbook_db) alarm_events: list[Alarm] = [] + # Strong refs para tareas async que persisten más allá del callback + _alarm_log_tasks: set = set() def _on_alarm_event(alarm: Alarm) -> None: alarm_events.append(alarm) + # Auto-log al log book (fire and forget) + import asyncio + + kind_map = { + AlarmState.ACTIVE: LogEntryKind.ALARM_ACTIVE, + AlarmState.ACK: LogEntryKind.ALARM_ACK, + AlarmState.CLEARED: LogEntryKind.ALARM_CLEARED, + } + kind = kind_map.get(alarm.state, LogEntryKind.ALARM_ACTIVE) + try: + loop = asyncio.get_running_loop() + task = loop.create_task( + logbook.append( + kind, + f"[{alarm.priority.value}] {alarm.tag_id}: {alarm.message}", + payload={"alarm_id": alarm.id, "value": alarm.value_at_trigger}, + user=alarm.acknowledged_by or "system", + timestamp=alarm.timestamp_ack or alarm.timestamp_active, + ) + ) + _alarm_log_tasks.add(task) + task.add_done_callback(_alarm_log_tasks.discard) + except RuntimeError: + # No event loop — silencioso (puede ocurrir en teardown) + pass alarm_engine = AlarmEngine(tag_store, on_alarm_event=_on_alarm_event) simulator = SimulatorDriver(tag_store, tick_period_s=simulator_tick_s) + engine_log_writer = EngineLogWriter(tag_store, logbook) + snapshot_writer = SnapshotLogWriter( + tag_store, logbook, period_s=snapshot_period_s + ) + + n2k_publisher = N2KPublisher(tag_store) + n2k_subscriber = N2KSubscriber(tag_store) return RuntimeApp( project=project, @@ -69,5 +129,10 @@ def build_runtime( historian=historian, alarm_engine=alarm_engine, simulator=simulator, + logbook=logbook, + engine_log_writer=engine_log_writer, + snapshot_writer=snapshot_writer, + n2k_publisher=n2k_publisher, + n2k_subscriber=n2k_subscriber, alarm_events=alarm_events, )