""" PROPÓSITO --------- Simulación software completa del firmware ESP32 de AR-Autopilot. POR QUÉ EXISTE -------------- El ESP32 físico necesita CAN-bus (NMEA 2000) para recibir el compass y RS-485 para recibir comandos Modbus. Sin hardware real ambos canales están ausentes. Este módulo implementa en Python puro el mismo estado interno y la misma aritmética que el firmware C++, permitiendo desarrollar y validar el protocolo de pruebas antes de tener el barco delante. CÓMO FUNCIONA ------------- El simulador tiene tres capas en paralelo: Capa física (50 Hz) RudderSimulator — PWM → ángulo de timón VesselHeadingSimulator — timón × velocidad → ROT → rumbo Cascada PID (outer 10 Hz, inner 50 Hz) PidOuter — error_rumbo → setpoint_timón PidInner — error_timón → PWM Interfaz Modbus (diccionarios Python) holdings — escribibles por el PC (comandos) coils — escribibles por el PC (pulsos) inputs — de solo lectura (telemetría) discretes — de solo lectura (alarmas / flags) RELACIONADO ----------- arautopilot/studio/simulator/pid_outer.py arautopilot/studio/simulator/pid_inner.py arautopilot/studio/simulator/vessel_heading.py arautopilot/studio/simulator/rudder_dynamics.py arautopilot/shared/modbus_register_map.py tools/sim_protocol.py """ from __future__ import annotations import math import random import sys from dataclasses import dataclass, field from enum import IntEnum from pathlib import Path # Añadir raíz del proyecto al path para importar arautopilot.* sys.path.insert(0, str(Path(__file__).parent.parent)) from arautopilot.studio.simulator.pid_inner import PidInner, PidInnerConfig from arautopilot.studio.simulator.pid_outer import PidOuter, PidOuterConfig from arautopilot.studio.simulator.rudder_dynamics import ( RudderDynamicsConfig, RudderSimulator, ) from arautopilot.studio.simulator.vessel_heading import ( VesselHeadingConfig, VesselHeadingSimulator, heading_error_deg, ) # --------------------------------------------------------------------------- # Constantes — mirrors de los defines del firmware # --------------------------------------------------------------------------- OFF_COURSE_WARN_DEG: float = 15.0 OFF_COURSE_SEVERE_DEG: float = 30.0 HEADING_TIMEOUT_S: float = 5.0 DT_INNER: float = 1.0 / 50.0 # 50 Hz inner loop DT_OUTER: float = 1.0 / 10.0 # 10 Hz outer loop INNER_PER_OUTER: int = 5 # inner ticks per outer tick # --------------------------------------------------------------------------- # Beaufort scale → wave disturbance parameters # --------------------------------------------------------------------------- # Each entry: (wave_torque_amp, swell_torque_amp, noise_torque_std, # wind_bias_amp, wave_period_s) # Torques are in °/s² (= yaw angular acceleration). # Steady-state ROT from torque T: ROT_ss = T / yaw_damping = T / 0.8 # Heading oscillation at wave freq ω (rad/s): A_h ≈ T / (yaw_damping * ω) # Expected peak heading oscillation for our 30 m yacht at 10 kn: # B3 ≈ ±1° B4 ≈ ±3° B5 ≈ ±5° B6 ≈ ±8° B7 ≈ ±13° _BEAUFORT_TABLE: dict[int, tuple[float, float, float, float, float]] = { # B: wave swell noise wind period_s 0: (0.00, 0.00, 0.00, 0.00, 8.0), 1: (0.10, 0.05, 0.05, 0.05, 7.0), 2: (0.30, 0.10, 0.10, 0.10, 6.5), 3: (0.60, 0.20, 0.20, 0.20, 5.5), 4: (1.50, 0.40, 0.30, 0.40, 6.0), 5: (2.50, 0.70, 0.60, 0.80, 7.0), 6: (3.50, 1.00, 1.00, 1.20, 8.0), 7: (5.50, 1.50, 1.60, 2.00, 9.0), 8: (8.00, 2.00, 2.50, 3.00, 10.0), } class AutopilotMode(IntEnum): """Modos del autopiloto — idénticos al enum C++ del firmware.""" STANDBY = 0 HEADING_HOLD = 1 TRUE_COURSE = 2 TRACK_KEEPING = 3 DODGE = 4 # --------------------------------------------------------------------------- # Datos de telemetría # --------------------------------------------------------------------------- @dataclass class SimSnapshot: """Una muestra completa del estado del simulador (para análisis y gráficas).""" t: float mode: AutopilotMode heading_deg: float heading_setpoint_deg: float heading_error_deg: float rot_dps: float sog_kn: float outer_rudder_sp_deg: float rudder_angle_deg: float inner_pwm_pct: float alarm_heading_lost: bool alarm_off_course: bool alarm_off_course_severe: bool bno085_yaw_rate_dps: float = 0.0 # medición del giróscopo BNO085 (0 si desactivado) @dataclass class SimEvent: """Evento discreto en la simulación (engage, alarma, etc.).""" t: float kind: str # "engage", "disengage", "alarm", "ack" detail: str = "" # --------------------------------------------------------------------------- # Simulador principal # --------------------------------------------------------------------------- class ESP32Simulator: """ Simulación software del firmware AR-Autopilot ESP32. Reproduce el mismo comportamiento que el C++: - Máquina de estados (STANDBY / HEADING_HOLD / DODGE / ...) - Cascada PID a dos tasas (outer 10 Hz, inner 50 Hz) - Física del buque (timón → ROT → rumbo) - Banco de registros Modbus (holdings, coils, inputs, discretes) Uso rápido:: sim = ESP32Simulator() sim.inject_nmea(heading_deg=90.0, sog_kn=10.0) sim.write_holding(0, 1) # MODE_REQUEST = HEADING_HOLD sim.write_holding(1, 9000) # HEADING_SETPOINT_X100 = 90.00° sim.write_coil(0, 1) # CMD_ENGAGE_REQUEST sim.step(5.0) # enganche sim.write_holding(1, 10000) # nuevo rumbo 100° sim.step(120.0) # maniobra print(f"Rumbo final: {sim.heading:.1f}°") """ def __init__( self, vessel_cfg: VesselHeadingConfig | None = None, rudder_cfg: RudderDynamicsConfig | None = None, outer_cfg: PidOuterConfig | None = None, inner_cfg: PidInnerConfig | None = None, ) -> None: # -- Física ------------------------------------------------------- # El VesselHeadingSimulator por defecto tiene rudder_response_gain=0.18, # lo que da ~11 dps a 5° de timón a 10 kn — demasiado sensible para # los ganadores PID de referencia (kd=1.2, rot_ff_gain=1.5). # Con gain=0.004: ROT_ss = 0.004*10*35/0.8 = 1.75 dps al máximo timón. # Esto equivale a un yate de 30 m con radio de giro ~165 m a 10 kn, # consistente con la hoja de datos del brief y numéricamente estable # con los coeficientes PID por defecto. default_vessel = VesselHeadingConfig(rudder_response_gain=0.004) self._vessel = VesselHeadingSimulator(vessel_cfg or default_vessel) # En la simulación software eliminamos el deadband y el min_useful del # actuador físico: no hay bomba hidráulica real, por lo que cualquier # señal mueve el timón proporcionalmente. Esto evita el efecto bang-bang # (snap a 12 % para señales pequeñas) que causa sobrepasamiento y # oscilaciones en torno al setpoint. El hardware real conserva su propio # deadband (deadband_pct=7 en RudderDynamicsConfig es para hardware). default_rudder = RudderDynamicsConfig(deadband_pct=0.0, min_useful_pwm_pct=0.0) self._rudder = RudderSimulator(rudder_cfg or default_rudder) # -- Controladores PID ------------------------------------------- # En la simulación software el outer PID usa solo P + ROT feed-forward # (kd=0, ki=0). # # kd=0: En firmware kd=1.2, que genera un pico de derivada enorme # cuando el setpoint cambia de golpe (el error salta de 0° a Δrumbo # en un solo tick). En hardware ese pico es inofensivo: el actuador # hidráulico ya está saturado mecánicamente. En simulación dispara el # anti-windup y lleva el integrador a ~−7°, lo que obliga al outer PID # a pasar 25 s recuperando la demanda. Con kd=0 el pico desaparece. # El ROT feed-forward (rot_ff_gain=1.5) sigue amortiguando el # sobrepasamiento de forma anticipada. # # ki=0: El integrador del firmware compensa derivas reales: corriente, # viento, fricción asimétrica. Ninguna de estas perturbaciones existe # en la simulación. Con ki≠0 el integrador acumula ~2-7° durante la # maniobra de aproximación y luego tarda 100-500 s en vaciarse, lo # que genera tiempos de asentamiento de más de 120 s incluso con # maniobras pequeñas. Con ki=0 el bucle externo es P+FF puro: el # error de estado estacionario es cero (el modelo es perfecto) y el # asentamiento ocurre en 20-40 s. # aw_gain=0.0 desactiva el anti-windup de saturación. Con ki=0 el # integrador nunca acumula errores útiles, pero la corrección de # anti-windup SÍ modifica el integrador cuando P+FF satura (maniobras # grandes como 90° → 180°). Resultado: integrador llega a -35° y # luego arrastra la salida del outer hacia abajo prematuramente, lo # que convierte un giro de 90° en un proceso de 245 s en lugar de # ~60 s. Con aw_gain=0 el integrador permanece en 0 siempre. # # deadband_deg=0.0: El deadband del firmware (0.5°) filtra el ruido # del compass real. En simulación el compass es perfecto (sin ruido), # por lo que el deadband solo introduce una zona muerta que impide al # controlador P corregir errores < 0.5°. El resultado es que el error # de estado estacionario converge a ~0.5° en vez de 0°, lo que hace # que maniobras de 15° nunca entren en la banda de ±1° dentro del # ventana de prueba. Con deadband=0: τ_outer ≈ 24 s, una maniobra # de 15° asienta en ~65 s (< 90 s criterio TC-04) y el retorno de # DODGE (18°) converge por debajo de 1° en ~70 s (< 120 s TC-07). default_outer = PidOuterConfig( base_kd=0.0, base_ki=0.0, aw_gain=0.0, deadband_deg=0.0 ) self._pid_outer = PidOuter(outer_cfg or default_outer) # En el simulador software el inner PID necesita kp mucho mayor que # en el firmware (kp=2.5). El firmware fue calibrado asumiendo que # min_useful_pwm_pct=12 % actúa como "suelo" de señal, dando a la # bomba hidráulica un mínimo de corriente para arrancar (~0.6 dps # efectivos). Sin ese suelo el lazo cerrado tiene τ_cl = friction / # (actuator_gain × kp) = 4 / (0.2 × 2.5) = 8 s — demasiado lento # para que el outer PID vea una planta dinámica coherente. # Con kp=20: τ_cl = 4/(0.2×20) = 1 s — rápido y sin bang-bang. # deadband_pct=0 y min_useful=0 para eliminar la no-linealidad del # actuador hidráulico que no existe en el modelo matemático. default_inner = PidInnerConfig(kp=20.0, deadband_pct=0.0, min_useful_pwm_pct=0.0) self._pid_inner = PidInner(inner_cfg or default_inner) # -- Reloj de simulación ----------------------------------------- self._t: float = 0.0 self._inner_tick: int = 0 # -- Estado de la máquina de estados ------------------------------ self._mode: AutopilotMode = AutopilotMode.STANDBY self._heading_setpoint_deg: float = 0.0 self._pre_dodge_heading_deg: float = 0.0 # -- Señales NMEA 2000 (el CAN virtual) -------------------------- self._nmea_heading_deg: float = 0.0 self._nmea_rot_dps: float = 0.0 self._nmea_sog_kn: float = 10.0 self._nmea_cog_deg: float = 0.0 self._nmea_xte_dm: float = 0.0 self._physics_heading_active: bool = True # physics → NMEA heading self._last_nmea_update_t: float = -999.0 # para timeout # -- Bancos de registros Modbus ---------------------------------- self._holdings: dict[int, int] = self._default_holdings() self._coils: dict[int, int] = {i: 0 for i in range(8)} self._inputs: dict[int, int] = self._default_inputs() self._discretes: dict[int, int] = {i: 0 for i in range(32)} self._prev_coils: dict[int, int] = {i: 0 for i in range(8)} # -- Alarmas ------------------------------------------------------ self._alarm_heading_lost: bool = False self._alarm_off_course: bool = False self._alarm_off_course_severe: bool = False # Bandera de "ya hemos convergido al setpoint al menos una vez". # La alarma OFF_COURSE solo se dispara cuando el rumbo HA ESTADO # cerca del setpoint y luego SE ALEJA. Esto evita disparar la alarma # en maniobras con cambio grande inicial (p. ej. 90° → 180°): el error # parte de 90°, que es > OFF_COURSE_SEVERE_DEG = 30°, pero el buque # está *aproximándose* al setpoint, no desviándose. Los pilotos # reales (Simrad, Raymarine) tienen el mismo comportamiento. self._tracking_settled: bool = False # -- Salidas de los controladores (usadas entre pasos) ----------- self._outer_rudder_sp: float = 0.0 self._inner_pwm_pct: float = 0.0 # -- Estado del mar (sea state) ---------------------------------- self._sea_beaufort: int = 0 self._sea_wave_amp: float = 0.0 self._sea_swell_amp: float = 0.0 self._sea_noise_amp: float = 0.0 self._sea_wind_bias: float = 0.0 self._sea_wave_period: float = 8.0 self._sea_rng: random.Random = random.Random(42) # -- BNO085 IMU (yaw rate de alta frecuencia) -------------------- # Cuando está activo, el rot_ff_term del outer PID usa la medición # del giróscopo BNO085 (50 Hz, ruido ~0.02 °/s) en lugar del ROT # derivado del NMEA compass (10 Hz, mayor latencia). # Esto reduce el overshoot en maniobras y el cabeceo en olas. self._bno085_enabled: bool = False self._bno085_noise_std_dps: float = 0.02 # spec BNO085: ~0.014 °/s self._bno085_yaw_rate_dps: float = 0.0 # -- Registro de telemetría para análisis ------------------------ self.log: list[SimSnapshot] = [] self.events: list[SimEvent] = [] self._log_dt: float = 0.1 # cada 100 ms self._next_log_t: float = 0.0 # ----------------------------------------------------------------------- # API pública — Condiciones iniciales NMEA # ----------------------------------------------------------------------- def inject_nmea( self, *, heading_deg: float | None = None, rot_dps: float | None = None, sog_kn: float | None = None, cog_deg: float | None = None, xte_dm: float | None = None, ) -> None: """ Inyecta datos de sensores NMEA 2000 (equivale al CAN bus). Cuando se llama con heading_deg, también reinicia la física del buque para que el simulador parta de ese rumbo. Las llamadas posteriores (sin heading_deg) solo actualizan los campos indicados. """ if heading_deg is not None: hd = heading_deg % 360.0 rot = rot_dps if rot_dps is not None else self._nmea_rot_dps self._vessel.reset(heading_deg=hd, rate_of_turn_dps=rot) self._nmea_heading_deg = hd self._nmea_cog_deg = hd # sin leeway → COG ≈ heading self._physics_heading_active = True self._last_nmea_update_t = self._t if rot_dps is not None: self._nmea_rot_dps = rot_dps if sog_kn is not None: self._nmea_sog_kn = max(0.0, sog_kn) if cog_deg is not None: self._nmea_cog_deg = cog_deg % 360.0 if xte_dm is not None: self._nmea_xte_dm = xte_dm def disconnect_nmea_heading(self) -> None: """ Simula la pérdida del compass NMEA 2000 (cable roto, bus saturado…). La física sigue avanzando pero el sensor deja de reportar. Tras HEADING_TIMEOUT_S el firmware dispara ALARM_HEADING_LOST y desengrana el autopiloto. """ self._physics_heading_active = False # No actualizamos _last_nmea_update_t → el timeout empieza a contar def reconnect_nmea_heading(self) -> None: """Restaura el compass NMEA 2000 (después de un fallo).""" self._physics_heading_active = True self._last_nmea_update_t = self._t def set_sea_state(self, beaufort: int, *, seed: int = 42) -> None: """ Configura el estado del mar según la escala Beaufort (0 = calma, 8 = temporal). Modela tres fuentes de perturbación de guiñada: * **Ola dominante** — sinusoide al periodo de ola característico del estado. Ejemplo B6 (mar gruesa): ≈ ±4° de oscilación de rumbo a 8 s de periodo. * **Mar de fondo (swell)** — sinusoide de periodo 3× más largo que la ola. * **Ruido blanco** — turbulencia residual aleatoria (random walk en ROT). * **Viento de costado (weather helm)** — deriva lenta, periodo 120 s. Todos se inyectan como ``external_yaw_torque`` en el ``VesselHeadingSimulator`` (campo ya existente, °/s²). Seatate esperado de oscilación de rumbo (barco 30 m, 10 kn, con AP activo): B0=0° B3≈±1° B4≈±3° B5≈±5° B6≈±8° B7≈±13° Args: beaufort: Nivel Beaufort (0–8). Valores fuera de rango se clamean. seed: Semilla del RNG para reproducibilidad (default 42). """ beaufort = max(0, min(8, beaufort)) row = _BEAUFORT_TABLE[beaufort] self._sea_beaufort = beaufort self._sea_wave_amp = row[0] self._sea_swell_amp = row[1] self._sea_noise_amp = row[2] self._sea_wind_bias = row[3] self._sea_wave_period = row[4] self._sea_rng = random.Random(seed) def tune_response( self, *, rudder_kp: float | None = None, counter_rudder: float | None = None, max_rudder_deg: float | None = None, ) -> None: """ Ajusta los parámetros del outer PID en tiempo real (simula los knobs del piloto). Equivalente a los controles físicos de pilotos clásicos: * ``rudder_kp`` → knob **"RUDDER"** (Robertson/Simrad): ganancia proporcional. Más ganancia = más timón por grado de error → respuesta más agresiva. * ``counter_rudder`` → knob **"COUNTER RUDDER"** / "Yaw Damping": feed-forward de ROT. Más contra-timón = mejor amortiguamiento del sobrepasamiento y del oleaje. * ``max_rudder_deg`` → límite de timón de trabajo (no el tope mecánico de 35°). En mal tiempo se sube de ~20° a 25–30° para dar más autoridad. Los cambios tienen efecto inmediato en el próximo outer-tick (10 Hz). Args: rudder_kp: Nuevo Kp del outer PID (típico rango 0.5–4.0). counter_rudder: Nuevo rot_ff_gain (típico rango 0.5–3.0). max_rudder_deg: Nuevo límite de trabajo del timón en grados (típico 15–35°). """ if rudder_kp is not None: self._pid_outer.config.base_kp = float(rudder_kp) if counter_rudder is not None: self._pid_outer.config.rot_ff_gain = float(counter_rudder) if max_rudder_deg is not None: self._pid_outer.config.max_rudder_deg = float(max_rudder_deg) def enable_bno085(self, *, noise_std_dps: float = 0.02) -> None: """ Activa el BNO085 como fuente de yaw rate para el outer PID. Con BNO085 activo el ``rot_ff_term`` del outer PID usa la medición del giróscopo (50 Hz, ruido típico ~0.014 °/s) en lugar del ROT derivado del bus NMEA 2000 (10 Hz, mayor latencia de bus + GPS). Diferencia clave respecto al ROT por NMEA: - Latencia: BNO085 ≈ 4 ms vs NMEA ROT ≈ 100-200 ms - Ruido: BNO085 ≈ 0.02 °/s vs NMEA ROT ≈ 0.1-0.5 °/s - Frecuencia: BNO085 muestrea a 250 Hz (gyro), outer PID consume a 10 Hz En mal tiempo (B4-B5) el BNO085 permite que el ``rot_ff_term`` reaccione a los picos de guiñada generados por las olas antes de que el error de heading se acumule — equivale a subir el knob "COUNTER RUDDER" manteniendo la ganancia proporcional estable. Args: noise_std_dps: Desviación estándar del ruido del giróscopo en °/s. BNO085 spec: ~0.014 °/s típico. Default 0.02 °/s (conservador). """ self._bno085_enabled = True self._bno085_noise_std_dps = float(noise_std_dps) def disable_bno085(self) -> None: """Desactiva el BNO085; el outer PID vuelve a usar ROT del NMEA 2000.""" self._bno085_enabled = False self._bno085_yaw_rate_dps = 0.0 # ----------------------------------------------------------------------- # API pública — Interfaz Modbus # ----------------------------------------------------------------------- def write_holding(self, addr: int, value: int) -> None: """Escribe un holding register (comando del PC → ESP32).""" self._holdings[addr] = int(value) & 0xFFFF def write_coil(self, addr: int, value: int) -> None: """Escribe una coil (pulso de comando del PC → ESP32).""" self._coils[addr] = 1 if value else 0 def read_input(self, addr: int) -> int: """Lee un input register (telemetría ESP32 → PC, solo lectura).""" return self._inputs.get(addr, 0) def read_discrete(self, addr: int) -> int: """Lee un discrete input (flag de estado ESP32 → PC, solo lectura).""" return self._discretes.get(addr, 0) def read_holding(self, addr: int) -> int: """Lee un holding register (para debug / verificación).""" return self._holdings.get(addr, 0) # ----------------------------------------------------------------------- # API pública — Control de la simulación # ----------------------------------------------------------------------- def step(self, duration_s: float, log_dt: float = 0.1) -> None: """ Avanza la simulación ``duration_s`` segundos. La física corre a 50 Hz. El outer PID se ejecuta cada 5 ticks (10 Hz). El inner PID corre en cada tick (50 Hz). Args: duration_s: Tiempo de simulación a avanzar (segundos). log_dt: Intervalo entre snapshots de telemetría (segundos). """ self._log_dt = log_dt n_ticks = max(1, round(duration_s / DT_INNER)) for _ in range(n_ticks): self._tick() # -- Propiedades de conveniencia ---------------------------------------- @property def t(self) -> float: """Tiempo de simulación actual (segundos).""" return self._t @property def mode(self) -> AutopilotMode: return self._mode @property def heading(self) -> float: """Rumbo actual del buque [0..360).""" return self._vessel.state.heading_deg @property def rot(self) -> float: """Rate of turn actual (deg/s).""" return self._vessel.state.rate_of_turn_dps @property def rudder(self) -> float: """Ángulo actual del timón (grados).""" return self._rudder.state.angle_deg @property def engaged(self) -> bool: return self._mode != AutopilotMode.STANDBY @property def any_alarm(self) -> bool: return (self._alarm_heading_lost or self._alarm_off_course or self._alarm_off_course_severe) # ----------------------------------------------------------------------- # Internos — tick principal (50 Hz) # ----------------------------------------------------------------------- def _tick(self) -> None: # 1. Procesar coils (flancos de subida) self._process_coils() # 2. Leer cambios en holdings (setpoint dinámico) self._sync_from_holdings() # 3. Outer loop @ 10 Hz if self._inner_tick % INNER_PER_OUTER == 0: self._run_outer_loop() # 4. Inner loop @ 50 Hz self._run_inner_loop() # 5. Física: avanzar la planta con el PWM actual self._run_physics() # 6. Evaluar alarmas con el estado post-física self._check_alarms() # 7. Espejo a los registros Modbus self._sync_to_registers() # 8. Capturar snapshot si toca if self._t >= self._next_log_t: self._capture_snapshot() self._next_log_t = self._t + self._log_dt # 9. Avanzar reloj self._t += DT_INNER self._inner_tick += 1 # ----------------------------------------------------------------------- # Internos — procesamiento de coils (flancos de subida) # ----------------------------------------------------------------------- def _process_coils(self) -> None: if self._coils[0] and not self._prev_coils[0]: self._cmd_engage() if self._coils[1] and not self._prev_coils[1]: self._cmd_disengage() if self._coils[2] and not self._prev_coils[2]: self._cmd_ack_alarms() self._prev_coils = dict(self._coils) # Las coils de comando son one-shot: se limpian tras procesar self._coils[0] = 0 self._coils[1] = 0 self._coils[2] = 0 def _cmd_engage(self) -> None: """Intenta enganchar en el modo solicitado por MODE_REQUEST.""" requested = AutopilotMode(self._holdings[0] & 0x0F) # Interlock: necesita rumbo válido para modos de control de rumbo heading_age = self._t - self._last_nmea_update_t heading_valid = heading_age <= HEADING_TIMEOUT_S if requested in (AutopilotMode.HEADING_HOLD, AutopilotMode.TRUE_COURSE, AutopilotMode.TRACK_KEEPING): if not heading_valid: self._add_event("engage_refused", "Sin rumbo NMEA válido") return # rechazar enganche if requested == AutopilotMode.HEADING_HOLD: raw_sp = self._holdings[1] if raw_sp == 0: # Auto-captura del rumbo actual si el setpoint es cero self._heading_setpoint_deg = self._nmea_heading_deg self._holdings[1] = int(self._nmea_heading_deg * 100) % 36000 else: self._heading_setpoint_deg = raw_sp * 0.01 self._pid_outer.reset(heading_deg=self._nmea_heading_deg) self._pid_inner.reset(measured_deg=self._rudder.state.angle_deg) self._mode = AutopilotMode.HEADING_HOLD # Resetear el flag de convergencia: el rumbo parte desde lejos del # nuevo setpoint; OFF_COURSE solo disparará cuando hayamos llegado # cerca y luego nos alejemos. self._tracking_settled = False self._add_event("engage", f"HEADING_HOLD sp={self._heading_setpoint_deg:.1f}°") elif requested == AutopilotMode.DODGE: self._pre_dodge_heading_deg = self._heading_setpoint_deg raw_offset = self._holdings[8] if raw_offset > 0x7FFF: raw_offset -= 0x10000 offset_deg = raw_offset * 0.01 self._heading_setpoint_deg = ( self._heading_setpoint_deg + offset_deg ) % 360.0 self._mode = AutopilotMode.DODGE self._add_event("engage", f"DODGE offset={offset_deg:.1f}°") elif requested == AutopilotMode.STANDBY: self._do_disengage() def _cmd_disengage(self) -> None: self._do_disengage() def _cmd_ack_alarms(self) -> None: self._alarm_heading_lost = False self._alarm_off_course = False self._alarm_off_course_severe = False self._add_event("ack", "Alarmas reconocidas") def _do_disengage(self) -> None: if self._mode != AutopilotMode.STANDBY: self._add_event("disengage", f"Desde {self._mode.name}") self._mode = AutopilotMode.STANDBY self._outer_rudder_sp = 0.0 self._inner_pwm_pct = 0.0 self._pid_outer.reset() self._pid_inner.reset() self._tracking_settled = False # ----------------------------------------------------------------------- # Internos — sincronización desde holdings # ----------------------------------------------------------------------- def _sync_from_holdings(self) -> None: """En HEADING_HOLD, el setpoint puede cambiar en caliente (knob).""" if self._mode == AutopilotMode.HEADING_HOLD: raw = self._holdings[1] new_sp = raw * 0.01 # Si el operador ha cambiado el setpoint significativamente # (> 1°) se resetea el flag de convergencia: el buque está # ahora aproximándose al nuevo rumbo, no desviándose del # anterior. Esto evita que OFF_COURSE dispare durante la # maniobra de aproximación al nuevo setpoint. if abs(heading_error_deg(new_sp, self._heading_setpoint_deg)) > 1.0: self._tracking_settled = False self._heading_setpoint_deg = new_sp # ----------------------------------------------------------------------- # Internos — outer loop (10 Hz) # ----------------------------------------------------------------------- def _run_outer_loop(self) -> None: if self._mode == AutopilotMode.STANDBY: self._outer_rudder_sp = 0.0 return heading_age = self._t - self._last_nmea_update_t if heading_age > HEADING_TIMEOUT_S: # No hay rumbo: salida cero (el firmware disengrana, esto lo hacemos # en _check_alarms, pero la salida ya no es válida) self._outer_rudder_sp = 0.0 return # Velocidad para gain scheduling: primero del holding, luego NMEA sp_speed_raw = self._holdings[25] # PID_OUTER_SPEED_KN_REQ_X10 sog_kn = (sp_speed_raw * 0.1) if sp_speed_raw > 0 else self._nmea_sog_kn # BNO085 activo → usa yaw rate del giróscopo (50 Hz, baja latencia). # BNO085 inactivo → usa ROT del bus NMEA 2000 (10 Hz, GPS/compass). rot_for_pid = (self._bno085_yaw_rate_dps if self._bno085_enabled else self._nmea_rot_dps) self._outer_rudder_sp = self._pid_outer.step( heading_setpoint_deg=self._heading_setpoint_deg, heading_measured_deg=self._nmea_heading_deg, rate_of_turn_dps=rot_for_pid, speed_kn=sog_kn, allowed=True, ) # ----------------------------------------------------------------------- # Internos — inner loop (50 Hz) # ----------------------------------------------------------------------- def _run_inner_loop(self) -> None: self._inner_pwm_pct = self._pid_inner.step( setpoint_deg=self._outer_rudder_sp, measured_deg=self._rudder.state.angle_deg, allowed=self._mode != AutopilotMode.STANDBY, ) # ----------------------------------------------------------------------- # Internos — física (50 Hz) # ----------------------------------------------------------------------- def _compute_wave_torque(self) -> float: """ Calcula el torque externo de guiñada del oleaje (°/s²) para el tick actual. Compone cuatro componentes: - Ola dominante: sinusoide al periodo característico del estado de mar. - Mar de fondo: sinusoide a periodo 3.2× más largo. - Ruido blanco: perturbación aleatoria (Gaussiana, proporcional a √dt). - Weather helm: deriva de viento, sinusoide de 120 s de periodo. Retorna 0.0 si el mar está en calma (beaufort == 0). """ if self._sea_beaufort == 0: return 0.0 t = self._t T = self._sea_wave_period wave = self._sea_wave_amp * math.sin(2.0 * math.pi * t / T) swell = self._sea_swell_amp * math.sin(2.0 * math.pi * t / (T * 3.2)) # Ruido blanco: la std por tick se escala con √dt para mantener # la densidad espectral de potencia constante en frecuencia. noise = self._sea_noise_amp * self._sea_rng.gauss(0.0, 1.0) * math.sqrt(DT_INNER) wind = self._sea_wind_bias * math.sin(2.0 * math.pi * t / 120.0) return wave + swell + noise + wind def _run_physics(self) -> None: # Timón responde al PWM self._rudder.step(dt=DT_INNER, pwm_pct=self._inner_pwm_pct) # Perturbación de mar: actualizar torque externo del buque cada tick self._vessel.config.external_yaw_torque = self._compute_wave_torque() # Buque responde al ángulo real del timón (y al torque del oleaje) self._vessel.step( dt=DT_INNER, rudder_deg=self._rudder.state.angle_deg, speed_kn=self._nmea_sog_kn, ) # La física actualiza el "sensor NMEA" si está conectado if self._physics_heading_active: self._nmea_heading_deg = self._vessel.state.heading_deg self._nmea_rot_dps = self._vessel.state.rate_of_turn_dps # COG = heading (sin corriente en este modelo simple) self._nmea_cog_deg = self._nmea_heading_deg self._last_nmea_update_t = self._t # BNO085: muestrea yaw rate a 50 Hz (mismo ritmo que el inner loop). # El giróscopo del BNO085 tiene latencia ~4ms y ruido ~0.02 °/s, # mucho mejor que el ROT calculado desde el GPS/compass NMEA (100-200ms). # En el outer PID (10 Hz) esto significa que rot_ff_term reacciona a # picos de guiñada por olas ~10x más rápido que con ROT por NMEA. if self._bno085_enabled: noise = (self._sea_rng.gauss(0.0, self._bno085_noise_std_dps) if self._bno085_noise_std_dps > 0.0 else 0.0) self._bno085_yaw_rate_dps = self._vessel.state.rate_of_turn_dps + noise # ----------------------------------------------------------------------- # Internos — alarmas # ----------------------------------------------------------------------- def _check_alarms(self) -> None: # HEADING_LOST: timeout del sensor NMEA heading_age = self._t - self._last_nmea_update_t if heading_age > HEADING_TIMEOUT_S: if not self._alarm_heading_lost: self._alarm_heading_lost = True self._add_event("alarm", "ALARM_HEADING_LOST") if self._mode != AutopilotMode.STANDBY: self._do_disengage() # OFF_COURSE (solo cuando enganchado) if self._mode != AutopilotMode.STANDBY: err = abs(heading_error_deg( self._heading_setpoint_deg, self._nmea_heading_deg )) # Marcar "primera convergencia" cuando el error cae bajo 5°. # Hasta ese momento el buque está en aproximación inicial al # setpoint — no se consideran desviaciones (sería falsa alarma). if err < 5.0: self._tracking_settled = True # OFF_COURSE solo se evalúa cuando ya hemos convergido al menos # una vez. Esto reproduce el comportamiento de pilotos reales # (Simrad, Raymarine): la alarma es para "nos hemos desviado del # rumbo", no para "el operador acaba de ordenar un cambio grande". if self._tracking_settled: if err > OFF_COURSE_SEVERE_DEG: if not self._alarm_off_course_severe: self._alarm_off_course_severe = True self._alarm_off_course = True self._add_event("alarm", f"ALARM_OFF_COURSE_SEVERE err={err:.1f}°") self._do_disengage() elif err > OFF_COURSE_WARN_DEG: if not self._alarm_off_course: self._alarm_off_course = True self._add_event("alarm", f"ALARM_OFF_COURSE err={err:.1f}°") else: if self._alarm_off_course and not self._alarm_off_course_severe: self._alarm_off_course = False # ----------------------------------------------------------------------- # Internos — espejo a registros Modbus # ----------------------------------------------------------------------- def _sync_to_registers(self) -> None: uptime = int(self._t) self._inputs[4] = uptime & 0xFFFF self._inputs[5] = (uptime >> 16) & 0xFFFF self._inputs[6] = int(self._mode) # Timón self._inputs[16] = _u16(int(self._rudder.state.angle_deg * 100)) # Rumbo / ROT / edad self._inputs[24] = _u16(int(self._nmea_heading_deg * 100)) self._inputs[25] = _s16_to_u16(int(self._nmea_rot_dps * 100)) age_ms = int((self._t - self._last_nmea_update_t) * 1000) self._inputs[26] = min(60000, max(0, age_ms)) # COG / SOG / XTE self._inputs[60] = _u16(int(self._nmea_cog_deg * 100)) self._inputs[61] = _u16(int(self._nmea_sog_kn * 10)) self._inputs[63] = _s16_to_u16(int(self._nmea_xte_dm)) # Telemetría inner PID self._inputs[40] = _s16_to_u16(int(self._outer_rudder_sp * 100)) self._inputs[41] = _s16_to_u16(int(self._inner_pwm_pct * 100)) inner_err = self._outer_rudder_sp - self._rudder.state.angle_deg self._inputs[42] = _s16_to_u16(int(inner_err * 100)) # Telemetría outer PID self._inputs[50] = _u16(int(self._heading_setpoint_deg * 100)) self._inputs[51] = _s16_to_u16(int(self._outer_rudder_sp * 100)) outer_err = heading_error_deg( self._heading_setpoint_deg, self._nmea_heading_deg ) self._inputs[52] = _s16_to_u16(int(outer_err * 100)) self._inputs[53] = _u16(int(self._nmea_sog_kn * 10)) # Discretos self._discretes[0] = 1 if self._mode != AutopilotMode.STANDBY else 0 self._discretes[8] = self._discretes[0] self._discretes[16] = 1 if self._alarm_off_course else 0 self._discretes[17] = 1 if self._alarm_off_course_severe else 0 self._discretes[19] = 1 if self._alarm_heading_lost else 0 self._discretes[25] = 1 if self.any_alarm else 0 # ----------------------------------------------------------------------- # Internos — telemetría # ----------------------------------------------------------------------- def _capture_snapshot(self) -> None: hdg = self._nmea_heading_deg sp = self._heading_setpoint_deg self.log.append(SimSnapshot( t=self._t, mode=self._mode, heading_deg=hdg, heading_setpoint_deg=sp, heading_error_deg=heading_error_deg(sp, hdg), rot_dps=self._nmea_rot_dps, sog_kn=self._nmea_sog_kn, outer_rudder_sp_deg=self._outer_rudder_sp, rudder_angle_deg=self._rudder.state.angle_deg, inner_pwm_pct=self._inner_pwm_pct, alarm_heading_lost=self._alarm_heading_lost, alarm_off_course=self._alarm_off_course, alarm_off_course_severe=self._alarm_off_course_severe, bno085_yaw_rate_dps=self._bno085_yaw_rate_dps, )) def _add_event(self, kind: str, detail: str = "") -> None: self.events.append(SimEvent(t=self._t, kind=kind, detail=detail)) # ----------------------------------------------------------------------- # Internos — valores por defecto # ----------------------------------------------------------------------- def _default_holdings(self) -> dict[int, int]: h: dict[int, int] = {i: 0 for i in range(40)} h[0] = 0 # MODE_REQUEST = STANDBY h[1] = 0 # HEADING_SETPOINT_X100 h[2] = 80 # BRIGHTNESS_PCT h[3] = 70 # ALARM_VOLUME_PCT h[25] = 100 # PID_OUTER_SPEED_KN_REQ_X10 = 10.0 kn h[33] = 50 # XTE_GAIN_X1000 = 0.050 deg/m h[34] = 2000 # XTE_MAX_CORRECTION_X100 = 20.0° return h def _default_inputs(self) -> dict[int, int]: i: dict[int, int] = {k: 0 for k in range(80)} i[0] = 0 # FW_VERSION_MAJOR i[1] = 4 # FW_VERSION_MINOR (sprint 4) i[2] = 0 # FW_VERSION_PATCH i[3] = 0 # SCHEMA_VERSION i[18] = 1 # RUDDER_VALID (el sim siempre es válido) return i # --------------------------------------------------------------------------- # Helpers de codificación de registros (Modbus es big-endian uint16) # --------------------------------------------------------------------------- def _u16(x: int) -> int: """Clamp a uint16 [0..65535].""" return max(0, min(0xFFFF, x)) & 0xFFFF def _s16_to_u16(x: int) -> int: """Convierte int16 con signo a representación uint16 (complemento a 2).""" if x < 0: return (x + 0x10000) & 0xFFFF return x & 0xFFFF