Files
alro65 36dda85259 sprint-4: Runtime server base — tag_store + historian + alarm engine + API FastAPI
Arquitectura asincronica completa para correr 24/7 a bordo del buque.

vmssailor/runtime/server/tag_store.py
- TagStore in-memory con pub/sub asyncio.Queue
- register_tag/register_many con valores iniciales
- TagValue dataclass: value + quality + timestamp + raw_value
- subscribe()/unsubscribe() para fan-out
- stats() con breakdown por Quality

vmssailor/runtime/server/historian.py
- Historian DuckDB embebido (in-memory o archivo)
- Reader loop suscrito al tag_store + buffer + flush periodico (1s)
- query(tag_id, since, until, limit) para series temporales
- Soporta valores numericos y boolean separadamente

vmssailor/runtime/server/alarm_engine.py
- Suscriptor al tag_store que evalua AlarmConfig por update
- Operators: >, >=, <, <=, ==, !=
- Hysteresis correcta: aplica al SALIR de alarma, no a entrar
- Delay configurable (persistencia minima antes de disparar)
- Estados: ACTIVE -> ACK (con user) -> CLEARED
- ack(alarm_id, user) reconoce sin clear

vmssailor/runtime/server/drivers.py
- SimulatorDriver: produce valores sinteticos creibles por UnitSI
- Tick configurable (default 0.5s)
- Respeta range_normal_min/max del tag para mantenerse en rango
- Permite probar UI/API sin hardware ni Modbus real

vmssailor/runtime/server/runtime_app.py
- RuntimeApp dataclass ensambla todos los servicios
- build_runtime(project) construye listo para correr
- start()/stop() async lifecycle ordenado

vmssailor/runtime/server/api.py
- FastAPI app con lifespan que arranca/detiene el runtime
- GET /health, /project
- GET /tags, /tags/{id}, /tags/{id}/history
- GET /alarms, POST /alarms/{id}/ack
- WebSocket /ws/realtime con snapshot inicial + push + heartbeat

runtime_server_main.py
- Entry point con argparse: --vmsproj, --host, --port, --db
- Sin --vmsproj usa proyecto demo Sprint 0 (genera simulator vivo)
- Lanza con uvicorn

Tests (tests/runtime/, 16 nuevos, total 142/142):
- test_tag_store: register, update, subscribe, unsubscribe, stats
- test_historian: roundtrip query, stats
- test_alarm_engine: fire when below, hysteresis clears, ack
- test_api: health, project, tags listing, history, alarms via httpx.ASGITransport

Para correr el servidor en vivo:
    uv run python runtime_server_main.py --verbose
Luego en otro shell:
    curl http://127.0.0.1:8765/health
    curl http://127.0.0.1:8765/tags | jq .

Dependencias agregadas:
- fastapi >=0.110
- uvicorn[standard] >=0.27
- websockets >=12.0
- duckdb >=0.10
- pymodbus >=3.5 (Sprint 5)
- python-can >=4.3 (Sprint 5)
- httpx >=0.27 (testing + cliente HTTP)

142/142 pytest verde, ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 20:03:19 -04:00

115 lines
3.9 KiB
Python

"""Drivers de protocolo (Sprint 4: Simulator).
Sprint 5 traerá los drivers reales:
- modbus_rtu.py (pymodbus, serial COM)
- nmea2000.py (python-can)
- j1939.py
- card_discovery.py
Por ahora, `SimulatorDriver` produce valores sintéticos creíbles para los
tags registrados. Sirve para:
- Probar el alarm engine + historian sin hardware
- Probar el cliente desktop y mobile contra datos vivos
- Tests de integración
"""
from __future__ import annotations
import asyncio
import logging
import math
import random
from collections.abc import Callable
from contextlib import suppress
from vmssailor.core.enums import Quality, UnitSI
from vmssailor.runtime.server.tag_store import TagStore
logger = logging.getLogger(__name__)
class SimulatorDriver:
"""Genera valores sintéticos para todos los tags numéricos del store.
Estrategia por unidad:
- rpm: oscila entre 1200 y 1800 ± ruido
- bar: estable cerca del medio del rango con jitter
- C (temperatura): se calienta hacia 80 °C con jitter
- %: 30-70 % oscilación
- V: 27-28 V (24V system) o 13-14 V (12V system)
- bool: random toggle ocasional
Tags pueden tener `range_normal_min/max`; respeta esos rangos.
"""
def __init__(self, tag_store: TagStore, tick_period_s: float = 0.5) -> None:
self._store = tag_store
self._tick_s = tick_period_s
self._t0 = 0.0
self._stop = False
self._task: asyncio.Task | None = None
self._on_tick: Callable[[], None] | None = None
async def start(self) -> None:
self._task = asyncio.create_task(self._loop())
async def stop(self) -> None:
self._stop = True
if self._task is not None:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _loop(self) -> None:
try:
while not self._stop:
await self._tick()
await asyncio.sleep(self._tick_s)
except asyncio.CancelledError:
pass
async def _tick(self) -> None:
self._t0 += self._tick_s
for tag_id, tag in self._store.all_tags().items():
value = self._synthesize(tag_id, tag.unit_si, tag)
if value is None:
continue
await self._store.update(tag_id, value, quality=Quality.GOOD)
if self._on_tick is not None:
self._on_tick()
def _synthesize(self, tag_id: str, unit: UnitSI, tag) -> float | bool | None:
t = self._t0
if unit == UnitSI.RPM:
base = 1500
return base + 250 * math.sin(t / 30 + hash(tag_id) % 100) + random.uniform(-15, 15)
if unit == UnitSI.BAR:
mid = (
(tag.range_normal_min + tag.range_normal_max) / 2
if tag.range_normal_min is not None and tag.range_normal_max is not None
else 4.5
)
return mid + 0.2 * math.sin(t / 5 + hash(tag_id) % 50) + random.uniform(-0.05, 0.05)
if unit == UnitSI.DEGREE_CELSIUS:
target = 82.0
return target + 8 * math.sin(t / 60 + hash(tag_id) % 30) + random.uniform(-0.5, 0.5)
if unit == UnitSI.PERCENT:
return 50 + 25 * math.sin(t / 20 + hash(tag_id) % 20) + random.uniform(-2, 2)
if unit == UnitSI.VOLT:
mid = (
(tag.range_normal_min + tag.range_normal_max) / 2
if tag.range_normal_min is not None and tag.range_normal_max is not None
else 27.5
)
return mid + random.uniform(-0.2, 0.2)
if unit == UnitSI.HERTZ:
return 50.0 + random.uniform(-0.1, 0.1)
if unit == UnitSI.AMPERE:
return 50 + 30 * math.sin(t / 25 + hash(tag_id) % 40)
if unit == UnitSI.HOUR:
return 1200 + t / 3600 # accumulating engine hours
if unit == UnitSI.BOOL:
# 99% del tiempo False, eventualmente True
return random.random() < 0.02
return None