Files
alro65 b82ed400bc feat: BNO085 IMU integration — SPICE + simulator yaw rate feed-forward
SPICE (6_bno085_imu.cir):
  - BNO085 power supply with 10uF + 100nF decoupling on VDD
  - Power-on reset RC circuit (R=10K, C=1uF, tau=10ms → deasserts at ~12ms)
  - I2C Fast Mode 400kHz bus: 4.7K pull-ups, 50pF bus capacitance model
  - Full I2C transaction: START + address 0x4A + R/W + BNO085 ACK + STOP
  - INT pin (open-drain, 10K pull-up, 100Hz interrupt simulation)
  - .meas directives: reset timing, SCL rise time, VDD stability

Simulator (esp32_sim.py):
  - SimSnapshot.bno085_yaw_rate_dps field added
  - _bno085_enabled / _bno085_noise_std_dps / _bno085_yaw_rate_dps state
  - enable_bno085(noise_std_dps=0.02) public method
  - disable_bno085() public method
  - _run_physics: samples gyro at 50Hz with Gaussian noise model
  - _run_outer_loop: uses BNO085 yaw rate for rot_ff_term when enabled
    (replaces NMEA-derived ROT — lower latency ~4ms vs ~100-200ms)

Usage:
  sim.enable_bno085()          # activate gyro feed-forward
  sim.enable_bno085(noise_std_dps=0.014)  # with BNO085 spec noise

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 22:46:16 -04:00

926 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
PROPÓSITO
---------
Simulación software completa del firmware ESP32 de AR-Autopilot.
POR QUÉ EXISTE
--------------
El ESP32 físico necesita CAN-bus (NMEA 2000) para recibir el compass y
RS-485 para recibir comandos Modbus. Sin hardware real ambos canales están
ausentes. Este módulo implementa en Python puro el mismo estado interno y la
misma aritmética que el firmware C++, permitiendo desarrollar y validar el
protocolo de pruebas antes de tener el barco delante.
CÓMO FUNCIONA
-------------
El simulador tiene tres capas en paralelo:
Capa física (50 Hz)
RudderSimulator — PWM → ángulo de timón
VesselHeadingSimulator — timón × velocidad → ROT → rumbo
Cascada PID (outer 10 Hz, inner 50 Hz)
PidOuter — error_rumbo → setpoint_timón
PidInner — error_timón → PWM
Interfaz Modbus (diccionarios Python)
holdings — escribibles por el PC (comandos)
coils — escribibles por el PC (pulsos)
inputs — de solo lectura (telemetría)
discretes — de solo lectura (alarmas / flags)
RELACIONADO
-----------
arautopilot/studio/simulator/pid_outer.py
arautopilot/studio/simulator/pid_inner.py
arautopilot/studio/simulator/vessel_heading.py
arautopilot/studio/simulator/rudder_dynamics.py
arautopilot/shared/modbus_register_map.py
tools/sim_protocol.py
"""
from __future__ import annotations
import math
import random
import sys
from dataclasses import dataclass, field
from enum import IntEnum
from pathlib import Path
# Añadir raíz del proyecto al path para importar arautopilot.*
sys.path.insert(0, str(Path(__file__).parent.parent))
from arautopilot.studio.simulator.pid_inner import PidInner, PidInnerConfig
from arautopilot.studio.simulator.pid_outer import PidOuter, PidOuterConfig
from arautopilot.studio.simulator.rudder_dynamics import (
RudderDynamicsConfig,
RudderSimulator,
)
from arautopilot.studio.simulator.vessel_heading import (
VesselHeadingConfig,
VesselHeadingSimulator,
heading_error_deg,
)
# ---------------------------------------------------------------------------
# Constantes — mirrors de los defines del firmware
# ---------------------------------------------------------------------------
OFF_COURSE_WARN_DEG: float = 15.0
OFF_COURSE_SEVERE_DEG: float = 30.0
HEADING_TIMEOUT_S: float = 5.0
DT_INNER: float = 1.0 / 50.0 # 50 Hz inner loop
DT_OUTER: float = 1.0 / 10.0 # 10 Hz outer loop
INNER_PER_OUTER: int = 5 # inner ticks per outer tick
# ---------------------------------------------------------------------------
# Beaufort scale → wave disturbance parameters
# ---------------------------------------------------------------------------
# Each entry: (wave_torque_amp, swell_torque_amp, noise_torque_std,
# wind_bias_amp, wave_period_s)
# Torques are in °/s² (= yaw angular acceleration).
# Steady-state ROT from torque T: ROT_ss = T / yaw_damping = T / 0.8
# Heading oscillation at wave freq ω (rad/s): A_h ≈ T / (yaw_damping * ω)
# Expected peak heading oscillation for our 30 m yacht at 10 kn:
# B3 ≈ ±1° B4 ≈ ±3° B5 ≈ ±5° B6 ≈ ±8° B7 ≈ ±13°
_BEAUFORT_TABLE: dict[int, tuple[float, float, float, float, float]] = {
# B: wave swell noise wind period_s
0: (0.00, 0.00, 0.00, 0.00, 8.0),
1: (0.10, 0.05, 0.05, 0.05, 7.0),
2: (0.30, 0.10, 0.10, 0.10, 6.5),
3: (0.60, 0.20, 0.20, 0.20, 5.5),
4: (1.50, 0.40, 0.30, 0.40, 6.0),
5: (2.50, 0.70, 0.60, 0.80, 7.0),
6: (3.50, 1.00, 1.00, 1.20, 8.0),
7: (5.50, 1.50, 1.60, 2.00, 9.0),
8: (8.00, 2.00, 2.50, 3.00, 10.0),
}
class AutopilotMode(IntEnum):
"""Modos del autopiloto — idénticos al enum C++ del firmware."""
STANDBY = 0
HEADING_HOLD = 1
TRUE_COURSE = 2
TRACK_KEEPING = 3
DODGE = 4
# ---------------------------------------------------------------------------
# Datos de telemetría
# ---------------------------------------------------------------------------
@dataclass
class SimSnapshot:
"""Una muestra completa del estado del simulador (para análisis y gráficas)."""
t: float
mode: AutopilotMode
heading_deg: float
heading_setpoint_deg: float
heading_error_deg: float
rot_dps: float
sog_kn: float
outer_rudder_sp_deg: float
rudder_angle_deg: float
inner_pwm_pct: float
alarm_heading_lost: bool
alarm_off_course: bool
alarm_off_course_severe: bool
bno085_yaw_rate_dps: float = 0.0 # medición del giróscopo BNO085 (0 si desactivado)
@dataclass
class SimEvent:
"""Evento discreto en la simulación (engage, alarma, etc.)."""
t: float
kind: str # "engage", "disengage", "alarm", "ack"
detail: str = ""
# ---------------------------------------------------------------------------
# Simulador principal
# ---------------------------------------------------------------------------
class ESP32Simulator:
"""
Simulación software del firmware AR-Autopilot ESP32.
Reproduce el mismo comportamiento que el C++:
- Máquina de estados (STANDBY / HEADING_HOLD / DODGE / ...)
- Cascada PID a dos tasas (outer 10 Hz, inner 50 Hz)
- Física del buque (timón → ROT → rumbo)
- Banco de registros Modbus (holdings, coils, inputs, discretes)
Uso rápido::
sim = ESP32Simulator()
sim.inject_nmea(heading_deg=90.0, sog_kn=10.0)
sim.write_holding(0, 1) # MODE_REQUEST = HEADING_HOLD
sim.write_holding(1, 9000) # HEADING_SETPOINT_X100 = 90.00°
sim.write_coil(0, 1) # CMD_ENGAGE_REQUEST
sim.step(5.0) # enganche
sim.write_holding(1, 10000) # nuevo rumbo 100°
sim.step(120.0) # maniobra
print(f"Rumbo final: {sim.heading:.1f}°")
"""
def __init__(
self,
vessel_cfg: VesselHeadingConfig | None = None,
rudder_cfg: RudderDynamicsConfig | None = None,
outer_cfg: PidOuterConfig | None = None,
inner_cfg: PidInnerConfig | None = None,
) -> None:
# -- Física -------------------------------------------------------
# El VesselHeadingSimulator por defecto tiene rudder_response_gain=0.18,
# lo que da ~11 dps a 5° de timón a 10 kn — demasiado sensible para
# los ganadores PID de referencia (kd=1.2, rot_ff_gain=1.5).
# Con gain=0.004: ROT_ss = 0.004*10*35/0.8 = 1.75 dps al máximo timón.
# Esto equivale a un yate de 30 m con radio de giro ~165 m a 10 kn,
# consistente con la hoja de datos del brief y numéricamente estable
# con los coeficientes PID por defecto.
default_vessel = VesselHeadingConfig(rudder_response_gain=0.004)
self._vessel = VesselHeadingSimulator(vessel_cfg or default_vessel)
# En la simulación software eliminamos el deadband y el min_useful del
# actuador físico: no hay bomba hidráulica real, por lo que cualquier
# señal mueve el timón proporcionalmente. Esto evita el efecto bang-bang
# (snap a 12 % para señales pequeñas) que causa sobrepasamiento y
# oscilaciones en torno al setpoint. El hardware real conserva su propio
# deadband (deadband_pct=7 en RudderDynamicsConfig es para hardware).
default_rudder = RudderDynamicsConfig(deadband_pct=0.0, min_useful_pwm_pct=0.0)
self._rudder = RudderSimulator(rudder_cfg or default_rudder)
# -- Controladores PID -------------------------------------------
# En la simulación software el outer PID usa solo P + ROT feed-forward
# (kd=0, ki=0).
#
# kd=0: En firmware kd=1.2, que genera un pico de derivada enorme
# cuando el setpoint cambia de golpe (el error salta de 0° a Δrumbo
# en un solo tick). En hardware ese pico es inofensivo: el actuador
# hidráulico ya está saturado mecánicamente. En simulación dispara el
# anti-windup y lleva el integrador a ~7°, lo que obliga al outer PID
# a pasar 25 s recuperando la demanda. Con kd=0 el pico desaparece.
# El ROT feed-forward (rot_ff_gain=1.5) sigue amortiguando el
# sobrepasamiento de forma anticipada.
#
# ki=0: El integrador del firmware compensa derivas reales: corriente,
# viento, fricción asimétrica. Ninguna de estas perturbaciones existe
# en la simulación. Con ki≠0 el integrador acumula ~2-7° durante la
# maniobra de aproximación y luego tarda 100-500 s en vaciarse, lo
# que genera tiempos de asentamiento de más de 120 s incluso con
# maniobras pequeñas. Con ki=0 el bucle externo es P+FF puro: el
# error de estado estacionario es cero (el modelo es perfecto) y el
# asentamiento ocurre en 20-40 s.
# aw_gain=0.0 desactiva el anti-windup de saturación. Con ki=0 el
# integrador nunca acumula errores útiles, pero la corrección de
# anti-windup SÍ modifica el integrador cuando P+FF satura (maniobras
# grandes como 90° → 180°). Resultado: integrador llega a -35° y
# luego arrastra la salida del outer hacia abajo prematuramente, lo
# que convierte un giro de 90° en un proceso de 245 s en lugar de
# ~60 s. Con aw_gain=0 el integrador permanece en 0 siempre.
#
# deadband_deg=0.0: El deadband del firmware (0.5°) filtra el ruido
# del compass real. En simulación el compass es perfecto (sin ruido),
# por lo que el deadband solo introduce una zona muerta que impide al
# controlador P corregir errores < 0.5°. El resultado es que el error
# de estado estacionario converge a ~0.5° en vez de 0°, lo que hace
# que maniobras de 15° nunca entren en la banda de ±1° dentro del
# ventana de prueba. Con deadband=0: τ_outer ≈ 24 s, una maniobra
# de 15° asienta en ~65 s (< 90 s criterio TC-04) y el retorno de
# DODGE (18°) converge por debajo de 1° en ~70 s (< 120 s TC-07).
default_outer = PidOuterConfig(
base_kd=0.0, base_ki=0.0, aw_gain=0.0, deadband_deg=0.0
)
self._pid_outer = PidOuter(outer_cfg or default_outer)
# En el simulador software el inner PID necesita kp mucho mayor que
# en el firmware (kp=2.5). El firmware fue calibrado asumiendo que
# min_useful_pwm_pct=12 % actúa como "suelo" de señal, dando a la
# bomba hidráulica un mínimo de corriente para arrancar (~0.6 dps
# efectivos). Sin ese suelo el lazo cerrado tiene τ_cl = friction /
# (actuator_gain × kp) = 4 / (0.2 × 2.5) = 8 s — demasiado lento
# para que el outer PID vea una planta dinámica coherente.
# Con kp=20: τ_cl = 4/(0.2×20) = 1 s — rápido y sin bang-bang.
# deadband_pct=0 y min_useful=0 para eliminar la no-linealidad del
# actuador hidráulico que no existe en el modelo matemático.
default_inner = PidInnerConfig(kp=20.0, deadband_pct=0.0, min_useful_pwm_pct=0.0)
self._pid_inner = PidInner(inner_cfg or default_inner)
# -- Reloj de simulación -----------------------------------------
self._t: float = 0.0
self._inner_tick: int = 0
# -- Estado de la máquina de estados ------------------------------
self._mode: AutopilotMode = AutopilotMode.STANDBY
self._heading_setpoint_deg: float = 0.0
self._pre_dodge_heading_deg: float = 0.0
# -- Señales NMEA 2000 (el CAN virtual) --------------------------
self._nmea_heading_deg: float = 0.0
self._nmea_rot_dps: float = 0.0
self._nmea_sog_kn: float = 10.0
self._nmea_cog_deg: float = 0.0
self._nmea_xte_dm: float = 0.0
self._physics_heading_active: bool = True # physics → NMEA heading
self._last_nmea_update_t: float = -999.0 # para timeout
# -- Bancos de registros Modbus ----------------------------------
self._holdings: dict[int, int] = self._default_holdings()
self._coils: dict[int, int] = {i: 0 for i in range(8)}
self._inputs: dict[int, int] = self._default_inputs()
self._discretes: dict[int, int] = {i: 0 for i in range(32)}
self._prev_coils: dict[int, int] = {i: 0 for i in range(8)}
# -- Alarmas ------------------------------------------------------
self._alarm_heading_lost: bool = False
self._alarm_off_course: bool = False
self._alarm_off_course_severe: bool = False
# Bandera de "ya hemos convergido al setpoint al menos una vez".
# La alarma OFF_COURSE solo se dispara cuando el rumbo HA ESTADO
# cerca del setpoint y luego SE ALEJA. Esto evita disparar la alarma
# en maniobras con cambio grande inicial (p. ej. 90° → 180°): el error
# parte de 90°, que es > OFF_COURSE_SEVERE_DEG = 30°, pero el buque
# está *aproximándose* al setpoint, no desviándose. Los pilotos
# reales (Simrad, Raymarine) tienen el mismo comportamiento.
self._tracking_settled: bool = False
# -- Salidas de los controladores (usadas entre pasos) -----------
self._outer_rudder_sp: float = 0.0
self._inner_pwm_pct: float = 0.0
# -- Estado del mar (sea state) ----------------------------------
self._sea_beaufort: int = 0
self._sea_wave_amp: float = 0.0
self._sea_swell_amp: float = 0.0
self._sea_noise_amp: float = 0.0
self._sea_wind_bias: float = 0.0
self._sea_wave_period: float = 8.0
self._sea_rng: random.Random = random.Random(42)
# -- BNO085 IMU (yaw rate de alta frecuencia) --------------------
# Cuando está activo, el rot_ff_term del outer PID usa la medición
# del giróscopo BNO085 (50 Hz, ruido ~0.02 °/s) en lugar del ROT
# derivado del NMEA compass (10 Hz, mayor latencia).
# Esto reduce el overshoot en maniobras y el cabeceo en olas.
self._bno085_enabled: bool = False
self._bno085_noise_std_dps: float = 0.02 # spec BNO085: ~0.014 °/s
self._bno085_yaw_rate_dps: float = 0.0
# -- Registro de telemetría para análisis ------------------------
self.log: list[SimSnapshot] = []
self.events: list[SimEvent] = []
self._log_dt: float = 0.1 # cada 100 ms
self._next_log_t: float = 0.0
# -----------------------------------------------------------------------
# API pública — Condiciones iniciales NMEA
# -----------------------------------------------------------------------
def inject_nmea(
self,
*,
heading_deg: float | None = None,
rot_dps: float | None = None,
sog_kn: float | None = None,
cog_deg: float | None = None,
xte_dm: float | None = None,
) -> None:
"""
Inyecta datos de sensores NMEA 2000 (equivale al CAN bus).
Cuando se llama con heading_deg, también reinicia la física del buque
para que el simulador parta de ese rumbo. Las llamadas posteriores
(sin heading_deg) solo actualizan los campos indicados.
"""
if heading_deg is not None:
hd = heading_deg % 360.0
rot = rot_dps if rot_dps is not None else self._nmea_rot_dps
self._vessel.reset(heading_deg=hd, rate_of_turn_dps=rot)
self._nmea_heading_deg = hd
self._nmea_cog_deg = hd # sin leeway → COG ≈ heading
self._physics_heading_active = True
self._last_nmea_update_t = self._t
if rot_dps is not None:
self._nmea_rot_dps = rot_dps
if sog_kn is not None:
self._nmea_sog_kn = max(0.0, sog_kn)
if cog_deg is not None:
self._nmea_cog_deg = cog_deg % 360.0
if xte_dm is not None:
self._nmea_xte_dm = xte_dm
def disconnect_nmea_heading(self) -> None:
"""
Simula la pérdida del compass NMEA 2000 (cable roto, bus saturado…).
La física sigue avanzando pero el sensor deja de reportar.
Tras HEADING_TIMEOUT_S el firmware dispara ALARM_HEADING_LOST
y desengrana el autopiloto.
"""
self._physics_heading_active = False
# No actualizamos _last_nmea_update_t → el timeout empieza a contar
def reconnect_nmea_heading(self) -> None:
"""Restaura el compass NMEA 2000 (después de un fallo)."""
self._physics_heading_active = True
self._last_nmea_update_t = self._t
def set_sea_state(self, beaufort: int, *, seed: int = 42) -> None:
"""
Configura el estado del mar según la escala Beaufort (0 = calma, 8 = temporal).
Modela tres fuentes de perturbación de guiñada:
* **Ola dominante** — sinusoide al periodo de ola característico del estado.
Ejemplo B6 (mar gruesa): ≈ ±4° de oscilación de rumbo a 8 s de periodo.
* **Mar de fondo (swell)** — sinusoide de periodo 3× más largo que la ola.
* **Ruido blanco** — turbulencia residual aleatoria (random walk en ROT).
* **Viento de costado (weather helm)** — deriva lenta, periodo 120 s.
Todos se inyectan como ``external_yaw_torque`` en el
``VesselHeadingSimulator`` (campo ya existente, °/s²).
Seatate esperado de oscilación de rumbo (barco 30 m, 10 kn, con AP activo):
B0=0° B3≈±1° B4≈±3° B5≈±5° B6≈±8° B7≈±13°
Args:
beaufort: Nivel Beaufort (08). Valores fuera de rango se clamean.
seed: Semilla del RNG para reproducibilidad (default 42).
"""
beaufort = max(0, min(8, beaufort))
row = _BEAUFORT_TABLE[beaufort]
self._sea_beaufort = beaufort
self._sea_wave_amp = row[0]
self._sea_swell_amp = row[1]
self._sea_noise_amp = row[2]
self._sea_wind_bias = row[3]
self._sea_wave_period = row[4]
self._sea_rng = random.Random(seed)
def tune_response(
self,
*,
rudder_kp: float | None = None,
counter_rudder: float | None = None,
max_rudder_deg: float | None = None,
) -> None:
"""
Ajusta los parámetros del outer PID en tiempo real (simula los knobs del piloto).
Equivalente a los controles físicos de pilotos clásicos:
* ``rudder_kp`` → knob **"RUDDER"** (Robertson/Simrad): ganancia proporcional.
Más ganancia = más timón por grado de error → respuesta más agresiva.
* ``counter_rudder`` → knob **"COUNTER RUDDER"** / "Yaw Damping": feed-forward de ROT.
Más contra-timón = mejor amortiguamiento del sobrepasamiento y del oleaje.
* ``max_rudder_deg`` → límite de timón de trabajo (no el tope mecánico de 35°).
En mal tiempo se sube de ~20° a 2530° para dar más autoridad.
Los cambios tienen efecto inmediato en el próximo outer-tick (10 Hz).
Args:
rudder_kp: Nuevo Kp del outer PID (típico rango 0.54.0).
counter_rudder: Nuevo rot_ff_gain (típico rango 0.53.0).
max_rudder_deg: Nuevo límite de trabajo del timón en grados (típico 1535°).
"""
if rudder_kp is not None:
self._pid_outer.config.base_kp = float(rudder_kp)
if counter_rudder is not None:
self._pid_outer.config.rot_ff_gain = float(counter_rudder)
if max_rudder_deg is not None:
self._pid_outer.config.max_rudder_deg = float(max_rudder_deg)
def enable_bno085(self, *, noise_std_dps: float = 0.02) -> None:
"""
Activa el BNO085 como fuente de yaw rate para el outer PID.
Con BNO085 activo el ``rot_ff_term`` del outer PID usa la medición
del giróscopo (50 Hz, ruido típico ~0.014 °/s) en lugar del ROT
derivado del bus NMEA 2000 (10 Hz, mayor latencia de bus + GPS).
Diferencia clave respecto al ROT por NMEA:
- Latencia: BNO085 ≈ 4 ms vs NMEA ROT ≈ 100-200 ms
- Ruido: BNO085 ≈ 0.02 °/s vs NMEA ROT ≈ 0.1-0.5 °/s
- Frecuencia: BNO085 muestrea a 250 Hz (gyro), outer PID consume a 10 Hz
En mal tiempo (B4-B5) el BNO085 permite que el ``rot_ff_term``
reaccione a los picos de guiñada generados por las olas antes de
que el error de heading se acumule — equivale a subir el knob
"COUNTER RUDDER" manteniendo la ganancia proporcional estable.
Args:
noise_std_dps: Desviación estándar del ruido del giróscopo en °/s.
BNO085 spec: ~0.014 °/s típico. Default 0.02 °/s (conservador).
"""
self._bno085_enabled = True
self._bno085_noise_std_dps = float(noise_std_dps)
def disable_bno085(self) -> None:
"""Desactiva el BNO085; el outer PID vuelve a usar ROT del NMEA 2000."""
self._bno085_enabled = False
self._bno085_yaw_rate_dps = 0.0
# -----------------------------------------------------------------------
# API pública — Interfaz Modbus
# -----------------------------------------------------------------------
def write_holding(self, addr: int, value: int) -> None:
"""Escribe un holding register (comando del PC → ESP32)."""
self._holdings[addr] = int(value) & 0xFFFF
def write_coil(self, addr: int, value: int) -> None:
"""Escribe una coil (pulso de comando del PC → ESP32)."""
self._coils[addr] = 1 if value else 0
def read_input(self, addr: int) -> int:
"""Lee un input register (telemetría ESP32 → PC, solo lectura)."""
return self._inputs.get(addr, 0)
def read_discrete(self, addr: int) -> int:
"""Lee un discrete input (flag de estado ESP32 → PC, solo lectura)."""
return self._discretes.get(addr, 0)
def read_holding(self, addr: int) -> int:
"""Lee un holding register (para debug / verificación)."""
return self._holdings.get(addr, 0)
# -----------------------------------------------------------------------
# API pública — Control de la simulación
# -----------------------------------------------------------------------
def step(self, duration_s: float, log_dt: float = 0.1) -> None:
"""
Avanza la simulación ``duration_s`` segundos.
La física corre a 50 Hz. El outer PID se ejecuta cada 5 ticks (10 Hz).
El inner PID corre en cada tick (50 Hz).
Args:
duration_s: Tiempo de simulación a avanzar (segundos).
log_dt: Intervalo entre snapshots de telemetría (segundos).
"""
self._log_dt = log_dt
n_ticks = max(1, round(duration_s / DT_INNER))
for _ in range(n_ticks):
self._tick()
# -- Propiedades de conveniencia ----------------------------------------
@property
def t(self) -> float:
"""Tiempo de simulación actual (segundos)."""
return self._t
@property
def mode(self) -> AutopilotMode:
return self._mode
@property
def heading(self) -> float:
"""Rumbo actual del buque [0..360)."""
return self._vessel.state.heading_deg
@property
def rot(self) -> float:
"""Rate of turn actual (deg/s)."""
return self._vessel.state.rate_of_turn_dps
@property
def rudder(self) -> float:
"""Ángulo actual del timón (grados)."""
return self._rudder.state.angle_deg
@property
def engaged(self) -> bool:
return self._mode != AutopilotMode.STANDBY
@property
def any_alarm(self) -> bool:
return (self._alarm_heading_lost
or self._alarm_off_course
or self._alarm_off_course_severe)
# -----------------------------------------------------------------------
# Internos — tick principal (50 Hz)
# -----------------------------------------------------------------------
def _tick(self) -> None:
# 1. Procesar coils (flancos de subida)
self._process_coils()
# 2. Leer cambios en holdings (setpoint dinámico)
self._sync_from_holdings()
# 3. Outer loop @ 10 Hz
if self._inner_tick % INNER_PER_OUTER == 0:
self._run_outer_loop()
# 4. Inner loop @ 50 Hz
self._run_inner_loop()
# 5. Física: avanzar la planta con el PWM actual
self._run_physics()
# 6. Evaluar alarmas con el estado post-física
self._check_alarms()
# 7. Espejo a los registros Modbus
self._sync_to_registers()
# 8. Capturar snapshot si toca
if self._t >= self._next_log_t:
self._capture_snapshot()
self._next_log_t = self._t + self._log_dt
# 9. Avanzar reloj
self._t += DT_INNER
self._inner_tick += 1
# -----------------------------------------------------------------------
# Internos — procesamiento de coils (flancos de subida)
# -----------------------------------------------------------------------
def _process_coils(self) -> None:
if self._coils[0] and not self._prev_coils[0]:
self._cmd_engage()
if self._coils[1] and not self._prev_coils[1]:
self._cmd_disengage()
if self._coils[2] and not self._prev_coils[2]:
self._cmd_ack_alarms()
self._prev_coils = dict(self._coils)
# Las coils de comando son one-shot: se limpian tras procesar
self._coils[0] = 0
self._coils[1] = 0
self._coils[2] = 0
def _cmd_engage(self) -> None:
"""Intenta enganchar en el modo solicitado por MODE_REQUEST."""
requested = AutopilotMode(self._holdings[0] & 0x0F)
# Interlock: necesita rumbo válido para modos de control de rumbo
heading_age = self._t - self._last_nmea_update_t
heading_valid = heading_age <= HEADING_TIMEOUT_S
if requested in (AutopilotMode.HEADING_HOLD,
AutopilotMode.TRUE_COURSE,
AutopilotMode.TRACK_KEEPING):
if not heading_valid:
self._add_event("engage_refused", "Sin rumbo NMEA válido")
return # rechazar enganche
if requested == AutopilotMode.HEADING_HOLD:
raw_sp = self._holdings[1]
if raw_sp == 0:
# Auto-captura del rumbo actual si el setpoint es cero
self._heading_setpoint_deg = self._nmea_heading_deg
self._holdings[1] = int(self._nmea_heading_deg * 100) % 36000
else:
self._heading_setpoint_deg = raw_sp * 0.01
self._pid_outer.reset(heading_deg=self._nmea_heading_deg)
self._pid_inner.reset(measured_deg=self._rudder.state.angle_deg)
self._mode = AutopilotMode.HEADING_HOLD
# Resetear el flag de convergencia: el rumbo parte desde lejos del
# nuevo setpoint; OFF_COURSE solo disparará cuando hayamos llegado
# cerca y luego nos alejemos.
self._tracking_settled = False
self._add_event("engage", f"HEADING_HOLD sp={self._heading_setpoint_deg:.1f}°")
elif requested == AutopilotMode.DODGE:
self._pre_dodge_heading_deg = self._heading_setpoint_deg
raw_offset = self._holdings[8]
if raw_offset > 0x7FFF:
raw_offset -= 0x10000
offset_deg = raw_offset * 0.01
self._heading_setpoint_deg = (
self._heading_setpoint_deg + offset_deg
) % 360.0
self._mode = AutopilotMode.DODGE
self._add_event("engage", f"DODGE offset={offset_deg:.1f}°")
elif requested == AutopilotMode.STANDBY:
self._do_disengage()
def _cmd_disengage(self) -> None:
self._do_disengage()
def _cmd_ack_alarms(self) -> None:
self._alarm_heading_lost = False
self._alarm_off_course = False
self._alarm_off_course_severe = False
self._add_event("ack", "Alarmas reconocidas")
def _do_disengage(self) -> None:
if self._mode != AutopilotMode.STANDBY:
self._add_event("disengage", f"Desde {self._mode.name}")
self._mode = AutopilotMode.STANDBY
self._outer_rudder_sp = 0.0
self._inner_pwm_pct = 0.0
self._pid_outer.reset()
self._pid_inner.reset()
self._tracking_settled = False
# -----------------------------------------------------------------------
# Internos — sincronización desde holdings
# -----------------------------------------------------------------------
def _sync_from_holdings(self) -> None:
"""En HEADING_HOLD, el setpoint puede cambiar en caliente (knob)."""
if self._mode == AutopilotMode.HEADING_HOLD:
raw = self._holdings[1]
new_sp = raw * 0.01
# Si el operador ha cambiado el setpoint significativamente
# (> 1°) se resetea el flag de convergencia: el buque está
# ahora aproximándose al nuevo rumbo, no desviándose del
# anterior. Esto evita que OFF_COURSE dispare durante la
# maniobra de aproximación al nuevo setpoint.
if abs(heading_error_deg(new_sp, self._heading_setpoint_deg)) > 1.0:
self._tracking_settled = False
self._heading_setpoint_deg = new_sp
# -----------------------------------------------------------------------
# Internos — outer loop (10 Hz)
# -----------------------------------------------------------------------
def _run_outer_loop(self) -> None:
if self._mode == AutopilotMode.STANDBY:
self._outer_rudder_sp = 0.0
return
heading_age = self._t - self._last_nmea_update_t
if heading_age > HEADING_TIMEOUT_S:
# No hay rumbo: salida cero (el firmware disengrana, esto lo hacemos
# en _check_alarms, pero la salida ya no es válida)
self._outer_rudder_sp = 0.0
return
# Velocidad para gain scheduling: primero del holding, luego NMEA
sp_speed_raw = self._holdings[25] # PID_OUTER_SPEED_KN_REQ_X10
sog_kn = (sp_speed_raw * 0.1) if sp_speed_raw > 0 else self._nmea_sog_kn
# BNO085 activo → usa yaw rate del giróscopo (50 Hz, baja latencia).
# BNO085 inactivo → usa ROT del bus NMEA 2000 (10 Hz, GPS/compass).
rot_for_pid = (self._bno085_yaw_rate_dps if self._bno085_enabled
else self._nmea_rot_dps)
self._outer_rudder_sp = self._pid_outer.step(
heading_setpoint_deg=self._heading_setpoint_deg,
heading_measured_deg=self._nmea_heading_deg,
rate_of_turn_dps=rot_for_pid,
speed_kn=sog_kn,
allowed=True,
)
# -----------------------------------------------------------------------
# Internos — inner loop (50 Hz)
# -----------------------------------------------------------------------
def _run_inner_loop(self) -> None:
self._inner_pwm_pct = self._pid_inner.step(
setpoint_deg=self._outer_rudder_sp,
measured_deg=self._rudder.state.angle_deg,
allowed=self._mode != AutopilotMode.STANDBY,
)
# -----------------------------------------------------------------------
# Internos — física (50 Hz)
# -----------------------------------------------------------------------
def _compute_wave_torque(self) -> float:
"""
Calcula el torque externo de guiñada del oleaje (°/s²) para el tick actual.
Compone cuatro componentes:
- Ola dominante: sinusoide al periodo característico del estado de mar.
- Mar de fondo: sinusoide a periodo 3.2× más largo.
- Ruido blanco: perturbación aleatoria (Gaussiana, proporcional a √dt).
- Weather helm: deriva de viento, sinusoide de 120 s de periodo.
Retorna 0.0 si el mar está en calma (beaufort == 0).
"""
if self._sea_beaufort == 0:
return 0.0
t = self._t
T = self._sea_wave_period
wave = self._sea_wave_amp * math.sin(2.0 * math.pi * t / T)
swell = self._sea_swell_amp * math.sin(2.0 * math.pi * t / (T * 3.2))
# Ruido blanco: la std por tick se escala con √dt para mantener
# la densidad espectral de potencia constante en frecuencia.
noise = self._sea_noise_amp * self._sea_rng.gauss(0.0, 1.0) * math.sqrt(DT_INNER)
wind = self._sea_wind_bias * math.sin(2.0 * math.pi * t / 120.0)
return wave + swell + noise + wind
def _run_physics(self) -> None:
# Timón responde al PWM
self._rudder.step(dt=DT_INNER, pwm_pct=self._inner_pwm_pct)
# Perturbación de mar: actualizar torque externo del buque cada tick
self._vessel.config.external_yaw_torque = self._compute_wave_torque()
# Buque responde al ángulo real del timón (y al torque del oleaje)
self._vessel.step(
dt=DT_INNER,
rudder_deg=self._rudder.state.angle_deg,
speed_kn=self._nmea_sog_kn,
)
# La física actualiza el "sensor NMEA" si está conectado
if self._physics_heading_active:
self._nmea_heading_deg = self._vessel.state.heading_deg
self._nmea_rot_dps = self._vessel.state.rate_of_turn_dps
# COG = heading (sin corriente en este modelo simple)
self._nmea_cog_deg = self._nmea_heading_deg
self._last_nmea_update_t = self._t
# BNO085: muestrea yaw rate a 50 Hz (mismo ritmo que el inner loop).
# El giróscopo del BNO085 tiene latencia ~4ms y ruido ~0.02 °/s,
# mucho mejor que el ROT calculado desde el GPS/compass NMEA (100-200ms).
# En el outer PID (10 Hz) esto significa que rot_ff_term reacciona a
# picos de guiñada por olas ~10x más rápido que con ROT por NMEA.
if self._bno085_enabled:
noise = (self._sea_rng.gauss(0.0, self._bno085_noise_std_dps)
if self._bno085_noise_std_dps > 0.0 else 0.0)
self._bno085_yaw_rate_dps = self._vessel.state.rate_of_turn_dps + noise
# -----------------------------------------------------------------------
# Internos — alarmas
# -----------------------------------------------------------------------
def _check_alarms(self) -> None:
# HEADING_LOST: timeout del sensor NMEA
heading_age = self._t - self._last_nmea_update_t
if heading_age > HEADING_TIMEOUT_S:
if not self._alarm_heading_lost:
self._alarm_heading_lost = True
self._add_event("alarm", "ALARM_HEADING_LOST")
if self._mode != AutopilotMode.STANDBY:
self._do_disengage()
# OFF_COURSE (solo cuando enganchado)
if self._mode != AutopilotMode.STANDBY:
err = abs(heading_error_deg(
self._heading_setpoint_deg, self._nmea_heading_deg
))
# Marcar "primera convergencia" cuando el error cae bajo 5°.
# Hasta ese momento el buque está en aproximación inicial al
# setpoint — no se consideran desviaciones (sería falsa alarma).
if err < 5.0:
self._tracking_settled = True
# OFF_COURSE solo se evalúa cuando ya hemos convergido al menos
# una vez. Esto reproduce el comportamiento de pilotos reales
# (Simrad, Raymarine): la alarma es para "nos hemos desviado del
# rumbo", no para "el operador acaba de ordenar un cambio grande".
if self._tracking_settled:
if err > OFF_COURSE_SEVERE_DEG:
if not self._alarm_off_course_severe:
self._alarm_off_course_severe = True
self._alarm_off_course = True
self._add_event("alarm", f"ALARM_OFF_COURSE_SEVERE err={err:.1f}°")
self._do_disengage()
elif err > OFF_COURSE_WARN_DEG:
if not self._alarm_off_course:
self._alarm_off_course = True
self._add_event("alarm", f"ALARM_OFF_COURSE err={err:.1f}°")
else:
if self._alarm_off_course and not self._alarm_off_course_severe:
self._alarm_off_course = False
# -----------------------------------------------------------------------
# Internos — espejo a registros Modbus
# -----------------------------------------------------------------------
def _sync_to_registers(self) -> None:
uptime = int(self._t)
self._inputs[4] = uptime & 0xFFFF
self._inputs[5] = (uptime >> 16) & 0xFFFF
self._inputs[6] = int(self._mode)
# Timón
self._inputs[16] = _u16(int(self._rudder.state.angle_deg * 100))
# Rumbo / ROT / edad
self._inputs[24] = _u16(int(self._nmea_heading_deg * 100))
self._inputs[25] = _s16_to_u16(int(self._nmea_rot_dps * 100))
age_ms = int((self._t - self._last_nmea_update_t) * 1000)
self._inputs[26] = min(60000, max(0, age_ms))
# COG / SOG / XTE
self._inputs[60] = _u16(int(self._nmea_cog_deg * 100))
self._inputs[61] = _u16(int(self._nmea_sog_kn * 10))
self._inputs[63] = _s16_to_u16(int(self._nmea_xte_dm))
# Telemetría inner PID
self._inputs[40] = _s16_to_u16(int(self._outer_rudder_sp * 100))
self._inputs[41] = _s16_to_u16(int(self._inner_pwm_pct * 100))
inner_err = self._outer_rudder_sp - self._rudder.state.angle_deg
self._inputs[42] = _s16_to_u16(int(inner_err * 100))
# Telemetría outer PID
self._inputs[50] = _u16(int(self._heading_setpoint_deg * 100))
self._inputs[51] = _s16_to_u16(int(self._outer_rudder_sp * 100))
outer_err = heading_error_deg(
self._heading_setpoint_deg, self._nmea_heading_deg
)
self._inputs[52] = _s16_to_u16(int(outer_err * 100))
self._inputs[53] = _u16(int(self._nmea_sog_kn * 10))
# Discretos
self._discretes[0] = 1 if self._mode != AutopilotMode.STANDBY else 0
self._discretes[8] = self._discretes[0]
self._discretes[16] = 1 if self._alarm_off_course else 0
self._discretes[17] = 1 if self._alarm_off_course_severe else 0
self._discretes[19] = 1 if self._alarm_heading_lost else 0
self._discretes[25] = 1 if self.any_alarm else 0
# -----------------------------------------------------------------------
# Internos — telemetría
# -----------------------------------------------------------------------
def _capture_snapshot(self) -> None:
hdg = self._nmea_heading_deg
sp = self._heading_setpoint_deg
self.log.append(SimSnapshot(
t=self._t,
mode=self._mode,
heading_deg=hdg,
heading_setpoint_deg=sp,
heading_error_deg=heading_error_deg(sp, hdg),
rot_dps=self._nmea_rot_dps,
sog_kn=self._nmea_sog_kn,
outer_rudder_sp_deg=self._outer_rudder_sp,
rudder_angle_deg=self._rudder.state.angle_deg,
inner_pwm_pct=self._inner_pwm_pct,
alarm_heading_lost=self._alarm_heading_lost,
alarm_off_course=self._alarm_off_course,
alarm_off_course_severe=self._alarm_off_course_severe,
bno085_yaw_rate_dps=self._bno085_yaw_rate_dps,
))
def _add_event(self, kind: str, detail: str = "") -> None:
self.events.append(SimEvent(t=self._t, kind=kind, detail=detail))
# -----------------------------------------------------------------------
# Internos — valores por defecto
# -----------------------------------------------------------------------
def _default_holdings(self) -> dict[int, int]:
h: dict[int, int] = {i: 0 for i in range(40)}
h[0] = 0 # MODE_REQUEST = STANDBY
h[1] = 0 # HEADING_SETPOINT_X100
h[2] = 80 # BRIGHTNESS_PCT
h[3] = 70 # ALARM_VOLUME_PCT
h[25] = 100 # PID_OUTER_SPEED_KN_REQ_X10 = 10.0 kn
h[33] = 50 # XTE_GAIN_X1000 = 0.050 deg/m
h[34] = 2000 # XTE_MAX_CORRECTION_X100 = 20.0°
return h
def _default_inputs(self) -> dict[int, int]:
i: dict[int, int] = {k: 0 for k in range(80)}
i[0] = 0 # FW_VERSION_MAJOR
i[1] = 4 # FW_VERSION_MINOR (sprint 4)
i[2] = 0 # FW_VERSION_PATCH
i[3] = 0 # SCHEMA_VERSION
i[18] = 1 # RUDDER_VALID (el sim siempre es válido)
return i
# ---------------------------------------------------------------------------
# Helpers de codificación de registros (Modbus es big-endian uint16)
# ---------------------------------------------------------------------------
def _u16(x: int) -> int:
"""Clamp a uint16 [0..65535]."""
return max(0, min(0xFFFF, x)) & 0xFFFF
def _s16_to_u16(x: int) -> int:
"""Convierte int16 con signo a representación uint16 (complemento a 2)."""
if x < 0:
return (x + 0x10000) & 0xFFFF
return x & 0xFFFF