From de25dcee57295f32da17fe33f4ebce78ba4c5a51 Mon Sep 17 00:00:00 2001 From: alro1965 Date: Sun, 24 May 2026 01:36:24 -0400 Subject: [PATCH] feat(installer): J6412 USB installer + AR Electronics license activation server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit installer/: - build_usb.py: dev tool — builds Flutter + AR-ECDIS, assembles USB pendrive image with serial.key, autorun.inf, and START_INSTALLER.bat - serial_generator.py: generates AR-XXXX-XXXX-XXXX serial numbers (48-bit entropy), logs to CSV for CRM integration - src/license.py: hardware fingerprint (Windows MachineGuid + primary MAC), serial validation, online activation POST, local cache with 30-day offline grace period - src/sysconfig.py: HKCU autostart registry entries, .lnk shortcuts (desktop + Start Menu via WScript.Shell), firewall rules (netsh), COM port detection - src/install.py: tkinter installer GUI — 9 sequential steps with per-step progress indicators, threaded execution, error dialogs, and silent mode license_server/ (FastAPI service — deploy to arelectronics.com VPS): - POST /api/v1/activate: first activation accepted; same-HW re-activation refreshes heartbeat; different-HW rejected with 409 - GET /api/v1/validate/{serial}: heartbeat endpoint to refresh offline cache - Admin endpoints (X-Admin-Key): issue, list, revoke licenses - SQLAlchemy models: License (serial registry) + Activation (per-machine rows) - SQLite default, PostgreSQL-ready via DATABASE_URL env var AR_electronics — AR-Autopilot Project --- installer/build_usb.py | 262 ++++++++++++++++++++ installer/serial_generator.py | 103 ++++++++ installer/src/install.py | 426 ++++++++++++++++++++++++++++++++ installer/src/license.py | 246 ++++++++++++++++++ installer/src/sysconfig.py | 201 +++++++++++++++ license_server/.env.example | 10 + license_server/__init__.py | 1 + license_server/database.py | 33 +++ license_server/main.py | 305 +++++++++++++++++++++++ license_server/models.py | 51 ++++ license_server/requirements.txt | 5 + license_server/schemas.py | 69 ++++++ 12 files changed, 1712 insertions(+) create mode 100644 installer/build_usb.py create mode 100644 installer/serial_generator.py create mode 100644 installer/src/install.py create mode 100644 installer/src/license.py create mode 100644 installer/src/sysconfig.py create mode 100644 license_server/.env.example create mode 100644 license_server/__init__.py create mode 100644 license_server/database.py create mode 100644 license_server/main.py create mode 100644 license_server/models.py create mode 100644 license_server/requirements.txt create mode 100644 license_server/schemas.py diff --git a/installer/build_usb.py b/installer/build_usb.py new file mode 100644 index 0000000..e8223b0 --- /dev/null +++ b/installer/build_usb.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +# ============================================================================= +# installer/build_usb.py — Build a USB pendrive installer image +# ============================================================================= +# +# Developer tool that: +# 1. Builds the Flutter Windows release (optional — skip with --no-flutter) +# 2. Copies AR-ECDIS and AR-Autopilot binaries into dist/packages/ +# 3. Generates a fresh serial number and writes serial.key +# 4. Creates autorun.inf and START_INSTALLER.bat +# +# Output: dist/ directory ready to be copied to a USB pendrive. +# +# Prerequisites (on the build machine): +# - Flutter SDK in PATH (for AR-Autopilot Display) +# - Python 3.11+ +# - AR-ECDIS webecdis cloned next to AR-Autopilot (or set --ecdis-dir) +# - PyInstaller (pip install pyinstaller) — for AR-ECDIS .exe packaging +# +# Usage: +# cd installer +# python build_usb.py --vessel "BUQUE NORTE" --csv ../serials_log.csv +# python build_usb.py --no-flutter --no-ecdis # quick test build +# ============================================================================= + +from __future__ import annotations + +import argparse +import shutil +import subprocess +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent +DISPLAY_DIR = REPO_ROOT / "display" +INSTALLER_SRC = Path(__file__).resolve().parent / "src" +DIST_DIR = Path(__file__).resolve().parent / "dist" + +# Default location for the AR-ECDIS repo (sibling of AR-Autopilot) +DEFAULT_ECDIS = REPO_ROOT.parent / "AR ECDIS" / "webecdis" + + +def run(cmd: list[str], cwd: Path | None = None, check: bool = True): + print(f" $ {' '.join(str(c) for c in cmd)}") + subprocess.run(cmd, cwd=cwd, check=check) + + +# --------------------------------------------------------------------------- +# Step 1 — Flutter Windows build +# --------------------------------------------------------------------------- + +def build_flutter(flutter_cmd: str = "flutter") -> Path: + """Build AR-Autopilot Display for Windows and return the build output dir.""" + print("\n[1/5] Building Flutter (Windows release)…") + run([flutter_cmd, "build", "windows", "--release"], cwd=DISPLAY_DIR) + build_out = DISPLAY_DIR / "build" / "windows" / "x64" / "runner" / "Release" + if not build_out.exists(): + raise FileNotFoundError(f"Flutter build output not found: {build_out}") + print(f" Output: {build_out}") + return build_out + + +# --------------------------------------------------------------------------- +# Step 2 — AR-ECDIS PyInstaller build +# --------------------------------------------------------------------------- + +def build_ecdis(ecdis_dir: Path) -> Path: + """Package AR-ECDIS with PyInstaller and return the dist directory.""" + print("\n[2/5] Building AR-ECDIS (PyInstaller)…") + if not ecdis_dir.exists(): + print(f" AR-ECDIS dir not found ({ecdis_dir}) — skipping.") + return Path() + + main_py = ecdis_dir / "main.py" + if not main_py.exists(): + print(f" AR-ECDIS main.py not found — skipping.") + return Path() + + run( + [ + sys.executable, "-m", "PyInstaller", + "--onedir", + "--name", "AR-ECDIS", + "--windowed", + "--clean", + str(main_py), + ], + cwd=ecdis_dir, + check=False, # non-fatal — missing PyInstaller is warned, not fatal + ) + out = ecdis_dir / "dist" / "AR-ECDIS" + if out.exists(): + print(f" Output: {out}") + else: + print(" PyInstaller output not found — AR-ECDIS skipped.") + out = Path() + return out + + +# --------------------------------------------------------------------------- +# Step 3 — Assemble dist/ tree +# --------------------------------------------------------------------------- + +def assemble_dist( + flutter_build: Path, + ecdis_build: Path, + serial: str, +) -> None: + print("\n[3/5] Assembling USB installer tree…") + + # Clean previous dist + if DIST_DIR.exists(): + shutil.rmtree(DIST_DIR) + DIST_DIR.mkdir(parents=True) + + # Copy installer source files + pkg_installer = DIST_DIR + shutil.copytree(INSTALLER_SRC, pkg_installer / "src") + + # Place the serial key + (DIST_DIR / "serial.key").write_text(serial, encoding="utf-8") + + # Copy app packages + packages = DIST_DIR / "packages" + packages.mkdir() + + if flutter_build.exists(): + dest = packages / "AR-Autopilot" + shutil.copytree(flutter_build, dest) + print(f" AR-Autopilot → packages/AR-Autopilot/") + + if ecdis_build.exists(): + dest = packages / "AR-ECDIS" + shutil.copytree(ecdis_build, dest) + print(f" AR-ECDIS → packages/AR-ECDIS/") + + print(f" Serial key → serial.key ({serial})") + + +# --------------------------------------------------------------------------- +# Step 4 — Write autorun + launcher batch +# --------------------------------------------------------------------------- + +def write_autorun() -> None: + print("\n[4/5] Writing autorun.inf and START_INSTALLER.bat…") + + autorun = ( + "[autorun]\n" + "label=AR Electronics Installer\n" + "open=START_INSTALLER.bat\n" + "icon=src\\install.py,0\n" + ) + (DIST_DIR / "autorun.inf").write_text(autorun, encoding="utf-8") + + batch = ( + "@echo off\n" + "title AR Electronics — Instalador J6412\n" + 'echo Iniciando instalador AR Electronics...\n' + 'cd /d "%~dp0"\n' + "python src\\install.py\n" + "if errorlevel 1 (\n" + " echo.\n" + " echo ERROR: La instalacion fallo.\n" + " pause\n" + ")\n" + ) + (DIST_DIR / "START_INSTALLER.bat").write_text(batch, encoding="utf-8") + + # README for field technicians + readme = ( + "=== AR Electronics — Instalador J6412 ===\n\n" + "1. Conecte este pendrive al mini PC J6412.\n" + "2. Abra el explorador de archivos y ejecute START_INSTALLER.bat.\n" + " (Si Windows pregunta, elija 'Más información' → 'Ejecutar de todas formas'.)\n" + "3. El instalador solicitará permisos de administrador — acepte.\n" + "4. Pulse INSTALAR y espere a que finalice.\n" + "5. Reinicie el equipo.\n\n" + "El sistema requiere conexión a internet para la activación de la licencia.\n\n" + "Soporte: soporte@arelectronics.com\n" + ) + (DIST_DIR / "LEAME.txt").write_text(readme, encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Step 5 — Summary +# --------------------------------------------------------------------------- + +def print_summary(serial: str) -> None: + print("\n[5/5] Build complete.") + print(f"\n Directorio de salida : {DIST_DIR}") + print(f" Número de serie : {serial}") + size_mb = sum(f.stat().st_size for f in DIST_DIR.rglob("*") if f.is_file()) / 1e6 + print(f" Tamaño total : {size_mb:.1f} MB") + print("\n Copie todo el contenido de dist/ al pendrive USB.") + print(" Asegúrese de que el pendrive tenga al menos 2 GB de espacio.\n") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="AR Electronics — USB installer builder" + ) + parser.add_argument("--vessel", default="", + help="Vessel name to tag the serial number with") + parser.add_argument("--csv", + help="Path to serials CSV log (created or appended)") + parser.add_argument("--ecdis-dir", type=Path, default=DEFAULT_ECDIS, + help=f"Path to AR-ECDIS webecdis directory (default: {DEFAULT_ECDIS})") + parser.add_argument("--flutter", default="flutter", + help="flutter command (default: 'flutter')") + parser.add_argument("--no-flutter", action="store_true", + help="Skip Flutter build (use existing build output)") + parser.add_argument("--no-ecdis", action="store_true", + help="Skip AR-ECDIS PyInstaller build") + parser.add_argument("--serial", + help="Use an existing serial number instead of generating one") + args = parser.parse_args() + + print("=" * 60) + print(" AR Electronics — USB Installer Builder") + print("=" * 60) + + # Resolve serial + if args.serial: + serial = args.serial + print(f"\n Using existing serial: {serial}") + else: + # Import here to avoid circular issues if running as a module + sys.path.insert(0, str(Path(__file__).resolve().parent)) + from serial_generator import generate_batch, write_csv # noqa: PLC0415 + + records = generate_batch(1, vessel=args.vessel) + serial = records[0]["serial"] + print(f"\n Generated serial: {serial}") + if args.csv: + write_csv(records, args.csv) + + # Flutter build + if args.no_flutter: + flutter_build = DISPLAY_DIR / "build" / "windows" / "x64" / "runner" / "Release" + print(f"\n[1/5] Skipping Flutter build — using {flutter_build}") + else: + flutter_build = build_flutter(args.flutter) + + # AR-ECDIS build + if args.no_ecdis: + ecdis_build = Path() + print("\n[2/5] Skipping AR-ECDIS build.") + else: + ecdis_build = build_ecdis(args.ecdis_dir) + + # Assemble + assemble_dist(flutter_build, ecdis_build, serial) + write_autorun() + print_summary(serial) + + +if __name__ == "__main__": + main() diff --git a/installer/serial_generator.py b/installer/serial_generator.py new file mode 100644 index 0000000..66495f4 --- /dev/null +++ b/installer/serial_generator.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# ============================================================================= +# installer/serial_generator.py — AR Electronics serial number generator +# ============================================================================= +# +# Developer tool. Generates a batch of unique serial numbers and optionally +# writes them to a CSV log for the AR Electronics CRM. +# +# Format: AR-XXXX-XXXX-XXXX (hex groups, 48 bits of entropy ≈ 281 trillion) +# +# Usage: +# python serial_generator.py 10 # generate 10 serials +# python serial_generator.py 10 --csv serials.csv +# python serial_generator.py 1 --vessel "MY YACHT NAME" --csv serials.csv +# ============================================================================= + +import argparse +import csv +import os +import secrets +from datetime import datetime, timezone + + +def generate_serial() -> str: + """Generate a single AR-XXXX-XXXX-XXXX serial number.""" + raw = secrets.token_hex(6).upper() # 6 bytes = 12 hex chars = 3 × 4 + return f"AR-{raw[0:4]}-{raw[4:8]}-{raw[8:12]}" + + +def generate_batch(count: int, vessel: str = "") -> list[dict]: + serials = [] + seen: set[str] = set() + + while len(serials) < count: + serial = generate_serial() + if serial in seen: + continue # collision (astronomically unlikely) + seen.add(serial) + serials.append( + { + "serial": serial, + "vessel": vessel, + "created_at": datetime.now(timezone.utc).isoformat(), + "status": "unactivated", + } + ) + + return serials + + +def write_key_file(serial: str, output_path: str) -> None: + """Write a single serial number to a serial.key file.""" + with open(output_path, "w", encoding="utf-8") as f: + f.write(serial) + print(f" → {output_path}") + + +def write_csv(records: list[dict], csv_path: str) -> None: + """Append records to a CSV log (creates file if missing).""" + file_exists = os.path.exists(csv_path) + with open(csv_path, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=["serial", "vessel", "created_at", "status"]) + if not file_exists: + writer.writeheader() + writer.writerows(records) + print(f"\nAnexado a CSV: {csv_path}") + + +def main(): + parser = argparse.ArgumentParser( + description="AR Electronics — Generador de números de serie" + ) + parser.add_argument("count", type=int, nargs="?", default=1, + help="Cantidad de seriales a generar (default: 1)") + parser.add_argument("--vessel", default="", + help="Nombre del buque para asignar al lote") + parser.add_argument("--csv", + help="Ruta al archivo CSV de registro (se crea o se añade)") + parser.add_argument("--key-dir", + help="Directorio donde escribir archivos serial.key individuales") + args = parser.parse_args() + + records = generate_batch(args.count, vessel=args.vessel) + + print(f"\nSeriales generados ({args.count}):\n") + for rec in records: + vessel_info = f" [{rec['vessel']}]" if rec["vessel"] else "" + print(f" {rec['serial']}{vessel_info}") + + if args.csv: + write_csv(records, args.csv) + + if args.key_dir: + os.makedirs(args.key_dir, exist_ok=True) + for i, rec in enumerate(records): + filename = f"serial_{i+1:03d}.key" if len(records) > 1 else "serial.key" + write_key_file(rec["serial"], os.path.join(args.key_dir, filename)) + + print() + + +if __name__ == "__main__": + main() diff --git a/installer/src/install.py b/installer/src/install.py new file mode 100644 index 0000000..5f35f90 --- /dev/null +++ b/installer/src/install.py @@ -0,0 +1,426 @@ +# ============================================================================= +# installer/src/install.py — AR Electronics J6412 installer +# ============================================================================= +# +# Tkinter GUI installer that: +# 1. Validates the bundled serial number +# 2. Installs AR-ECDIS and AR-Autopilot Display to Program Files +# 3. Activates the license online +# 4. Configures Windows autostart, shortcuts, and firewall rules +# +# Usage: +# python install.py — interactive GUI mode +# python install.py --silent — headless mode (for testing / scripted deploy) +# +# This file lives in the root of the USB pendrive alongside serial.key and +# the packages/ directory. Run as Administrator for full functionality. +# ============================================================================= + +from __future__ import annotations + +import argparse +import shutil +import subprocess +import sys +import threading +import tkinter as tk +from pathlib import Path +from tkinter import messagebox, ttk + +# ── Resolve installer root (directory containing this script) ───────────────── +INSTALLER_DIR = Path(__file__).parent +PACKAGES_DIR = INSTALLER_DIR / "packages" + +APP_VERSION = "0.4.0" + +# ── Late imports from sibling modules ───────────────────────────────────────── +sys.path.insert(0, str(INSTALLER_DIR)) +from license import activate_online, read_serial, ActivationError # noqa: E402 +from sysconfig import ( # noqa: E402 + ECDIS_DIR, AUTOPILOT_DIR, + ensure_install_dirs, configure_autostart, + create_shortcuts, add_firewall_rules, list_com_ports, + is_admin, require_admin, +) + +# --------------------------------------------------------------------------- +# Installation steps +# --------------------------------------------------------------------------- + +STEPS = [ + ("Verificar serial", "_step_verify_serial"), + ("Crear directorios", "_step_create_dirs"), + ("Instalar AR-ECDIS", "_step_install_ecdis"), + ("Instalar AR-Autopilot", "_step_install_autopilot"), + ("Activar licencia", "_step_activate_license"), + ("Configurar inicio", "_step_configure_autostart"), + ("Accesos directos", "_step_create_shortcuts"), + ("Reglas de firewall", "_step_firewall"), + ("Finalizar", "_step_finalize"), +] + + +class Installer: + """Core installation logic — UI-independent.""" + + def __init__(self, log_fn=print): + self._log = log_fn + self.serial: str | None = None + + # ── Step implementations ───────────────────────────────────────────────── + + def _step_verify_serial(self): + self._log("Leyendo número de serie…") + self.serial = read_serial(INSTALLER_DIR) + self._log(f"Serial: {self.serial}") + + def _step_create_dirs(self): + self._log("Creando directorios en Program Files…") + ensure_install_dirs() + + def _step_install_ecdis(self): + src = PACKAGES_DIR / "AR-ECDIS" + if not src.exists(): + self._log("Paquete AR-ECDIS no encontrado — omitiendo.") + return + self._log(f"Copiando AR-ECDIS → {ECDIS_DIR}") + if ECDIS_DIR.exists(): + shutil.rmtree(ECDIS_DIR) + shutil.copytree(src, ECDIS_DIR) + self._log("AR-ECDIS instalado correctamente.") + + def _step_install_autopilot(self): + src = PACKAGES_DIR / "AR-Autopilot" + if not src.exists(): + self._log("Paquete AR-Autopilot no encontrado — omitiendo.") + return + self._log(f"Copiando AR-Autopilot → {AUTOPILOT_DIR}") + if AUTOPILOT_DIR.exists(): + shutil.rmtree(AUTOPILOT_DIR) + shutil.copytree(src, AUTOPILOT_DIR) + self._log("AR-Autopilot instalado correctamente.") + + def _step_activate_license(self): + if self.serial is None: + raise RuntimeError("Serial no disponible para activación.") + self._log(f"Activando licencia en servidor AR Electronics…") + result = activate_online(self.serial, app_version=APP_VERSION) + self._log( + f"Licencia activada — ID: {result.activation_id[:8]}…\n" + f" Slot: {result.vessel_slot} | {result.licensed_to}" + ) + + def _step_configure_autostart(self): + self._log("Configurando inicio automático con Windows…") + configure_autostart() + + def _step_create_shortcuts(self): + self._log("Creando accesos directos…") + create_shortcuts() + + def _step_firewall(self): + if not is_admin(): + self._log("Sin privilegios de administrador — omitiendo reglas de firewall.") + return + self._log("Añadiendo reglas de firewall…") + add_firewall_rules() + + def _step_finalize(self): + ports = list_com_ports() + if ports: + self._log(f"Puertos COM detectados: {', '.join(ports)}") + self._log( + "Conecte el concentrador y configure los puertos en\n" + "AR-Autopilot → Ajustes → Puertos COM." + ) + else: + self._log("No se detectaron puertos COM — conecte el concentrador USB.") + self._log("Instalación completada con éxito.") + + # ── Public runner ──────────────────────────────────────────────────────── + + def run_all(self, progress_cb=None): + """ + Execute all steps sequentially. + + :param progress_cb: optional callable(step_index, step_name, success, error) + """ + for idx, (name, method_name) in enumerate(STEPS): + method = getattr(self, method_name) + try: + method() + if progress_cb: + progress_cb(idx, name, True, None) + except ActivationError as exc: + if progress_cb: + progress_cb(idx, name, False, exc) + raise + except Exception as exc: + if progress_cb: + progress_cb(idx, name, False, exc) + raise + + +# --------------------------------------------------------------------------- +# Tkinter GUI +# --------------------------------------------------------------------------- + +BRAND_NAVY = "#0D1B2A" +BRAND_BLUE = "#2563EB" +BRAND_GLOW = "#60B8FF" +BRAND_TEXT = "#E2E8F0" +BRAND_MUTED = "#8899AA" +BRAND_GREEN = "#22C55E" +BRAND_RED = "#EF4444" + + +class InstallerWindow: + def __init__(self): + self.root = tk.Tk() + self.root.title("AR Electronics — Instalador J6412") + self.root.configure(bg=BRAND_NAVY) + self.root.resizable(False, False) + + # Centre on screen + w, h = 560, 520 + sw = self.root.winfo_screenwidth() + sh = self.root.winfo_screenheight() + self.root.geometry(f"{w}x{h}+{(sw-w)//2}+{(sh-h)//2}") + + self._build_ui() + self._installer = Installer(log_fn=self._append_log) + + # ── UI construction ────────────────────────────────────────────────────── + + def _build_ui(self): + # Header + hdr = tk.Frame(self.root, bg=BRAND_NAVY) + hdr.pack(fill="x", padx=0, pady=0) + + tk.Label( + hdr, + text="AR Electronics", + font=("Segoe UI", 18, "bold"), + fg=BRAND_GLOW, + bg=BRAND_NAVY, + ).pack(pady=(20, 0)) + + tk.Label( + hdr, + text="Instalador para J6412 Mini PC", + font=("Segoe UI", 11), + fg=BRAND_MUTED, + bg=BRAND_NAVY, + ).pack(pady=(0, 12)) + + sep = tk.Frame(self.root, height=1, bg=BRAND_BLUE) + sep.pack(fill="x", padx=20) + + # Steps frame + self._step_vars: list[tk.StringVar] = [] + steps_frame = tk.Frame(self.root, bg=BRAND_NAVY) + steps_frame.pack(fill="x", padx=30, pady=14) + + for _, (name, _) in enumerate(STEPS): + var = tk.StringVar(value=f" ○ {name}") + lbl = tk.Label( + steps_frame, + textvariable=var, + font=("Consolas", 10), + fg=BRAND_MUTED, + bg=BRAND_NAVY, + anchor="w", + ) + lbl.pack(fill="x", pady=1) + self._step_vars.append(var) + + self._step_labels = steps_frame.winfo_children() + + # Progress bar + pb_frame = tk.Frame(self.root, bg=BRAND_NAVY) + pb_frame.pack(fill="x", padx=30, pady=(0, 8)) + + style = ttk.Style() + style.theme_use("clam") + style.configure( + "AR.Horizontal.TProgressbar", + troughcolor=BRAND_NAVY, + bordercolor=BRAND_BLUE, + background=BRAND_GLOW, + lightcolor=BRAND_GLOW, + darkcolor=BRAND_BLUE, + ) + self._progress = ttk.Progressbar( + pb_frame, + style="AR.Horizontal.TProgressbar", + maximum=len(STEPS), + length=500, + ) + self._progress.pack(fill="x") + + # Log text + log_frame = tk.Frame(self.root, bg="#0A1520") + log_frame.pack(fill="both", expand=True, padx=20, pady=(0, 12)) + + self._log_text = tk.Text( + log_frame, + height=7, + font=("Consolas", 9), + bg="#0A1520", + fg=BRAND_MUTED, + relief="flat", + state="disabled", + wrap="word", + ) + self._log_text.pack(fill="both", expand=True, padx=8, pady=6) + + # Buttons + btn_frame = tk.Frame(self.root, bg=BRAND_NAVY) + btn_frame.pack(fill="x", padx=20, pady=(0, 20)) + + self._install_btn = tk.Button( + btn_frame, + text="INSTALAR", + font=("Segoe UI", 10, "bold"), + bg=BRAND_BLUE, + fg="white", + activebackground=BRAND_GLOW, + relief="flat", + padx=24, + pady=8, + cursor="hand2", + command=self._start_install, + ) + self._install_btn.pack(side="left") + + self._cancel_btn = tk.Button( + btn_frame, + text="Cancelar", + font=("Segoe UI", 10), + bg=BRAND_NAVY, + fg=BRAND_MUTED, + activeforeground=BRAND_TEXT, + relief="flat", + padx=16, + pady=8, + cursor="hand2", + command=self.root.destroy, + ) + self._cancel_btn.pack(side="left", padx=(10, 0)) + + self._status_lbl = tk.Label( + btn_frame, + text="", + font=("Segoe UI", 9), + fg=BRAND_MUTED, + bg=BRAND_NAVY, + ) + self._status_lbl.pack(side="right") + + # ── Install thread ─────────────────────────────────────────────────────── + + def _start_install(self): + self._install_btn.configure(state="disabled") + self._cancel_btn.configure(state="disabled") + threading.Thread(target=self._run_install, daemon=True).start() + + def _run_install(self): + try: + self._installer.run_all(progress_cb=self._on_step) + self.root.after(0, self._on_success) + except Exception as exc: + self.root.after(0, self._on_failure, str(exc)) + + def _on_step(self, idx: int, name: str, success: bool, error): + def update(): + if success: + self._step_vars[idx].set(f" ✓ {name}") + self._step_labels[idx].configure(fg=BRAND_GREEN) + else: + self._step_vars[idx].set(f" ✗ {name}") + self._step_labels[idx].configure(fg=BRAND_RED) + self._progress["value"] = idx + 1 + + self.root.after(0, update) + + def _on_success(self): + self._status_lbl.configure(text="Instalación completada", fg=BRAND_GREEN) + self._cancel_btn.configure(state="normal", text="Cerrar") + messagebox.showinfo( + "AR Electronics", + "Instalación completada con éxito.\n\n" + "AR-ECDIS y AR-Autopilot están listos.\n" + "Reinicie el equipo para activar el inicio automático.", + ) + + def _on_failure(self, msg: str): + self._status_lbl.configure(text="Error en la instalación", fg=BRAND_RED) + self._install_btn.configure(state="normal") + self._cancel_btn.configure(state="normal") + messagebox.showerror( + "Error de instalación", + f"La instalación no se completó:\n\n{msg}\n\n" + "Verifique la conexión a internet y vuelva a intentarlo.\n" + "Si el problema persiste, contacte a AR Electronics.", + ) + + # ── Log output ─────────────────────────────────────────────────────────── + + def _append_log(self, text: str): + def _do(): + self._log_text.configure(state="normal") + self._log_text.insert("end", text + "\n") + self._log_text.see("end") + self._log_text.configure(state="disabled") + + self.root.after(0, _do) + + # ── Main loop ──────────────────────────────────────────────────────────── + + def run(self): + self.root.mainloop() + + +# --------------------------------------------------------------------------- +# Silent / headless mode +# --------------------------------------------------------------------------- + +def run_silent(): + installer = Installer() + errors = [] + + def cb(idx, name, success, error): + icon = "✓" if success else "✗" + print(f" [{icon}] {name}") + if error: + errors.append(str(error)) + + try: + installer.run_all(progress_cb=cb) + print("\nInstalación completada con éxito.") + except Exception as exc: + print(f"\nERROR: {exc}") + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="AR Electronics J6412 Installer") + parser.add_argument( + "--silent", action="store_true", help="Run without GUI (headless mode)" + ) + parser.add_argument( + "--no-admin-check", action="store_true", help="Skip UAC elevation request" + ) + args = parser.parse_args() + + if not args.no_admin_check: + require_admin() + + if args.silent: + run_silent() + else: + InstallerWindow().run() diff --git a/installer/src/license.py b/installer/src/license.py new file mode 100644 index 0000000..3a3cc91 --- /dev/null +++ b/installer/src/license.py @@ -0,0 +1,246 @@ +# ============================================================================= +# installer/src/license.py — Serial-number activation client +# ============================================================================= +# +# Reads the serial number bundled with this installer package (serial.key), +# collects a hardware fingerprint (Windows Machine GUID + primary MAC address), +# POSTs to the AR Electronics license server, and caches the activation token +# locally in %APPDATA%\AR Electronics\license.json. +# +# Offline grace period: 30 days without contacting the server. +# Duplicate activations on a different machine are rejected server-side. +# ============================================================================= + +from __future__ import annotations + +import hashlib +import json +import platform +import re +import socket +import subprocess +import uuid +import winreg +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import requests + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +SERVER_BASE_URL = "https://license.arelectronics.com" +ACTIVATE_ROUTE = "/api/v1/activate" +VALIDATE_ROUTE = "/api/v1/validate" +OFFLINE_GRACE = 30 # days allowed without server contact +APP_DATA_DIR = Path.home() / "AppData" / "Roaming" / "AR Electronics" +LICENSE_CACHE = APP_DATA_DIR / "license.json" +SERIAL_FILE_NAME = "serial.key" # placed next to install.py by build_usb.py + +# --------------------------------------------------------------------------- +# Hardware fingerprint +# --------------------------------------------------------------------------- + +def _machine_guid() -> str: + """Read the Windows Machine GUID from the registry (stable across reboots).""" + try: + key = winreg.OpenKey( + winreg.HKEY_LOCAL_MACHINE, + r"SOFTWARE\Microsoft\Cryptography", + ) + value, _ = winreg.QueryValueEx(key, "MachineGuid") + winreg.CloseKey(key) + return value + except OSError: + return "" + + +def _primary_mac() -> str: + """Return the MAC address of the adapter used for the default route.""" + try: + # Connect to an external address (no data sent) to discover default adapter + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + s.close() + + # Find the MAC for this IP using ipconfig output + result = subprocess.run( + ["ipconfig", "/all"], capture_output=True, text=True, timeout=5 + ) + blocks = result.stdout.split("\n\n") + for block in blocks: + if ip in block: + m = re.search( + r"Physical Address[.\s]+:\s*([0-9A-F-]{17})", block, re.IGNORECASE + ) + if m: + return m.group(1).replace("-", ":").upper() + except Exception: + pass + + # Fallback — uuid.getnode() uses whichever adapter Python finds first + raw = uuid.getnode() + return ":".join(f"{(raw >> (i * 8)) & 0xFF:02X}" for i in range(5, -1, -1)) + + +def hardware_fingerprint() -> str: + """Stable, anonymised hardware fingerprint for this machine.""" + raw = f"{_machine_guid()}|{_primary_mac()}|{platform.node()}" + return hashlib.sha256(raw.encode()).hexdigest()[:32] + + +# --------------------------------------------------------------------------- +# Serial number helpers +# --------------------------------------------------------------------------- + +def read_serial(installer_dir: Path) -> str: + """ + Read the serial number from ``serial.key`` next to the installer. + + Raises FileNotFoundError if the file is missing, ValueError if malformed. + """ + serial_path = installer_dir / SERIAL_FILE_NAME + if not serial_path.exists(): + raise FileNotFoundError( + f"Serial key file not found: {serial_path}\n" + "This installer package may be incomplete." + ) + serial = serial_path.read_text(encoding="utf-8").strip() + if not _valid_serial_format(serial): + raise ValueError(f"Malformed serial number: {serial!r}") + return serial + + +def _valid_serial_format(serial: str) -> bool: + """Validate format: AR-XXXX-XXXX-XXXX (hex groups).""" + pattern = r"^AR-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}$" + return bool(re.match(pattern, serial, re.IGNORECASE)) + + +# --------------------------------------------------------------------------- +# Activation +# --------------------------------------------------------------------------- + +class ActivationError(Exception): + """Raised when activation is refused or fails.""" + + +class ActivationResult: + def __init__(self, data: dict): + self.activation_id: str = data["activation_id"] + self.vessel_slot: int = data.get("vessel_slot", 1) + self.licensed_to: str = data.get("licensed_to", "") + self.activated_at: str = data.get("activated_at", "") + self.expires_at: str | None = data.get("expires_at") + + +def activate_online(serial: str, app_version: str = "0.4.0") -> ActivationResult: + """ + POST activation request to the AR Electronics license server. + + On success caches the response in LICENSE_CACHE. + Raises ActivationError on any refusal or connection problem. + """ + hw_id = hardware_fingerprint() + payload = { + "serial": serial, + "hardware_id": hw_id, + "app_version": app_version, + "platform": platform.system(), + "hostname": platform.node(), + } + + try: + resp = requests.post( + SERVER_BASE_URL + ACTIVATE_ROUTE, + json=payload, + timeout=15, + ) + except requests.ConnectionError: + raise ActivationError("No se pudo conectar con el servidor de licencias.\n" + "Verifique la conexión a internet e intente de nuevo.") + except requests.Timeout: + raise ActivationError("El servidor de licencias no respondió (timeout 15 s).") + + if resp.status_code == 200: + result = ActivationResult(resp.json()) + _cache_activation(serial, hw_id, resp.json()) + return result + + # Server returned an error + try: + msg = resp.json().get("detail", resp.text) + except Exception: + msg = resp.text + raise ActivationError(f"Activación rechazada ({resp.status_code}): {msg}") + + +# --------------------------------------------------------------------------- +# Local cache +# --------------------------------------------------------------------------- + +def _cache_activation(serial: str, hardware_id: str, server_data: dict) -> None: + APP_DATA_DIR.mkdir(parents=True, exist_ok=True) + cache = { + "serial": serial, + "hardware_id": hardware_id, + "cached_at": datetime.now(timezone.utc).isoformat(), + "server_data": server_data, + } + LICENSE_CACHE.write_text(json.dumps(cache, indent=2), encoding="utf-8") + + +def load_cached_license() -> dict | None: + """Return cached license dict or None if missing / expired.""" + if not LICENSE_CACHE.exists(): + return None + try: + cache = json.loads(LICENSE_CACHE.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + + # Validate hardware fingerprint matches this machine + if cache.get("hardware_id") != hardware_fingerprint(): + return None + + # Check offline grace period + cached_at = datetime.fromisoformat(cache["cached_at"]) + if datetime.now(timezone.utc) - cached_at > timedelta(days=OFFLINE_GRACE): + return None # Cache expired — must re-validate online + + return cache + + +def is_activated() -> bool: + """Quick check: is this machine currently activated (cache or online)?""" + cache = load_cached_license() + if cache is not None: + return True + # Try a fast online validate + hw_id = hardware_fingerprint() + serial = _serial_from_cache() + if serial is None: + return False + try: + resp = requests.get( + f"{SERVER_BASE_URL}{VALIDATE_ROUTE}/{serial}", + params={"hardware_id": hw_id}, + timeout=8, + ) + if resp.status_code == 200 and resp.json().get("active"): + _cache_activation(serial, hw_id, resp.json()) + return True + except Exception: + pass + return False + + +def _serial_from_cache() -> str | None: + if not LICENSE_CACHE.exists(): + return None + try: + return json.loads(LICENSE_CACHE.read_text(encoding="utf-8")).get("serial") + except Exception: + return None diff --git a/installer/src/sysconfig.py b/installer/src/sysconfig.py new file mode 100644 index 0000000..b70578b --- /dev/null +++ b/installer/src/sysconfig.py @@ -0,0 +1,201 @@ +# ============================================================================= +# installer/src/sysconfig.py — Windows system configuration helpers +# ============================================================================= +# +# All functions target Windows 10/11 (tested on J6412 with Windows 10 IoT). +# Requires the installer to be run as Administrator for firewall and registry +# operations; shortcuts and HKCU run-key work without elevation. +# ============================================================================= + +from __future__ import annotations + +import ctypes +import os +import subprocess +import sys +import winreg +from pathlib import Path + +# --------------------------------------------------------------------------- +# Install paths +# --------------------------------------------------------------------------- + +INSTALL_ROOT = Path(os.environ.get("ProgramFiles", "C:\\Program Files")) / "AR Electronics" +ECDIS_DIR = INSTALL_ROOT / "AR-ECDIS" +AUTOPILOT_DIR = INSTALL_ROOT / "AR-Autopilot" +APPDATA_DIR = Path.home() / "AppData" / "Roaming" / "AR Electronics" +START_MENU = Path(os.environ.get("APPDATA", "")) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "AR Electronics" + + +def ensure_install_dirs() -> None: + """Create install directory tree (requires admin).""" + for d in (INSTALL_ROOT, ECDIS_DIR, AUTOPILOT_DIR, APPDATA_DIR, START_MENU): + d.mkdir(parents=True, exist_ok=True) + + +# --------------------------------------------------------------------------- +# Auto-start (HKCU — no admin required) +# --------------------------------------------------------------------------- + +_AUTORUN_KEY = r"SOFTWARE\Microsoft\Windows\CurrentVersion\Run" +_AR_ECDIS_VALUE = "AR-ECDIS" +_AR_AUTOPILOT_VALUE = "AR-Autopilot" + + +def register_autostart(name: str, exe_path: Path, args: str = "") -> None: + """ + Add an entry to HKCU Run so the app starts with Windows. + + Does NOT require administrator rights (current-user key). + """ + cmd = f'"{exe_path}" {args}'.strip() + key = winreg.OpenKey( + winreg.HKEY_CURRENT_USER, + _AUTORUN_KEY, + access=winreg.KEY_SET_VALUE, + ) + winreg.SetValueEx(key, name, 0, winreg.REG_SZ, cmd) + winreg.CloseKey(key) + + +def unregister_autostart(name: str) -> None: + """Remove an auto-start entry (best-effort, ignores missing keys).""" + try: + key = winreg.OpenKey( + winreg.HKEY_CURRENT_USER, + _AUTORUN_KEY, + access=winreg.KEY_SET_VALUE, + ) + winreg.DeleteValue(key, name) + winreg.CloseKey(key) + except FileNotFoundError: + pass + + +def configure_autostart() -> None: + """Register both AR Electronics apps in the Windows autostart.""" + ecdis_exe = ECDIS_DIR / "AR-ECDIS.exe" + autopilot_exe = AUTOPILOT_DIR / "ar_autopilot_display.exe" + + if ecdis_exe.exists(): + register_autostart(_AR_ECDIS_VALUE, ecdis_exe) + + if autopilot_exe.exists(): + register_autostart(_AR_AUTOPILOT_VALUE, autopilot_exe) + + +# --------------------------------------------------------------------------- +# Desktop and Start Menu shortcuts +# --------------------------------------------------------------------------- + +def _make_shortcut(target: Path, shortcut_path: Path, icon: Path | None = None) -> None: + """ + Create a .lnk shortcut using PowerShell WScript.Shell. + + No admin required; works on standard user accounts. + """ + icon_str = f'$s.IconLocation = "{icon}"' if icon else "" + script = ( + f'$s=(New-Object -COM WScript.Shell).CreateShortcut("{shortcut_path}");' + f'$s.TargetPath="{target}";' + f'{icon_str};' + f'$s.Save()' + ) + subprocess.run(["powershell", "-Command", script], check=True, capture_output=True) + + +def create_shortcuts() -> None: + """Create Start Menu and Desktop shortcuts for both apps.""" + START_MENU.mkdir(parents=True, exist_ok=True) + desktop = Path.home() / "Desktop" + + apps = [ + ("AR-ECDIS", ECDIS_DIR / "AR-ECDIS.exe"), + ("AR-Autopilot", AUTOPILOT_DIR / "ar_autopilot_display.exe"), + ] + for name, exe in apps: + if not exe.exists(): + continue + lnk = f"{name}.lnk" + _make_shortcut(exe, START_MENU / lnk, icon=exe) + _make_shortcut(exe, desktop / lnk, icon=exe) + + +# --------------------------------------------------------------------------- +# Windows Firewall rules (requires admin) +# --------------------------------------------------------------------------- + +def add_firewall_rules() -> None: + """ + Allow inbound TCP on ports used by AR Electronics services. + + Requires the installer to be running as Administrator. + """ + rules = [ + ("AR-ECDIS Web", "TCP", "8080"), # AR-ECDIS FastAPI backend + ("AR-ECDIS WebSocket","TCP", "8080"), + ] + for name, proto, port in rules: + subprocess.run( + [ + "netsh", "advfirewall", "firewall", "add", "rule", + f"name={name}", + "dir=in", + "action=allow", + f"protocol={proto}", + f"localport={port}", + ], + check=False, # non-fatal if rule already exists + capture_output=True, + ) + + +# --------------------------------------------------------------------------- +# COM port detection (informational — no forced assignment) +# --------------------------------------------------------------------------- + +def list_com_ports() -> list[str]: + """ + Return list of detected COM ports on the system. + + On Windows this queries the registry rather than requiring PySerial. + """ + ports: list[str] = [] + try: + key = winreg.OpenKey( + winreg.HKEY_LOCAL_MACHINE, + r"HARDWARE\DEVICEMAP\SERIALCOMM", + ) + i = 0 + while True: + try: + _, port_name, _ = winreg.EnumValue(key, i) + ports.append(port_name) + i += 1 + except OSError: + break + winreg.CloseKey(key) + except OSError: + pass + return sorted(ports) + + +# --------------------------------------------------------------------------- +# Administrator check +# --------------------------------------------------------------------------- + +def is_admin() -> bool: + """Return True if the current process has administrator privileges.""" + try: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + except AttributeError: + return False + + +def require_admin() -> None: + """Re-launch this process with UAC elevation if not already admin.""" + if not is_admin(): + ctypes.windll.shell32.ShellExecuteW( + None, "runas", sys.executable, " ".join(sys.argv), None, 1 + ) + sys.exit(0) diff --git a/license_server/.env.example b/license_server/.env.example new file mode 100644 index 0000000..976827f --- /dev/null +++ b/license_server/.env.example @@ -0,0 +1,10 @@ +# AR Electronics License Server — environment variables +# Copy this file to .env and fill in values before starting the server. + +# Database — SQLite (default) or PostgreSQL +DATABASE_URL=sqlite:///./ar_licenses.db +# DATABASE_URL=postgresql://user:password@localhost:5432/ar_licenses + +# Admin API key — change this to a strong random value in production! +# Generate one: python -c "import secrets; print(secrets.token_urlsafe(32))" +ADMIN_API_KEY=change-me-in-production diff --git a/license_server/__init__.py b/license_server/__init__.py new file mode 100644 index 0000000..020f3fa --- /dev/null +++ b/license_server/__init__.py @@ -0,0 +1 @@ +# AR Electronics License Server package diff --git a/license_server/database.py b/license_server/database.py new file mode 100644 index 0000000..881ed18 --- /dev/null +++ b/license_server/database.py @@ -0,0 +1,33 @@ +# ============================================================================= +# license_server/database.py — SQLAlchemy engine + session factory +# ============================================================================= + +import os +from sqlalchemy import create_engine +from sqlalchemy.orm import DeclarativeBase, sessionmaker + +# Default: SQLite in same directory. Set DATABASE_URL env var for PostgreSQL. +DATABASE_URL = os.getenv( + "DATABASE_URL", + "sqlite:///./ar_licenses.db", +) + +# connect_args only needed for SQLite (allows multi-threaded use by FastAPI) +_connect_args = {"check_same_thread": False} if DATABASE_URL.startswith("sqlite") else {} + +engine = create_engine(DATABASE_URL, connect_args=_connect_args) + +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +class Base(DeclarativeBase): + pass + + +def get_db(): + """FastAPI dependency that yields a database session.""" + db = SessionLocal() + try: + yield db + finally: + db.close() diff --git a/license_server/main.py b/license_server/main.py new file mode 100644 index 0000000..4a03a37 --- /dev/null +++ b/license_server/main.py @@ -0,0 +1,305 @@ +# ============================================================================= +# license_server/main.py — AR Electronics License Server +# ============================================================================= +# +# FastAPI REST service that manages serial number activations for all +# AR Electronics products deployed on J6412 mini PCs. +# +# Endpoints (public): +# POST /api/v1/activate — activate a serial on a machine +# GET /api/v1/validate/{serial} — check activation status +# +# Endpoints (admin — require X-Admin-Key header): +# GET /api/v1/admin/licenses — list all issued licenses +# GET /api/v1/admin/activations — list all activations +# POST /api/v1/admin/issue — issue a new serial number +# DELETE /api/v1/admin/revoke/{serial} — revoke a license +# +# Run: +# uvicorn license_server.main:app --host 0.0.0.0 --port 8888 --reload +# +# Environment variables: +# DATABASE_URL — SQLAlchemy URL (default: sqlite:///./ar_licenses.db) +# ADMIN_API_KEY — Secret key for /admin/* endpoints +# ============================================================================= + +from __future__ import annotations + +import os +import uuid +from datetime import datetime, timezone + +from fastapi import Depends, FastAPI, Header, HTTPException, Query +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from .database import Base, engine, get_db +from .models import Activation, License +from .schemas import ( + ActivationRequest, + ActivationAdminRecord, + ActivationResponse, + LicenseAdminRecord, + ValidateResponse, +) + +# ── Create tables on startup ───────────────────────────────────────────────── +Base.metadata.create_all(bind=engine) + +# ── App ────────────────────────────────────────────────────────────────────── +app = FastAPI( + title="AR Electronics License Server", + description="Serial-number activation and validation for J6412 deployments.", + version="1.0.0", +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["GET", "POST", "DELETE"], + allow_headers=["*"], +) + +ADMIN_KEY = os.getenv("ADMIN_API_KEY", "change-me-in-production") + + +# --------------------------------------------------------------------------- +# Auth helpers +# --------------------------------------------------------------------------- + +def require_admin(x_admin_key: str = Header(...)): + if x_admin_key != ADMIN_KEY: + raise HTTPException(status_code=403, detail="Invalid admin key.") + + +# --------------------------------------------------------------------------- +# Admin request schemas (defined here — too small for schemas.py) +# --------------------------------------------------------------------------- + +class IssueRequest(BaseModel): + serial: str + vessel: str = "" + notes: str = "" + + +# --------------------------------------------------------------------------- +# Public — Activation +# --------------------------------------------------------------------------- + +@app.post("/api/v1/activate", response_model=ActivationResponse, tags=["Public"]) +def activate(request: ActivationRequest, db: Session = Depends(get_db)): + """ + Activate a serial number on a specific hardware machine. + + - First activation: accepted, creates a new Activation row. + - Same hardware re-activating the same serial: accepted, updates last_seen. + - Different hardware trying to activate an already-activated serial: rejected. + """ + serial = request.serial.upper() + + # Verify the serial exists and is not revoked + lic = db.query(License).filter(License.serial == serial).first() + if lic is None: + raise HTTPException(status_code=404, detail="Serial number not found.") + if not lic.is_active: + raise HTTPException(status_code=403, detail="This license has been revoked.") + + hw_id = request.hardware_id + + # Check for an existing activation + existing = ( + db.query(Activation) + .filter(Activation.serial == serial, Activation.revoked == False) # noqa: E712 + .first() + ) + + if existing is not None: + if existing.hardware_id == hw_id: + # Same machine re-activating — refresh heartbeat + existing.last_seen_at = datetime.now(timezone.utc) + existing.app_version = request.app_version or existing.app_version + db.commit() + db.refresh(existing) + return ActivationResponse( + activation_id = existing.activation_id, + vessel_slot = existing.vessel_slot, + licensed_to = existing.licensed_to, + activated_at = existing.activated_at, + ) + else: + raise HTTPException( + status_code=409, + detail=( + "Este número de serie ya está activado en otro equipo. " + "Contacte a AR Electronics para transferir la licencia." + ), + ) + + # New activation + activation = Activation( + activation_id = str(uuid.uuid4()), + serial = serial, + hardware_id = hw_id, + app_version = request.app_version, + platform = request.platform, + hostname = request.hostname, + vessel_slot = 1, + licensed_to = lic.vessel, + ) + db.add(activation) + db.commit() + db.refresh(activation) + + return ActivationResponse( + activation_id = activation.activation_id, + vessel_slot = activation.vessel_slot, + licensed_to = activation.licensed_to, + activated_at = activation.activated_at, + ) + + +# --------------------------------------------------------------------------- +# Public — Validation +# --------------------------------------------------------------------------- + +@app.get("/api/v1/validate/{serial}", response_model=ValidateResponse, tags=["Public"]) +def validate( + serial: str, + hardware_id: str = Query(..., min_length=16), + db: Session = Depends(get_db), +): + """ + Check whether a (serial, hardware_id) pair is currently active. + + Called by the installed app on each boot to refresh its offline cache. + """ + serial = serial.upper() + + activation = ( + db.query(Activation) + .filter( + Activation.serial == serial, + Activation.hardware_id == hardware_id, + Activation.revoked == False, # noqa: E712 + ) + .first() + ) + + if activation is None: + raise HTTPException(status_code=404, detail="No active activation found.") + + activation.last_seen_at = datetime.now(timezone.utc) + db.commit() + + return ValidateResponse( + serial = activation.serial, + active = True, + hardware_id = activation.hardware_id, + activation_id = activation.activation_id, + vessel_slot = activation.vessel_slot, + licensed_to = activation.licensed_to, + activated_at = activation.activated_at, + last_seen_at = activation.last_seen_at, + ) + + +# --------------------------------------------------------------------------- +# Admin — Issue new serial +# --------------------------------------------------------------------------- + +@app.post("/api/v1/admin/issue", + tags=["Admin"], + dependencies=[Depends(require_admin)]) +def issue_license(request: IssueRequest, db: Session = Depends(get_db)): + """Register a pre-generated serial number in the database.""" + serial = request.serial.upper() + if db.query(License).filter(License.serial == serial).first(): + raise HTTPException(status_code=409, detail="Serial already in database.") + lic = License(serial=serial, vessel=request.vessel, notes=request.notes, is_active=True) + db.add(lic) + db.commit() + return {"status": "issued", "serial": lic.serial} + + +# --------------------------------------------------------------------------- +# Admin — List licenses +# --------------------------------------------------------------------------- + +@app.get("/api/v1/admin/licenses", + response_model=list[LicenseAdminRecord], + tags=["Admin"], + dependencies=[Depends(require_admin)]) +def list_licenses(db: Session = Depends(get_db)): + licenses = db.query(License).order_by(License.issued_at.desc()).all() + result = [] + for lic in licenses: + count = db.query(Activation).filter( + Activation.serial == lic.serial, Activation.revoked == False # noqa: E712 + ).count() + result.append( + LicenseAdminRecord( + serial = lic.serial, + vessel = lic.vessel, + issued_at = lic.issued_at, + is_active = lic.is_active, + activations = count, + ) + ) + return result + + +# --------------------------------------------------------------------------- +# Admin — List activations +# --------------------------------------------------------------------------- + +@app.get("/api/v1/admin/activations", + response_model=list[ActivationAdminRecord], + tags=["Admin"], + dependencies=[Depends(require_admin)]) +def list_activations(db: Session = Depends(get_db)): + rows = db.query(Activation).order_by(Activation.activated_at.desc()).all() + return [ + ActivationAdminRecord( + activation_id = r.activation_id, + serial = r.serial, + hardware_id = r.hardware_id, + app_version = r.app_version, + platform = r.platform, + hostname = r.hostname, + activated_at = r.activated_at, + last_seen_at = r.last_seen_at, + vessel_slot = r.vessel_slot, + revoked = r.revoked, + licensed_to = r.licensed_to, + ) + for r in rows + ] + + +# --------------------------------------------------------------------------- +# Admin — Revoke +# --------------------------------------------------------------------------- + +@app.delete("/api/v1/admin/revoke/{serial}", + tags=["Admin"], + dependencies=[Depends(require_admin)]) +def revoke_license(serial: str, db: Session = Depends(get_db)): + """Revoke a license — all activations are invalidated immediately.""" + serial = serial.upper() + lic = db.query(License).filter(License.serial == serial).first() + if lic is None: + raise HTTPException(status_code=404, detail="Serial not found.") + lic.is_active = False + db.query(Activation).filter(Activation.serial == serial).update({"revoked": True}) + db.commit() + return {"status": "revoked", "serial": serial} + + +# --------------------------------------------------------------------------- +# Health check +# --------------------------------------------------------------------------- + +@app.get("/health", tags=["System"]) +def health(): + return {"status": "ok", "service": "AR Electronics License Server"} diff --git a/license_server/models.py b/license_server/models.py new file mode 100644 index 0000000..7d01a8c --- /dev/null +++ b/license_server/models.py @@ -0,0 +1,51 @@ +# ============================================================================= +# license_server/models.py — SQLAlchemy ORM models +# ============================================================================= + +import uuid +from datetime import datetime, timezone + +from sqlalchemy import Boolean, DateTime, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from .database import Base + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +def _uuid() -> str: + return str(uuid.uuid4()) + + +class License(Base): + """One row per serial number issued by AR Electronics.""" + + __tablename__ = "licenses" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + serial: Mapped[str] = mapped_column(String(20), unique=True, nullable=False, index=True) + vessel: Mapped[str] = mapped_column(String(120), default="") + issued_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_now) + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + notes: Mapped[str] = mapped_column(String(500), default="") + + +class Activation(Base): + """One row per successful hardware activation of a serial number.""" + + __tablename__ = "activations" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + activation_id: Mapped[str] = mapped_column(String(36), unique=True, default=_uuid, index=True) + serial: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + hardware_id: Mapped[str] = mapped_column(String(64), nullable=False) + app_version: Mapped[str] = mapped_column(String(20), default="") + platform: Mapped[str] = mapped_column(String(20), default="") + hostname: Mapped[str] = mapped_column(String(120), default="") + activated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_now) + last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_now) + vessel_slot: Mapped[int] = mapped_column(Integer, default=1) + revoked: Mapped[bool] = mapped_column(Boolean, default=False) + licensed_to: Mapped[str] = mapped_column(String(120), default="") diff --git a/license_server/requirements.txt b/license_server/requirements.txt new file mode 100644 index 0000000..1cfae29 --- /dev/null +++ b/license_server/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.111.0 +uvicorn[standard]>=0.29.0 +sqlalchemy>=2.0.0 +pydantic>=2.7.0 +python-dotenv>=1.0.0 diff --git a/license_server/schemas.py b/license_server/schemas.py new file mode 100644 index 0000000..2788170 --- /dev/null +++ b/license_server/schemas.py @@ -0,0 +1,69 @@ +# ============================================================================= +# license_server/schemas.py — Pydantic v2 request/response schemas +# ============================================================================= + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +# --------------------------------------------------------------------------- +# Activation +# --------------------------------------------------------------------------- + +class ActivationRequest(BaseModel): + serial: str = Field(..., pattern=r"^AR-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}$") + hardware_id: str = Field(..., min_length=16, max_length=64) + app_version: str = Field(default="", max_length=20) + platform: str = Field(default="", max_length=20) + hostname: str = Field(default="", max_length=120) + + +class ActivationResponse(BaseModel): + activation_id: str + vessel_slot: int + licensed_to: str + activated_at: datetime + expires_at: Optional[datetime] = None + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + +class ValidateResponse(BaseModel): + serial: str + active: bool + hardware_id: str + activation_id: str + vessel_slot: int + licensed_to: str + activated_at: datetime + last_seen_at: datetime + + +# --------------------------------------------------------------------------- +# Admin list +# --------------------------------------------------------------------------- + +class LicenseAdminRecord(BaseModel): + serial: str + vessel: str + issued_at: datetime + is_active: bool + activations: int + + +class ActivationAdminRecord(BaseModel): + activation_id: str + serial: str + hardware_id: str + app_version: str + platform: str + hostname: str + activated_at: datetime + last_seen_at: datetime + vessel_slot: int + revoked: bool + licensed_to: str