"""Alarm engine — evalúa AlarmConfig contra valores del tag_store. Para cada update de tag, recorre sus AlarmConfig y produce instancias `Alarm` en estados ACTIVE/ACK/CLEARED. Sprint 4: lógica básica. Sprint 8: escalación + permissives integration. """ from __future__ import annotations import asyncio import logging from collections.abc import Callable from contextlib import suppress from datetime import UTC, datetime from vmssailor.core.alarm import Alarm from vmssailor.core.enums import AlarmState from vmssailor.core.tag import AlarmConfig, Tag from vmssailor.runtime.server.tag_store import TagStore, TagValue from vmssailor.shared.ids import make_alarm_instance_id logger = logging.getLogger(__name__) def _evaluate(value: float, alarm: AlarmConfig) -> bool: """¿La condición de alarma está activa según el valor?""" op = alarm.operator t = alarm.threshold if op == ">": return value > t if op == "<": return value < t if op == ">=": return value >= t if op == "<=": return value <= t if op == "==": return value == t if op == "!=": return value != t return False def _evaluate_with_hysteresis( value: float, alarm: AlarmConfig, currently_active: bool ) -> bool: """Aplica histéresis para no salir/entrar de alarma con jitter.""" base = _evaluate(value, alarm) if alarm.hysteresis == 0: return base # Si actualmente activo: requiere cruzar threshold ± hysteresis para salir if currently_active: h = alarm.hysteresis op = alarm.operator if op in (">", ">="): # Para salir, value debe ser < threshold - h return value > (alarm.threshold - h) if op in ("<", "<="): return value < (alarm.threshold + h) return base class AlarmEngine: """Consume del tag_store y emite eventos de alarma.""" def __init__( self, tag_store: TagStore, on_alarm_event: Callable[[Alarm], None] | None = None, ) -> None: self._tag_store = tag_store self._on_event = on_alarm_event or (lambda _a: None) self._active_alarms: dict[str, Alarm] = {} # tracking del primer momento que la condicion se vuelve verdadera # para soportar `delay_seconds` self._pending: dict[str, datetime] = {} self._stop = False self._task: asyncio.Task | None = None # ----- Lifecycle --------------------------------------------------- async def start(self) -> None: self._sub_q = self._tag_store.subscribe(maxsize=4096) self._task = asyncio.create_task(self._loop()) async def stop(self) -> None: self._stop = True if self._task is not None: self._task.cancel() with suppress(asyncio.CancelledError): await self._task self._tag_store.unsubscribe(self._sub_q) async def _loop(self) -> None: try: while not self._stop: tv = await self._sub_q.get() await self._evaluate_tag(tv) except asyncio.CancelledError: pass # ----- Evaluation -------------------------------------------------- async def _evaluate_tag(self, tv: TagValue) -> None: if not isinstance(tv.value, (int, float)) or isinstance(tv.value, bool): return tag = self._tag_store.get_tag(tv.tag_id) if tag is None or not tag.alarms: return for a in tag.alarms: await self._evaluate_alarm(tag, a, float(tv.value), tv.timestamp) async def _evaluate_alarm( self, tag: Tag, config: AlarmConfig, value: float, ts: datetime, ) -> None: key = f"{tag.id}|{config.id}" currently_active = key in self._active_alarms condition = _evaluate_with_hysteresis(value, config, currently_active) if condition and not currently_active: # Check delay: si delay_seconds > 0 requiere persistencia previa if config.delay_seconds > 0: first_seen = self._pending.get(key) if first_seen is None: self._pending[key] = ts return if (ts - first_seen).total_seconds() < config.delay_seconds: return # Disparar self._pending.pop(key, None) alarm = Alarm( id=make_alarm_instance_id(tag.id, config.id, ts.timestamp()), tag_id=tag.id, alarm_config_id=config.id, priority=config.priority, state=AlarmState.ACTIVE, timestamp_active=ts, message=config.message or f"{tag.id} {config.operator} {config.threshold}", value_at_trigger=value, ) self._active_alarms[key] = alarm self._on_event(alarm) logger.info("ALARM ACTIVE %s = %s (%s)", tag.id, value, config.priority.value) return if not condition and currently_active: # Clear self._pending.pop(key, None) alarm = self._active_alarms.pop(key) cleared = alarm.model_copy( update={"state": AlarmState.CLEARED, "timestamp_cleared": ts} ) self._on_event(cleared) logger.info("ALARM CLEARED %s", tag.id) return if not condition: self._pending.pop(key, None) def ack(self, alarm_id: str, user: str) -> Alarm | None: """Reconoce una alarma activa por su id.""" for key, a in list(self._active_alarms.items()): if a.id == alarm_id: ts = datetime.now(UTC) acked = a.model_copy( update={ "state": AlarmState.ACK, "timestamp_ack": ts, "acknowledged_by": user, } ) self._active_alarms[key] = acked self._on_event(acked) return acked return None def active_alarms(self) -> list[Alarm]: return list(self._active_alarms.values())