"""Telemetry widget — live $PARP STATUS viewer. Connects to the AR-Concentrador over USB serial and displays heading, setpoint, and rudder angle as rolling time-series charts. No external charting library required — all drawing is done with QPainter. Usage (embedded in StudioMainWindow): from arautopilot.studio.telemetry_widget import TelemetryWidget tabs.addTab(TelemetryWidget(session), "Telemetría") The widget works without a connected board: it stays in "waiting" state and shows a message until a port is selected and connected. """ from __future__ import annotations import collections import re import time from typing import Deque from PySide6.QtCore import QByteArray, QIODevice, QTimer, Qt from PySide6.QtGui import QColor, QFont, QPainter, QPen from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo from PySide6.QtWidgets import ( QComboBox, QGroupBox, QHBoxLayout, QLabel, QPushButton, QVBoxLayout, QWidget, ) from arautopilot.studio.ar_style import ACCENT, ACCENT_MID, ERROR, NAVY, OK, TEXT_MUTED, WARN from arautopilot.studio.session import Session # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- BAUD_RATE = 115200 WINDOW_SEC = 60 # seconds of history shown on chart CHART_FPS = 5 # redraws per second (low is fine — data is 2 Hz) MAX_SAMPLES = WINDOW_SEC * 10 # 10 samples/s max CHANNEL_COLORS = { "heading": ACCENT_MID, "setpoint": OK, "rudder": WARN, } # --------------------------------------------------------------------------- # $PARP STATUS parser (Python re-implementation of parp_codec.dart) # --------------------------------------------------------------------------- def _xor_checksum(body: str) -> int: crc = 0 for ch in body: crc ^= ord(ch) return crc def _parse_parp_status(line: str) -> dict | None: """ Parse ``$PARP,STATUS,...*CRC`` → dict or None. Returns:: { "mode": str, # "STANDBY" | "HEADING_HOLD" | "TRACK" "setpoint": float, "heading": float, "rudder": float, "ts": float, # time.monotonic() } """ s = line.strip() star = s.rfind("*") if star < 0: return None body = s[1:star] if s.startswith("$") else s[:star] crc_hex = s[star + 1:] try: if _xor_checksum(body) != int(crc_hex, 16): return None except ValueError: return None parts = body.split(",") if len(parts) < 7 or parts[0] != "PARP" or parts[1] != "STATUS": return None try: return { "mode": parts[2], "setpoint": float(parts[3]), "heading": float(parts[4]), "rudder": float(parts[5]), "ts": time.monotonic(), } except ValueError: return None # --------------------------------------------------------------------------- # Rolling chart widget # --------------------------------------------------------------------------- class _RollingChart(QWidget): """ A minimal scrolling time-series chart drawn with QPainter. Each channel is a ``deque`` of ``(timestamp_monotonic, value)`` pairs. Y-range is passed in; X range is always the last *window_sec* seconds. """ def __init__( self, title: str, channels: dict[str, tuple[Deque, str]], # name → (deque, colour_hex) y_min: float, y_max: float, y_unit: str = "", parent: QWidget | None = None, ) -> None: super().__init__(parent) self.setMinimumHeight(160) self._title = title self._channels = channels self._y_min = y_min self._y_max = y_max self._y_unit = y_unit def paintEvent(self, _event) -> None: # noqa: N802 w, h = self.width(), self.height() p = QPainter(self) p.setRenderHint(QPainter.RenderHint.Antialiasing) # Background p.fillRect(0, 0, w, h, QColor(NAVY)) # Margins ml, mr, mt, mb = 48, 12, 28, 28 cw = w - ml - mr ch = h - mt - mb if cw <= 0 or ch <= 0: return now = time.monotonic() t0 = now - WINDOW_SEC # Grid lines grid_pen = QPen(QColor("#1E3A5F"), 1, Qt.PenStyle.DotLine) p.setPen(grid_pen) y_range = self._y_max - self._y_min or 1.0 for fraction in (0.0, 0.25, 0.5, 0.75, 1.0): gy = mt + int((1 - fraction) * ch) p.drawLine(ml, gy, ml + cw, gy) val = self._y_min + fraction * y_range p.setPen(QColor(TEXT_MUTED)) p.setFont(QFont("Segoe UI", 8)) label = f"{val:.0f}{self._y_unit}" p.drawText(0, gy - 6, ml - 4, 14, Qt.AlignmentFlag.AlignRight, label) p.setPen(grid_pen) # Axes p.setPen(QPen(QColor(ACCENT_MID), 1)) p.drawLine(ml, mt, ml, mt + ch) p.drawLine(ml, mt + ch, ml + cw, mt + ch) # Title p.setPen(QColor(ACCENT_MID)) p.setFont(QFont("Segoe UI", 9, QFont.Weight.Bold)) p.drawText(ml, 0, cw, mt, Qt.AlignmentFlag.AlignCenter, self._title) def _tx(ts: float) -> int: return ml + int((ts - t0) / WINDOW_SEC * cw) def _ty(v: float) -> int: frac = (v - self._y_min) / y_range return mt + ch - int(frac * ch) # Series for name, (deque, colour) in self._channels.items(): pts = [(ts, v) for ts, v in deque if ts >= t0] if len(pts) < 2: continue pen = QPen(QColor(colour), 2) pen.setCapStyle(Qt.PenCapStyle.RoundCap) pen.setJoinStyle(Qt.PenJoinStyle.RoundJoin) p.setPen(pen) path_pts = [(_tx(ts), _ty(v)) for ts, v in pts] for i in range(1, len(path_pts)): p.drawLine(*path_pts[i - 1], *path_pts[i]) p.end() # --------------------------------------------------------------------------- # Value display row # --------------------------------------------------------------------------- class _ValueRow(QWidget): """Compact name │ value display for the live readout strip.""" def __init__(self, label: str, unit: str, colour: str) -> None: super().__init__() h = QHBoxLayout(self) h.setContentsMargins(0, 0, 0, 0) lbl = QLabel(label) lbl.setStyleSheet(f"color:{colour}; font-size:11px; min-width:80px;") self._value = QLabel("---") self._value.setStyleSheet( f"color:{colour}; font-size:20px; font-weight:bold; " "font-family:Consolas; min-width:80px;" ) unit_lbl = QLabel(unit) unit_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-size:11px;") h.addWidget(lbl) h.addWidget(self._value) h.addWidget(unit_lbl) h.addStretch(1) def update_value(self, v: float) -> None: self._value.setText(f"{v:6.1f}") # --------------------------------------------------------------------------- # Main widget # --------------------------------------------------------------------------- class TelemetryWidget(QWidget): """Live telemetry from the AR-Concentrador — embedded in Studio.""" def __init__(self, session: Session, parent: QWidget | None = None) -> None: super().__init__(parent) self._session = session self._port: QSerialPort | None = None self._buf = bytearray() # Data stores mk: type[Deque] = lambda: collections.deque(maxlen=MAX_SAMPLES) self._hdg_data: Deque[tuple[float, float]] = mk() self._spt_data: Deque[tuple[float, float]] = mk() self._rud_data: Deque[tuple[float, float]] = mk() self._mode_str = "—" self._build_ui() self._redraw_timer = QTimer(self) self._redraw_timer.setInterval(1000 // CHART_FPS) self._redraw_timer.timeout.connect(self._refresh_charts) self._redraw_timer.start() # ── UI ─────────────────────────────────────────────────────────────────── def _build_ui(self) -> None: root = QVBoxLayout(self) root.setContentsMargins(12, 12, 12, 12) root.setSpacing(10) # ── Connection bar ─────────────────────────────────────────────────── conn = QGroupBox("Conexión al Concentrador") ch = QHBoxLayout(conn) ch.addWidget(QLabel("Puerto RX:")) self._port_combo = QComboBox() self._port_combo.setMinimumWidth(200) ch.addWidget(self._port_combo) refresh_btn = QPushButton("Actualizar") refresh_btn.clicked.connect(self._refresh_ports) ch.addWidget(refresh_btn) self._connect_btn = QPushButton("Conectar") self._connect_btn.clicked.connect(self._on_connect) ch.addWidget(self._connect_btn) self._conn_lbl = QLabel("● Desconectado") self._conn_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-weight:bold;") ch.addWidget(self._conn_lbl) ch.addStretch(1) root.addWidget(conn) # ── Live readout strip ─────────────────────────────────────────────── strip = QGroupBox("Datos en vivo") sh = QHBoxLayout(strip) self._hdg_row = _ValueRow("RUMBO", "°", ACCENT_MID) self._spt_row = _ValueRow("SETPOINT", "°", OK) self._rud_row = _ValueRow("TIMÓN", "°", WARN) self._mode_lbl = QLabel("Modo: —") self._mode_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-size:12px;") sh.addWidget(self._hdg_row) sh.addWidget(self._spt_row) sh.addWidget(self._rud_row) sh.addStretch(1) sh.addWidget(self._mode_lbl) root.addWidget(strip) # ── Charts ─────────────────────────────────────────────────────────── self._hdg_chart = _RollingChart( "RUMBO / SETPOINT (°)", { "Rumbo": (self._hdg_data, ACCENT_MID), "Setpoint": (self._spt_data, OK), }, y_min=0, y_max=360, y_unit="°", ) self._rud_chart = _RollingChart( "TIMÓN (°)", { "Timón": (self._rud_data, WARN), }, y_min=-40, y_max=40, y_unit="°", ) root.addWidget(self._hdg_chart, 2) root.addWidget(self._rud_chart, 1) self._refresh_ports() # ── Ports ──────────────────────────────────────────────────────────────── def _refresh_ports(self) -> None: self._port_combo.clear() ports = QSerialPortInfo.availablePorts() if not ports: self._port_combo.addItem("(sin puertos)") for info in ports: label = info.portName() if info.description(): label += f" — {info.description()}" self._port_combo.addItem(label, info.portName()) def _selected_port(self) -> str | None: v = self._port_combo.currentData() return str(v) if v else None # ── Connect / Disconnect ───────────────────────────────────────────────── def _on_connect(self) -> None: if self._port and self._port.isOpen(): self._disconnect() return port_name = self._selected_port() if not port_name: return self._port = QSerialPort(self) self._port.setPortName(port_name) self._port.setBaudRate(BAUD_RATE) self._port.setDataBits(QSerialPort.DataBits.Data8) self._port.setParity(QSerialPort.Parity.NoParity) self._port.setStopBits(QSerialPort.StopBits.OneStop) self._port.setFlowControl(QSerialPort.FlowControl.NoFlowControl) self._port.readyRead.connect(self._on_data) self._port.errorOccurred.connect(self._on_serial_error) if self._port.open(QIODevice.OpenModeFlag.ReadOnly): self._conn_lbl.setText("● Conectado") self._conn_lbl.setStyleSheet(f"color:{OK}; font-weight:bold;") self._connect_btn.setText("Desconectar") else: self._conn_lbl.setText(f"✗ Error: {self._port.errorString()}") self._conn_lbl.setStyleSheet(f"color:{ERROR}; font-weight:bold;") self._port = None def _disconnect(self) -> None: if self._port: self._port.close() self._port = None self._conn_lbl.setText("● Desconectado") self._conn_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-weight:bold;") self._connect_btn.setText("Conectar") # ── Serial data ────────────────────────────────────────────────────────── def _on_data(self) -> None: if not self._port: return raw: QByteArray = self._port.readAll() self._buf.extend(bytes(raw)) while b"\n" in self._buf: idx = self._buf.index(b"\n") line_bytes = self._buf[:idx] self._buf = self._buf[idx + 1:] try: line = line_bytes.decode("ascii", errors="ignore").strip() except Exception: continue status = _parse_parp_status(line) if status: self._ingest(status) def _ingest(self, s: dict) -> None: ts = s["ts"] self._hdg_data.append((ts, s["heading"])) self._spt_data.append((ts, s["setpoint"])) self._rud_data.append((ts, s["rudder"])) self._mode_str = s["mode"] # Update live readout self._hdg_row.update_value(s["heading"]) self._spt_row.update_value(s["setpoint"]) self._rud_row.update_value(s["rudder"]) self._mode_lbl.setText(f"Modo: {s['mode'].replace('_', ' ')}") def _on_serial_error(self, error) -> None: if error != QSerialPort.SerialPortError.NoError: self._disconnect() # ── Chart refresh ──────────────────────────────────────────────────────── def _refresh_charts(self) -> None: self._hdg_chart.update() self._rud_chart.update() # ── Cleanup ────────────────────────────────────────────────────────────── def closeEvent(self, event) -> None: # noqa: N802 self._disconnect() super().closeEvent(event)