5238bd31f0
ar_style.py — global QSS dark theme + QPalette matching the Flutter brand palette (navy #0D1B2A, electric blue #2563EB, glow #60B8FF). Single call: apply_ar_style(app). app.py — applies AR style and window icon on startup. main_window.py — complete rewrite of the layout: - Sidebar: AR logo (PNG), user/role display, capabilities list, version stamp - 5 tabs: Overview · ⚡ Flash ESP32 · 📋 Proyecto · 📡 Telemetría · 💾 Instalar J6412 - Overview tab: rich-text guide with icons for each tab's purpose telemetry_widget.py — live $PARP STATUS chart tab: - QSerialPort RX-only connection to AR-Concentrador (port selector + Refresh) - Python $PARP XOR-checksum parser (mirrors Dart ParpCodec) - _RollingChart: pure QPainter scrolling time-series, 60 s window, no external charting library - Heading + Setpoint on one chart; Rudder on a second chart - Live value strip shows Rumbo / Setpoint / Timón + mode label installer_widget.py — J6412 USB image builder tab: - Vessel name + serial number (auto-generate or paste) - Optional CSV log path for CRM - App checkboxes (AR-ECDIS / AR-Autopilot / skip Flutter build) - Worker thread runs installer/build_usb.py with streamed log output - "Abrir dist/" button when build succeeds - RBAC gated: Engineer or Super Admin only pyproject.toml — adds [installer] and [license-server] optional dep groups AR_electronics — AR-Autopilot Project
429 lines
15 KiB
Python
429 lines
15 KiB
Python
"""Telemetry widget — live $PARP STATUS viewer.
|
|
|
|
Connects to the AR-Concentrador over USB serial and displays heading,
|
|
setpoint, and rudder angle as rolling time-series charts. No external
|
|
charting library required — all drawing is done with QPainter.
|
|
|
|
Usage (embedded in StudioMainWindow):
|
|
from arautopilot.studio.telemetry_widget import TelemetryWidget
|
|
tabs.addTab(TelemetryWidget(session), "Telemetría")
|
|
|
|
The widget works without a connected board: it stays in "waiting" state
|
|
and shows a message until a port is selected and connected.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import collections
|
|
import re
|
|
import time
|
|
from typing import Deque
|
|
|
|
from PySide6.QtCore import QByteArray, QIODevice, QTimer, Qt
|
|
from PySide6.QtGui import QColor, QFont, QPainter, QPen
|
|
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
|
|
from PySide6.QtWidgets import (
|
|
QComboBox,
|
|
QGroupBox,
|
|
QHBoxLayout,
|
|
QLabel,
|
|
QPushButton,
|
|
QVBoxLayout,
|
|
QWidget,
|
|
)
|
|
|
|
from arautopilot.studio.ar_style import ACCENT, ACCENT_MID, ERROR, NAVY, OK, TEXT_MUTED, WARN
|
|
from arautopilot.studio.session import Session
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
BAUD_RATE = 115200
|
|
WINDOW_SEC = 60 # seconds of history shown on chart
|
|
CHART_FPS = 5 # redraws per second (low is fine — data is 2 Hz)
|
|
MAX_SAMPLES = WINDOW_SEC * 10 # 10 samples/s max
|
|
|
|
CHANNEL_COLORS = {
|
|
"heading": ACCENT_MID,
|
|
"setpoint": OK,
|
|
"rudder": WARN,
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# $PARP STATUS parser (Python re-implementation of parp_codec.dart)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _xor_checksum(body: str) -> int:
|
|
crc = 0
|
|
for ch in body:
|
|
crc ^= ord(ch)
|
|
return crc
|
|
|
|
|
|
def _parse_parp_status(line: str) -> dict | None:
|
|
"""
|
|
Parse ``$PARP,STATUS,...*CRC`` → dict or None.
|
|
|
|
Returns::
|
|
|
|
{
|
|
"mode": str, # "STANDBY" | "HEADING_HOLD" | "TRACK"
|
|
"setpoint": float,
|
|
"heading": float,
|
|
"rudder": float,
|
|
"ts": float, # time.monotonic()
|
|
}
|
|
"""
|
|
s = line.strip()
|
|
star = s.rfind("*")
|
|
if star < 0:
|
|
return None
|
|
body = s[1:star] if s.startswith("$") else s[:star]
|
|
crc_hex = s[star + 1:]
|
|
|
|
try:
|
|
if _xor_checksum(body) != int(crc_hex, 16):
|
|
return None
|
|
except ValueError:
|
|
return None
|
|
|
|
parts = body.split(",")
|
|
if len(parts) < 7 or parts[0] != "PARP" or parts[1] != "STATUS":
|
|
return None
|
|
|
|
try:
|
|
return {
|
|
"mode": parts[2],
|
|
"setpoint": float(parts[3]),
|
|
"heading": float(parts[4]),
|
|
"rudder": float(parts[5]),
|
|
"ts": time.monotonic(),
|
|
}
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rolling chart widget
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class _RollingChart(QWidget):
|
|
"""
|
|
A minimal scrolling time-series chart drawn with QPainter.
|
|
|
|
Each channel is a ``deque`` of ``(timestamp_monotonic, value)`` pairs.
|
|
Y-range is passed in; X range is always the last *window_sec* seconds.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
title: str,
|
|
channels: dict[str, tuple[Deque, str]], # name → (deque, colour_hex)
|
|
y_min: float,
|
|
y_max: float,
|
|
y_unit: str = "",
|
|
parent: QWidget | None = None,
|
|
) -> None:
|
|
super().__init__(parent)
|
|
self.setMinimumHeight(160)
|
|
self._title = title
|
|
self._channels = channels
|
|
self._y_min = y_min
|
|
self._y_max = y_max
|
|
self._y_unit = y_unit
|
|
|
|
def paintEvent(self, _event) -> None: # noqa: N802
|
|
w, h = self.width(), self.height()
|
|
p = QPainter(self)
|
|
p.setRenderHint(QPainter.RenderHint.Antialiasing)
|
|
|
|
# Background
|
|
p.fillRect(0, 0, w, h, QColor(NAVY))
|
|
|
|
# Margins
|
|
ml, mr, mt, mb = 48, 12, 28, 28
|
|
cw = w - ml - mr
|
|
ch = h - mt - mb
|
|
if cw <= 0 or ch <= 0:
|
|
return
|
|
|
|
now = time.monotonic()
|
|
t0 = now - WINDOW_SEC
|
|
|
|
# Grid lines
|
|
grid_pen = QPen(QColor("#1E3A5F"), 1, Qt.PenStyle.DotLine)
|
|
p.setPen(grid_pen)
|
|
y_range = self._y_max - self._y_min or 1.0
|
|
for fraction in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
gy = mt + int((1 - fraction) * ch)
|
|
p.drawLine(ml, gy, ml + cw, gy)
|
|
val = self._y_min + fraction * y_range
|
|
p.setPen(QColor(TEXT_MUTED))
|
|
p.setFont(QFont("Segoe UI", 8))
|
|
label = f"{val:.0f}{self._y_unit}"
|
|
p.drawText(0, gy - 6, ml - 4, 14, Qt.AlignmentFlag.AlignRight, label)
|
|
p.setPen(grid_pen)
|
|
|
|
# Axes
|
|
p.setPen(QPen(QColor(ACCENT_MID), 1))
|
|
p.drawLine(ml, mt, ml, mt + ch)
|
|
p.drawLine(ml, mt + ch, ml + cw, mt + ch)
|
|
|
|
# Title
|
|
p.setPen(QColor(ACCENT_MID))
|
|
p.setFont(QFont("Segoe UI", 9, QFont.Weight.Bold))
|
|
p.drawText(ml, 0, cw, mt, Qt.AlignmentFlag.AlignCenter, self._title)
|
|
|
|
def _tx(ts: float) -> int:
|
|
return ml + int((ts - t0) / WINDOW_SEC * cw)
|
|
|
|
def _ty(v: float) -> int:
|
|
frac = (v - self._y_min) / y_range
|
|
return mt + ch - int(frac * ch)
|
|
|
|
# Series
|
|
for name, (deque, colour) in self._channels.items():
|
|
pts = [(ts, v) for ts, v in deque if ts >= t0]
|
|
if len(pts) < 2:
|
|
continue
|
|
pen = QPen(QColor(colour), 2)
|
|
pen.setCapStyle(Qt.PenCapStyle.RoundCap)
|
|
pen.setJoinStyle(Qt.PenJoinStyle.RoundJoin)
|
|
p.setPen(pen)
|
|
path_pts = [(_tx(ts), _ty(v)) for ts, v in pts]
|
|
for i in range(1, len(path_pts)):
|
|
p.drawLine(*path_pts[i - 1], *path_pts[i])
|
|
|
|
p.end()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Value display row
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class _ValueRow(QWidget):
|
|
"""Compact name │ value display for the live readout strip."""
|
|
|
|
def __init__(self, label: str, unit: str, colour: str) -> None:
|
|
super().__init__()
|
|
h = QHBoxLayout(self)
|
|
h.setContentsMargins(0, 0, 0, 0)
|
|
lbl = QLabel(label)
|
|
lbl.setStyleSheet(f"color:{colour}; font-size:11px; min-width:80px;")
|
|
self._value = QLabel("---")
|
|
self._value.setStyleSheet(
|
|
f"color:{colour}; font-size:20px; font-weight:bold; "
|
|
"font-family:Consolas; min-width:80px;"
|
|
)
|
|
unit_lbl = QLabel(unit)
|
|
unit_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-size:11px;")
|
|
h.addWidget(lbl)
|
|
h.addWidget(self._value)
|
|
h.addWidget(unit_lbl)
|
|
h.addStretch(1)
|
|
|
|
def update_value(self, v: float) -> None:
|
|
self._value.setText(f"{v:6.1f}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main widget
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TelemetryWidget(QWidget):
|
|
"""Live telemetry from the AR-Concentrador — embedded in Studio."""
|
|
|
|
def __init__(self, session: Session, parent: QWidget | None = None) -> None:
|
|
super().__init__(parent)
|
|
self._session = session
|
|
self._port: QSerialPort | None = None
|
|
self._buf = bytearray()
|
|
|
|
# Data stores
|
|
mk: type[Deque] = lambda: collections.deque(maxlen=MAX_SAMPLES)
|
|
self._hdg_data: Deque[tuple[float, float]] = mk()
|
|
self._spt_data: Deque[tuple[float, float]] = mk()
|
|
self._rud_data: Deque[tuple[float, float]] = mk()
|
|
self._mode_str = "—"
|
|
|
|
self._build_ui()
|
|
|
|
self._redraw_timer = QTimer(self)
|
|
self._redraw_timer.setInterval(1000 // CHART_FPS)
|
|
self._redraw_timer.timeout.connect(self._refresh_charts)
|
|
self._redraw_timer.start()
|
|
|
|
# ── UI ───────────────────────────────────────────────────────────────────
|
|
|
|
def _build_ui(self) -> None:
|
|
root = QVBoxLayout(self)
|
|
root.setContentsMargins(12, 12, 12, 12)
|
|
root.setSpacing(10)
|
|
|
|
# ── Connection bar ───────────────────────────────────────────────────
|
|
conn = QGroupBox("Conexión al Concentrador")
|
|
ch = QHBoxLayout(conn)
|
|
|
|
ch.addWidget(QLabel("Puerto RX:"))
|
|
self._port_combo = QComboBox()
|
|
self._port_combo.setMinimumWidth(200)
|
|
ch.addWidget(self._port_combo)
|
|
|
|
refresh_btn = QPushButton("Actualizar")
|
|
refresh_btn.clicked.connect(self._refresh_ports)
|
|
ch.addWidget(refresh_btn)
|
|
|
|
self._connect_btn = QPushButton("Conectar")
|
|
self._connect_btn.clicked.connect(self._on_connect)
|
|
ch.addWidget(self._connect_btn)
|
|
|
|
self._conn_lbl = QLabel("● Desconectado")
|
|
self._conn_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-weight:bold;")
|
|
ch.addWidget(self._conn_lbl)
|
|
ch.addStretch(1)
|
|
|
|
root.addWidget(conn)
|
|
|
|
# ── Live readout strip ───────────────────────────────────────────────
|
|
strip = QGroupBox("Datos en vivo")
|
|
sh = QHBoxLayout(strip)
|
|
|
|
self._hdg_row = _ValueRow("RUMBO", "°", ACCENT_MID)
|
|
self._spt_row = _ValueRow("SETPOINT", "°", OK)
|
|
self._rud_row = _ValueRow("TIMÓN", "°", WARN)
|
|
self._mode_lbl = QLabel("Modo: —")
|
|
self._mode_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-size:12px;")
|
|
|
|
sh.addWidget(self._hdg_row)
|
|
sh.addWidget(self._spt_row)
|
|
sh.addWidget(self._rud_row)
|
|
sh.addStretch(1)
|
|
sh.addWidget(self._mode_lbl)
|
|
root.addWidget(strip)
|
|
|
|
# ── Charts ───────────────────────────────────────────────────────────
|
|
self._hdg_chart = _RollingChart(
|
|
"RUMBO / SETPOINT (°)",
|
|
{
|
|
"Rumbo": (self._hdg_data, ACCENT_MID),
|
|
"Setpoint": (self._spt_data, OK),
|
|
},
|
|
y_min=0, y_max=360, y_unit="°",
|
|
)
|
|
self._rud_chart = _RollingChart(
|
|
"TIMÓN (°)",
|
|
{
|
|
"Timón": (self._rud_data, WARN),
|
|
},
|
|
y_min=-40, y_max=40, y_unit="°",
|
|
)
|
|
root.addWidget(self._hdg_chart, 2)
|
|
root.addWidget(self._rud_chart, 1)
|
|
|
|
self._refresh_ports()
|
|
|
|
# ── Ports ────────────────────────────────────────────────────────────────
|
|
|
|
def _refresh_ports(self) -> None:
|
|
self._port_combo.clear()
|
|
ports = QSerialPortInfo.availablePorts()
|
|
if not ports:
|
|
self._port_combo.addItem("(sin puertos)")
|
|
for info in ports:
|
|
label = info.portName()
|
|
if info.description():
|
|
label += f" — {info.description()}"
|
|
self._port_combo.addItem(label, info.portName())
|
|
|
|
def _selected_port(self) -> str | None:
|
|
v = self._port_combo.currentData()
|
|
return str(v) if v else None
|
|
|
|
# ── Connect / Disconnect ─────────────────────────────────────────────────
|
|
|
|
def _on_connect(self) -> None:
|
|
if self._port and self._port.isOpen():
|
|
self._disconnect()
|
|
return
|
|
|
|
port_name = self._selected_port()
|
|
if not port_name:
|
|
return
|
|
|
|
self._port = QSerialPort(self)
|
|
self._port.setPortName(port_name)
|
|
self._port.setBaudRate(BAUD_RATE)
|
|
self._port.setDataBits(QSerialPort.DataBits.Data8)
|
|
self._port.setParity(QSerialPort.Parity.NoParity)
|
|
self._port.setStopBits(QSerialPort.StopBits.OneStop)
|
|
self._port.setFlowControl(QSerialPort.FlowControl.NoFlowControl)
|
|
self._port.readyRead.connect(self._on_data)
|
|
self._port.errorOccurred.connect(self._on_serial_error)
|
|
|
|
if self._port.open(QIODevice.OpenModeFlag.ReadOnly):
|
|
self._conn_lbl.setText("● Conectado")
|
|
self._conn_lbl.setStyleSheet(f"color:{OK}; font-weight:bold;")
|
|
self._connect_btn.setText("Desconectar")
|
|
else:
|
|
self._conn_lbl.setText(f"✗ Error: {self._port.errorString()}")
|
|
self._conn_lbl.setStyleSheet(f"color:{ERROR}; font-weight:bold;")
|
|
self._port = None
|
|
|
|
def _disconnect(self) -> None:
|
|
if self._port:
|
|
self._port.close()
|
|
self._port = None
|
|
self._conn_lbl.setText("● Desconectado")
|
|
self._conn_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-weight:bold;")
|
|
self._connect_btn.setText("Conectar")
|
|
|
|
# ── Serial data ──────────────────────────────────────────────────────────
|
|
|
|
def _on_data(self) -> None:
|
|
if not self._port:
|
|
return
|
|
raw: QByteArray = self._port.readAll()
|
|
self._buf.extend(bytes(raw))
|
|
|
|
while b"\n" in self._buf:
|
|
idx = self._buf.index(b"\n")
|
|
line_bytes = self._buf[:idx]
|
|
self._buf = self._buf[idx + 1:]
|
|
try:
|
|
line = line_bytes.decode("ascii", errors="ignore").strip()
|
|
except Exception:
|
|
continue
|
|
status = _parse_parp_status(line)
|
|
if status:
|
|
self._ingest(status)
|
|
|
|
def _ingest(self, s: dict) -> None:
|
|
ts = s["ts"]
|
|
self._hdg_data.append((ts, s["heading"]))
|
|
self._spt_data.append((ts, s["setpoint"]))
|
|
self._rud_data.append((ts, s["rudder"]))
|
|
self._mode_str = s["mode"]
|
|
|
|
# Update live readout
|
|
self._hdg_row.update_value(s["heading"])
|
|
self._spt_row.update_value(s["setpoint"])
|
|
self._rud_row.update_value(s["rudder"])
|
|
self._mode_lbl.setText(f"Modo: {s['mode'].replace('_', ' ')}")
|
|
|
|
def _on_serial_error(self, error) -> None:
|
|
if error != QSerialPort.SerialPortError.NoError:
|
|
self._disconnect()
|
|
|
|
# ── Chart refresh ────────────────────────────────────────────────────────
|
|
|
|
def _refresh_charts(self) -> None:
|
|
self._hdg_chart.update()
|
|
self._rud_chart.update()
|
|
|
|
# ── Cleanup ──────────────────────────────────────────────────────────────
|
|
|
|
def closeEvent(self, event) -> None: # noqa: N802
|
|
self._disconnect()
|
|
super().closeEvent(event)
|