Files
alro65 36dda85259 sprint-4: Runtime server base — tag_store + historian + alarm engine + API FastAPI
Arquitectura asincronica completa para correr 24/7 a bordo del buque.

vmssailor/runtime/server/tag_store.py
- TagStore in-memory con pub/sub asyncio.Queue
- register_tag/register_many con valores iniciales
- TagValue dataclass: value + quality + timestamp + raw_value
- subscribe()/unsubscribe() para fan-out
- stats() con breakdown por Quality

vmssailor/runtime/server/historian.py
- Historian DuckDB embebido (in-memory o archivo)
- Reader loop suscrito al tag_store + buffer + flush periodico (1s)
- query(tag_id, since, until, limit) para series temporales
- Soporta valores numericos y boolean separadamente

vmssailor/runtime/server/alarm_engine.py
- Suscriptor al tag_store que evalua AlarmConfig por update
- Operators: >, >=, <, <=, ==, !=
- Hysteresis correcta: aplica al SALIR de alarma, no a entrar
- Delay configurable (persistencia minima antes de disparar)
- Estados: ACTIVE -> ACK (con user) -> CLEARED
- ack(alarm_id, user) reconoce sin clear

vmssailor/runtime/server/drivers.py
- SimulatorDriver: produce valores sinteticos creibles por UnitSI
- Tick configurable (default 0.5s)
- Respeta range_normal_min/max del tag para mantenerse en rango
- Permite probar UI/API sin hardware ni Modbus real

vmssailor/runtime/server/runtime_app.py
- RuntimeApp dataclass ensambla todos los servicios
- build_runtime(project) construye listo para correr
- start()/stop() async lifecycle ordenado

vmssailor/runtime/server/api.py
- FastAPI app con lifespan que arranca/detiene el runtime
- GET /health, /project
- GET /tags, /tags/{id}, /tags/{id}/history
- GET /alarms, POST /alarms/{id}/ack
- WebSocket /ws/realtime con snapshot inicial + push + heartbeat

runtime_server_main.py
- Entry point con argparse: --vmsproj, --host, --port, --db
- Sin --vmsproj usa proyecto demo Sprint 0 (genera simulator vivo)
- Lanza con uvicorn

Tests (tests/runtime/, 16 nuevos, total 142/142):
- test_tag_store: register, update, subscribe, unsubscribe, stats
- test_historian: roundtrip query, stats
- test_alarm_engine: fire when below, hysteresis clears, ack
- test_api: health, project, tags listing, history, alarms via httpx.ASGITransport

Para correr el servidor en vivo:
    uv run python runtime_server_main.py --verbose
Luego en otro shell:
    curl http://127.0.0.1:8765/health
    curl http://127.0.0.1:8765/tags | jq .

Dependencias agregadas:
- fastapi >=0.110
- uvicorn[standard] >=0.27
- websockets >=12.0
- duckdb >=0.10
- pymodbus >=3.5 (Sprint 5)
- python-can >=4.3 (Sprint 5)
- httpx >=0.27 (testing + cliente HTTP)

142/142 pytest verde, ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 20:03:19 -04:00

93 lines
2.8 KiB
Python

"""Tests del Alarm Engine (Sprint 4)."""
from __future__ import annotations
import asyncio
import pytest
from vmssailor.core.alarm import Alarm
from vmssailor.core.enums import AlarmPriority, AlarmState, Protocol, UnitSI
from vmssailor.core.tag import AlarmConfig, Tag
from vmssailor.runtime.server.alarm_engine import AlarmEngine
from vmssailor.runtime.server.tag_store import TagStore
def _tag_with_alarm(low_threshold: float = 1.5, delay_s: float = 0.0) -> Tag:
return Tag(
id="X.PRESS",
unit_si=UnitSI.BAR,
protocol=Protocol.MODBUS_RTU,
address=1,
alarms=[
AlarmConfig(
id="X.PRESS.LOW",
threshold=low_threshold,
operator="<",
priority=AlarmPriority.EMERGENCY,
hysteresis=0.2,
delay_seconds=delay_s,
)
],
)
@pytest.mark.asyncio
async def test_alarm_fires_when_below_threshold():
store = TagStore()
store.register_tag(_tag_with_alarm(low_threshold=1.5))
events: list[Alarm] = []
engine = AlarmEngine(store, on_alarm_event=events.append)
await engine.start()
try:
await store.update("X.PRESS", 1.0) # debajo del threshold
await asyncio.sleep(0.05)
assert len(events) == 1
assert events[0].state == AlarmState.ACTIVE
assert events[0].priority == AlarmPriority.EMERGENCY
finally:
await engine.stop()
@pytest.mark.asyncio
async def test_alarm_clears_with_hysteresis():
store = TagStore()
store.register_tag(_tag_with_alarm(low_threshold=1.5))
events: list[Alarm] = []
engine = AlarmEngine(store, on_alarm_event=events.append)
await engine.start()
try:
await store.update("X.PRESS", 1.0)
await asyncio.sleep(0.05)
# Subiendo a 1.6 NO debería limpiar (dentro de la hysteresis +0.2 = 1.7)
await store.update("X.PRESS", 1.6)
await asyncio.sleep(0.05)
# Subiendo a 2.0 SÍ limpia
await store.update("X.PRESS", 2.0)
await asyncio.sleep(0.05)
states = [e.state for e in events]
assert AlarmState.ACTIVE in states
assert AlarmState.CLEARED in states
finally:
await engine.stop()
@pytest.mark.asyncio
async def test_alarm_ack():
store = TagStore()
store.register_tag(_tag_with_alarm(low_threshold=1.5))
events: list[Alarm] = []
engine = AlarmEngine(store, on_alarm_event=events.append)
await engine.start()
try:
await store.update("X.PRESS", 1.0)
await asyncio.sleep(0.05)
active = engine.active_alarms()
assert len(active) == 1
ack = engine.ack(active[0].id, user="operator1")
assert ack is not None
assert ack.state == AlarmState.ACK
assert ack.acknowledged_by == "operator1"
finally:
await engine.stop()