sprint-0: foundations -- data model, seed library, tests, demo

Initial commit. Delivers what the brief calls 'Sprint 0 - Foundations'
(see docs/AR_Autopilot_brief.md section 12):

- Complete repository structure (arautopilot package + firmware, display,
  installer, tools placeholders + docs).
- Core data model (Pydantic v2): modes, alarms, actuator config, PID
  config + gain scheduling, vessel config, knob state machine, project
  config with YAML/JSON serialisation.
- Seed library: 2 actuator profiles (hydraulic & electric DC reversible)
  and 2 default tunings (yacht motor planeo 30 m and 40 m). Conservative
  literature values, NOT the integrator's production tuning IP.
- Firmware skeleton: only src/hal/pinout.h with the 21 I/O contract for
  the AR-NMEA-IO v1.0 board. No drivers, no main loop.
- Studio stubs (real PySide6 app starts in Sprint 4).
- pytest suite (80 tests, all green): modes, alarms, actuator, PID
  (incl. gain interpolation and the +/-50% adaptive bound from brief
  section 6), vessel, knob state, project config, library loader,
  end-to-end roundtrip.
- examples/sprint0_demo.py - the acceptance demo from the brief.

Acceptance criteria met:
- pytest green (80/80)
- demo creates, saves (YAML + JSON), reloads, and verifies a full
  ProjectConfig using the seed library
- repository ready for tag `sprint-0-approved`

See CHANGELOG.md for the detailed scope.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-17 23:57:18 -04:00
commit 700756c16f
54 changed files with 3855 additions and 0 deletions
+88
View File
@@ -0,0 +1,88 @@
"""Core data model for AR-Autopilot.
This module exports the typed, validated Pydantic v2 models used across
the Studio, the firmware build pipeline, and the test bench.
Public surface (Sprint 0):
- :mod:`~arautopilot.core.ids` — typed identifier wrappers
- :mod:`~arautopilot.core.modes` — autopilot operating modes
- :mod:`~arautopilot.core.alarms` — alarm types and severities
- :mod:`~arautopilot.core.actuator_config` — rudder actuator configuration
- :mod:`~arautopilot.core.pid_config` — cascaded PID + gain schedule
- :mod:`~arautopilot.core.vessel_config` — vessel + composed configs
- :mod:`~arautopilot.core.knob_state` — bridge knob arming state machine
- :mod:`~arautopilot.core.project_config` — root project config (root entity)
"""
from arautopilot.core.actuator_config import (
ActuatorConfig,
ActuatorType,
)
from arautopilot.core.alarms import (
Alarm,
AlarmSeverity,
AlarmType,
)
from arautopilot.core.ids import (
ProjectId,
VesselId,
new_project_id,
new_vessel_id,
)
from arautopilot.core.knob_state import (
KnobFunction,
KnobMode,
KnobState,
)
from arautopilot.core.modes import (
AutopilotMode,
is_available_in_phase,
)
from arautopilot.core.pid_config import (
AccessLevel,
GainSchedulePoint,
PidConfig,
PidGains,
interpolate_gains,
)
from arautopilot.core.project_config import (
ProjectConfig,
)
from arautopilot.core.vessel_config import (
VesselConfig,
VesselType,
)
__all__ = [
# ids
"ProjectId",
"VesselId",
"new_project_id",
"new_vessel_id",
# modes
"AutopilotMode",
"is_available_in_phase",
# alarms
"AlarmType",
"AlarmSeverity",
"Alarm",
# actuator
"ActuatorType",
"ActuatorConfig",
# pid
"AccessLevel",
"PidGains",
"GainSchedulePoint",
"PidConfig",
"interpolate_gains",
# vessel
"VesselType",
"VesselConfig",
# knob
"KnobMode",
"KnobFunction",
"KnobState",
# project
"ProjectConfig",
]
+140
View File
@@ -0,0 +1,140 @@
"""Rudder actuator configuration.
Supports the actuator families enumerated in section 3 of the brief.
Sterndrive, IPS and Zeus interfaces are reserved for Phase 3 and are
intentionally **not** enabled in Phase 1 configurations.
"""
from __future__ import annotations
from enum import Enum
from pydantic import BaseModel, ConfigDict, Field, field_validator
class ActuatorType(str, Enum):
"""Actuator family driving the rudder.
Phase-1 supported families have a regular value. Reserved (Phase 3)
families also live here so that configurations and the UI can model
them, but the firmware refuses to operate them in Phase 1.
"""
HYDRAULIC_REVERSIBLE = "hydraulic_reversible"
"""Reversible hydraulic pump (Hynautic, Hypro, Octopus, Vetus, L&S). Phase 1."""
ELECTRIC_DC_REVERSIBLE = "electric_dc_reversible"
"""Reversible DC motor with mechanical end-stops (Lewmar, Simpson Lawrence). Phase 1."""
SERVOMOTOR_FEEDBACK = "servomotor_feedback"
"""Servomotor with built-in position feedback. Phase 1."""
STERNDRIVE_ANALOG = "sterndrive_analog"
"""Analog directional sterndrive. Phase 1."""
VOLVO_IPS = "volvo_ips"
"""Volvo IPS via EVC. Phase 3 — reserved."""
MERCURY_ZEUS = "mercury_zeus"
"""Mercury Zeus via SmartCraft. Phase 3 — reserved."""
_PHASE_1_ACTUATORS: frozenset[ActuatorType] = frozenset(
{
ActuatorType.HYDRAULIC_REVERSIBLE,
ActuatorType.ELECTRIC_DC_REVERSIBLE,
ActuatorType.SERVOMOTOR_FEEDBACK,
ActuatorType.STERNDRIVE_ANALOG,
}
)
def is_phase_1(actuator_type: ActuatorType) -> bool:
"""Return ``True`` iff the firmware is allowed to drive this actuator in Phase 1."""
return actuator_type in _PHASE_1_ACTUATORS
class ActuatorConfig(BaseModel):
"""Configuration of the rudder actuator and its calibration.
All angles are in degrees. ``deadband_pct`` and ``min_useful_pwm_pct``
are percentages of full command. Asymmetry is the ratio of starboard
speed to port speed: ``1.0`` means symmetric, ``1.10`` means the pump
pushes 10 % faster to starboard than to port.
"""
model_config = ConfigDict(extra="forbid", validate_assignment=True)
type: ActuatorType
"""Actuator family."""
name: str = Field(
default="",
max_length=120,
description="Free-form label, e.g. 'Hynautic K-21 + Capilano cylinder'.",
)
# -- Calibration / non-linearity compensation ------------------------------
deadband_pct: float = Field(
default=5.0,
ge=0.0,
le=30.0,
description="Initial percentage of command that produces no motion (static friction).",
)
min_useful_pwm_pct: float = Field(
default=8.0,
ge=0.0,
le=50.0,
description="Minimum PWM where the actuator actually moves; commands below this snap up to it.",
)
asymmetry_stbd_over_port: float = Field(
default=1.0,
ge=0.50,
le=2.00,
description="Ratio of starboard speed to port speed; 1.0 = symmetric.",
)
# -- Mechanical / electrical limits ---------------------------------------
max_rudder_angle_deg: float = Field(
default=35.0,
gt=0.0,
le=45.0,
description="Mechanical limit on either side from amidships; typically 35 deg.",
)
max_rate_dps: float = Field(
default=4.5,
gt=0.0,
le=15.0,
description="Maximum slew rate in degrees per second (typical 3-6 dps).",
)
max_current_a: float = Field(
default=15.0,
gt=0.0,
le=200.0,
description="Overcurrent trip threshold (A) for the actuator power line.",
)
# -- Safety policy --------------------------------------------------------
feedback_required: bool = Field(
default=True,
description=(
"Closed loop only. Open-loop operation requires explicit integrator "
"override (brief section 3, Phase 1). Default True."
),
)
@field_validator("min_useful_pwm_pct")
@classmethod
def _min_useful_must_cover_deadband(cls, v: float, info: object) -> float:
# info.data is provided by pydantic-core for cross-field validation
data = getattr(info, "data", {}) or {}
deadband = data.get("deadband_pct")
if deadband is not None and v < deadband:
raise ValueError(
f"min_useful_pwm_pct ({v}) must be >= deadband_pct ({deadband})"
)
return v
def is_phase_1_supported(self) -> bool:
"""Return ``True`` iff this actuator can be driven in Phase 1."""
return is_phase_1(self.type)
+116
View File
@@ -0,0 +1,116 @@
"""Alarm types, severities, and the ``Alarm`` runtime record.
The catalogue mirrors section 7 of the brief. Each ``AlarmType`` has a
canonical default ``AlarmSeverity`` and a hard-coded flag stating whether
that alarm, when fired, should also trigger automatic disengagement of
the autopilot (return to STANDBY).
"""
from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from pydantic import BaseModel, ConfigDict, Field
class AlarmSeverity(str, Enum):
"""IEC-style four-level severity scheme."""
EMERGENCY = "emergency"
"""Immediate danger; loudest annunciation, blocking modal on display."""
HIGH = "high"
"""Operator must act within a few seconds."""
LOW = "low"
"""Warning; degraded operation but no immediate danger."""
INFO = "info"
"""Informational only; ack with one tap."""
class AlarmType(str, Enum):
"""The fixed catalogue of alarms emitted by the firmware.
Adding an entry here is a deliberate, reviewed change — the firmware
state machine and the display strings must be updated in lockstep.
"""
OFF_COURSE = "off_course"
OFF_COURSE_SEVERE = "off_course_severe"
RUDDER_NOT_RESPONDING = "rudder_not_responding"
HEADING_SENSOR_LOST = "heading_sensor_lost"
ACTUATOR_OVERCURRENT = "actuator_overcurrent"
VOLTAGE_LOW = "voltage_low"
LIMIT_SWITCH_REACHED = "limit_switch_reached"
WATCHDOG_TRIPPED = "watchdog_tripped"
VMS_CRITICAL = "vms_critical"
# Canonical metadata per alarm type. Source of truth referenced by both
# the firmware (via code-generated header) and the Studio (via the GUI).
_CATALOGUE: dict[AlarmType, tuple[AlarmSeverity, bool, str]] = {
AlarmType.OFF_COURSE: (AlarmSeverity.LOW, False, "Heading deviates from setpoint beyond threshold"),
AlarmType.OFF_COURSE_SEVERE: (AlarmSeverity.EMERGENCY, True, "Heading deviation >30 deg for >5 s — auto-disengage"),
AlarmType.RUDDER_NOT_RESPONDING:(AlarmSeverity.EMERGENCY, True, "Rudder command sent but no feedback motion — auto-disengage"),
AlarmType.HEADING_SENSOR_LOST: (AlarmSeverity.EMERGENCY, True, "NMEA 2000 PGN 127250 not received for >5 s — auto-disengage"),
AlarmType.ACTUATOR_OVERCURRENT: (AlarmSeverity.HIGH, True, "Actuator current exceeded configured limit"),
AlarmType.VOLTAGE_LOW: (AlarmSeverity.HIGH, True, "Supply voltage below safe operating threshold"),
AlarmType.LIMIT_SWITCH_REACHED: (AlarmSeverity.LOW, False, "Rudder reached mechanical end-stop"),
AlarmType.WATCHDOG_TRIPPED: (AlarmSeverity.EMERGENCY, True, "Firmware watchdog fired — controller reset to STANDBY"),
AlarmType.VMS_CRITICAL: (AlarmSeverity.EMERGENCY, True, "VMS reported blackout or critical electrical overload"),
}
def default_severity(alarm_type: AlarmType) -> AlarmSeverity:
"""Return the canonical severity for ``alarm_type``."""
return _CATALOGUE[alarm_type][0]
def triggers_auto_disengage(alarm_type: AlarmType) -> bool:
"""Return ``True`` iff this alarm forces the pilot into STANDBY."""
return _CATALOGUE[alarm_type][1]
def default_message(alarm_type: AlarmType) -> str:
"""Return the canonical default human-readable description."""
return _CATALOGUE[alarm_type][2]
class Alarm(BaseModel):
"""A single alarm event raised by the firmware at runtime.
Persisted to the immutable audit log (brief section 14, rule #14).
"""
model_config = ConfigDict(frozen=True, extra="forbid")
type: AlarmType
severity: AlarmSeverity
message: str = Field(min_length=1, max_length=240)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
source: str = Field(
default="firmware",
description="Subsystem that raised the alarm (e.g. 'firmware', 'display', 'vms').",
max_length=64,
)
acknowledged: bool = False
auto_disengage_triggered: bool = False
@classmethod
def from_type(
cls,
alarm_type: AlarmType,
*,
message: str | None = None,
source: str = "firmware",
) -> "Alarm":
"""Convenience constructor using catalogue defaults."""
return cls(
type=alarm_type,
severity=default_severity(alarm_type),
message=message or default_message(alarm_type),
source=source,
auto_disengage_triggered=triggers_auto_disengage(alarm_type),
)
+27
View File
@@ -0,0 +1,27 @@
"""Typed identifier wrappers.
Pydantic v2 ``Annotated[str, ...]`` aliases that prevent accidentally
passing a ``VesselId`` where a ``ProjectId`` is expected (mypy-enforced).
At runtime they are plain strings — UUID v4 hex by default.
"""
from __future__ import annotations
import uuid
from typing import NewType
ProjectId = NewType("ProjectId", str)
"""Unique identifier of a customer project (one vessel = one project)."""
VesselId = NewType("VesselId", str)
"""Unique identifier of a vessel within a project."""
def new_project_id() -> ProjectId:
"""Generate a fresh, random ``ProjectId`` (UUID v4 hex)."""
return ProjectId(uuid.uuid4().hex)
def new_vessel_id() -> VesselId:
"""Generate a fresh, random ``VesselId`` (UUID v4 hex)."""
return VesselId(uuid.uuid4().hex)
+162
View File
@@ -0,0 +1,162 @@
"""Bridge rotary knob — armed-by-software state machine.
Section 5 of the brief: the knob never controls anything by default.
The operator must press it, pick a function, and then turn. Idle activity
auto-disarms after a configurable timeout (default 30 s).
The state machine modelled here is the **logical** one consumed by the
Display and by the controller; the actual encoder quadrature decoding
happens in the ESP32 firmware (Sprint 7).
"""
from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from pydantic import BaseModel, ConfigDict, Field, model_validator
class KnobMode(str, Enum):
"""High-level state of the bridge knob."""
LIBRE = "libre"
"""Idle: rotation does nothing. Default."""
ARMADO = "armado"
"""Function selected; rotation now produces a pending value."""
CONFIRMANDO = "confirmando"
"""Pending value being shown for operator confirmation."""
class KnobFunction(str, Enum):
"""The set of values the knob may target once armed."""
NONE = "none"
"""No function selected (LIBRE only)."""
RUMBO = "rumbo"
"""Heading setpoint."""
GANANCIA_P = "ganancia_p"
"""Proportional gain (Technician+ only)."""
VELOCIDAD_GIRO = "velocidad_giro"
"""Desired turn rate."""
BRILLO = "brillo"
"""Display brightness."""
VOLUMEN = "volumen"
"""Alarm volume."""
DEFAULT_ARMED_TIMEOUT_S: float = 30.0
"""Default auto-disarm timeout in seconds (brief section 5)."""
class KnobState(BaseModel):
"""Immutable snapshot of the knob state at a point in time."""
model_config = ConfigDict(frozen=True, extra="forbid")
mode: KnobMode = KnobMode.LIBRE
function: KnobFunction = KnobFunction.NONE
current_value: float | None = Field(
default=None,
description="Last confirmed value of the selected function.",
)
pending_value: float | None = Field(
default=None,
description="Value being adjusted but not yet confirmed.",
)
armed_at: datetime | None = Field(
default=None,
description="UTC timestamp when the knob entered ARMADO mode.",
)
timeout_remaining_s: float = Field(
default=0.0,
ge=0.0,
description="Seconds remaining before auto-disarm.",
)
@model_validator(mode="after")
def _consistency(self) -> "KnobState":
if self.mode == KnobMode.LIBRE:
if self.function != KnobFunction.NONE:
raise ValueError("LIBRE mode requires function == NONE")
if self.pending_value is not None:
raise ValueError("LIBRE mode forbids pending_value")
if self.armed_at is not None:
raise ValueError("LIBRE mode forbids armed_at")
if self.timeout_remaining_s != 0.0:
raise ValueError("LIBRE mode requires timeout_remaining_s == 0")
else:
if self.function == KnobFunction.NONE:
raise ValueError(f"{self.mode.value} mode requires a non-NONE function")
if self.armed_at is None:
raise ValueError(f"{self.mode.value} mode requires armed_at to be set")
if self.mode == KnobMode.CONFIRMANDO and self.pending_value is None:
raise ValueError("CONFIRMANDO mode requires pending_value to be set")
return self
# --- Pure transition helpers (return new immutable states) --------------
@classmethod
def idle(cls) -> "KnobState":
"""Construct the canonical idle (LIBRE) state."""
return cls()
def arm(
self,
function: KnobFunction,
*,
current_value: float,
timeout_s: float = DEFAULT_ARMED_TIMEOUT_S,
now: datetime | None = None,
) -> "KnobState":
"""Transition LIBRE → ARMADO for the given function."""
if self.mode != KnobMode.LIBRE:
raise ValueError(f"Cannot arm from mode {self.mode.value}")
if function == KnobFunction.NONE:
raise ValueError("Cannot arm with KnobFunction.NONE")
return KnobState(
mode=KnobMode.ARMADO,
function=function,
current_value=current_value,
pending_value=None,
armed_at=now or datetime.now(timezone.utc),
timeout_remaining_s=timeout_s,
)
def propose(self, value: float, *, timeout_s: float = DEFAULT_ARMED_TIMEOUT_S) -> "KnobState":
"""Operator turned the knob: stage a pending value (ARMADO → CONFIRMANDO)."""
if self.mode not in (KnobMode.ARMADO, KnobMode.CONFIRMANDO):
raise ValueError(f"Cannot propose from mode {self.mode.value}")
return KnobState(
mode=KnobMode.CONFIRMANDO,
function=self.function,
current_value=self.current_value,
pending_value=value,
armed_at=self.armed_at,
timeout_remaining_s=timeout_s,
)
def confirm(self, *, timeout_s: float = DEFAULT_ARMED_TIMEOUT_S) -> "KnobState":
"""Operator pressed to confirm; commits ``pending_value`` and stays armed."""
if self.mode != KnobMode.CONFIRMANDO:
raise ValueError(f"Cannot confirm from mode {self.mode.value}")
return KnobState(
mode=KnobMode.ARMADO,
function=self.function,
current_value=self.pending_value,
pending_value=None,
armed_at=self.armed_at,
timeout_remaining_s=timeout_s,
)
def disarm(self) -> "KnobState":
"""Force the knob back to LIBRE (timeout, alarm, mode change, long-press)."""
return KnobState.idle()
+76
View File
@@ -0,0 +1,76 @@
"""Autopilot operating modes.
The brief (section 3) defines five Phase 1 modes plus three Phase 2 sail
modes that appear greyed-out in the UI but are reserved here for forward
compatibility.
"""
from __future__ import annotations
from enum import Enum
class AutopilotMode(str, Enum):
"""The complete set of autopilot operating modes, across both phases.
Use :func:`is_available_in_phase` to query phase gating instead of
hard-coding lists in the UI.
"""
# --- Phase 1 (Sprint 1+) ---
STANDBY = "standby"
"""Pilot disengaged, helm is manual."""
HEADING_HOLD = "heading_hold"
"""Holds a fixed magnetic/true compass heading."""
TRUE_COURSE = "true_course"
"""Holds COG, compensating drift due to current and wind."""
TRACK_KEEPING = "track_keeping"
"""Follows an ECDIS waypoint route with smooth XTE correction."""
DODGE = "dodge"
"""Temporary deviation without losing the route; auto-returns when released."""
# --- Phase 2 (Sprint 10+, greyed in Phase 1 UI) ---
APPARENT_WIND = "apparent_wind"
"""Holds constant apparent wind angle (vane mode). Sailboats only."""
TRUE_WIND = "true_wind"
"""Holds constant true wind angle. Sailboats only."""
AUTO_TACK = "auto_tack"
"""Automatically tacks at a target relative wind angle. Sailboats only."""
_PHASE_1: frozenset[AutopilotMode] = frozenset(
{
AutopilotMode.STANDBY,
AutopilotMode.HEADING_HOLD,
AutopilotMode.TRUE_COURSE,
AutopilotMode.TRACK_KEEPING,
AutopilotMode.DODGE,
}
)
_PHASE_2: frozenset[AutopilotMode] = frozenset(
{
AutopilotMode.APPARENT_WIND,
AutopilotMode.TRUE_WIND,
AutopilotMode.AUTO_TACK,
}
)
def is_available_in_phase(mode: AutopilotMode, phase: int) -> bool:
"""Return ``True`` if ``mode`` is available in the given product phase.
Phase 1 is the launch product. Phase 2 adds the sailboat wind modes.
Asking for any other phase number returns ``False``.
"""
if phase == 1:
return mode in _PHASE_1
if phase == 2:
return mode in _PHASE_1 or mode in _PHASE_2
return False
+197
View File
@@ -0,0 +1,197 @@
"""Cascaded PID configuration + gain scheduling by vessel speed.
Mirrors section 6 of the brief:
- Inner loop (50 Hz) drives rudder position to setpoint.
- Outer loop (10 Hz) drives heading error to a rudder setpoint, with
Rate-of-Turn feed-forward and gain scheduling by SOG.
- Adaptive tuning (Sprint 9+) may shift the active gains by no more than
±50 % of the base gains — this hard limit is enforced here.
"""
from __future__ import annotations
from enum import Enum
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
class AccessLevel(str, Enum):
"""Three-level RBAC for PID tuning (brief section 6)."""
OPERATOR = "operator"
"""Captain: chooses a pre-configured profile only (Soft/Normal/Sport)."""
TECHNICIAN = "technician"
"""Authorised installer (PIN): may shift gains within ±30 % of base."""
INTEGRATOR = "integrator"
"""Alvaro (PIN): full access, may edit base gains and adaptation limits."""
class PidGains(BaseModel):
"""A single set of PID gains."""
model_config = ConfigDict(extra="forbid", validate_assignment=True)
kp: float = Field(ge=0.0, description="Proportional gain.")
ki: float = Field(default=0.0, ge=0.0, description="Integral gain.")
kd: float = Field(default=0.0, ge=0.0, description="Derivative gain.")
class GainSchedulePoint(BaseModel):
"""One point of the SOG-indexed gain schedule for the outer loop."""
model_config = ConfigDict(extra="forbid", validate_assignment=True)
speed_knots: float = Field(ge=0.0, le=80.0, description="SOG at which these gains apply.")
gains: PidGains
class PidConfig(BaseModel):
"""Complete PID configuration for one vessel.
The base gains (``inner_loop_base`` and ``outer_loop_base``) are the
integrator's IP and are NEVER exposed to the operator. They are the
starting point for adaptive tuning and the anchor against which the
adaptive bound (``adaptive_max_deviation_pct``) is enforced.
"""
model_config = ConfigDict(extra="forbid", validate_assignment=True)
schema_version: str = Field(default="0.1.0", pattern=r"^\d+\.\d+\.\d+$")
# --- Inner loop (rudder position controller, 50 Hz) ---------------------
inner_loop_base: PidGains
inner_loop_freq_hz: float = Field(default=50.0, gt=0.0, le=200.0)
# --- Outer loop (heading controller, 10 Hz) -----------------------------
outer_loop_base: PidGains
outer_loop_freq_hz: float = Field(default=10.0, gt=0.0, le=50.0)
# --- Gain scheduling by vessel SOG (outer loop) -------------------------
gain_schedule: list[GainSchedulePoint] = Field(
default_factory=list,
description=(
"Outer-loop gains by SOG. Linear interpolation between points; "
"outside the range, the closest endpoint is held. Empty list "
"means scheduling is disabled and the base gains are used."
),
)
# --- Feed-forward & saturation ------------------------------------------
rot_feedforward_gain: float = Field(
default=0.0,
ge=0.0,
le=10.0,
description="Rate-of-Turn feed-forward gain (brief section 6 — anticipates inertia).",
)
setpoint_deadband_deg: float = Field(
default=0.5,
ge=0.0,
le=5.0,
description="Heading deadband around the setpoint to suppress noise oscillation.",
)
setpoint_rate_limit_dps: float = Field(
default=4.0,
gt=0.0,
le=20.0,
description="Maximum rate of change of the heading setpoint (typical 3-6 dps).",
)
anti_windup_limit: float = Field(
default=10.0,
gt=0.0,
description="Integrator absolute clamp; saturates when the actuator saturates.",
)
# --- Adaptive tuning safety bound (Sprint 9+) ---------------------------
adaptive_enabled: bool = Field(
default=False,
description="Enabled at commissioning by the integrator only.",
)
adaptive_max_deviation_pct: float = Field(
default=50.0,
gt=0.0,
le=50.0,
description=(
"Hard cap on how far adaptive tuning may shift the active gains "
"away from the base gains. Brief section 6: 'never out of ±50 %'."
),
)
# --- Validators ---------------------------------------------------------
@field_validator("gain_schedule")
@classmethod
def _schedule_must_be_ascending(
cls, v: list[GainSchedulePoint]
) -> list[GainSchedulePoint]:
speeds = [p.speed_knots for p in v]
if speeds != sorted(speeds):
raise ValueError("gain_schedule points must be sorted by ascending speed_knots")
if len(set(speeds)) != len(speeds):
raise ValueError("gain_schedule points must have unique speed_knots values")
return v
@model_validator(mode="after")
def _check_loop_frequencies(self) -> "PidConfig":
if self.inner_loop_freq_hz <= self.outer_loop_freq_hz:
raise ValueError(
"inner_loop_freq_hz must be strictly greater than outer_loop_freq_hz "
"(cascaded control requires the inner loop to be faster)"
)
return self
def is_within_adaptive_bound(self, candidate: PidGains, *, of: str = "outer") -> bool:
"""Return ``True`` if ``candidate`` is within the adaptive bound of the base.
``of`` is either ``"inner"`` or ``"outer"``.
"""
base = self.inner_loop_base if of == "inner" else self.outer_loop_base
limit = self.adaptive_max_deviation_pct / 100.0
return all(
_within_pct(getattr(candidate, axis), getattr(base, axis), limit)
for axis in ("kp", "ki", "kd")
)
def _within_pct(candidate: float, base: float, frac: float) -> bool:
"""``True`` iff ``candidate`` lies within ``±frac`` of ``base``.
If ``base`` is exactly zero, we require the candidate to be zero too
(any non-zero gain breaks the ratio test).
"""
if base == 0.0:
return candidate == 0.0
lo = base * (1.0 - frac)
hi = base * (1.0 + frac)
return lo <= candidate <= hi
def interpolate_gains(schedule: list[GainSchedulePoint], speed_knots: float) -> PidGains:
"""Linear interpolation of outer-loop gains over the speed schedule.
Outside the range of the schedule, the nearest endpoint is held
(no extrapolation). Raises ``ValueError`` if the schedule is empty.
"""
if not schedule:
raise ValueError("Cannot interpolate over an empty gain_schedule")
# Endpoint hold
if speed_knots <= schedule[0].speed_knots:
return schedule[0].gains
if speed_knots >= schedule[-1].speed_knots:
return schedule[-1].gains
# Find bracketing points
for i in range(len(schedule) - 1):
a, b = schedule[i], schedule[i + 1]
if a.speed_knots <= speed_knots <= b.speed_knots:
span = b.speed_knots - a.speed_knots
t = 0.0 if span == 0.0 else (speed_knots - a.speed_knots) / span
return PidGains(
kp=a.gains.kp + t * (b.gains.kp - a.gains.kp),
ki=a.gains.ki + t * (b.gains.ki - a.gains.ki),
kd=a.gains.kd + t * (b.gains.kd - a.gains.kd),
)
# Unreachable given the endpoint guards above
raise RuntimeError("Gain schedule interpolation failed unexpectedly")
+103
View File
@@ -0,0 +1,103 @@
"""Root project entity — what a Studio ``.appack`` package contains.
A ``ProjectConfig`` is the unit of work in the Studio: one customer
project = one configured vessel. It serialises to YAML or JSON for
transport into the firmware build pipeline.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
from pydantic import BaseModel, ConfigDict, Field
from arautopilot.core.ids import ProjectId, new_project_id
from arautopilot.core.vessel_config import VesselConfig
from arautopilot.version import __version__
class ProjectConfig(BaseModel):
"""Root configuration object persisted to disk and shipped as ``.appack``."""
model_config = ConfigDict(extra="forbid", validate_assignment=True)
schema_version: str = Field(default="0.1.0", pattern=r"^\d+\.\d+\.\d+$")
"""Project file schema version (independent of package ``__version__``)."""
package_version: str = Field(default=__version__)
"""Version of ``arautopilot`` that produced this file (informational)."""
project_id: ProjectId = Field(default_factory=new_project_id)
client_name: str = Field(min_length=1, max_length=120)
project_name: str = Field(min_length=1, max_length=120)
notes: str = Field(default="", max_length=2000)
vessel: VesselConfig
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# --- Serialisation -------------------------------------------------------
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-ready dict (datetimes as ISO 8601 strings)."""
return self.model_dump(mode="json")
def to_json(self, *, indent: int | None = 2) -> str:
"""Serialise to JSON text."""
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
def to_yaml(self) -> str:
"""Serialise to YAML text (block style, sorted keys disabled to preserve schema order)."""
return yaml.safe_dump(
self.to_dict(),
sort_keys=False,
default_flow_style=False,
allow_unicode=True,
)
def save_yaml(self, path: Path | str) -> Path:
"""Write the project to ``path`` as YAML and return the resolved path."""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(self.to_yaml(), encoding="utf-8")
return p
def save_json(self, path: Path | str) -> Path:
"""Write the project to ``path`` as JSON and return the resolved path."""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(self.to_json(), encoding="utf-8")
return p
# --- Deserialisation -----------------------------------------------------
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ProjectConfig":
return cls.model_validate(data)
@classmethod
def from_json(cls, text: str) -> "ProjectConfig":
return cls.model_validate(json.loads(text))
@classmethod
def from_yaml(cls, text: str) -> "ProjectConfig":
return cls.model_validate(yaml.safe_load(text))
@classmethod
def load(cls, path: Path | str) -> "ProjectConfig":
"""Load from disk; format inferred from the file extension."""
p = Path(path)
text = p.read_text(encoding="utf-8")
suffix = p.suffix.lower()
if suffix in (".yaml", ".yml"):
return cls.from_yaml(text)
if suffix == ".json":
return cls.from_json(text)
raise ValueError(f"Unsupported project file extension: {suffix!r}")
def touch(self) -> "ProjectConfig":
"""Return a copy with ``modified_at`` refreshed to now (UTC)."""
return self.model_copy(update={"modified_at": datetime.now(timezone.utc)})
+62
View File
@@ -0,0 +1,62 @@
"""Per-vessel configuration: identification + actuator + PID.
Composes the lower-level configs into one object that lives at the heart
of every ``ProjectConfig``.
"""
from __future__ import annotations
from enum import Enum
from pydantic import BaseModel, ConfigDict, Field
from arautopilot.core.actuator_config import ActuatorConfig
from arautopilot.core.ids import VesselId, new_vessel_id
from arautopilot.core.pid_config import PidConfig
class VesselType(str, Enum):
"""Vessel classes targeted by Phase 1 of the product (brief section 3)."""
YACHT_MOTOR_PLANEO = "yacht_motor_planeo"
"""Planing motor yacht, 30-40 m."""
YACHT_MOTOR_DESPLAZAMIENTO = "yacht_motor_desplazamiento"
"""Displacement motor yacht, 30-40 m."""
SAILBOAT_MOTOR = "sailboat_motor"
"""Sailing yacht under motor (no sail trim). Phase 1 only."""
FISHING_BOAT = "fishing_boat"
"""Fishing vessel, 30 m class."""
SMALL_FERRY = "small_ferry"
"""Small ferry, 30 m class."""
PATROL_BOAT = "patrol_boat"
"""Coastal patrol boat, 30 m class."""
class VesselConfig(BaseModel):
"""Identification, geometry, and control configuration of one vessel."""
model_config = ConfigDict(extra="forbid", validate_assignment=True)
vessel_id: VesselId = Field(default_factory=new_vessel_id)
name: str = Field(min_length=1, max_length=120)
type: VesselType
length_m: float = Field(gt=0.0, le=200.0, description="Length overall, metres.")
displacement_t: float = Field(
default=0.0,
ge=0.0,
le=10_000.0,
description="Loaded displacement, tonnes. 0 means unknown.",
)
max_speed_kn: float = Field(
gt=0.0,
le=80.0,
description="Maximum design speed over ground, knots.",
)
actuator: ActuatorConfig
pid: PidConfig