Files
alro65 36dda85259 sprint-4: Runtime server base — tag_store + historian + alarm engine + API FastAPI
Arquitectura asincronica completa para correr 24/7 a bordo del buque.

vmssailor/runtime/server/tag_store.py
- TagStore in-memory con pub/sub asyncio.Queue
- register_tag/register_many con valores iniciales
- TagValue dataclass: value + quality + timestamp + raw_value
- subscribe()/unsubscribe() para fan-out
- stats() con breakdown por Quality

vmssailor/runtime/server/historian.py
- Historian DuckDB embebido (in-memory o archivo)
- Reader loop suscrito al tag_store + buffer + flush periodico (1s)
- query(tag_id, since, until, limit) para series temporales
- Soporta valores numericos y boolean separadamente

vmssailor/runtime/server/alarm_engine.py
- Suscriptor al tag_store que evalua AlarmConfig por update
- Operators: >, >=, <, <=, ==, !=
- Hysteresis correcta: aplica al SALIR de alarma, no a entrar
- Delay configurable (persistencia minima antes de disparar)
- Estados: ACTIVE -> ACK (con user) -> CLEARED
- ack(alarm_id, user) reconoce sin clear

vmssailor/runtime/server/drivers.py
- SimulatorDriver: produce valores sinteticos creibles por UnitSI
- Tick configurable (default 0.5s)
- Respeta range_normal_min/max del tag para mantenerse en rango
- Permite probar UI/API sin hardware ni Modbus real

vmssailor/runtime/server/runtime_app.py
- RuntimeApp dataclass ensambla todos los servicios
- build_runtime(project) construye listo para correr
- start()/stop() async lifecycle ordenado

vmssailor/runtime/server/api.py
- FastAPI app con lifespan que arranca/detiene el runtime
- GET /health, /project
- GET /tags, /tags/{id}, /tags/{id}/history
- GET /alarms, POST /alarms/{id}/ack
- WebSocket /ws/realtime con snapshot inicial + push + heartbeat

runtime_server_main.py
- Entry point con argparse: --vmsproj, --host, --port, --db
- Sin --vmsproj usa proyecto demo Sprint 0 (genera simulator vivo)
- Lanza con uvicorn

Tests (tests/runtime/, 16 nuevos, total 142/142):
- test_tag_store: register, update, subscribe, unsubscribe, stats
- test_historian: roundtrip query, stats
- test_alarm_engine: fire when below, hysteresis clears, ack
- test_api: health, project, tags listing, history, alarms via httpx.ASGITransport

Para correr el servidor en vivo:
    uv run python runtime_server_main.py --verbose
Luego en otro shell:
    curl http://127.0.0.1:8765/health
    curl http://127.0.0.1:8765/tags | jq .

Dependencias agregadas:
- fastapi >=0.110
- uvicorn[standard] >=0.27
- websockets >=12.0
- duckdb >=0.10
- pymodbus >=3.5 (Sprint 5)
- python-can >=4.3 (Sprint 5)
- httpx >=0.27 (testing + cliente HTTP)

142/142 pytest verde, ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 20:03:19 -04:00

166 lines
5.4 KiB
Python

"""Historian DuckDB embebido.
Almacena series temporales de los tags con retención configurable.
Para Sprint 4 implementamos:
- Inserción batch (cada 1s) desde el subscriber del tag_store
- Query simple por tag_id + rango temporal
Sprint 5+: downsampling automático, compresión.
"""
from __future__ import annotations
import asyncio
import logging
from contextlib import suppress
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
import duckdb
from vmssailor.runtime.server.tag_store import TagStore, TagValue
logger = logging.getLogger(__name__)
_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS samples (
tag_id VARCHAR NOT NULL,
timestamp TIMESTAMP NOT NULL,
value DOUBLE,
bool_value BOOLEAN,
quality VARCHAR NOT NULL DEFAULT 'good'
);
CREATE INDEX IF NOT EXISTS idx_samples_tag_ts ON samples(tag_id, timestamp);
"""
class Historian:
"""DuckDB-backed historian. Apenas un wrapper async-friendly."""
def __init__(self, db_path: Path | str | None = None) -> None:
if db_path is None:
db_path = ":memory:"
self._db_path = str(db_path)
self._conn = duckdb.connect(self._db_path)
self._conn.execute(_SCHEMA_SQL)
self._buffer: list[tuple] = []
self._buffer_lock = asyncio.Lock()
self._flush_task: asyncio.Task | None = None
self._stopping = False
# ----- Lifecycle ---------------------------------------------------
async def start(self, tag_store: TagStore) -> None:
"""Comienza a suscribirse al tag store y a flush periodico."""
self._tag_store = tag_store
self._subscriber_q = tag_store.subscribe(maxsize=8192)
self._reader_task = asyncio.create_task(self._reader_loop())
self._flush_task = asyncio.create_task(self._flush_loop())
async def stop(self) -> None:
self._stopping = True
for t in (getattr(self, "_reader_task", None), self._flush_task):
if t is not None:
t.cancel()
with suppress(asyncio.CancelledError):
await t
if hasattr(self, "_subscriber_q"):
self._tag_store.unsubscribe(self._subscriber_q)
await self._flush(force=True)
self._conn.close()
# ----- Loops -------------------------------------------------------
async def _reader_loop(self) -> None:
while not self._stopping:
try:
tv: TagValue = await self._subscriber_q.get()
except asyncio.CancelledError:
break
await self._enqueue(tv)
async def _flush_loop(self) -> None:
try:
while not self._stopping:
await asyncio.sleep(1.0)
await self._flush()
except asyncio.CancelledError:
pass
async def _enqueue(self, tv: TagValue) -> None:
async with self._buffer_lock:
value_num: float | None
value_bool: bool | None
if isinstance(tv.value, bool):
value_num = None
value_bool = tv.value
elif isinstance(tv.value, (int, float)):
value_num = float(tv.value)
value_bool = None
else:
value_num = None
value_bool = None
self._buffer.append(
(tv.tag_id, tv.timestamp, value_num, value_bool, tv.quality.value)
)
async def _flush(self, force: bool = False) -> None:
async with self._buffer_lock:
if not self._buffer:
return
batch = self._buffer
self._buffer = []
try:
# DuckDB executemany es síncrono pero rápido
self._conn.executemany(
"INSERT INTO samples (tag_id, timestamp, value, bool_value, quality) "
"VALUES (?, ?, ?, ?, ?)",
batch,
)
except Exception:
logger.exception("Failed to flush %d samples", len(batch))
# ----- Queries -----------------------------------------------------
def query(
self,
tag_id: str,
*,
since: datetime | None = None,
until: datetime | None = None,
limit: int | None = None,
) -> list[dict[str, Any]]:
"""Consulta histórica de un tag."""
if since is None:
since = datetime.now(UTC) - timedelta(hours=1)
if until is None:
until = datetime.now(UTC)
sql = (
"SELECT tag_id, timestamp, value, bool_value, quality "
"FROM samples WHERE tag_id = ? AND timestamp >= ? AND timestamp <= ? "
"ORDER BY timestamp"
)
if limit:
sql += f" LIMIT {int(limit)}"
rows = self._conn.execute(sql, [tag_id, since, until]).fetchall()
result: list[dict[str, Any]] = []
for r in rows:
value = r[2] if r[2] is not None else r[3]
result.append(
{
"tag_id": r[0],
"timestamp": r[1].isoformat(),
"value": value,
"quality": r[4],
}
)
return result
def stats(self) -> dict[str, Any]:
total = self._conn.execute("SELECT COUNT(*) FROM samples").fetchone()[0]
tags = self._conn.execute("SELECT COUNT(DISTINCT tag_id) FROM samples").fetchone()[0]
return {"total_samples": int(total), "tags": int(tags), "db_path": self._db_path}