"""Historian DuckDB embebido. Almacena series temporales de los tags con retención configurable. Para Sprint 4 implementamos: - Inserción batch (cada 1s) desde el subscriber del tag_store - Query simple por tag_id + rango temporal Sprint 5+: downsampling automático, compresión. """ from __future__ import annotations import asyncio import logging from contextlib import suppress from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any import duckdb from vmssailor.runtime.server.tag_store import TagStore, TagValue logger = logging.getLogger(__name__) _SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS samples ( tag_id VARCHAR NOT NULL, timestamp TIMESTAMP NOT NULL, value DOUBLE, bool_value BOOLEAN, quality VARCHAR NOT NULL DEFAULT 'good' ); CREATE INDEX IF NOT EXISTS idx_samples_tag_ts ON samples(tag_id, timestamp); """ class Historian: """DuckDB-backed historian. Apenas un wrapper async-friendly.""" def __init__(self, db_path: Path | str | None = None) -> None: if db_path is None: db_path = ":memory:" self._db_path = str(db_path) self._conn = duckdb.connect(self._db_path) self._conn.execute(_SCHEMA_SQL) self._buffer: list[tuple] = [] self._buffer_lock = asyncio.Lock() self._flush_task: asyncio.Task | None = None self._stopping = False # ----- Lifecycle --------------------------------------------------- async def start(self, tag_store: TagStore) -> None: """Comienza a suscribirse al tag store y a flush periodico.""" self._tag_store = tag_store self._subscriber_q = tag_store.subscribe(maxsize=8192) self._reader_task = asyncio.create_task(self._reader_loop()) self._flush_task = asyncio.create_task(self._flush_loop()) async def stop(self) -> None: self._stopping = True for t in (getattr(self, "_reader_task", None), self._flush_task): if t is not None: t.cancel() with suppress(asyncio.CancelledError): await t if hasattr(self, "_subscriber_q"): self._tag_store.unsubscribe(self._subscriber_q) await self._flush(force=True) self._conn.close() # ----- Loops ------------------------------------------------------- async def _reader_loop(self) -> None: while not self._stopping: try: tv: TagValue = await self._subscriber_q.get() except asyncio.CancelledError: break await self._enqueue(tv) async def _flush_loop(self) -> None: try: while not self._stopping: await asyncio.sleep(1.0) await self._flush() except asyncio.CancelledError: pass async def _enqueue(self, tv: TagValue) -> None: async with self._buffer_lock: value_num: float | None value_bool: bool | None if isinstance(tv.value, bool): value_num = None value_bool = tv.value elif isinstance(tv.value, (int, float)): value_num = float(tv.value) value_bool = None else: value_num = None value_bool = None self._buffer.append( (tv.tag_id, tv.timestamp, value_num, value_bool, tv.quality.value) ) async def _flush(self, force: bool = False) -> None: async with self._buffer_lock: if not self._buffer: return batch = self._buffer self._buffer = [] try: # DuckDB executemany es síncrono pero rápido self._conn.executemany( "INSERT INTO samples (tag_id, timestamp, value, bool_value, quality) " "VALUES (?, ?, ?, ?, ?)", batch, ) except Exception: logger.exception("Failed to flush %d samples", len(batch)) # ----- Queries ----------------------------------------------------- def query( self, tag_id: str, *, since: datetime | None = None, until: datetime | None = None, limit: int | None = None, ) -> list[dict[str, Any]]: """Consulta histórica de un tag.""" if since is None: since = datetime.now(UTC) - timedelta(hours=1) if until is None: until = datetime.now(UTC) sql = ( "SELECT tag_id, timestamp, value, bool_value, quality " "FROM samples WHERE tag_id = ? AND timestamp >= ? AND timestamp <= ? " "ORDER BY timestamp" ) if limit: sql += f" LIMIT {int(limit)}" rows = self._conn.execute(sql, [tag_id, since, until]).fetchall() result: list[dict[str, Any]] = [] for r in rows: value = r[2] if r[2] is not None else r[3] result.append( { "tag_id": r[0], "timestamp": r[1].isoformat(), "value": value, "quality": r[4], } ) return result def stats(self) -> dict[str, Any]: total = self._conn.execute("SELECT COUNT(*) FROM samples").fetchone()[0] tags = self._conn.execute("SELECT COUNT(DISTINCT tag_id) FROM samples").fetchone()[0] return {"total_samples": int(total), "tags": int(tags), "db_path": self._db_path}