Files
alro65 5238bd31f0 feat(studio): AR Electronics branding + Telemetría + Instalar J6412 tabs
ar_style.py — global QSS dark theme + QPalette matching the Flutter brand
  palette (navy #0D1B2A, electric blue #2563EB, glow #60B8FF). Single call:
  apply_ar_style(app).

app.py — applies AR style and window icon on startup.

main_window.py — complete rewrite of the layout:
  - Sidebar: AR logo (PNG), user/role display, capabilities list, version stamp
  - 5 tabs: Overview ·  Flash ESP32 · 📋 Proyecto · 📡 Telemetría · 💾 Instalar J6412
  - Overview tab: rich-text guide with icons for each tab's purpose

telemetry_widget.py — live $PARP STATUS chart tab:
  - QSerialPort RX-only connection to AR-Concentrador (port selector + Refresh)
  - Python $PARP XOR-checksum parser (mirrors Dart ParpCodec)
  - _RollingChart: pure QPainter scrolling time-series, 60 s window, no
    external charting library
  - Heading + Setpoint on one chart; Rudder on a second chart
  - Live value strip shows Rumbo / Setpoint / Timón + mode label

installer_widget.py — J6412 USB image builder tab:
  - Vessel name + serial number (auto-generate or paste)
  - Optional CSV log path for CRM
  - App checkboxes (AR-ECDIS / AR-Autopilot / skip Flutter build)
  - Worker thread runs installer/build_usb.py with streamed log output
  - "Abrir dist/" button when build succeeds
  - RBAC gated: Engineer or Super Admin only

pyproject.toml — adds [installer] and [license-server] optional dep groups

AR_electronics — AR-Autopilot Project
2026-05-24 11:22:38 -04:00

429 lines
15 KiB
Python

"""Telemetry widget — live $PARP STATUS viewer.
Connects to the AR-Concentrador over USB serial and displays heading,
setpoint, and rudder angle as rolling time-series charts. No external
charting library required — all drawing is done with QPainter.
Usage (embedded in StudioMainWindow):
from arautopilot.studio.telemetry_widget import TelemetryWidget
tabs.addTab(TelemetryWidget(session), "Telemetría")
The widget works without a connected board: it stays in "waiting" state
and shows a message until a port is selected and connected.
"""
from __future__ import annotations
import collections
import re
import time
from typing import Deque
from PySide6.QtCore import QByteArray, QIODevice, QTimer, Qt
from PySide6.QtGui import QColor, QFont, QPainter, QPen
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
from PySide6.QtWidgets import (
QComboBox,
QGroupBox,
QHBoxLayout,
QLabel,
QPushButton,
QVBoxLayout,
QWidget,
)
from arautopilot.studio.ar_style import ACCENT, ACCENT_MID, ERROR, NAVY, OK, TEXT_MUTED, WARN
from arautopilot.studio.session import Session
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BAUD_RATE = 115200
WINDOW_SEC = 60 # seconds of history shown on chart
CHART_FPS = 5 # redraws per second (low is fine — data is 2 Hz)
MAX_SAMPLES = WINDOW_SEC * 10 # 10 samples/s max
CHANNEL_COLORS = {
"heading": ACCENT_MID,
"setpoint": OK,
"rudder": WARN,
}
# ---------------------------------------------------------------------------
# $PARP STATUS parser (Python re-implementation of parp_codec.dart)
# ---------------------------------------------------------------------------
def _xor_checksum(body: str) -> int:
crc = 0
for ch in body:
crc ^= ord(ch)
return crc
def _parse_parp_status(line: str) -> dict | None:
"""
Parse ``$PARP,STATUS,...*CRC`` → dict or None.
Returns::
{
"mode": str, # "STANDBY" | "HEADING_HOLD" | "TRACK"
"setpoint": float,
"heading": float,
"rudder": float,
"ts": float, # time.monotonic()
}
"""
s = line.strip()
star = s.rfind("*")
if star < 0:
return None
body = s[1:star] if s.startswith("$") else s[:star]
crc_hex = s[star + 1:]
try:
if _xor_checksum(body) != int(crc_hex, 16):
return None
except ValueError:
return None
parts = body.split(",")
if len(parts) < 7 or parts[0] != "PARP" or parts[1] != "STATUS":
return None
try:
return {
"mode": parts[2],
"setpoint": float(parts[3]),
"heading": float(parts[4]),
"rudder": float(parts[5]),
"ts": time.monotonic(),
}
except ValueError:
return None
# ---------------------------------------------------------------------------
# Rolling chart widget
# ---------------------------------------------------------------------------
class _RollingChart(QWidget):
"""
A minimal scrolling time-series chart drawn with QPainter.
Each channel is a ``deque`` of ``(timestamp_monotonic, value)`` pairs.
Y-range is passed in; X range is always the last *window_sec* seconds.
"""
def __init__(
self,
title: str,
channels: dict[str, tuple[Deque, str]], # name → (deque, colour_hex)
y_min: float,
y_max: float,
y_unit: str = "",
parent: QWidget | None = None,
) -> None:
super().__init__(parent)
self.setMinimumHeight(160)
self._title = title
self._channels = channels
self._y_min = y_min
self._y_max = y_max
self._y_unit = y_unit
def paintEvent(self, _event) -> None: # noqa: N802
w, h = self.width(), self.height()
p = QPainter(self)
p.setRenderHint(QPainter.RenderHint.Antialiasing)
# Background
p.fillRect(0, 0, w, h, QColor(NAVY))
# Margins
ml, mr, mt, mb = 48, 12, 28, 28
cw = w - ml - mr
ch = h - mt - mb
if cw <= 0 or ch <= 0:
return
now = time.monotonic()
t0 = now - WINDOW_SEC
# Grid lines
grid_pen = QPen(QColor("#1E3A5F"), 1, Qt.PenStyle.DotLine)
p.setPen(grid_pen)
y_range = self._y_max - self._y_min or 1.0
for fraction in (0.0, 0.25, 0.5, 0.75, 1.0):
gy = mt + int((1 - fraction) * ch)
p.drawLine(ml, gy, ml + cw, gy)
val = self._y_min + fraction * y_range
p.setPen(QColor(TEXT_MUTED))
p.setFont(QFont("Segoe UI", 8))
label = f"{val:.0f}{self._y_unit}"
p.drawText(0, gy - 6, ml - 4, 14, Qt.AlignmentFlag.AlignRight, label)
p.setPen(grid_pen)
# Axes
p.setPen(QPen(QColor(ACCENT_MID), 1))
p.drawLine(ml, mt, ml, mt + ch)
p.drawLine(ml, mt + ch, ml + cw, mt + ch)
# Title
p.setPen(QColor(ACCENT_MID))
p.setFont(QFont("Segoe UI", 9, QFont.Weight.Bold))
p.drawText(ml, 0, cw, mt, Qt.AlignmentFlag.AlignCenter, self._title)
def _tx(ts: float) -> int:
return ml + int((ts - t0) / WINDOW_SEC * cw)
def _ty(v: float) -> int:
frac = (v - self._y_min) / y_range
return mt + ch - int(frac * ch)
# Series
for name, (deque, colour) in self._channels.items():
pts = [(ts, v) for ts, v in deque if ts >= t0]
if len(pts) < 2:
continue
pen = QPen(QColor(colour), 2)
pen.setCapStyle(Qt.PenCapStyle.RoundCap)
pen.setJoinStyle(Qt.PenJoinStyle.RoundJoin)
p.setPen(pen)
path_pts = [(_tx(ts), _ty(v)) for ts, v in pts]
for i in range(1, len(path_pts)):
p.drawLine(*path_pts[i - 1], *path_pts[i])
p.end()
# ---------------------------------------------------------------------------
# Value display row
# ---------------------------------------------------------------------------
class _ValueRow(QWidget):
"""Compact name │ value display for the live readout strip."""
def __init__(self, label: str, unit: str, colour: str) -> None:
super().__init__()
h = QHBoxLayout(self)
h.setContentsMargins(0, 0, 0, 0)
lbl = QLabel(label)
lbl.setStyleSheet(f"color:{colour}; font-size:11px; min-width:80px;")
self._value = QLabel("---")
self._value.setStyleSheet(
f"color:{colour}; font-size:20px; font-weight:bold; "
"font-family:Consolas; min-width:80px;"
)
unit_lbl = QLabel(unit)
unit_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-size:11px;")
h.addWidget(lbl)
h.addWidget(self._value)
h.addWidget(unit_lbl)
h.addStretch(1)
def update_value(self, v: float) -> None:
self._value.setText(f"{v:6.1f}")
# ---------------------------------------------------------------------------
# Main widget
# ---------------------------------------------------------------------------
class TelemetryWidget(QWidget):
"""Live telemetry from the AR-Concentrador — embedded in Studio."""
def __init__(self, session: Session, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._session = session
self._port: QSerialPort | None = None
self._buf = bytearray()
# Data stores
mk: type[Deque] = lambda: collections.deque(maxlen=MAX_SAMPLES)
self._hdg_data: Deque[tuple[float, float]] = mk()
self._spt_data: Deque[tuple[float, float]] = mk()
self._rud_data: Deque[tuple[float, float]] = mk()
self._mode_str = "—"
self._build_ui()
self._redraw_timer = QTimer(self)
self._redraw_timer.setInterval(1000 // CHART_FPS)
self._redraw_timer.timeout.connect(self._refresh_charts)
self._redraw_timer.start()
# ── UI ───────────────────────────────────────────────────────────────────
def _build_ui(self) -> None:
root = QVBoxLayout(self)
root.setContentsMargins(12, 12, 12, 12)
root.setSpacing(10)
# ── Connection bar ───────────────────────────────────────────────────
conn = QGroupBox("Conexión al Concentrador")
ch = QHBoxLayout(conn)
ch.addWidget(QLabel("Puerto RX:"))
self._port_combo = QComboBox()
self._port_combo.setMinimumWidth(200)
ch.addWidget(self._port_combo)
refresh_btn = QPushButton("Actualizar")
refresh_btn.clicked.connect(self._refresh_ports)
ch.addWidget(refresh_btn)
self._connect_btn = QPushButton("Conectar")
self._connect_btn.clicked.connect(self._on_connect)
ch.addWidget(self._connect_btn)
self._conn_lbl = QLabel("● Desconectado")
self._conn_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-weight:bold;")
ch.addWidget(self._conn_lbl)
ch.addStretch(1)
root.addWidget(conn)
# ── Live readout strip ───────────────────────────────────────────────
strip = QGroupBox("Datos en vivo")
sh = QHBoxLayout(strip)
self._hdg_row = _ValueRow("RUMBO", "°", ACCENT_MID)
self._spt_row = _ValueRow("SETPOINT", "°", OK)
self._rud_row = _ValueRow("TIMÓN", "°", WARN)
self._mode_lbl = QLabel("Modo: —")
self._mode_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-size:12px;")
sh.addWidget(self._hdg_row)
sh.addWidget(self._spt_row)
sh.addWidget(self._rud_row)
sh.addStretch(1)
sh.addWidget(self._mode_lbl)
root.addWidget(strip)
# ── Charts ───────────────────────────────────────────────────────────
self._hdg_chart = _RollingChart(
"RUMBO / SETPOINT (°)",
{
"Rumbo": (self._hdg_data, ACCENT_MID),
"Setpoint": (self._spt_data, OK),
},
y_min=0, y_max=360, y_unit="°",
)
self._rud_chart = _RollingChart(
"TIMÓN (°)",
{
"Timón": (self._rud_data, WARN),
},
y_min=-40, y_max=40, y_unit="°",
)
root.addWidget(self._hdg_chart, 2)
root.addWidget(self._rud_chart, 1)
self._refresh_ports()
# ── Ports ────────────────────────────────────────────────────────────────
def _refresh_ports(self) -> None:
self._port_combo.clear()
ports = QSerialPortInfo.availablePorts()
if not ports:
self._port_combo.addItem("(sin puertos)")
for info in ports:
label = info.portName()
if info.description():
label += f" — {info.description()}"
self._port_combo.addItem(label, info.portName())
def _selected_port(self) -> str | None:
v = self._port_combo.currentData()
return str(v) if v else None
# ── Connect / Disconnect ─────────────────────────────────────────────────
def _on_connect(self) -> None:
if self._port and self._port.isOpen():
self._disconnect()
return
port_name = self._selected_port()
if not port_name:
return
self._port = QSerialPort(self)
self._port.setPortName(port_name)
self._port.setBaudRate(BAUD_RATE)
self._port.setDataBits(QSerialPort.DataBits.Data8)
self._port.setParity(QSerialPort.Parity.NoParity)
self._port.setStopBits(QSerialPort.StopBits.OneStop)
self._port.setFlowControl(QSerialPort.FlowControl.NoFlowControl)
self._port.readyRead.connect(self._on_data)
self._port.errorOccurred.connect(self._on_serial_error)
if self._port.open(QIODevice.OpenModeFlag.ReadOnly):
self._conn_lbl.setText("● Conectado")
self._conn_lbl.setStyleSheet(f"color:{OK}; font-weight:bold;")
self._connect_btn.setText("Desconectar")
else:
self._conn_lbl.setText(f"✗ Error: {self._port.errorString()}")
self._conn_lbl.setStyleSheet(f"color:{ERROR}; font-weight:bold;")
self._port = None
def _disconnect(self) -> None:
if self._port:
self._port.close()
self._port = None
self._conn_lbl.setText("● Desconectado")
self._conn_lbl.setStyleSheet(f"color:{TEXT_MUTED}; font-weight:bold;")
self._connect_btn.setText("Conectar")
# ── Serial data ──────────────────────────────────────────────────────────
def _on_data(self) -> None:
if not self._port:
return
raw: QByteArray = self._port.readAll()
self._buf.extend(bytes(raw))
while b"\n" in self._buf:
idx = self._buf.index(b"\n")
line_bytes = self._buf[:idx]
self._buf = self._buf[idx + 1:]
try:
line = line_bytes.decode("ascii", errors="ignore").strip()
except Exception:
continue
status = _parse_parp_status(line)
if status:
self._ingest(status)
def _ingest(self, s: dict) -> None:
ts = s["ts"]
self._hdg_data.append((ts, s["heading"]))
self._spt_data.append((ts, s["setpoint"]))
self._rud_data.append((ts, s["rudder"]))
self._mode_str = s["mode"]
# Update live readout
self._hdg_row.update_value(s["heading"])
self._spt_row.update_value(s["setpoint"])
self._rud_row.update_value(s["rudder"])
self._mode_lbl.setText(f"Modo: {s['mode'].replace('_', ' ')}")
def _on_serial_error(self, error) -> None:
if error != QSerialPort.SerialPortError.NoError:
self._disconnect()
# ── Chart refresh ────────────────────────────────────────────────────────
def _refresh_charts(self) -> None:
self._hdg_chart.update()
self._rud_chart.update()
# ── Cleanup ──────────────────────────────────────────────────────────────
def closeEvent(self, event) -> None: # noqa: N802
self._disconnect()
super().closeEvent(event)