Files
AR-Autopilot/installer/src/license.py
T
alro65 de25dcee57 feat(installer): J6412 USB installer + AR Electronics license activation server
installer/:
  - build_usb.py: dev tool — builds Flutter + AR-ECDIS, assembles USB pendrive
    image with serial.key, autorun.inf, and START_INSTALLER.bat
  - serial_generator.py: generates AR-XXXX-XXXX-XXXX serial numbers (48-bit
    entropy), logs to CSV for CRM integration
  - src/license.py: hardware fingerprint (Windows MachineGuid + primary MAC),
    serial validation, online activation POST, local cache with 30-day offline
    grace period
  - src/sysconfig.py: HKCU autostart registry entries, .lnk shortcuts (desktop
    + Start Menu via WScript.Shell), firewall rules (netsh), COM port detection
  - src/install.py: tkinter installer GUI — 9 sequential steps with per-step
    progress indicators, threaded execution, error dialogs, and silent mode

license_server/ (FastAPI service — deploy to arelectronics.com VPS):
  - POST /api/v1/activate: first activation accepted; same-HW re-activation
    refreshes heartbeat; different-HW rejected with 409
  - GET  /api/v1/validate/{serial}: heartbeat endpoint to refresh offline cache
  - Admin endpoints (X-Admin-Key): issue, list, revoke licenses
  - SQLAlchemy models: License (serial registry) + Activation (per-machine rows)
  - SQLite default, PostgreSQL-ready via DATABASE_URL env var

AR_electronics — AR-Autopilot Project
2026-05-24 01:36:24 -04:00

247 lines
8.5 KiB
Python

# =============================================================================
# installer/src/license.py — Serial-number activation client
# =============================================================================
#
# Reads the serial number bundled with this installer package (serial.key),
# collects a hardware fingerprint (Windows Machine GUID + primary MAC address),
# POSTs to the AR Electronics license server, and caches the activation token
# locally in %APPDATA%\AR Electronics\license.json.
#
# Offline grace period: 30 days without contacting the server.
# Duplicate activations on a different machine are rejected server-side.
# =============================================================================
from __future__ import annotations
import hashlib
import json
import platform
import re
import socket
import subprocess
import uuid
import winreg
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SERVER_BASE_URL = "https://license.arelectronics.com"
ACTIVATE_ROUTE = "/api/v1/activate"
VALIDATE_ROUTE = "/api/v1/validate"
OFFLINE_GRACE = 30 # days allowed without server contact
APP_DATA_DIR = Path.home() / "AppData" / "Roaming" / "AR Electronics"
LICENSE_CACHE = APP_DATA_DIR / "license.json"
SERIAL_FILE_NAME = "serial.key" # placed next to install.py by build_usb.py
# ---------------------------------------------------------------------------
# Hardware fingerprint
# ---------------------------------------------------------------------------
def _machine_guid() -> str:
"""Read the Windows Machine GUID from the registry (stable across reboots)."""
try:
key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Cryptography",
)
value, _ = winreg.QueryValueEx(key, "MachineGuid")
winreg.CloseKey(key)
return value
except OSError:
return ""
def _primary_mac() -> str:
"""Return the MAC address of the adapter used for the default route."""
try:
# Connect to an external address (no data sent) to discover default adapter
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
# Find the MAC for this IP using ipconfig output
result = subprocess.run(
["ipconfig", "/all"], capture_output=True, text=True, timeout=5
)
blocks = result.stdout.split("\n\n")
for block in blocks:
if ip in block:
m = re.search(
r"Physical Address[.\s]+:\s*([0-9A-F-]{17})", block, re.IGNORECASE
)
if m:
return m.group(1).replace("-", ":").upper()
except Exception:
pass
# Fallback — uuid.getnode() uses whichever adapter Python finds first
raw = uuid.getnode()
return ":".join(f"{(raw >> (i * 8)) & 0xFF:02X}" for i in range(5, -1, -1))
def hardware_fingerprint() -> str:
"""Stable, anonymised hardware fingerprint for this machine."""
raw = f"{_machine_guid()}|{_primary_mac()}|{platform.node()}"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
# ---------------------------------------------------------------------------
# Serial number helpers
# ---------------------------------------------------------------------------
def read_serial(installer_dir: Path) -> str:
"""
Read the serial number from ``serial.key`` next to the installer.
Raises FileNotFoundError if the file is missing, ValueError if malformed.
"""
serial_path = installer_dir / SERIAL_FILE_NAME
if not serial_path.exists():
raise FileNotFoundError(
f"Serial key file not found: {serial_path}\n"
"This installer package may be incomplete."
)
serial = serial_path.read_text(encoding="utf-8").strip()
if not _valid_serial_format(serial):
raise ValueError(f"Malformed serial number: {serial!r}")
return serial
def _valid_serial_format(serial: str) -> bool:
"""Validate format: AR-XXXX-XXXX-XXXX (hex groups)."""
pattern = r"^AR-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}$"
return bool(re.match(pattern, serial, re.IGNORECASE))
# ---------------------------------------------------------------------------
# Activation
# ---------------------------------------------------------------------------
class ActivationError(Exception):
"""Raised when activation is refused or fails."""
class ActivationResult:
def __init__(self, data: dict):
self.activation_id: str = data["activation_id"]
self.vessel_slot: int = data.get("vessel_slot", 1)
self.licensed_to: str = data.get("licensed_to", "")
self.activated_at: str = data.get("activated_at", "")
self.expires_at: str | None = data.get("expires_at")
def activate_online(serial: str, app_version: str = "0.4.0") -> ActivationResult:
"""
POST activation request to the AR Electronics license server.
On success caches the response in LICENSE_CACHE.
Raises ActivationError on any refusal or connection problem.
"""
hw_id = hardware_fingerprint()
payload = {
"serial": serial,
"hardware_id": hw_id,
"app_version": app_version,
"platform": platform.system(),
"hostname": platform.node(),
}
try:
resp = requests.post(
SERVER_BASE_URL + ACTIVATE_ROUTE,
json=payload,
timeout=15,
)
except requests.ConnectionError:
raise ActivationError("No se pudo conectar con el servidor de licencias.\n"
"Verifique la conexión a internet e intente de nuevo.")
except requests.Timeout:
raise ActivationError("El servidor de licencias no respondió (timeout 15 s).")
if resp.status_code == 200:
result = ActivationResult(resp.json())
_cache_activation(serial, hw_id, resp.json())
return result
# Server returned an error
try:
msg = resp.json().get("detail", resp.text)
except Exception:
msg = resp.text
raise ActivationError(f"Activación rechazada ({resp.status_code}): {msg}")
# ---------------------------------------------------------------------------
# Local cache
# ---------------------------------------------------------------------------
def _cache_activation(serial: str, hardware_id: str, server_data: dict) -> None:
APP_DATA_DIR.mkdir(parents=True, exist_ok=True)
cache = {
"serial": serial,
"hardware_id": hardware_id,
"cached_at": datetime.now(timezone.utc).isoformat(),
"server_data": server_data,
}
LICENSE_CACHE.write_text(json.dumps(cache, indent=2), encoding="utf-8")
def load_cached_license() -> dict | None:
"""Return cached license dict or None if missing / expired."""
if not LICENSE_CACHE.exists():
return None
try:
cache = json.loads(LICENSE_CACHE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
# Validate hardware fingerprint matches this machine
if cache.get("hardware_id") != hardware_fingerprint():
return None
# Check offline grace period
cached_at = datetime.fromisoformat(cache["cached_at"])
if datetime.now(timezone.utc) - cached_at > timedelta(days=OFFLINE_GRACE):
return None # Cache expired — must re-validate online
return cache
def is_activated() -> bool:
"""Quick check: is this machine currently activated (cache or online)?"""
cache = load_cached_license()
if cache is not None:
return True
# Try a fast online validate
hw_id = hardware_fingerprint()
serial = _serial_from_cache()
if serial is None:
return False
try:
resp = requests.get(
f"{SERVER_BASE_URL}{VALIDATE_ROUTE}/{serial}",
params={"hardware_id": hw_id},
timeout=8,
)
if resp.status_code == 200 and resp.json().get("active"):
_cache_activation(serial, hw_id, resp.json())
return True
except Exception:
pass
return False
def _serial_from_cache() -> str | None:
if not LICENSE_CACHE.exists():
return None
try:
return json.loads(LICENSE_CACHE.read_text(encoding="utf-8")).get("serial")
except Exception:
return None