"""Flash Console widget -- "mini Arduino IDE" inside the Studio. What it does ------------ - Enumerates available serial ports (pyserial). - Lets the operator pick the firmware variant (``esp32-dev`` or ``esp32-debug``). - Compiles the firmware via PlatformIO (`pio run -e -d firmware/ar_autopilot_v1`). - Flashes the firmware via `pio run -t upload --upload-port `. - Streams the build/flash output to a read-only text area. Permissions ----------- - Super Admin: flashes directly, single factor. - Engineer: flashes only after providing the Super Admin's PIN. The dialog asks for the SA's PIN inline; on success the audit trail records both engineer's and SA's user_ids. - Owner / User: the Flash Console button is hidden. The actual ``pio`` invocations run in a worker thread so the GUI never blocks. """ from __future__ import annotations import os import shlex import subprocess import sys from pathlib import Path from typing import Optional from PySide6.QtCore import QObject, Qt, QThread, Signal from PySide6.QtWidgets import ( QComboBox, QDialog, QDialogButtonBox, QFormLayout, QGroupBox, QHBoxLayout, QLabel, QLineEdit, QMessageBox, QPlainTextEdit, QPushButton, QVBoxLayout, QWidget, ) from arautopilot.core.audit import AuditOutcome from arautopilot.core.rbac import Capability from arautopilot.studio.session import Session REPO_ROOT = Path(__file__).resolve().parents[2] FIRMWARE_DIR = REPO_ROOT / "firmware" / "ar_autopilot_v1" PIO_EXE_CANDIDATES = [ REPO_ROOT / ".venv" / "Scripts" / "pio.exe", # Windows venv REPO_ROOT / ".venv" / "bin" / "pio", # POSIX venv Path("pio"), # PATH fallback ] def _find_pio() -> Path | None: for c in PIO_EXE_CANDIDATES: if c.exists(): return c # Last-resort PATH lookup -- subprocess will find it. return Path("pio") def list_serial_ports() -> list[tuple[str, str]]: """Return ``[(device, description), ...]`` for every available port.""" try: from serial.tools import list_ports # type: ignore[import-untyped] except ImportError: return [] return [(p.device, p.description or "") for p in list_ports.comports()] class _PioWorker(QObject): """Runs `pio ...` in a background thread and streams stdout.""" line = Signal(str) # one line of output (no trailing \n) finished_ok = Signal(int) # exit code on success failed = Signal(str) # error description def __init__(self, argv: list[str], cwd: Path) -> None: super().__init__() self._argv = argv self._cwd = cwd self._proc: subprocess.Popen[str] | None = None self._cancel = False def run(self) -> None: try: env = os.environ.copy() env.setdefault("PYTHONIOENCODING", "utf-8") self._proc = subprocess.Popen( self._argv, cwd=str(self._cwd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, env=env, ) assert self._proc.stdout is not None for raw_line in self._proc.stdout: if self._cancel: break self.line.emit(raw_line.rstrip("\n")) code = self._proc.wait() self.finished_ok.emit(code) except FileNotFoundError as exc: self.failed.emit(f"executable not found: {exc}") except Exception as exc: # noqa: BLE001 self.failed.emit(f"unexpected error: {exc}") def cancel(self) -> None: self._cancel = True if self._proc is not None: try: self._proc.terminate() except Exception: pass class FlashConsoleWidget(QWidget): """A self-contained Flash Console for embedding in the main window.""" def __init__(self, session: Session, parent: QWidget | None = None) -> None: super().__init__(parent) self._session = session self._thread: QThread | None = None self._worker: _PioWorker | None = None self._build_ui() self.refresh_ports() # ----- UI ------------------------------------------------------------ def _build_ui(self) -> None: outer = QVBoxLayout(self) outer.setContentsMargins(8, 8, 8, 8) header = QLabel( "Flash Console
" "Compile and flash AR-Autopilot firmware to an AR-NMEA-IO board.
" f"Logged in as {self._session.user.display_name} " f"({self._session.role.value})." ) header.setTextFormat(Qt.TextFormat.RichText) header.setWordWrap(True) outer.addWidget(header) form_group = QGroupBox("Target") form = QFormLayout(form_group) port_row = QHBoxLayout() self._port_combo = QComboBox() self._port_combo.setMinimumWidth(280) port_row.addWidget(self._port_combo, stretch=1) refresh_btn = QPushButton("Refresh") refresh_btn.clicked.connect(self.refresh_ports) port_row.addWidget(refresh_btn) form.addRow("Serial port:", port_row) self._variant_combo = QComboBox() self._variant_combo.addItem("esp32-dev (release, -Os)", userData="esp32-dev") self._variant_combo.addItem("esp32-debug (-O0, verbose)", userData="esp32-debug") form.addRow("Variant:", self._variant_combo) outer.addWidget(form_group) action_row = QHBoxLayout() self._compile_btn = QPushButton("Compile only") self._compile_btn.clicked.connect(self._on_compile) action_row.addWidget(self._compile_btn) self._flash_btn = QPushButton("Compile + Flash") self._flash_btn.clicked.connect(self._on_flash) action_row.addWidget(self._flash_btn) self._cancel_btn = QPushButton("Cancel") self._cancel_btn.setEnabled(False) self._cancel_btn.clicked.connect(self._on_cancel) action_row.addWidget(self._cancel_btn) action_row.addStretch(1) outer.addLayout(action_row) self._output = QPlainTextEdit() self._output.setReadOnly(True) self._output.setStyleSheet( "background-color: #0d1117; color: #c9d1d9; font-family: Consolas, " "'Cascadia Mono', monospace; font-size: 11px;" ) outer.addWidget(self._output, stretch=1) # Gate the buttons by capability. if not self._session.can(Capability.BUILD_FIRMWARE): self._compile_btn.setEnabled(False) self._compile_btn.setToolTip("Your role cannot build firmware.") if not self._session.can(Capability.FLASH_FIRMWARE): self._flash_btn.setEnabled(False) self._flash_btn.setToolTip("Your role cannot flash firmware.") # ----- Public -------------------------------------------------------- def refresh_ports(self) -> None: self._port_combo.clear() ports = list_serial_ports() if not ports: self._port_combo.addItem("(no boards found)", userData=None) else: for device, desc in ports: label = f"{device} -- {desc}" if desc else device self._port_combo.addItem(label, userData=device) # ----- Handlers ------------------------------------------------------ def _on_compile(self) -> None: if not self._session.check(Capability.BUILD_FIRMWARE, target=self._variant()): QMessageBox.warning(self, "Permission denied", "Your role cannot build firmware.") return self._run_pio(["run", "-e", self._variant()]) def _on_flash(self) -> None: # Capability gate (logged either way). if not self._session.check(Capability.FLASH_FIRMWARE, target=self._port_or_none()): QMessageBox.warning(self, "Permission denied", "Your role cannot flash firmware.") return port = self._port_or_none() if port is None: QMessageBox.warning(self, "No port", "No serial port selected. Connect a board and " "click Refresh.") return # Dual-auth check: Engineer needs Super Admin PIN. if self._session.needs_dual_auth(Capability.FLASH_FIRMWARE): sa_user = _ask_super_admin_pin(self, self._session) if sa_user is None: self._session.log_action( "flash_firmware", outcome=AuditOutcome.APPROVAL_PENDING, target=f"{port}:{self._variant()}", reason="Super Admin approval refused or cancelled", ) return self._session.log_dual_auth_grant( Capability.FLASH_FIRMWARE, sa_user, target=f"{port}:{self._variant()}", extra={"variant": self._variant()}, ) self._run_pio([ "run", "-e", self._variant(), "-t", "upload", "--upload-port", port, ]) def _on_cancel(self) -> None: if self._worker is not None: self._worker.cancel() self._append_line("[cancelled by operator]") # ----- pio worker ---------------------------------------------------- def _run_pio(self, args: list[str]) -> None: pio = _find_pio() argv = [str(pio), *args] self._output.clear() self._append_line(f"$ {' '.join(shlex.quote(a) for a in argv)}") self._set_running(True) self._thread = QThread(self) self._worker = _PioWorker(argv=argv, cwd=FIRMWARE_DIR) self._worker.moveToThread(self._thread) self._thread.started.connect(self._worker.run) self._worker.line.connect(self._append_line) self._worker.finished_ok.connect(self._on_pio_done) self._worker.failed.connect(self._on_pio_failed) self._worker.finished_ok.connect(self._thread.quit) self._worker.failed.connect(self._thread.quit) self._thread.finished.connect(self._cleanup_thread) self._thread.start() def _on_pio_done(self, code: int) -> None: self._append_line(f"[pio exit code: {code}]") outcome = AuditOutcome.SUCCESS if code == 0 else AuditOutcome.FAILED self._session.log_action( "flash_firmware_run", outcome=outcome, target=self._port_or_none(), extra={"variant": self._variant(), "exit_code": code}, ) def _on_pio_failed(self, msg: str) -> None: self._append_line(f"[pio failed: {msg}]") self._session.log_action( "flash_firmware_run", outcome=AuditOutcome.FAILED, target=self._port_or_none(), reason=msg, ) def _cleanup_thread(self) -> None: self._set_running(False) if self._thread is not None: self._thread.deleteLater() self._thread = None if self._worker is not None: self._worker.deleteLater() self._worker = None def _set_running(self, running: bool) -> None: self._compile_btn.setEnabled( not running and self._session.can(Capability.BUILD_FIRMWARE) ) self._flash_btn.setEnabled( not running and self._session.can(Capability.FLASH_FIRMWARE) ) self._cancel_btn.setEnabled(running) def _append_line(self, line: str) -> None: self._output.appendPlainText(line) def _variant(self) -> str: return str(self._variant_combo.currentData()) def _port_or_none(self) -> str | None: v = self._port_combo.currentData() return None if v is None else str(v) # ---------------------------------------------------------------------------- # Super Admin PIN approval dialog # ---------------------------------------------------------------------------- def _ask_super_admin_pin(parent: QWidget, session: Session): """Modal dialog asking the SA for their PIN. Returns the SA :class:`User` on success, ``None`` on cancel / bad PIN.""" dlg = QDialog(parent) dlg.setWindowTitle("Super Admin approval required") dlg.setModal(True) layout = QVBoxLayout(dlg) layout.addWidget(QLabel( "Flashing firmware as an Engineer requires Super Admin approval.\n" "Ask the Super Admin to enter their PIN below." )) form = QFormLayout() pin_field = QLineEdit() pin_field.setEchoMode(QLineEdit.EchoMode.Password) pin_field.setMaxLength(8) form.addRow("Super Admin PIN:", pin_field) layout.addLayout(form) buttons = QDialogButtonBox( QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel ) layout.addWidget(buttons) result_holder: dict[str, Optional[object]] = {"user": None} def _accept() -> None: sa = session.verify_super_admin_pin(pin_field.text()) if sa is None: QMessageBox.warning(dlg, "Approval failed", "Incorrect Super Admin PIN.") pin_field.clear() pin_field.setFocus() return result_holder["user"] = sa dlg.accept() buttons.accepted.connect(_accept) buttons.rejected.connect(dlg.reject) dlg.exec() return result_holder["user"] # ---------------------------------------------------------------------------- # Standalone smoke test # ---------------------------------------------------------------------------- if __name__ == "__main__": # pragma: no cover -- manual launch helper from PySide6.QtWidgets import QApplication from arautopilot.core.audit import AuditLog from arautopilot.core.user import User from arautopilot.core.user_store import UserStore from arautopilot.core.rbac import Role app = QApplication(sys.argv) store = UserStore(REPO_ROOT / ".tmp" / "users.json") if len(store) == 0: store.add(User.create(display_name="Smoke", role=Role.SUPER_ADMIN, pin="0000")) sa = next(iter(store.by_role(Role.SUPER_ADMIN))) audit = AuditLog(REPO_ROOT / ".tmp" / "audit.jsonl") session = Session(user=sa, store=store, audit=audit) w = FlashConsoleWidget(session) w.resize(800, 600) w.show() sys.exit(app.exec())