"""Drivers de protocolo (Sprint 4: Simulator). Sprint 5 traerá los drivers reales: - modbus_rtu.py (pymodbus, serial COM) - nmea2000.py (python-can) - j1939.py - card_discovery.py Por ahora, `SimulatorDriver` produce valores sintéticos creíbles para los tags registrados. Sirve para: - Probar el alarm engine + historian sin hardware - Probar el cliente desktop y mobile contra datos vivos - Tests de integración """ from __future__ import annotations import asyncio import logging import math import random from collections.abc import Callable from contextlib import suppress from vmssailor.core.enums import Quality, UnitSI from vmssailor.runtime.server.tag_store import TagStore logger = logging.getLogger(__name__) class SimulatorDriver: """Genera valores sintéticos para todos los tags numéricos del store. Estrategia por unidad: - rpm: oscila entre 1200 y 1800 ± ruido - bar: estable cerca del medio del rango con jitter - C (temperatura): se calienta hacia 80 °C con jitter - %: 30-70 % oscilación - V: 27-28 V (24V system) o 13-14 V (12V system) - bool: random toggle ocasional Tags pueden tener `range_normal_min/max`; respeta esos rangos. """ def __init__(self, tag_store: TagStore, tick_period_s: float = 0.5) -> None: self._store = tag_store self._tick_s = tick_period_s self._t0 = 0.0 self._stop = False self._task: asyncio.Task | None = None self._on_tick: Callable[[], None] | None = None async def start(self) -> None: self._task = asyncio.create_task(self._loop()) async def stop(self) -> None: self._stop = True if self._task is not None: self._task.cancel() with suppress(asyncio.CancelledError): await self._task async def _loop(self) -> None: try: while not self._stop: await self._tick() await asyncio.sleep(self._tick_s) except asyncio.CancelledError: pass async def _tick(self) -> None: self._t0 += self._tick_s for tag_id, tag in self._store.all_tags().items(): value = self._synthesize(tag_id, tag.unit_si, tag) if value is None: continue await self._store.update(tag_id, value, quality=Quality.GOOD) if self._on_tick is not None: self._on_tick() def _synthesize(self, tag_id: str, unit: UnitSI, tag) -> float | bool | None: t = self._t0 if unit == UnitSI.RPM: base = 1500 return base + 250 * math.sin(t / 30 + hash(tag_id) % 100) + random.uniform(-15, 15) if unit == UnitSI.BAR: mid = ( (tag.range_normal_min + tag.range_normal_max) / 2 if tag.range_normal_min is not None and tag.range_normal_max is not None else 4.5 ) return mid + 0.2 * math.sin(t / 5 + hash(tag_id) % 50) + random.uniform(-0.05, 0.05) if unit == UnitSI.DEGREE_CELSIUS: target = 82.0 return target + 8 * math.sin(t / 60 + hash(tag_id) % 30) + random.uniform(-0.5, 0.5) if unit == UnitSI.PERCENT: return 50 + 25 * math.sin(t / 20 + hash(tag_id) % 20) + random.uniform(-2, 2) if unit == UnitSI.VOLT: mid = ( (tag.range_normal_min + tag.range_normal_max) / 2 if tag.range_normal_min is not None and tag.range_normal_max is not None else 27.5 ) return mid + random.uniform(-0.2, 0.2) if unit == UnitSI.HERTZ: return 50.0 + random.uniform(-0.1, 0.1) if unit == UnitSI.AMPERE: return 50 + 30 * math.sin(t / 25 + hash(tag_id) % 40) if unit == UnitSI.HOUR: return 1200 + t / 3600 # accumulating engine hours if unit == UnitSI.BOOL: # 99% del tiempo False, eventualmente True return random.random() < 0.02 return None