"""Vista de conexión: configurar host/port + connect/disconnect + tag table. Sprint 6: corre el cliente API/WS en un QThread separado con su propio event loop asyncio. """ from __future__ import annotations import asyncio import logging import threading from typing import Any from PySide6.QtCore import QObject, QThread, Signal, Slot from PySide6.QtWidgets import ( QAbstractItemView, QHBoxLayout, QHeaderView, QLabel, QLineEdit, QPushButton, QSpinBox, QTableWidget, QTableWidgetItem, QVBoxLayout, QWidget, ) from vmssailor.runtime.client.api_client import ( RuntimeApiClient, RuntimeWebSocketClient, ) from vmssailor.studio.theme import ( C_CYAN, C_FOG, mono_font, ui_font, ) logger = logging.getLogger(__name__) class _AsyncWorker(QObject): """Worker que corre un asyncio event loop en su propio QThread.""" eventReceived = Signal(dict) connectionStateChanged = Signal(str) projectLoaded = Signal(dict) error = Signal(str) def __init__(self, host: str, port: int) -> None: super().__init__() self._host = host self._port = port self._loop: asyncio.AbstractEventLoop | None = None self._api: RuntimeApiClient | None = None self._ws: RuntimeWebSocketClient | None = None self._thread_id: int | None = None @Slot() def run(self) -> None: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._thread_id = threading.get_ident() try: self._loop.run_until_complete(self._main()) finally: self._loop.close() async def _main(self) -> None: base_url = f"http://{self._host}:{self._port}" ws_url = f"ws://{self._host}:{self._port}/ws/realtime" self._api = RuntimeApiClient(base_url) self._ws = RuntimeWebSocketClient( ws_url, on_event=lambda ev: self.eventReceived.emit(ev), on_state_change=lambda s: self.connectionStateChanged.emit(s), ) # Try fetch project info try: project = await self._api.project() self.projectLoaded.emit(project) except Exception as exc: self.error.emit(f"GET /project falló: {exc}") # Start WS loop await self._ws.start() # Block until cancelled try: while True: await asyncio.sleep(60) except asyncio.CancelledError: pass finally: if self._ws is not None: await self._ws.stop() if self._api is not None: await self._api.close() def submit(self, coro_factory): """Programa una coroutine en el loop del worker.""" if self._loop is None or not self._loop.is_running(): return None return asyncio.run_coroutine_threadsafe(coro_factory(), self._loop) async def ack_alarm(self, alarm_id: str, user: str) -> dict[str, Any] | None: if self._api is None: return None try: return await self._api.ack_alarm(alarm_id, user=user) except Exception as exc: self.error.emit(f"ACK falló: {exc}") return None class ConnectionView(QWidget): """Panel de conexión + monitor de eventos crudos.""" connectionStateChanged = Signal(str) eventReceived = Signal(dict) projectLoaded = Signal(dict) def __init__(self, parent: QWidget | None = None) -> None: super().__init__(parent) self._worker: _AsyncWorker | None = None self._thread: QThread | None = None layout = QVBoxLayout(self) layout.setContentsMargins(20, 20, 20, 20) layout.setSpacing(12) title = QLabel("Conexión al Runtime server") title.setFont(ui_font(16)) title.setStyleSheet(f"color: {C_CYAN}; font-weight: 600;") layout.addWidget(title) intro = QLabel( "Lanza el servidor primero con:\n" " uv run python runtime_server_main.py --verbose\n" "Luego conecta acá." ) intro.setStyleSheet(f"color: {C_FOG};") intro.setFont(mono_font(10)) layout.addWidget(intro) # Connect controls row = QHBoxLayout() row.addWidget(QLabel("Host:")) self._host_input = QLineEdit("127.0.0.1") self._host_input.setFixedWidth(160) row.addWidget(self._host_input) row.addWidget(QLabel("Puerto:")) self._port_input = QSpinBox() self._port_input.setRange(1, 65535) self._port_input.setValue(8765) self._port_input.setFixedWidth(100) row.addWidget(self._port_input) self._connect_btn = QPushButton("Conectar") self._connect_btn.setObjectName("primary") self._disconnect_btn = QPushButton("Desconectar") self._disconnect_btn.setEnabled(False) row.addWidget(self._connect_btn) row.addWidget(self._disconnect_btn) row.addStretch(1) layout.addLayout(row) # Event log (tabla con últimos updates) self._counter = QLabel("0 eventos recibidos") self._counter.setFont(mono_font(10)) self._counter.setStyleSheet(f"color: {C_FOG};") layout.addWidget(self._counter) self._events_table = QTableWidget(0, 4) self._events_table.setHorizontalHeaderLabels(["Tipo", "Tag/Detalle", "Valor", "Timestamp"]) self._events_table.horizontalHeader().setSectionResizeMode( 1, QHeaderView.ResizeMode.Stretch ) self._events_table.verticalHeader().setVisible(False) self._events_table.setSelectionBehavior( QAbstractItemView.SelectionBehavior.SelectRows ) self._events_table.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers) layout.addWidget(self._events_table, 1) self._connect_btn.clicked.connect(self._on_connect) self._disconnect_btn.clicked.connect(self._on_disconnect) self._event_count = 0 # ----- Connection lifecycle ---------------------------------------- def _on_connect(self) -> None: host = self._host_input.text().strip() port = self._port_input.value() if self._thread is not None: return self._worker = _AsyncWorker(host, port) self._thread = QThread(self) self._worker.moveToThread(self._thread) self._worker.connectionStateChanged.connect(self.connectionStateChanged) self._worker.eventReceived.connect(self._on_event_received_internal) self._worker.projectLoaded.connect(self.projectLoaded) self._worker.error.connect(self._on_error) self._thread.started.connect(self._worker.run) self._thread.start() self._connect_btn.setEnabled(False) self._disconnect_btn.setEnabled(True) def _on_disconnect(self) -> None: if self._worker is not None and self._worker._loop is not None: self._worker._loop.call_soon_threadsafe(lambda: None) # Stop loop tasks fut = asyncio.run_coroutine_threadsafe( _stop_worker(self._worker), self._worker._loop ) from contextlib import suppress with suppress(Exception): fut.result(timeout=3) if self._thread is not None: self._thread.quit() self._thread.wait(3000) self._thread = None self._worker = None self.connectionStateChanged.emit("disconnected") self._connect_btn.setEnabled(True) self._disconnect_btn.setEnabled(False) # ----- Events ------------------------------------------------------ @Slot(dict) def _on_event_received_internal(self, event: dict) -> None: self._event_count += 1 # Add row to table (keep last 500) if self._events_table.rowCount() > 500: self._events_table.removeRow(self._events_table.rowCount() - 1) self._events_table.insertRow(0) ev_type = event.get("type", "?") self._events_table.setItem(0, 0, QTableWidgetItem(ev_type)) detail = event.get("tag_id", "") or event.get("alarm_id", "") or "" self._events_table.setItem(0, 1, QTableWidgetItem(str(detail))) value = event.get("value", "") self._events_table.setItem(0, 2, QTableWidgetItem(str(value))) ts = event.get("timestamp", "") self._events_table.setItem(0, 3, QTableWidgetItem(str(ts))) self._counter.setText(f"{self._event_count} eventos recibidos") self.eventReceived.emit(event) @Slot(str) def _on_error(self, msg: str) -> None: logger.warning("ConnectionView error: %s", msg) # ----- Public API -------------------------------------------------- def request_ack(self, alarm_id: str, user: str) -> None: if self._worker is None: return self._worker.submit(lambda: self._worker.ack_alarm(alarm_id, user)) async def _stop_worker(worker: _AsyncWorker) -> None: """Cancela tareas y cierra recursos del worker desde su loop.""" if worker._ws is not None: await worker._ws.stop() if worker._api is not None: await worker._api.close() loop = asyncio.get_running_loop() # Cancelar todas las tareas for task in asyncio.all_tasks(loop): if task is not asyncio.current_task(): task.cancel() loop.stop()