From 45642fda0e08b262886573a53cf98e6f3a12f3b3 Mon Sep 17 00:00:00 2001 From: alro1965 Date: Wed, 20 May 2026 03:07:27 -0400 Subject: [PATCH] sprint-8: EKF + adaptive tuner + HWID + SHA-256 audit hash-chain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - heading_ekf.py: 2-state Kalman filter fusing PGN 127250 heading and 127251 ROT with shortest-arc innovation and symmetric covariance update - adaptive_tuner.py: gradient-descent outer-loop Kp/Ki adjuster bounded to ±adaptive_max_deviation_pct; oscillation vs steady-state detection - hwid.py: HMAC-SHA256 activation token (verify side); hwid_from_mac_words converts three Modbus uint16 MAC words to 12-char hex HWID - audit.py: SHA-256 hash-chain -- each JSONL line carries prev_hash and line_hash; verify_chain() detects tampering, deletion, insertion - firmware/system/hwid.h+cpp: esp_efuse_mac_get_default wrapper + FNV-32 hash + "AA:BB:CC:DD:EE:FF" formatter - modbus_registers.yaml + generated .h/.py: HWID_MAC_01/23/45 at input addrs 9/10/11 (three 16-bit words = 6-byte MAC) - modbus_slave.cpp: INPUT_HWID_MAC_01/23/45 cases read eFuse MAC - main.cpp: logs HWID string + FNV-32 hash at boot (activation traceability) - tests: 72 new tests (audit signing, EKF, adaptive tuner, HWID) -- 398 total Co-Authored-By: Claude Sonnet 4.6 --- arautopilot/core/adaptive_tuner.py | 140 ++++++++++++++ arautopilot/core/audit.py | 111 ++++++++++- arautopilot/core/heading_ekf.py | 169 ++++++++++++++++ arautopilot/core/hwid.py | 71 +++++++ arautopilot/shared/modbus_register_map.py | 3 + arautopilot/tests/test_adaptive_tuner.py | 157 +++++++++++++++ arautopilot/tests/test_audit_signing.py | 180 ++++++++++++++++++ arautopilot/tests/test_heading_ekf.py | 142 ++++++++++++++ arautopilot/tests/test_hwid.py | 144 ++++++++++++++ .../ar_autopilot_v1/modbus_registers.yaml | 3 + firmware/ar_autopilot_v1/src/main.cpp | 14 ++ .../src/protocols/modbus_registers.h | 8 +- .../src/protocols/modbus_slave.cpp | 12 ++ firmware/ar_autopilot_v1/src/system/hwid.cpp | 35 ++++ firmware/ar_autopilot_v1/src/system/hwid.h | 33 ++++ 15 files changed, 1217 insertions(+), 5 deletions(-) create mode 100644 arautopilot/core/adaptive_tuner.py create mode 100644 arautopilot/core/heading_ekf.py create mode 100644 arautopilot/core/hwid.py create mode 100644 arautopilot/tests/test_adaptive_tuner.py create mode 100644 arautopilot/tests/test_audit_signing.py create mode 100644 arautopilot/tests/test_heading_ekf.py create mode 100644 arautopilot/tests/test_hwid.py create mode 100644 firmware/ar_autopilot_v1/src/system/hwid.cpp create mode 100644 firmware/ar_autopilot_v1/src/system/hwid.h diff --git a/arautopilot/core/adaptive_tuner.py b/arautopilot/core/adaptive_tuner.py new file mode 100644 index 0000000..8260427 --- /dev/null +++ b/arautopilot/core/adaptive_tuner.py @@ -0,0 +1,140 @@ +"""Adaptive gain tuner -- Sprint 8. + +Watches the steady-state heading error and adjusts the outer-loop Kp/Ki +within the bounds defined by ``PidConfig.adaptive_max_deviation_pct``. + +The strategy is a simple integral-error gradient scheme: +- If |mean_error| > dead_band for a sustained window, nudge Kp up by step_pct. +- If the response is oscillating (error sign changes frequently), nudge Kp down. +- Ki is adjusted proportionally to maintain the ZN ratio Ki = 2*Kp / Tu_est. +- Kd is not adapted (derivative magnifies noise; ZN auto-tune sets it once). + +All changes are bounded to ±adaptive_max_deviation_pct of the base gains +(brief section 6: "never outside ±50 %"). + +Usage:: + + tuner = AdaptiveTuner(pid_config, base_gains) + # On each outer-loop tick (10 Hz): + new_gains = tuner.step(heading_error_deg, dt_s=0.1) + if new_gains is not None: + apply_outer_gains(new_gains) +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +from arautopilot.core.pid_config import PidConfig, PidGains + + +@dataclass +class AdaptiveTuner: + """Gradient-descent adaptive gain tuner for the outer PID loop. + + Parameters + ---------- + config: + PidConfig that owns the base gains and adaptive bounds. + base_gains: + The current base gains (set by commissioning or default). + dead_band_deg: + Error below which no adaptation occurs. + window_steps: + Number of 10 Hz steps over which the error statistics are computed. + step_pct: + Fractional Kp adjustment per adaptation event (default 2 %). + """ + + config: PidConfig + base_gains: PidGains + dead_band_deg: float = 1.0 + window_steps: int = 100 # 10 seconds + step_pct: float = 0.02 # 2 % per step + + _error_buffer: list[float] = field(default_factory=list) + _current_kp: float = field(init=False) + _current_ki: float = field(init=False) + _current_kd: float = field(init=False) + + def __post_init__(self) -> None: + self._current_kp = self.base_gains.kp + self._current_ki = self.base_gains.ki + self._current_kd = self.base_gains.kd + + # ------------------------------------------------------------------ + + @property + def current_gains(self) -> PidGains: + return PidGains(kp=self._current_kp, ki=self._current_ki, kd=self._current_kd) + + def step(self, error_deg: float, dt_s: float = 0.1) -> PidGains | None: + """Feed one heading error sample; return updated gains if adapted. + + Returns ``None`` if no adaptation occurred this step. + """ + if not self.config.adaptive_enabled: + return None + + self._error_buffer.append(error_deg) + if len(self._error_buffer) > self.window_steps: + self._error_buffer.pop(0) + + if len(self._error_buffer) < self.window_steps: + return None # buffer not yet full + + mean_abs = sum(abs(e) for e in self._error_buffer) / len(self._error_buffer) + + # Count sign changes (oscillation indicator) + sign_changes = sum( + 1 for i in range(1, len(self._error_buffer)) + if (self._error_buffer[i] >= 0) != (self._error_buffer[i - 1] >= 0) + ) + oscillating = sign_changes > self.window_steps * 0.3 # > 30 % sign flips + + # Decision + if oscillating: + # Reduce Kp to damp oscillation. + return self._adjust_kp(-self.step_pct) + elif mean_abs > self.dead_band_deg: + # Increase Kp to reduce steady-state error. + return self._adjust_kp(+self.step_pct) + + return None + + def _adjust_kp(self, delta_frac: float) -> PidGains | None: + """Adjust Kp by ``delta_frac`` fraction (signed), clamped to bounds.""" + new_kp = self._current_kp * (1.0 + delta_frac) + + # Clamp to ±adaptive_max_deviation_pct of base. + limit = self.config.adaptive_max_deviation_pct / 100.0 + lo = self.base_gains.kp * (1.0 - limit) + hi = self.base_gains.kp * (1.0 + limit) + new_kp = max(lo, min(hi, new_kp)) + if new_kp == self._current_kp: + return None + + # Adjust Ki proportionally (maintain integral-to-proportional ratio). + if self.base_gains.kp > 0: + ki_ratio = self.base_gains.ki / self.base_gains.kp + else: + ki_ratio = 0.0 + new_ki = new_kp * ki_ratio + + # Clamp Ki as well. + ki_lo = self.base_gains.ki * (1.0 - limit) + ki_hi = self.base_gains.ki * (1.0 + limit) + new_ki = max(ki_lo, min(ki_hi, new_ki)) + + self._current_kp = new_kp + self._current_ki = new_ki + # Clear buffer after each adaptation to avoid consecutive nudges. + self._error_buffer.clear() + return self.current_gains + + def reset(self) -> None: + """Reset to base gains.""" + self._current_kp = self.base_gains.kp + self._current_ki = self.base_gains.ki + self._current_kd = self.base_gains.kd + self._error_buffer.clear() diff --git a/arautopilot/core/audit.py b/arautopilot/core/audit.py index f432ea0..1f93592 100644 --- a/arautopilot/core/audit.py +++ b/arautopilot/core/audit.py @@ -15,6 +15,7 @@ instances + a CLI tool don't interleave half-written events. from __future__ import annotations +import hashlib import json from datetime import UTC, datetime from enum import StrEnum @@ -23,6 +24,9 @@ from typing import Any from pydantic import BaseModel, ConfigDict, Field +# Sentinel used as the "previous hash" for the very first entry in a log. +GENESIS_HASH = "0" * 64 + class AuditOutcome(StrEnum): SUCCESS = "success" @@ -67,26 +71,85 @@ class AuditEvent(BaseModel): ) extra: dict[str, Any] = Field(default_factory=dict) + # ----- Hash-chain fields (Sprint 8) ------------------------------------- + # Set by AuditLog.append(); must not be set by the caller. + prev_hash: str | None = Field( + default=None, + min_length=64, + max_length=64, + pattern=r"^[0-9a-f]{64}$", + description="SHA-256 hex digest of the previous JSONL line (or GENESIS_HASH for first).", + ) + line_hash: str | None = Field( + default=None, + min_length=64, + max_length=64, + pattern=r"^[0-9a-f]{64}$", + description="SHA-256 hex digest of (prev_hash + this event's canonical JSON).", + ) + def to_jsonl(self) -> str: """Render as one JSON line (no trailing newline).""" return json.dumps(self.model_dump(mode="json"), ensure_ascii=False) + @staticmethod + def _compute_hash(prev_hash: str, payload: str) -> str: + """Return SHA-256(prev_hash + payload) as a lower-case hex string.""" + return hashlib.sha256((prev_hash + payload).encode()).hexdigest() + class AuditLog: - """Append-only writer to a JSONL audit file.""" + """Append-only writer to a JSONL audit file with SHA-256 hash-chain. + + Each appended event is automatically chained: the ``prev_hash`` is set + to the SHA-256 of the previous JSONL line (or GENESIS_HASH for the first + entry), and ``line_hash`` is SHA-256(prev_hash + canonical_json). + + The chain is verified by :meth:`verify_chain`. + """ def __init__(self, path: Path | str) -> None: self.path = Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) - # Touch the file so subsequent appends work even on first run. if not self.path.exists(): self.path.touch() + # Bootstrap: read the last hash from the file tail. + self._last_line_hash: str = self._read_last_hash() + + def _read_last_hash(self) -> str: + """Return the line_hash of the last entry, or GENESIS_HASH if empty.""" + if not self.path.exists() or self.path.stat().st_size == 0: + return GENESIS_HASH + last_line = "" + with self.path.open("r", encoding="utf-8") as f: + for line in f: + stripped = line.strip() + if stripped: + last_line = stripped + if not last_line: + return GENESIS_HASH + try: + data = json.loads(last_line) + return data.get("line_hash") or GENESIS_HASH + except (json.JSONDecodeError, KeyError): + return GENESIS_HASH def append(self, event: AuditEvent) -> None: - """Append one event to the log. Atomic at the line level (single write()).""" + """Append one event with hash-chain fields filled in.""" + prev = self._last_line_hash + # Build the payload (without hash fields) for signing. + payload_dict = event.model_dump(mode="json") + payload_dict.pop("prev_hash", None) + payload_dict.pop("line_hash", None) + canonical = json.dumps(payload_dict, ensure_ascii=False, sort_keys=True) + h = AuditEvent._compute_hash(prev, canonical) + # Create a signed copy. + signed = event.model_copy(update={"prev_hash": prev, "line_hash": h}) + line = signed.to_jsonl() with self.path.open("a", encoding="utf-8") as f: - f.write(event.to_jsonl()) + f.write(line) f.write("\n") + self._last_line_hash = h def read_all(self) -> list[AuditEvent]: """Read every event in chronological order.""" @@ -107,6 +170,46 @@ class AuditLog: events.append(AuditEvent.model_validate(data)) return events + def verify_chain(self) -> tuple[bool, str]: + """Verify the hash-chain integrity of the entire log. + + Returns ``(True, "ok")`` on success, or ``(False, reason)`` on + the first detected tampering. + """ + prev = GENESIS_HASH + with self.path.open("r", encoding="utf-8") as f: + for line_no, raw in enumerate(f, start=1): + line = raw.strip() + if not line: + continue + try: + data = json.loads(line) + except json.JSONDecodeError: + return False, f"line {line_no}: invalid JSON" + + stored_prev = data.get("prev_hash") + stored_hash = data.get("line_hash") + + if stored_prev != prev: + return False, ( + f"line {line_no}: prev_hash mismatch " + f"(expected {prev[:16]}… got {str(stored_prev)[:16]}…)" + ) + + # Recompute canonical payload (fields minus hash fields). + payload = {k: v for k, v in data.items() + if k not in ("prev_hash", "line_hash")} + canonical = json.dumps(payload, ensure_ascii=False, sort_keys=True) + expected = AuditEvent._compute_hash(prev, canonical) + + if stored_hash != expected: + return False, ( + f"line {line_no}: line_hash mismatch -- entry tampered" + ) + prev = stored_hash + + return True, "ok" + def __len__(self) -> int: if not self.path.exists(): return 0 diff --git a/arautopilot/core/heading_ekf.py b/arautopilot/core/heading_ekf.py new file mode 100644 index 0000000..522e3d4 --- /dev/null +++ b/arautopilot/core/heading_ekf.py @@ -0,0 +1,169 @@ +"""2-state heading EKF (Extended Kalman Filter) -- Sprint 8. + +Fuses NMEA 2000 heading (PGN 127250) and rate-of-turn (PGN 127251) into a +smoothed, low-latency heading/ROT estimate for the outer PID loop. + +State vector x = [heading_deg, rot_dps] +Process model (constant-ROT): + h_{k+1} = h_k + rot_k * dt + r_{k+1} = r_k (ROT modelled as random walk) + +Measurements: + z_heading = h_k + v_h (v_h ~ N(0, R_h)) + z_rot = r_k + v_r (v_r ~ N(0, R_r)) + +Angles are kept in the range [0, 360) for the state but the update step +works on signed shortest-arc differences to avoid wrap-around errors. + +Usage:: + + ekf = HeadingEKF() + # On each 10 Hz tick: + ekf.predict(dt_s=0.1) + if new_heading_available: + ekf.update_heading(heading_deg, noise_deg=2.0) + if new_rot_available: + ekf.update_rot(rot_dps, noise_dps=1.0) + h, rot = ekf.heading_deg, ekf.rot_dps +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass, field + + +@dataclass +class HeadingEKF: + """2-state linear Kalman filter for heading and rate-of-turn. + + Parameters + ---------- + heading_deg: + Initial heading estimate (degrees, 0-360). + rot_dps: + Initial rate-of-turn estimate (degrees per second, signed). + process_noise_heading: + Process noise variance for heading (deg²). Larger = trust model less. + process_noise_rot: + Process noise variance for ROT (deg²/s²). + """ + + heading_deg: float = 0.0 + rot_dps: float = 0.0 + + # Process noise (Q matrix diagonal) + process_noise_heading: float = 0.01 # deg² + process_noise_rot: float = 0.1 # (deg/s)² + + # Covariance matrix P (2×2, stored as flat [p00, p01, p10, p11]) + _P: list[float] = field(default_factory=lambda: [1.0, 0.0, 0.0, 1.0]) + + # ------------------------------------------------------------------ + + def predict(self, dt_s: float) -> None: + """Propagate the state and covariance forward by ``dt_s`` seconds.""" + h = self.heading_deg + r = self.rot_dps + + # State transition: heading += rot * dt + self.heading_deg = (h + r * dt_s) % 360.0 + + # Jacobian F = [[1, dt], [0, 1]] + dt = dt_s + p00, p01, p10, p11 = self._P + + # P = F P Fᵀ + Q + new_p00 = p00 + dt * p10 + dt * p01 + dt * dt * p11 + self.process_noise_heading + new_p01 = p01 + dt * p11 + new_p10 = p10 + dt * p11 + new_p11 = p11 + self.process_noise_rot + + self._P = [new_p00, new_p01, new_p10, new_p11] + + def update_heading(self, measured_deg: float, noise_deg: float = 2.0) -> None: + """Kalman update step for a heading measurement. + + Parameters + ---------- + measured_deg: + Raw heading from PGN 127250, degrees [0, 360). + noise_deg: + Standard deviation of the sensor noise (degrees). Variance = noise² + """ + R = noise_deg * noise_deg + p00, p01, p10, p11 = self._P + + # Innovation (shortest arc) + innov = _shortest_arc(measured_deg, self.heading_deg) + + # H = [1, 0] (observe heading only) + # S = H P Hᵀ + R = p00 + R + S = p00 + R + if S == 0: + return + + # Kalman gain K = P Hᵀ / S → [k0, k1] = [p00/S, p10/S] + k0 = p00 / S + k1 = p10 / S + + # Update state + self.heading_deg = (self.heading_deg + k0 * innov) % 360.0 + self.rot_dps += k1 * innov + + # Update P = (I - K H) P + self._P = [ + (1 - k0) * p00, + (1 - k0) * p01, + p10 - k1 * p00, + p11 - k1 * p01, + ] + + def update_rot(self, measured_rot_dps: float, noise_dps: float = 1.0) -> None: + """Kalman update step for a rate-of-turn measurement. + + Parameters + ---------- + measured_rot_dps: + Raw ROT from PGN 127251, degrees per second (signed). + noise_dps: + Standard deviation of the ROT sensor noise (deg/s). + """ + R = noise_dps * noise_dps + p00, p01, p10, p11 = self._P + + # Innovation + innov = measured_rot_dps - self.rot_dps + + # H = [0, 1] (observe ROT only) + # S = p11 + R + S = p11 + R + if S == 0: + return + + # Kalman gain: [k0, k1] = [p01/S, p11/S] + k0 = p01 / S + k1 = p11 / S + + # Update state + self.heading_deg = (self.heading_deg + k0 * innov) % 360.0 + self.rot_dps += k1 * innov + + # Update P + self._P = [ + p00 - k0 * p01, + (1 - k1) * p01, + p10 - k0 * p11, + (1 - k1) * p11, + ] + + @property + def covariance(self) -> tuple[float, float, float, float]: + """Return (p00, p01, p10, p11) — the 2×2 covariance matrix.""" + return tuple(self._P) # type: ignore[return-value] + + +def _shortest_arc(a: float, b: float) -> float: + """Signed shortest-arc from ``b`` to ``a`` (degrees).""" + diff = (a - b + 180.0) % 360.0 - 180.0 + return diff diff --git a/arautopilot/core/hwid.py b/arautopilot/core/hwid.py new file mode 100644 index 0000000..eff6249 --- /dev/null +++ b/arautopilot/core/hwid.py @@ -0,0 +1,71 @@ +"""Hardware ID binding and activation token -- Sprint 8. + +The 6-byte ESP32 MAC (read via Modbus INPUT_HWID_MAC_01/23/45) is used to +bind a project license to a specific hardware unit. + +Activation token format +----------------------- +The factory generates a token by HMAC-SHA256(key=SECRET, msg=hwid_hex), +where ``hwid_hex`` is the 12-character lower-case hex representation of the +6-byte MAC. The token is the first 16 bytes (32 hex chars) of the HMAC +output. + +The SECRET is a per-product deployment key embedded in the Studio binary +(not in the open-source firmware). This file ships the *verification* side +only; token generation happens offline in the factory tooling. + +For development / local testing a deterministic stub secret is used +(STUB_SECRET_KEY). The production key must be injected via the environment +variable ``AR_ACTIVATION_KEY``. +""" + +from __future__ import annotations + +import hashlib +import hmac +import os + +STUB_SECRET_KEY = b"AR-Autopilot-Dev-Key-2026" +TOKEN_BYTES = 16 # 32 hex chars + + +def _get_secret() -> bytes: + key = os.environ.get("AR_ACTIVATION_KEY", "").encode() + return key if key else STUB_SECRET_KEY + + +def hwid_from_mac_words(mac01: int, mac23: int, mac45: int) -> str: + """Convert three 16-bit Modbus words to a 12-char lower-case hex HWID string.""" + mac = bytes([ + (mac01 >> 8) & 0xFF, mac01 & 0xFF, + (mac23 >> 8) & 0xFF, mac23 & 0xFF, + (mac45 >> 8) & 0xFF, mac45 & 0xFF, + ]) + return mac.hex() + + +def generate_token(hwid_hex: str) -> str: + """Generate the activation token for a given HWID. + + Should only be called by factory tooling. In the Studio this is + called only during development / bench testing with the stub key. + """ + h = hmac.new(_get_secret(), hwid_hex.lower().encode(), hashlib.sha256) + return h.hexdigest()[:TOKEN_BYTES * 2] + + +def verify_token(hwid_hex: str, token: str) -> bool: + """Return True iff the token is valid for the given HWID. + + Uses constant-time comparison to resist timing attacks. + """ + expected = generate_token(hwid_hex) + return hmac.compare_digest(expected.lower(), token.lower()) + + +def format_hwid(hwid_hex: str) -> str: + """Format a 12-char hex HWID as 'AA:BB:CC:DD:EE:FF'.""" + if len(hwid_hex) != 12: + raise ValueError(f"HWID must be 12 hex chars, got {len(hwid_hex)}") + h = hwid_hex.upper() + return ":".join(h[i:i+2] for i in range(0, 12, 2)) diff --git a/arautopilot/shared/modbus_register_map.py b/arautopilot/shared/modbus_register_map.py index b92519e..1c82852 100644 --- a/arautopilot/shared/modbus_register_map.py +++ b/arautopilot/shared/modbus_register_map.py @@ -70,6 +70,9 @@ INPUTS: dict[str, Reg] = { "CURRENT_MODE": Reg(addr=6, name="CURRENT_MODE", desc='Current AutopilotMode (0=STANDBY,1=HH,2=TC,3=TK,4=DODGE)', unit="", scale=1.0, offset=0.0), "FREE_HEAP_KB": Reg(addr=7, name="FREE_HEAP_KB", desc='Current free heap, KiB', unit="KiB", scale=1.0, offset=0.0), "MIN_FREE_HEAP_KB": Reg(addr=8, name="MIN_FREE_HEAP_KB", desc='Minimum free heap since boot', unit="KiB", scale=1.0, offset=0.0), + "HWID_MAC_01": Reg(addr=9, name="HWID_MAC_01", desc='Hardware ID bytes [0..1] (MAC eFuse high word)', unit="", scale=1.0, offset=0.0), + "HWID_MAC_23": Reg(addr=10, name="HWID_MAC_23", desc='Hardware ID bytes [2..3] (MAC eFuse mid word)', unit="", scale=1.0, offset=0.0), + "HWID_MAC_45": Reg(addr=11, name="HWID_MAC_45", desc='Hardware ID bytes [4..5] (MAC eFuse low word)', unit="", scale=1.0, offset=0.0), "RUDDER_ANGLE_DEG_X100": Reg(addr=16, name="RUDDER_ANGLE_DEG_X100", desc='Filtered rudder angle, deg * 100 (-3500..+3500)', unit="deg", scale=0.01, offset=0.0), "RUDDER_RAW_ADC": Reg(addr=17, name="RUDDER_RAW_ADC", desc='Raw ADC reading after median filter (0..4095)', unit="counts", scale=1.0, offset=0.0), "RUDDER_VALID": Reg(addr=18, name="RUDDER_VALID", desc='1 if median filter has filled (>=5 samples)', unit="", scale=1.0, offset=0.0), diff --git a/arautopilot/tests/test_adaptive_tuner.py b/arautopilot/tests/test_adaptive_tuner.py new file mode 100644 index 0000000..975eab5 --- /dev/null +++ b/arautopilot/tests/test_adaptive_tuner.py @@ -0,0 +1,157 @@ +"""Tests for AdaptiveTuner -- Sprint 8.""" + +from __future__ import annotations + +import pytest + +from arautopilot.core.adaptive_tuner import AdaptiveTuner +from arautopilot.core.pid_config import PidConfig, PidGains + + +def _make_config( + adaptive_enabled: bool = True, + adaptive_max_deviation_pct: float = 50.0, +) -> PidConfig: + return PidConfig( + inner_loop_base=PidGains(kp=1.0, ki=0.1, kd=0.05), + outer_loop_base=PidGains(kp=2.0, ki=0.2, kd=0.1), + adaptive_enabled=adaptive_enabled, + adaptive_max_deviation_pct=adaptive_max_deviation_pct, + ) + + +def _make_tuner( + kp: float = 2.0, + ki: float = 0.4, + adaptive_max_deviation_pct: float = 50.0, + window_steps: int = 10, + step_pct: float = 0.1, +) -> AdaptiveTuner: + config = PidConfig( + inner_loop_base=PidGains(kp=1.0, ki=0.1, kd=0.05), + outer_loop_base=PidGains(kp=kp, ki=ki, kd=0.1), + adaptive_enabled=True, + adaptive_max_deviation_pct=adaptive_max_deviation_pct, + ) + base = PidGains(kp=kp, ki=ki, kd=0.1) + return AdaptiveTuner(config=config, base_gains=base, window_steps=window_steps, step_pct=step_pct) + + +class TestAdaptiveDisabled: + def test_returns_none_when_disabled(self): + config = _make_config(adaptive_enabled=False) + tuner = AdaptiveTuner( + config=config, + base_gains=PidGains(kp=2.0, ki=0.2, kd=0.1), + ) + for _ in range(200): + result = tuner.step(5.0) + assert result is None + + +class TestBufferFill: + def test_returns_none_before_buffer_full(self): + tuner = _make_tuner(window_steps=20) + for i in range(19): + assert tuner.step(5.0) is None + + def test_may_adapt_once_buffer_is_full(self): + tuner = _make_tuner(window_steps=10, step_pct=0.1) + result = None + for _ in range(10): + result = tuner.step(5.0) # large persistent error → increase Kp + # Should have triggered an adaptation on the 10th step + assert result is not None + + +class TestOscillationDetection: + def test_oscillating_signal_reduces_kp(self): + tuner = _make_tuner(window_steps=10, kp=2.0, step_pct=0.1) + original_kp = tuner._current_kp + # Fill buffer with alternating signs (100% sign flips) + result = None + for i in range(10): + err = 3.0 if i % 2 == 0 else -3.0 + result = tuner.step(err) + # After oscillation detection, Kp should be reduced + if result is not None: + assert result.kp < original_kp + + def test_steady_error_increases_kp(self): + tuner = _make_tuner(window_steps=10, kp=2.0, step_pct=0.1, ki=0.2) + original_kp = tuner._current_kp + result = None + for _ in range(10): + result = tuner.step(5.0) # sustained positive error + assert result is not None + assert result.kp > original_kp + + def test_small_error_within_deadband_no_adapt(self): + tuner = _make_tuner(window_steps=10) + tuner.dead_band_deg = 2.0 + result = None + for _ in range(10): + result = tuner.step(0.5) # within dead band + assert result is None + + +class TestBoundsClamping: + def test_kp_never_exceeds_upper_bound(self): + tuner = _make_tuner(kp=2.0, adaptive_max_deviation_pct=20.0, window_steps=5, step_pct=0.5) + for _ in range(200): + tuner.step(10.0) # large error, keep trying to increase + assert tuner._current_kp <= 2.0 * 1.20 + 1e-9 + + def test_kp_never_below_lower_bound(self): + tuner = _make_tuner(kp=2.0, adaptive_max_deviation_pct=20.0, window_steps=5, step_pct=0.5) + for i in range(200): + err = 5.0 if i % 2 == 0 else -5.0 + assert tuner._current_kp >= 2.0 * 0.80 - 1e-9 + + def test_ki_stays_within_bound(self): + tuner = _make_tuner(kp=2.0, ki=0.4, adaptive_max_deviation_pct=30.0, window_steps=5, step_pct=0.5) + for _ in range(200): + tuner.step(10.0) + assert tuner._current_ki <= 0.4 * 1.30 + 1e-9 + + def test_kd_unchanged_after_adaptation(self): + tuner = _make_tuner(window_steps=10, step_pct=0.1) + original_kd = tuner._current_kd + for _ in range(50): + tuner.step(5.0) + assert tuner._current_kd == pytest.approx(original_kd) + + +class TestKiRatioPreservation: + def test_ki_kp_ratio_preserved_after_adapt(self): + tuner = _make_tuner(kp=2.0, ki=0.4, window_steps=10, step_pct=0.1) + base_ratio = 0.4 / 2.0 + result = None + for _ in range(10): + result = tuner.step(5.0) + if result is not None: + new_ratio = result.ki / result.kp + assert new_ratio == pytest.approx(base_ratio, rel=0.01) + + +class TestReset: + def test_reset_restores_base_gains(self): + tuner = _make_tuner(kp=2.0, ki=0.4, window_steps=10, step_pct=0.1) + for _ in range(10): + tuner.step(5.0) + tuner.reset() + assert tuner._current_kp == pytest.approx(2.0) + assert tuner._current_ki == pytest.approx(0.4) + + def test_reset_clears_buffer(self): + tuner = _make_tuner(window_steps=10) + for _ in range(8): + tuner.step(5.0) + tuner.reset() + assert len(tuner._error_buffer) == 0 + + def test_current_gains_property(self): + tuner = _make_tuner(kp=2.0, ki=0.4) + g = tuner.current_gains + assert g.kp == pytest.approx(2.0) + assert g.ki == pytest.approx(0.4) diff --git a/arautopilot/tests/test_audit_signing.py b/arautopilot/tests/test_audit_signing.py new file mode 100644 index 0000000..db408e1 --- /dev/null +++ b/arautopilot/tests/test_audit_signing.py @@ -0,0 +1,180 @@ +"""Tests for SHA-256 hash-chain audit signing -- Sprint 8.""" + +from __future__ import annotations + +import json +import tempfile +from pathlib import Path + +import pytest + +from arautopilot.core.audit import ( + GENESIS_HASH, + AuditEvent, + AuditLog, + AuditOutcome, +) + + +def _make_event(**kwargs) -> AuditEvent: + defaults = dict(action="test_action", outcome=AuditOutcome.SUCCESS) + defaults.update(kwargs) + return AuditEvent(**defaults) + + +class TestGenesisHash: + def test_genesis_hash_is_64_hex_chars(self): + assert len(GENESIS_HASH) == 64 + assert all(c == "0" for c in GENESIS_HASH) + + def test_compute_hash_returns_64_char_hex(self): + h = AuditEvent._compute_hash(GENESIS_HASH, "payload") + assert len(h) == 64 + assert all(c in "0123456789abcdef" for c in h) + + +class TestHashChainAppend: + def test_first_event_prev_hash_is_genesis(self, tmp_path): + log = AuditLog(tmp_path / "audit.jsonl") + log.append(_make_event(action="first")) + events = log.read_all() + assert events[0].prev_hash == GENESIS_HASH + + def test_second_event_prev_hash_links_to_first(self, tmp_path): + log = AuditLog(tmp_path / "audit.jsonl") + log.append(_make_event(action="first")) + log.append(_make_event(action="second")) + events = log.read_all() + assert events[1].prev_hash == events[0].line_hash + + def test_line_hash_is_deterministic(self, tmp_path): + log = AuditLog(tmp_path / "audit.jsonl") + event = _make_event(action="deterministic") + log.append(event) + events = log.read_all() + e = events[0] + # Recompute manually + payload_dict = e.model_dump(mode="json") + payload_dict.pop("prev_hash") + payload_dict.pop("line_hash") + canonical = json.dumps(payload_dict, ensure_ascii=False, sort_keys=True) + expected = AuditEvent._compute_hash(GENESIS_HASH, canonical) + assert e.line_hash == expected + + def test_chain_links_across_multiple_events(self, tmp_path): + log = AuditLog(tmp_path / "audit.jsonl") + for i in range(5): + log.append(_make_event(action=f"event_{i}")) + events = log.read_all() + assert events[0].prev_hash == GENESIS_HASH + for i in range(1, 5): + assert events[i].prev_hash == events[i - 1].line_hash + + def test_chain_continues_after_reload(self, tmp_path): + p = tmp_path / "audit.jsonl" + log1 = AuditLog(p) + log1.append(_make_event(action="first")) + first_hash = log1._last_line_hash + + log2 = AuditLog(p) # reload + log2.append(_make_event(action="second")) + events = log2.read_all() + assert events[1].prev_hash == first_hash + + +class TestVerifyChain: + def test_empty_log_verifies_ok(self, tmp_path): + log = AuditLog(tmp_path / "audit.jsonl") + ok, reason = log.verify_chain() + assert ok + assert reason == "ok" + + def test_valid_chain_verifies_ok(self, tmp_path): + log = AuditLog(tmp_path / "audit.jsonl") + for i in range(10): + log.append(_make_event(action=f"ev_{i}")) + ok, reason = log.verify_chain() + assert ok, reason + + def test_tampered_content_detected(self, tmp_path): + p = tmp_path / "audit.jsonl" + log = AuditLog(p) + log.append(_make_event(action="before_tamper")) + log.append(_make_event(action="after_tamper")) + + # Tamper the first line: change action field in the raw JSON + lines = p.read_text(encoding="utf-8").splitlines() + data = json.loads(lines[0]) + data["action"] = "TAMPERED" + lines[0] = json.dumps(data) + p.write_text("\n".join(lines) + "\n", encoding="utf-8") + + log2 = AuditLog(p) + ok, reason = log2.verify_chain() + assert not ok + assert "tampered" in reason.lower() or "mismatch" in reason.lower() + + def test_deleted_line_detected(self, tmp_path): + p = tmp_path / "audit.jsonl" + log = AuditLog(p) + for i in range(3): + log.append(_make_event(action=f"ev_{i}")) + + # Remove the second line + lines = [l for l in p.read_text(encoding="utf-8").splitlines() if l.strip()] + lines.pop(1) + p.write_text("\n".join(lines) + "\n", encoding="utf-8") + + log2 = AuditLog(p) + ok, reason = log2.verify_chain() + assert not ok + + def test_inserted_line_detected(self, tmp_path): + p = tmp_path / "audit.jsonl" + log = AuditLog(p) + log.append(_make_event(action="first")) + log.append(_make_event(action="last")) + + # Insert a fake line between them (with wrong prev_hash) + lines = p.read_text(encoding="utf-8").splitlines() + fake = json.loads(lines[0]) + fake["action"] = "injected" + fake["prev_hash"] = "a" * 64 + fake["line_hash"] = "b" * 64 + lines.insert(1, json.dumps(fake)) + p.write_text("\n".join(lines) + "\n", encoding="utf-8") + + log2 = AuditLog(p) + ok, reason = log2.verify_chain() + assert not ok + + +class TestHashChainIsolation: + def test_hash_fields_excluded_from_payload(self, tmp_path): + log = AuditLog(tmp_path / "audit.jsonl") + log.append(_make_event(action="isolated")) + events = log.read_all() + e = events[0] + # The hash must NOT depend on the hash fields themselves (circular). + # Recompute without hash fields and confirm it matches. + payload_dict = e.model_dump(mode="json") + payload_dict.pop("prev_hash") + payload_dict.pop("line_hash") + canonical = json.dumps(payload_dict, ensure_ascii=False, sort_keys=True) + expected = AuditEvent._compute_hash(e.prev_hash, canonical) + assert e.line_hash == expected + + def test_extra_field_change_breaks_chain(self, tmp_path): + p = tmp_path / "audit.jsonl" + log = AuditLog(p) + log.append(_make_event(action="good", extra={"key": "value"})) + + lines = p.read_text(encoding="utf-8").splitlines() + data = json.loads(lines[0]) + data["extra"]["key"] = "EVIL" + lines[0] = json.dumps(data) + p.write_text("\n".join(lines) + "\n", encoding="utf-8") + + log2 = AuditLog(p) + ok, _ = log2.verify_chain() + assert not ok diff --git a/arautopilot/tests/test_heading_ekf.py b/arautopilot/tests/test_heading_ekf.py new file mode 100644 index 0000000..da83ff3 --- /dev/null +++ b/arautopilot/tests/test_heading_ekf.py @@ -0,0 +1,142 @@ +"""Tests for 2-state Heading EKF -- Sprint 8.""" + +from __future__ import annotations + +import math + +import pytest + +from arautopilot.core.heading_ekf import HeadingEKF, _shortest_arc + + +class TestShortestArc: + def test_zero_difference(self): + assert _shortest_arc(90.0, 90.0) == pytest.approx(0.0) + + def test_positive_difference(self): + assert _shortest_arc(100.0, 90.0) == pytest.approx(10.0) + + def test_negative_difference(self): + assert _shortest_arc(80.0, 90.0) == pytest.approx(-10.0) + + def test_wrap_around_positive(self): + # 350 → 10 is +20 degrees (going through north) + assert _shortest_arc(10.0, 350.0) == pytest.approx(20.0) + + def test_wrap_around_negative(self): + # 10 → 350 is -20 degrees + assert _shortest_arc(350.0, 10.0) == pytest.approx(-20.0) + + def test_exactly_180_degrees(self): + diff = _shortest_arc(270.0, 90.0) + assert abs(diff) == pytest.approx(180.0) + + +class TestPredict: + def test_heading_advances_by_rot_times_dt(self): + ekf = HeadingEKF(heading_deg=0.0, rot_dps=10.0) + ekf.predict(dt_s=1.0) + assert ekf.heading_deg == pytest.approx(10.0) + + def test_heading_wraps_at_360(self): + ekf = HeadingEKF(heading_deg=355.0, rot_dps=10.0) + ekf.predict(dt_s=1.0) + assert ekf.heading_deg == pytest.approx(5.0) + + def test_zero_rot_heading_unchanged(self): + ekf = HeadingEKF(heading_deg=45.0, rot_dps=0.0) + ekf.predict(dt_s=1.0) + assert ekf.heading_deg == pytest.approx(45.0) + + def test_covariance_grows_with_predict(self): + ekf = HeadingEKF() + p00_before = ekf._P[0] + ekf.predict(dt_s=0.1) + assert ekf._P[0] > p00_before + + def test_predict_symmetry_p01_p10(self): + ekf = HeadingEKF() + for _ in range(5): + ekf.predict(dt_s=0.1) + assert ekf._P[1] == pytest.approx(ekf._P[2]) + + +class TestUpdateHeading: + def test_state_moves_toward_measurement(self): + ekf = HeadingEKF(heading_deg=0.0) + ekf.update_heading(10.0) + assert 0.0 < ekf.heading_deg < 10.0 + + def test_exact_measurement_moves_fully_when_p_large(self): + # With very large P and very small noise, Kalman gain → 1 + ekf = HeadingEKF(heading_deg=0.0, _P=[1e6, 0.0, 0.0, 1e6]) + ekf.update_heading(90.0, noise_deg=0.001) + assert ekf.heading_deg == pytest.approx(90.0, abs=0.1) + + def test_covariance_shrinks_after_update(self): + ekf = HeadingEKF() + ekf._P = [100.0, 0.0, 0.0, 100.0] + p00_before = ekf._P[0] + ekf.update_heading(10.0, noise_deg=2.0) + assert ekf._P[0] < p00_before + + def test_wrap_around_innov(self): + ekf = HeadingEKF(heading_deg=355.0) + ekf._P = [1e6, 0.0, 0.0, 1e6] + ekf.update_heading(5.0, noise_deg=0.001) + # Should go toward 5.0 (via shortest arc +10), not regress + assert ekf.heading_deg == pytest.approx(5.0, abs=0.5) + + +class TestUpdateRot: + def test_state_moves_toward_rot_measurement(self): + ekf = HeadingEKF(rot_dps=0.0) + ekf.update_rot(5.0) + assert 0.0 < ekf.rot_dps < 5.0 + + def test_covariance_p11_shrinks_after_rot_update(self): + ekf = HeadingEKF() + ekf._P = [100.0, 0.0, 0.0, 100.0] + p11_before = ekf._P[3] + ekf.update_rot(2.0, noise_dps=1.0) + assert ekf._P[3] < p11_before + + +class TestCovarianceConvergence: + def test_covariance_converges_with_repeated_updates(self): + ekf = HeadingEKF( + process_noise_heading=0.01, + process_noise_rot=0.1, + _P=[100.0, 0.0, 0.0, 100.0], + ) + # 50 predict+update cycles + for _ in range(50): + ekf.predict(dt_s=0.1) + ekf.update_heading(ekf.heading_deg, noise_deg=2.0) + ekf.update_rot(ekf.rot_dps, noise_dps=1.0) + + # Covariance should have converged to steady-state (not 100 anymore) + assert ekf._P[0] < 50.0 + assert ekf._P[3] < 50.0 + + def test_filter_tracks_constant_heading(self): + true_heading = 135.0 + ekf = HeadingEKF(heading_deg=0.0) + for _ in range(100): + ekf.predict(dt_s=0.1) + ekf.update_heading(true_heading, noise_deg=2.0) + assert ekf.heading_deg == pytest.approx(true_heading, abs=2.0) + + def test_filter_tracks_constant_rot(self): + ekf = HeadingEKF(heading_deg=0.0, rot_dps=0.0) + true_rot = 3.0 + for _ in range(100): + ekf.predict(dt_s=0.1) + ekf.update_rot(true_rot, noise_dps=0.5) + assert ekf.rot_dps == pytest.approx(true_rot, abs=0.5) + + def test_covariance_property_returns_tuple(self): + ekf = HeadingEKF() + cov = ekf.covariance + assert len(cov) == 4 + assert all(isinstance(v, float) for v in cov) diff --git a/arautopilot/tests/test_hwid.py b/arautopilot/tests/test_hwid.py new file mode 100644 index 0000000..18aa025 --- /dev/null +++ b/arautopilot/tests/test_hwid.py @@ -0,0 +1,144 @@ +"""Tests for HWID activation token -- Sprint 8.""" + +from __future__ import annotations + +import hashlib +import hmac +import os + +import pytest + +from arautopilot.core.hwid import ( + STUB_SECRET_KEY, + TOKEN_BYTES, + format_hwid, + generate_token, + hwid_from_mac_words, + verify_token, +) + + +SAMPLE_HWID = "aabbccddeeff" # 12-char lower-case hex + + +class TestHwidFromMacWords: + def test_known_bytes(self): + # mac01=0xAABB, mac23=0xCCDD, mac45=0xEEFF → "aabbccddeeff" + result = hwid_from_mac_words(0xAABB, 0xCCDD, 0xEEFF) + assert result == "aabbccddeeff" + + def test_zero_mac(self): + result = hwid_from_mac_words(0, 0, 0) + assert result == "000000000000" + + def test_all_ones(self): + result = hwid_from_mac_words(0xFFFF, 0xFFFF, 0xFFFF) + assert result == "ffffffffffff" + + def test_returns_lowercase(self): + result = hwid_from_mac_words(0xAABB, 0xCCDD, 0xEEFF) + assert result == result.lower() + + def test_result_is_12_chars(self): + result = hwid_from_mac_words(0x0102, 0x0304, 0x0506) + assert len(result) == 12 + + def test_byte_order(self): + # 0x1234 → bytes [0x12, 0x34] + result = hwid_from_mac_words(0x1234, 0x0000, 0x0000) + assert result[:4] == "1234" + + +class TestGenerateToken: + def test_returns_32_hex_chars(self): + token = generate_token(SAMPLE_HWID) + assert len(token) == TOKEN_BYTES * 2 # 32 + assert all(c in "0123456789abcdef" for c in token.lower()) + + def test_deterministic_with_stub_key(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + t1 = generate_token(SAMPLE_HWID) + t2 = generate_token(SAMPLE_HWID) + assert t1 == t2 + + def test_different_hwid_different_token(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + t1 = generate_token("aabbccddeeff") + t2 = generate_token("112233445566") + assert t1 != t2 + + def test_uses_env_key_when_set(self, monkeypatch): + monkeypatch.setenv("AR_ACTIVATION_KEY", "test-production-key") + prod_token = generate_token(SAMPLE_HWID) + monkeypatch.delenv("AR_ACTIVATION_KEY") + stub_token = generate_token(SAMPLE_HWID) + assert prod_token != stub_token + + def test_case_insensitive_hwid(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + t_lower = generate_token("aabbccddeeff") + t_upper = generate_token("AABBCCDDEEFF") + assert t_lower == t_upper + + def test_matches_manual_hmac(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + expected = hmac.new( + STUB_SECRET_KEY, SAMPLE_HWID.encode(), hashlib.sha256 + ).hexdigest()[:TOKEN_BYTES * 2] + assert generate_token(SAMPLE_HWID) == expected + + +class TestVerifyToken: + def test_valid_token_returns_true(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + token = generate_token(SAMPLE_HWID) + assert verify_token(SAMPLE_HWID, token) is True + + def test_wrong_token_returns_false(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + assert verify_token(SAMPLE_HWID, "a" * 32) is False + + def test_wrong_hwid_returns_false(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + token = generate_token(SAMPLE_HWID) + assert verify_token("000000000000", token) is False + + def test_case_insensitive_token_comparison(self, monkeypatch): + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + token = generate_token(SAMPLE_HWID) + assert verify_token(SAMPLE_HWID, token.upper()) is True + + def test_constant_time_compare(self, monkeypatch): + """verify_token must use hmac.compare_digest (checked by smoke-test, not timing).""" + monkeypatch.delenv("AR_ACTIVATION_KEY", raising=False) + token = generate_token(SAMPLE_HWID) + # Calling with correct and incorrect tokens both return without exception + assert verify_token(SAMPLE_HWID, token) is True + assert verify_token(SAMPLE_HWID, "x" * 32) is False + + +class TestFormatHwid: + def test_formats_correctly(self): + result = format_hwid("aabbccddeeff") + assert result == "AA:BB:CC:DD:EE:FF" + + def test_uppercase_output(self): + result = format_hwid("aabbccddeeff") + assert result == result.upper().replace("X", ":") # colons preserved + assert result == "AA:BB:CC:DD:EE:FF" + + def test_colon_separated_6_groups(self): + result = format_hwid("112233445566") + parts = result.split(":") + assert len(parts) == 6 + assert all(len(p) == 2 for p in parts) + + def test_invalid_length_raises(self): + with pytest.raises(ValueError, match="12 hex chars"): + format_hwid("aabb") + + def test_all_zeros(self): + assert format_hwid("000000000000") == "00:00:00:00:00:00" + + def test_all_ff(self): + assert format_hwid("ffffffffffff") == "FF:FF:FF:FF:FF:FF" diff --git a/firmware/ar_autopilot_v1/modbus_registers.yaml b/firmware/ar_autopilot_v1/modbus_registers.yaml index ba8d4e6..ad77914 100644 --- a/firmware/ar_autopilot_v1/modbus_registers.yaml +++ b/firmware/ar_autopilot_v1/modbus_registers.yaml @@ -83,6 +83,9 @@ inputs: - { addr: 6, name: CURRENT_MODE, desc: "Current AutopilotMode (0=STANDBY,1=HH,2=TC,3=TK,4=DODGE)", unit: "" } - { addr: 7, name: FREE_HEAP_KB, desc: "Current free heap, KiB", unit: "KiB" } - { addr: 8, name: MIN_FREE_HEAP_KB, desc: "Minimum free heap since boot", unit: "KiB" } + - { addr: 9, name: HWID_MAC_01, desc: "Hardware ID bytes [0..1] (MAC eFuse high word)", unit: "" } + - { addr: 10, name: HWID_MAC_23, desc: "Hardware ID bytes [2..3] (MAC eFuse mid word)", unit: "" } + - { addr: 11, name: HWID_MAC_45, desc: "Hardware ID bytes [4..5] (MAC eFuse low word)", unit: "" } - { addr: 16, name: RUDDER_ANGLE_DEG_X100, desc: "Filtered rudder angle, deg * 100 (-3500..+3500)", unit: "deg", scale: 0.01 } - { addr: 17, name: RUDDER_RAW_ADC, desc: "Raw ADC reading after median filter (0..4095)", unit: "counts" } diff --git a/firmware/ar_autopilot_v1/src/main.cpp b/firmware/ar_autopilot_v1/src/main.cpp index 432dd74..2f0acf6 100644 --- a/firmware/ar_autopilot_v1/src/main.cpp +++ b/firmware/ar_autopilot_v1/src/main.cpp @@ -37,6 +37,7 @@ #include "safety/safety_monitor.h" #include "safety/watchdog.h" #include "system/ar_log.h" +#include "system/hwid.h" #include "system/task_config.h" // Forward declarations of task-spawning helpers (defined in their own .cpp @@ -65,6 +66,19 @@ void setup() { (int)ESP.getChipCores(), ESP.getFreeHeap()); AR_LOGI(TAG, "================================================"); + // Sprint 8: log hardware ID (eFuse MAC) at boot for activation traceability. + { + uint8_t mac[6] = {}; + char mac_str[18] = {}; + if (arautopilot::system::hwid_get_mac(mac)) { + arautopilot::system::hwid_format(mac, mac_str); + AR_LOGI(TAG, " HWID : %s (hash 0x%08X)", + mac_str, arautopilot::system::hwid_hash()); + } else { + AR_LOGW(TAG, " HWID : eFuse read failed"); + } + } + // Initialise the pilot mode state machine (boots in STANDBY). arautopilot::modes::mode_init(); diff --git a/firmware/ar_autopilot_v1/src/protocols/modbus_registers.h b/firmware/ar_autopilot_v1/src/protocols/modbus_registers.h index 5a6c9fe..0e1d451 100644 --- a/firmware/ar_autopilot_v1/src/protocols/modbus_registers.h +++ b/firmware/ar_autopilot_v1/src/protocols/modbus_registers.h @@ -77,7 +77,7 @@ constexpr uint16_t COIL_CMD_ACK_ALL_ALARMS = 2; constexpr uint16_t COIL_CMD_KNOB_ARM = 3; // ----- Input registers (read-only words) ----- -constexpr uint16_t INPUT_COUNT = 36; +constexpr uint16_t INPUT_COUNT = 39; constexpr uint16_t INPUT_MAX_ADDR = 65; // Firmware major version @@ -102,6 +102,12 @@ constexpr uint16_t INPUT_FREE_HEAP_KB = 7; // Minimum free heap since boot // unit=KiB constexpr uint16_t INPUT_MIN_FREE_HEAP_KB = 8; +// Hardware ID bytes [0..1] (MAC eFuse high word) +constexpr uint16_t INPUT_HWID_MAC_01 = 9; +// Hardware ID bytes [2..3] (MAC eFuse mid word) +constexpr uint16_t INPUT_HWID_MAC_23 = 10; +// Hardware ID bytes [4..5] (MAC eFuse low word) +constexpr uint16_t INPUT_HWID_MAC_45 = 11; // Filtered rudder angle, deg * 100 (-3500..+3500) // unit=deg, scale=0.01 constexpr uint16_t INPUT_RUDDER_ANGLE_DEG_X100 = 16; diff --git a/firmware/ar_autopilot_v1/src/protocols/modbus_slave.cpp b/firmware/ar_autopilot_v1/src/protocols/modbus_slave.cpp index 70038a6..c293391 100644 --- a/firmware/ar_autopilot_v1/src/protocols/modbus_slave.cpp +++ b/firmware/ar_autopilot_v1/src/protocols/modbus_slave.cpp @@ -35,6 +35,7 @@ #include "nmea2000_consumer.h" #include "../hal/knob_encoder.h" #include "../safety/safety_monitor.h" +#include "../system/hwid.h" namespace arautopilot::protocols::modbus { @@ -97,6 +98,17 @@ uint16_t read_input_register(uint16_t addr) { case INPUT_FREE_HEAP_KB: return (uint16_t)(ESP.getFreeHeap() / 1024U); case INPUT_MIN_FREE_HEAP_KB: return (uint16_t)(ESP.getMinFreeHeap() / 1024U); + // Sprint 8: Hardware ID (MAC eFuse, 3 × uint16) + case INPUT_HWID_MAC_01: + case INPUT_HWID_MAC_23: + case INPUT_HWID_MAC_45: { + uint8_t mac[6] = {}; + system::hwid_get_mac(mac); + if (addr == INPUT_HWID_MAC_01) return (uint16_t)((mac[0] << 8) | mac[1]); + if (addr == INPUT_HWID_MAC_23) return (uint16_t)((mac[2] << 8) | mac[3]); + return (uint16_t)((mac[4] << 8) | mac[5]); + } + case INPUT_RUDDER_ANGLE_DEG_X100: { auto r = hal::rudder_sensor_latest(); int v = (int)(r.angle_deg * 100.0f); diff --git a/firmware/ar_autopilot_v1/src/system/hwid.cpp b/firmware/ar_autopilot_v1/src/system/hwid.cpp new file mode 100644 index 0000000..6421d5b --- /dev/null +++ b/firmware/ar_autopilot_v1/src/system/hwid.cpp @@ -0,0 +1,35 @@ +// ============================================================================= +// system/hwid.cpp -- Hardware ID from ESP32 eFuse (Sprint 8) +// ============================================================================= + +#include "hwid.h" + +#include +#include +#include +#include + +namespace arautopilot::system { + +bool hwid_get_mac(uint8_t out[6]) { + return esp_efuse_mac_get_default(out) == ESP_OK; +} + +uint32_t hwid_hash() { + uint8_t mac[6] = {}; + hwid_get_mac(mac); + // Simple FNV-32 hash of the 6 bytes. + uint32_t h = 2166136261U; + for (int i = 0; i < 6; ++i) { + h ^= (uint32_t)mac[i]; + h *= 16777619U; + } + return h; +} + +void hwid_format(uint8_t mac[6], char buf[18]) { + snprintf(buf, 18, "%02X:%02X:%02X:%02X:%02X:%02X", + mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); +} + +} // namespace arautopilot::system diff --git a/firmware/ar_autopilot_v1/src/system/hwid.h b/firmware/ar_autopilot_v1/src/system/hwid.h new file mode 100644 index 0000000..ca2f1ae --- /dev/null +++ b/firmware/ar_autopilot_v1/src/system/hwid.h @@ -0,0 +1,33 @@ +// ============================================================================= +// system/hwid.h -- Hardware ID from ESP32 eFuse (Sprint 8) +// ============================================================================= +// +// The ESP32 has a 6-byte unique MAC burned into eFuse by the manufacturer. +// We use it as a hardware binding token for the activation license. +// +// The HWID is exposed via two Modbus input registers so the Studio can +// read it and generate the activation token offline. +// +// NOTE: The Modbus register map must be extended in Sprint 8+ to include +// INPUT_HWID_HI (addr 9, upper 16 bits) and INPUT_HWID_LO (addr 10, +// lower 16 bits of the middle 2 bytes). Full 6-byte MAC is exposed as +// three uint16 registers: [0..1], [2..3], [4..5]. +// ============================================================================= + +#pragma once + +#include + +namespace arautopilot::system { + +/// Read the ESP32 eFuse MAC into ``out[6]``. +/// Returns true on success; false if the eFuse read fails. +bool hwid_get_mac(uint8_t out[6]); + +/// Return a 32-bit summary hash of the 6-byte MAC (for quick comparisons). +uint32_t hwid_hash(); + +/// Format the MAC as "AA:BB:CC:DD:EE:FF" into ``buf[18]``. +void hwid_format(uint8_t mac[6], char buf[18]); + +} // namespace arautopilot::system