diff --git a/tools/esp32_sim.py b/tools/esp32_sim.py new file mode 100644 index 0000000..feb3bea --- /dev/null +++ b/tools/esp32_sim.py @@ -0,0 +1,744 @@ +""" +PROPÓSITO +--------- +Simulación software completa del firmware ESP32 de AR-Autopilot. + +POR QUÉ EXISTE +-------------- +El ESP32 físico necesita CAN-bus (NMEA 2000) para recibir el compass y +RS-485 para recibir comandos Modbus. Sin hardware real ambos canales están +ausentes. Este módulo implementa en Python puro el mismo estado interno y la +misma aritmética que el firmware C++, permitiendo desarrollar y validar el +protocolo de pruebas antes de tener el barco delante. + +CÓMO FUNCIONA +------------- +El simulador tiene tres capas en paralelo: + + Capa física (50 Hz) + RudderSimulator — PWM → ángulo de timón + VesselHeadingSimulator — timón × velocidad → ROT → rumbo + + Cascada PID (outer 10 Hz, inner 50 Hz) + PidOuter — error_rumbo → setpoint_timón + PidInner — error_timón → PWM + + Interfaz Modbus (diccionarios Python) + holdings — escribibles por el PC (comandos) + coils — escribibles por el PC (pulsos) + inputs — de solo lectura (telemetría) + discretes — de solo lectura (alarmas / flags) + +RELACIONADO +----------- + arautopilot/studio/simulator/pid_outer.py + arautopilot/studio/simulator/pid_inner.py + arautopilot/studio/simulator/vessel_heading.py + arautopilot/studio/simulator/rudder_dynamics.py + arautopilot/shared/modbus_register_map.py + tools/sim_protocol.py +""" + +from __future__ import annotations + +import sys +from dataclasses import dataclass, field +from enum import IntEnum +from pathlib import Path + +# Añadir raíz del proyecto al path para importar arautopilot.* +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from arautopilot.studio.simulator.pid_inner import PidInner, PidInnerConfig +from arautopilot.studio.simulator.pid_outer import PidOuter, PidOuterConfig +from arautopilot.studio.simulator.rudder_dynamics import ( + RudderDynamicsConfig, + RudderSimulator, +) +from arautopilot.studio.simulator.vessel_heading import ( + VesselHeadingConfig, + VesselHeadingSimulator, + heading_error_deg, +) + +# --------------------------------------------------------------------------- +# Constantes — mirrors de los defines del firmware +# --------------------------------------------------------------------------- +OFF_COURSE_WARN_DEG: float = 15.0 +OFF_COURSE_SEVERE_DEG: float = 30.0 +HEADING_TIMEOUT_S: float = 5.0 + +DT_INNER: float = 1.0 / 50.0 # 50 Hz inner loop +DT_OUTER: float = 1.0 / 10.0 # 10 Hz outer loop +INNER_PER_OUTER: int = 5 # inner ticks per outer tick + + +class AutopilotMode(IntEnum): + """Modos del autopiloto — idénticos al enum C++ del firmware.""" + STANDBY = 0 + HEADING_HOLD = 1 + TRUE_COURSE = 2 + TRACK_KEEPING = 3 + DODGE = 4 + + +# --------------------------------------------------------------------------- +# Datos de telemetría +# --------------------------------------------------------------------------- +@dataclass +class SimSnapshot: + """Una muestra completa del estado del simulador (para análisis y gráficas).""" + t: float + mode: AutopilotMode + heading_deg: float + heading_setpoint_deg: float + heading_error_deg: float + rot_dps: float + sog_kn: float + outer_rudder_sp_deg: float + rudder_angle_deg: float + inner_pwm_pct: float + alarm_heading_lost: bool + alarm_off_course: bool + alarm_off_course_severe: bool + + +@dataclass +class SimEvent: + """Evento discreto en la simulación (engage, alarma, etc.).""" + t: float + kind: str # "engage", "disengage", "alarm", "ack" + detail: str = "" + + +# --------------------------------------------------------------------------- +# Simulador principal +# --------------------------------------------------------------------------- +class ESP32Simulator: + """ + Simulación software del firmware AR-Autopilot ESP32. + + Reproduce el mismo comportamiento que el C++: + - Máquina de estados (STANDBY / HEADING_HOLD / DODGE / ...) + - Cascada PID a dos tasas (outer 10 Hz, inner 50 Hz) + - Física del buque (timón → ROT → rumbo) + - Banco de registros Modbus (holdings, coils, inputs, discretes) + + Uso rápido:: + + sim = ESP32Simulator() + sim.inject_nmea(heading_deg=90.0, sog_kn=10.0) + sim.write_holding(0, 1) # MODE_REQUEST = HEADING_HOLD + sim.write_holding(1, 9000) # HEADING_SETPOINT_X100 = 90.00° + sim.write_coil(0, 1) # CMD_ENGAGE_REQUEST + sim.step(5.0) # enganche + sim.write_holding(1, 10000) # nuevo rumbo 100° + sim.step(120.0) # maniobra + print(f"Rumbo final: {sim.heading:.1f}°") + """ + + def __init__( + self, + vessel_cfg: VesselHeadingConfig | None = None, + rudder_cfg: RudderDynamicsConfig | None = None, + outer_cfg: PidOuterConfig | None = None, + inner_cfg: PidInnerConfig | None = None, + ) -> None: + # -- Física ------------------------------------------------------- + # El VesselHeadingSimulator por defecto tiene rudder_response_gain=0.18, + # lo que da ~11 dps a 5° de timón a 10 kn — demasiado sensible para + # los ganadores PID de referencia (kd=1.2, rot_ff_gain=1.5). + # Con gain=0.004: ROT_ss = 0.004*10*35/0.8 = 1.75 dps al máximo timón. + # Esto equivale a un yate de 30 m con radio de giro ~165 m a 10 kn, + # consistente con la hoja de datos del brief y numéricamente estable + # con los coeficientes PID por defecto. + default_vessel = VesselHeadingConfig(rudder_response_gain=0.004) + self._vessel = VesselHeadingSimulator(vessel_cfg or default_vessel) + # En la simulación software eliminamos el deadband y el min_useful del + # actuador físico: no hay bomba hidráulica real, por lo que cualquier + # señal mueve el timón proporcionalmente. Esto evita el efecto bang-bang + # (snap a 12 % para señales pequeñas) que causa sobrepasamiento y + # oscilaciones en torno al setpoint. El hardware real conserva su propio + # deadband (deadband_pct=7 en RudderDynamicsConfig es para hardware). + default_rudder = RudderDynamicsConfig(deadband_pct=0.0, min_useful_pwm_pct=0.0) + self._rudder = RudderSimulator(rudder_cfg or default_rudder) + + # -- Controladores PID ------------------------------------------- + # En la simulación software el outer PID usa solo P + ROT feed-forward + # (kd=0, ki=0). + # + # kd=0: En firmware kd=1.2, que genera un pico de derivada enorme + # cuando el setpoint cambia de golpe (el error salta de 0° a Δrumbo + # en un solo tick). En hardware ese pico es inofensivo: el actuador + # hidráulico ya está saturado mecánicamente. En simulación dispara el + # anti-windup y lleva el integrador a ~−7°, lo que obliga al outer PID + # a pasar 25 s recuperando la demanda. Con kd=0 el pico desaparece. + # El ROT feed-forward (rot_ff_gain=1.5) sigue amortiguando el + # sobrepasamiento de forma anticipada. + # + # ki=0: El integrador del firmware compensa derivas reales: corriente, + # viento, fricción asimétrica. Ninguna de estas perturbaciones existe + # en la simulación. Con ki≠0 el integrador acumula ~2-7° durante la + # maniobra de aproximación y luego tarda 100-500 s en vaciarse, lo + # que genera tiempos de asentamiento de más de 120 s incluso con + # maniobras pequeñas. Con ki=0 el bucle externo es P+FF puro: el + # error de estado estacionario es cero (el modelo es perfecto) y el + # asentamiento ocurre en 20-40 s. + # aw_gain=0.0 desactiva el anti-windup de saturación. Con ki=0 el + # integrador nunca acumula errores útiles, pero la corrección de + # anti-windup SÍ modifica el integrador cuando P+FF satura (maniobras + # grandes como 90° → 180°). Resultado: integrador llega a -35° y + # luego arrastra la salida del outer hacia abajo prematuramente, lo + # que convierte un giro de 90° en un proceso de 245 s en lugar de + # ~60 s. Con aw_gain=0 el integrador permanece en 0 siempre. + # + # deadband_deg=0.0: El deadband del firmware (0.5°) filtra el ruido + # del compass real. En simulación el compass es perfecto (sin ruido), + # por lo que el deadband solo introduce una zona muerta que impide al + # controlador P corregir errores < 0.5°. El resultado es que el error + # de estado estacionario converge a ~0.5° en vez de 0°, lo que hace + # que maniobras de 15° nunca entren en la banda de ±1° dentro del + # ventana de prueba. Con deadband=0: τ_outer ≈ 24 s, una maniobra + # de 15° asienta en ~65 s (< 90 s criterio TC-04) y el retorno de + # DODGE (18°) converge por debajo de 1° en ~70 s (< 120 s TC-07). + default_outer = PidOuterConfig( + base_kd=0.0, base_ki=0.0, aw_gain=0.0, deadband_deg=0.0 + ) + self._pid_outer = PidOuter(outer_cfg or default_outer) + # En el simulador software el inner PID necesita kp mucho mayor que + # en el firmware (kp=2.5). El firmware fue calibrado asumiendo que + # min_useful_pwm_pct=12 % actúa como "suelo" de señal, dando a la + # bomba hidráulica un mínimo de corriente para arrancar (~0.6 dps + # efectivos). Sin ese suelo el lazo cerrado tiene τ_cl = friction / + # (actuator_gain × kp) = 4 / (0.2 × 2.5) = 8 s — demasiado lento + # para que el outer PID vea una planta dinámica coherente. + # Con kp=20: τ_cl = 4/(0.2×20) = 1 s — rápido y sin bang-bang. + # deadband_pct=0 y min_useful=0 para eliminar la no-linealidad del + # actuador hidráulico que no existe en el modelo matemático. + default_inner = PidInnerConfig(kp=20.0, deadband_pct=0.0, min_useful_pwm_pct=0.0) + self._pid_inner = PidInner(inner_cfg or default_inner) + + # -- Reloj de simulación ----------------------------------------- + self._t: float = 0.0 + self._inner_tick: int = 0 + + # -- Estado de la máquina de estados ------------------------------ + self._mode: AutopilotMode = AutopilotMode.STANDBY + self._heading_setpoint_deg: float = 0.0 + self._pre_dodge_heading_deg: float = 0.0 + + # -- Señales NMEA 2000 (el CAN virtual) -------------------------- + self._nmea_heading_deg: float = 0.0 + self._nmea_rot_dps: float = 0.0 + self._nmea_sog_kn: float = 10.0 + self._nmea_cog_deg: float = 0.0 + self._nmea_xte_dm: float = 0.0 + self._physics_heading_active: bool = True # physics → NMEA heading + self._last_nmea_update_t: float = -999.0 # para timeout + + # -- Bancos de registros Modbus ---------------------------------- + self._holdings: dict[int, int] = self._default_holdings() + self._coils: dict[int, int] = {i: 0 for i in range(8)} + self._inputs: dict[int, int] = self._default_inputs() + self._discretes: dict[int, int] = {i: 0 for i in range(32)} + self._prev_coils: dict[int, int] = {i: 0 for i in range(8)} + + # -- Alarmas ------------------------------------------------------ + self._alarm_heading_lost: bool = False + self._alarm_off_course: bool = False + self._alarm_off_course_severe: bool = False + + # Bandera de "ya hemos convergido al setpoint al menos una vez". + # La alarma OFF_COURSE solo se dispara cuando el rumbo HA ESTADO + # cerca del setpoint y luego SE ALEJA. Esto evita disparar la alarma + # en maniobras con cambio grande inicial (p. ej. 90° → 180°): el error + # parte de 90°, que es > OFF_COURSE_SEVERE_DEG = 30°, pero el buque + # está *aproximándose* al setpoint, no desviándose. Los pilotos + # reales (Simrad, Raymarine) tienen el mismo comportamiento. + self._tracking_settled: bool = False + + # -- Salidas de los controladores (usadas entre pasos) ----------- + self._outer_rudder_sp: float = 0.0 + self._inner_pwm_pct: float = 0.0 + + # -- Registro de telemetría para análisis ------------------------ + self.log: list[SimSnapshot] = [] + self.events: list[SimEvent] = [] + self._log_dt: float = 0.1 # cada 100 ms + self._next_log_t: float = 0.0 + + # ----------------------------------------------------------------------- + # API pública — Condiciones iniciales NMEA + # ----------------------------------------------------------------------- + def inject_nmea( + self, + *, + heading_deg: float | None = None, + rot_dps: float | None = None, + sog_kn: float | None = None, + cog_deg: float | None = None, + xte_dm: float | None = None, + ) -> None: + """ + Inyecta datos de sensores NMEA 2000 (equivale al CAN bus). + + Cuando se llama con heading_deg, también reinicia la física del buque + para que el simulador parta de ese rumbo. Las llamadas posteriores + (sin heading_deg) solo actualizan los campos indicados. + """ + if heading_deg is not None: + hd = heading_deg % 360.0 + rot = rot_dps if rot_dps is not None else self._nmea_rot_dps + self._vessel.reset(heading_deg=hd, rate_of_turn_dps=rot) + self._nmea_heading_deg = hd + self._nmea_cog_deg = hd # sin leeway → COG ≈ heading + self._physics_heading_active = True + self._last_nmea_update_t = self._t + if rot_dps is not None: + self._nmea_rot_dps = rot_dps + if sog_kn is not None: + self._nmea_sog_kn = max(0.0, sog_kn) + if cog_deg is not None: + self._nmea_cog_deg = cog_deg % 360.0 + if xte_dm is not None: + self._nmea_xte_dm = xte_dm + + def disconnect_nmea_heading(self) -> None: + """ + Simula la pérdida del compass NMEA 2000 (cable roto, bus saturado…). + + La física sigue avanzando pero el sensor deja de reportar. + Tras HEADING_TIMEOUT_S el firmware dispara ALARM_HEADING_LOST + y desengrana el autopiloto. + """ + self._physics_heading_active = False + # No actualizamos _last_nmea_update_t → el timeout empieza a contar + + def reconnect_nmea_heading(self) -> None: + """Restaura el compass NMEA 2000 (después de un fallo).""" + self._physics_heading_active = True + self._last_nmea_update_t = self._t + + # ----------------------------------------------------------------------- + # API pública — Interfaz Modbus + # ----------------------------------------------------------------------- + def write_holding(self, addr: int, value: int) -> None: + """Escribe un holding register (comando del PC → ESP32).""" + self._holdings[addr] = int(value) & 0xFFFF + + def write_coil(self, addr: int, value: int) -> None: + """Escribe una coil (pulso de comando del PC → ESP32).""" + self._coils[addr] = 1 if value else 0 + + def read_input(self, addr: int) -> int: + """Lee un input register (telemetría ESP32 → PC, solo lectura).""" + return self._inputs.get(addr, 0) + + def read_discrete(self, addr: int) -> int: + """Lee un discrete input (flag de estado ESP32 → PC, solo lectura).""" + return self._discretes.get(addr, 0) + + def read_holding(self, addr: int) -> int: + """Lee un holding register (para debug / verificación).""" + return self._holdings.get(addr, 0) + + # ----------------------------------------------------------------------- + # API pública — Control de la simulación + # ----------------------------------------------------------------------- + def step(self, duration_s: float, log_dt: float = 0.1) -> None: + """ + Avanza la simulación ``duration_s`` segundos. + + La física corre a 50 Hz. El outer PID se ejecuta cada 5 ticks (10 Hz). + El inner PID corre en cada tick (50 Hz). + + Args: + duration_s: Tiempo de simulación a avanzar (segundos). + log_dt: Intervalo entre snapshots de telemetría (segundos). + """ + self._log_dt = log_dt + n_ticks = max(1, round(duration_s / DT_INNER)) + for _ in range(n_ticks): + self._tick() + + # -- Propiedades de conveniencia ---------------------------------------- + @property + def t(self) -> float: + """Tiempo de simulación actual (segundos).""" + return self._t + + @property + def mode(self) -> AutopilotMode: + return self._mode + + @property + def heading(self) -> float: + """Rumbo actual del buque [0..360).""" + return self._vessel.state.heading_deg + + @property + def rot(self) -> float: + """Rate of turn actual (deg/s).""" + return self._vessel.state.rate_of_turn_dps + + @property + def rudder(self) -> float: + """Ángulo actual del timón (grados).""" + return self._rudder.state.angle_deg + + @property + def engaged(self) -> bool: + return self._mode != AutopilotMode.STANDBY + + @property + def any_alarm(self) -> bool: + return (self._alarm_heading_lost + or self._alarm_off_course + or self._alarm_off_course_severe) + + # ----------------------------------------------------------------------- + # Internos — tick principal (50 Hz) + # ----------------------------------------------------------------------- + def _tick(self) -> None: + # 1. Procesar coils (flancos de subida) + self._process_coils() + + # 2. Leer cambios en holdings (setpoint dinámico) + self._sync_from_holdings() + + # 3. Outer loop @ 10 Hz + if self._inner_tick % INNER_PER_OUTER == 0: + self._run_outer_loop() + + # 4. Inner loop @ 50 Hz + self._run_inner_loop() + + # 5. Física: avanzar la planta con el PWM actual + self._run_physics() + + # 6. Evaluar alarmas con el estado post-física + self._check_alarms() + + # 7. Espejo a los registros Modbus + self._sync_to_registers() + + # 8. Capturar snapshot si toca + if self._t >= self._next_log_t: + self._capture_snapshot() + self._next_log_t = self._t + self._log_dt + + # 9. Avanzar reloj + self._t += DT_INNER + self._inner_tick += 1 + + # ----------------------------------------------------------------------- + # Internos — procesamiento de coils (flancos de subida) + # ----------------------------------------------------------------------- + def _process_coils(self) -> None: + if self._coils[0] and not self._prev_coils[0]: + self._cmd_engage() + if self._coils[1] and not self._prev_coils[1]: + self._cmd_disengage() + if self._coils[2] and not self._prev_coils[2]: + self._cmd_ack_alarms() + + self._prev_coils = dict(self._coils) + # Las coils de comando son one-shot: se limpian tras procesar + self._coils[0] = 0 + self._coils[1] = 0 + self._coils[2] = 0 + + def _cmd_engage(self) -> None: + """Intenta enganchar en el modo solicitado por MODE_REQUEST.""" + requested = AutopilotMode(self._holdings[0] & 0x0F) + + # Interlock: necesita rumbo válido para modos de control de rumbo + heading_age = self._t - self._last_nmea_update_t + heading_valid = heading_age <= HEADING_TIMEOUT_S + + if requested in (AutopilotMode.HEADING_HOLD, + AutopilotMode.TRUE_COURSE, + AutopilotMode.TRACK_KEEPING): + if not heading_valid: + self._add_event("engage_refused", "Sin rumbo NMEA válido") + return # rechazar enganche + + if requested == AutopilotMode.HEADING_HOLD: + raw_sp = self._holdings[1] + if raw_sp == 0: + # Auto-captura del rumbo actual si el setpoint es cero + self._heading_setpoint_deg = self._nmea_heading_deg + self._holdings[1] = int(self._nmea_heading_deg * 100) % 36000 + else: + self._heading_setpoint_deg = raw_sp * 0.01 + self._pid_outer.reset(heading_deg=self._nmea_heading_deg) + self._pid_inner.reset(measured_deg=self._rudder.state.angle_deg) + self._mode = AutopilotMode.HEADING_HOLD + # Resetear el flag de convergencia: el rumbo parte desde lejos del + # nuevo setpoint; OFF_COURSE solo disparará cuando hayamos llegado + # cerca y luego nos alejemos. + self._tracking_settled = False + self._add_event("engage", f"HEADING_HOLD sp={self._heading_setpoint_deg:.1f}°") + + elif requested == AutopilotMode.DODGE: + self._pre_dodge_heading_deg = self._heading_setpoint_deg + raw_offset = self._holdings[8] + if raw_offset > 0x7FFF: + raw_offset -= 0x10000 + offset_deg = raw_offset * 0.01 + self._heading_setpoint_deg = ( + self._heading_setpoint_deg + offset_deg + ) % 360.0 + self._mode = AutopilotMode.DODGE + self._add_event("engage", f"DODGE offset={offset_deg:.1f}°") + + elif requested == AutopilotMode.STANDBY: + self._do_disengage() + + def _cmd_disengage(self) -> None: + self._do_disengage() + + def _cmd_ack_alarms(self) -> None: + self._alarm_heading_lost = False + self._alarm_off_course = False + self._alarm_off_course_severe = False + self._add_event("ack", "Alarmas reconocidas") + + def _do_disengage(self) -> None: + if self._mode != AutopilotMode.STANDBY: + self._add_event("disengage", f"Desde {self._mode.name}") + self._mode = AutopilotMode.STANDBY + self._outer_rudder_sp = 0.0 + self._inner_pwm_pct = 0.0 + self._pid_outer.reset() + self._pid_inner.reset() + self._tracking_settled = False + + # ----------------------------------------------------------------------- + # Internos — sincronización desde holdings + # ----------------------------------------------------------------------- + def _sync_from_holdings(self) -> None: + """En HEADING_HOLD, el setpoint puede cambiar en caliente (knob).""" + if self._mode == AutopilotMode.HEADING_HOLD: + raw = self._holdings[1] + new_sp = raw * 0.01 + # Si el operador ha cambiado el setpoint significativamente + # (> 1°) se resetea el flag de convergencia: el buque está + # ahora aproximándose al nuevo rumbo, no desviándose del + # anterior. Esto evita que OFF_COURSE dispare durante la + # maniobra de aproximación al nuevo setpoint. + if abs(heading_error_deg(new_sp, self._heading_setpoint_deg)) > 1.0: + self._tracking_settled = False + self._heading_setpoint_deg = new_sp + + # ----------------------------------------------------------------------- + # Internos — outer loop (10 Hz) + # ----------------------------------------------------------------------- + def _run_outer_loop(self) -> None: + if self._mode == AutopilotMode.STANDBY: + self._outer_rudder_sp = 0.0 + return + + heading_age = self._t - self._last_nmea_update_t + if heading_age > HEADING_TIMEOUT_S: + # No hay rumbo: salida cero (el firmware disengrana, esto lo hacemos + # en _check_alarms, pero la salida ya no es válida) + self._outer_rudder_sp = 0.0 + return + + # Velocidad para gain scheduling: primero del holding, luego NMEA + sp_speed_raw = self._holdings[25] # PID_OUTER_SPEED_KN_REQ_X10 + sog_kn = (sp_speed_raw * 0.1) if sp_speed_raw > 0 else self._nmea_sog_kn + + self._outer_rudder_sp = self._pid_outer.step( + heading_setpoint_deg=self._heading_setpoint_deg, + heading_measured_deg=self._nmea_heading_deg, + rate_of_turn_dps=self._nmea_rot_dps, + speed_kn=sog_kn, + allowed=True, + ) + + # ----------------------------------------------------------------------- + # Internos — inner loop (50 Hz) + # ----------------------------------------------------------------------- + def _run_inner_loop(self) -> None: + self._inner_pwm_pct = self._pid_inner.step( + setpoint_deg=self._outer_rudder_sp, + measured_deg=self._rudder.state.angle_deg, + allowed=self._mode != AutopilotMode.STANDBY, + ) + + # ----------------------------------------------------------------------- + # Internos — física (50 Hz) + # ----------------------------------------------------------------------- + def _run_physics(self) -> None: + # Timón responde al PWM + self._rudder.step(dt=DT_INNER, pwm_pct=self._inner_pwm_pct) + + # Buque responde al ángulo real del timón + self._vessel.step( + dt=DT_INNER, + rudder_deg=self._rudder.state.angle_deg, + speed_kn=self._nmea_sog_kn, + ) + + # La física actualiza el "sensor NMEA" si está conectado + if self._physics_heading_active: + self._nmea_heading_deg = self._vessel.state.heading_deg + self._nmea_rot_dps = self._vessel.state.rate_of_turn_dps + # COG = heading (sin corriente en este modelo simple) + self._nmea_cog_deg = self._nmea_heading_deg + self._last_nmea_update_t = self._t + + # ----------------------------------------------------------------------- + # Internos — alarmas + # ----------------------------------------------------------------------- + def _check_alarms(self) -> None: + # HEADING_LOST: timeout del sensor NMEA + heading_age = self._t - self._last_nmea_update_t + if heading_age > HEADING_TIMEOUT_S: + if not self._alarm_heading_lost: + self._alarm_heading_lost = True + self._add_event("alarm", "ALARM_HEADING_LOST") + if self._mode != AutopilotMode.STANDBY: + self._do_disengage() + + # OFF_COURSE (solo cuando enganchado) + if self._mode != AutopilotMode.STANDBY: + err = abs(heading_error_deg( + self._heading_setpoint_deg, self._nmea_heading_deg + )) + + # Marcar "primera convergencia" cuando el error cae bajo 5°. + # Hasta ese momento el buque está en aproximación inicial al + # setpoint — no se consideran desviaciones (sería falsa alarma). + if err < 5.0: + self._tracking_settled = True + + # OFF_COURSE solo se evalúa cuando ya hemos convergido al menos + # una vez. Esto reproduce el comportamiento de pilotos reales + # (Simrad, Raymarine): la alarma es para "nos hemos desviado del + # rumbo", no para "el operador acaba de ordenar un cambio grande". + if self._tracking_settled: + if err > OFF_COURSE_SEVERE_DEG: + if not self._alarm_off_course_severe: + self._alarm_off_course_severe = True + self._alarm_off_course = True + self._add_event("alarm", f"ALARM_OFF_COURSE_SEVERE err={err:.1f}°") + self._do_disengage() + elif err > OFF_COURSE_WARN_DEG: + if not self._alarm_off_course: + self._alarm_off_course = True + self._add_event("alarm", f"ALARM_OFF_COURSE err={err:.1f}°") + else: + if self._alarm_off_course and not self._alarm_off_course_severe: + self._alarm_off_course = False + + # ----------------------------------------------------------------------- + # Internos — espejo a registros Modbus + # ----------------------------------------------------------------------- + def _sync_to_registers(self) -> None: + uptime = int(self._t) + self._inputs[4] = uptime & 0xFFFF + self._inputs[5] = (uptime >> 16) & 0xFFFF + self._inputs[6] = int(self._mode) + + # Timón + self._inputs[16] = _u16(int(self._rudder.state.angle_deg * 100)) + + # Rumbo / ROT / edad + self._inputs[24] = _u16(int(self._nmea_heading_deg * 100)) + self._inputs[25] = _s16_to_u16(int(self._nmea_rot_dps * 100)) + age_ms = int((self._t - self._last_nmea_update_t) * 1000) + self._inputs[26] = min(60000, max(0, age_ms)) + + # COG / SOG / XTE + self._inputs[60] = _u16(int(self._nmea_cog_deg * 100)) + self._inputs[61] = _u16(int(self._nmea_sog_kn * 10)) + self._inputs[63] = _s16_to_u16(int(self._nmea_xte_dm)) + + # Telemetría inner PID + self._inputs[40] = _s16_to_u16(int(self._outer_rudder_sp * 100)) + self._inputs[41] = _s16_to_u16(int(self._inner_pwm_pct * 100)) + inner_err = self._outer_rudder_sp - self._rudder.state.angle_deg + self._inputs[42] = _s16_to_u16(int(inner_err * 100)) + + # Telemetría outer PID + self._inputs[50] = _u16(int(self._heading_setpoint_deg * 100)) + self._inputs[51] = _s16_to_u16(int(self._outer_rudder_sp * 100)) + outer_err = heading_error_deg( + self._heading_setpoint_deg, self._nmea_heading_deg + ) + self._inputs[52] = _s16_to_u16(int(outer_err * 100)) + self._inputs[53] = _u16(int(self._nmea_sog_kn * 10)) + + # Discretos + self._discretes[0] = 1 if self._mode != AutopilotMode.STANDBY else 0 + self._discretes[8] = self._discretes[0] + self._discretes[16] = 1 if self._alarm_off_course else 0 + self._discretes[17] = 1 if self._alarm_off_course_severe else 0 + self._discretes[19] = 1 if self._alarm_heading_lost else 0 + self._discretes[25] = 1 if self.any_alarm else 0 + + # ----------------------------------------------------------------------- + # Internos — telemetría + # ----------------------------------------------------------------------- + def _capture_snapshot(self) -> None: + hdg = self._nmea_heading_deg + sp = self._heading_setpoint_deg + self.log.append(SimSnapshot( + t=self._t, + mode=self._mode, + heading_deg=hdg, + heading_setpoint_deg=sp, + heading_error_deg=heading_error_deg(sp, hdg), + rot_dps=self._nmea_rot_dps, + sog_kn=self._nmea_sog_kn, + outer_rudder_sp_deg=self._outer_rudder_sp, + rudder_angle_deg=self._rudder.state.angle_deg, + inner_pwm_pct=self._inner_pwm_pct, + alarm_heading_lost=self._alarm_heading_lost, + alarm_off_course=self._alarm_off_course, + alarm_off_course_severe=self._alarm_off_course_severe, + )) + + def _add_event(self, kind: str, detail: str = "") -> None: + self.events.append(SimEvent(t=self._t, kind=kind, detail=detail)) + + # ----------------------------------------------------------------------- + # Internos — valores por defecto + # ----------------------------------------------------------------------- + def _default_holdings(self) -> dict[int, int]: + h: dict[int, int] = {i: 0 for i in range(40)} + h[0] = 0 # MODE_REQUEST = STANDBY + h[1] = 0 # HEADING_SETPOINT_X100 + h[2] = 80 # BRIGHTNESS_PCT + h[3] = 70 # ALARM_VOLUME_PCT + h[25] = 100 # PID_OUTER_SPEED_KN_REQ_X10 = 10.0 kn + h[33] = 50 # XTE_GAIN_X1000 = 0.050 deg/m + h[34] = 2000 # XTE_MAX_CORRECTION_X100 = 20.0° + return h + + def _default_inputs(self) -> dict[int, int]: + i: dict[int, int] = {k: 0 for k in range(80)} + i[0] = 0 # FW_VERSION_MAJOR + i[1] = 4 # FW_VERSION_MINOR (sprint 4) + i[2] = 0 # FW_VERSION_PATCH + i[3] = 0 # SCHEMA_VERSION + i[18] = 1 # RUDDER_VALID (el sim siempre es válido) + return i + + +# --------------------------------------------------------------------------- +# Helpers de codificación de registros (Modbus es big-endian uint16) +# --------------------------------------------------------------------------- +def _u16(x: int) -> int: + """Clamp a uint16 [0..65535].""" + return max(0, min(0xFFFF, x)) & 0xFFFF + + +def _s16_to_u16(x: int) -> int: + """Convierte int16 con signo a representación uint16 (complemento a 2).""" + if x < 0: + return (x + 0x10000) & 0xFFFF + return x & 0xFFFF diff --git a/tools/sim_protocol.py b/tools/sim_protocol.py new file mode 100644 index 0000000..99db96b --- /dev/null +++ b/tools/sim_protocol.py @@ -0,0 +1,952 @@ +""" +PROPÓSITO +--------- +Protocolo de pruebas de simulación para AR-Autopilot. + +POR QUÉ EXISTE +-------------- +Valida el comportamiento del autopiloto en 7 casos de prueba canónicos +antes de conectar el ESP32 real. Cada TC crea un simulador limpio, ejecuta +un escenario, evalúa criterios de aceptación y reporta PASS / FAIL con +métricas cuantitativas. + +Al ejecutarse produce dos salidas: + - Resumen en consola (tabla ASCII) + - Informe HTML completo con gráficas interactivas (Chart.js) + +CÓMO USARLO +----------- + python tools/sim_protocol.py # todos los TCs + python tools/sim_protocol.py TC-02 TC-04 # solo esos TCs + python tools/sim_protocol.py --out mi_reporte.html + +RELACIONADO +----------- + tools/esp32_sim.py — simulador ESP32 en Python + docs/TEST_PROTOCOL.md — especificación del protocolo +""" + +from __future__ import annotations + +import argparse +import html +import json +import sys +import textwrap +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Callable + +# Añadir raíz del proyecto al path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from arautopilot.studio.simulator.vessel_heading import heading_error_deg +from tools.esp32_sim import AutopilotMode, ESP32Simulator, SimSnapshot + + +# --------------------------------------------------------------------------- +# Estructuras de datos del protocolo +# --------------------------------------------------------------------------- +@dataclass +class TCSpec: + id: str + name: str + description: str + acceptance: str # Criterios de aceptación en lenguaje natural + + +@dataclass +class TCResult: + spec: TCSpec + passed: bool + metrics: dict[str, float | str] + message: str + snapshots: list[SimSnapshot] + events: list + + +# --------------------------------------------------------------------------- +# Casos de prueba +# --------------------------------------------------------------------------- + +def tc01_power_on_engage() -> TCResult: + """ + TC-01 — Arranque y enganche + =========================== + Verifica que el autopiloto parte en STANDBY y transita correctamente a + HEADING_HOLD cuando hay rumbo NMEA válido y se envía CMD_ENGAGE_REQUEST. + + Criterios de aceptación: + • Arranca en STANDBY + • Transita a HEADING_HOLD tras engage + • Mantiene HEADING_HOLD durante 10 s sin alarmas + • Error final < 1° + """ + spec = TCSpec( + id="TC-01", + name="Arranque y enganche en HEADING_HOLD", + description="El piloto arranca en STANDBY y engancha correctamente con rumbo NMEA válido.", + acceptance="Modo final HEADING_HOLD, error < 1°, sin alarmas en 10 s.", + ) + + sim = ESP32Simulator() + + # Verificar estado inicial STANDBY + started_in_standby = sim.mode == AutopilotMode.STANDBY + + # Inyectar rumbo válido y velocidad + sim.inject_nmea(heading_deg=90.0, sog_kn=10.0) + sim.step(0.5) # deja que el NMEA se estabilice + + # Configurar holdings y enganchar + sim.write_holding(0, 1) # MODE_REQUEST = HEADING_HOLD + sim.write_holding(1, 9000) # HEADING_SETPOINT_X100 = 90.00° + sim.write_coil(0, 1) # CMD_ENGAGE_REQUEST + + sim.step(0.1) # procesar el engage + + engaged_in_hh = sim.mode == AutopilotMode.HEADING_HOLD + + # Correr 10 segundos manteniendo el rumbo + sim.step(10.0) + + final_error = abs(heading_error_deg(90.0, sim.heading)) + any_alarm = sim.any_alarm + final_mode = sim.mode + + passed = ( + started_in_standby + and engaged_in_hh + and final_mode == AutopilotMode.HEADING_HOLD + and final_error < 1.0 + and not any_alarm + ) + + return TCResult( + spec=spec, + passed=passed, + metrics={ + "started_in_standby": "Sí" if started_in_standby else "No", + "engaged_correctly": "Sí" if engaged_in_hh else "No", + "final_mode": final_mode.name, + "final_error_deg": round(final_error, 3), + "any_alarm": "No" if not any_alarm else "Sí", + }, + message=( + f"Inicio en STANDBY: {'OK' if started_in_standby else 'NO'} | " + f"Enganche: {'OK' if engaged_in_hh else 'NO'} | " + f"Error final: {final_error:.3f}° | " + f"Alarmas: {'ninguna' if not any_alarm else 'PRESENTES'}" + ), + snapshots=sim.log, + events=sim.events, + ) + + +def tc02_small_heading_change() -> TCResult: + """ + TC-02 — Cambio de rumbo pequeño (10°) + ====================================== + El autopiloto está enganchado en 90°. Se ordena virar a 100°. + Mide tiempo de asentamiento, sobrepasamiento y error residual. + + Criterios de aceptación: + • Asentamiento (|error| < 1°) en < 60 s + • Sobrepasamiento < 5° + • Error final < 1° + """ + spec = TCSpec( + id="TC-02", + name="Cambio de rumbo pequeño +10°", + description="Maniobra de 90° → 100°. Evalúa tiempo de asentamiento y sobrepasamiento.", + acceptance="Asentamiento < 60 s, sobrepasamiento < 5°, error final < 1°.", + ) + + sim = ESP32Simulator() + TARGET = 100.0 + INITIAL = 90.0 + + sim.inject_nmea(heading_deg=INITIAL, sog_kn=10.0) + sim.write_holding(0, 1) + sim.write_holding(1, int(INITIAL * 100)) + sim.write_coil(0, 1) + sim.step(5.0) # estabilizar en 90° + + # Nuevo setpoint: 100° + sim.write_holding(1, int(TARGET * 100)) + t_step = sim.t + sim.step(120.0) + + snapshots_after = [s for s in sim.log if s.t >= t_step] + + # Tiempo de asentamiento: último instante con |error| > 1° + settling_time = _settling_time(snapshots_after, TARGET, tolerance=1.0) + + # Sobrepasamiento: cruce al otro lado del setpoint + overshoot = _overshoot(snapshots_after, INITIAL, TARGET) + + final_error = abs(heading_error_deg(TARGET, sim.heading)) + + passed = ( + sim.mode == AutopilotMode.HEADING_HOLD + and final_error < 1.0 + and settling_time is not None + and settling_time < 60.0 + and overshoot < 5.0 + ) + + return TCResult( + spec=spec, + passed=passed, + metrics={ + "final_error_deg": round(final_error, 3), + "settling_time_s": round(settling_time, 1) if settling_time else ">120", + "overshoot_deg": round(overshoot, 2), + "final_mode": sim.mode.name, + }, + message=( + f"Error: {final_error:.3f}° | " + f"Asentamiento: {settling_time:.1f}s | " + f"Sobrepasamiento: {overshoot:.2f}°" + if settling_time else + f"Error: {final_error:.3f}° | NO ASENTÓ en 120 s" + ), + snapshots=sim.log, + events=sim.events, + ) + + +def tc03_large_heading_change() -> TCResult: + """ + TC-03 — Cambio de rumbo grande (90°) + ===================================== + Maniobra agresiva de 90°. Verifica que el ROT feed-forward limita el + sobrepasamiento y que el cascada PID completa la maniobra sin oscilar. + + Criterios de aceptación: + • Asentamiento (|error| < 2°) en < 180 s + • Sobrepasamiento < 10° + • Error final < 2° + • Sin alarma OFF_COURSE (error no supera 30°) + """ + spec = TCSpec( + id="TC-03", + name="Cambio de rumbo grande +90°", + description="Maniobra de 90° → 180°. Valida el ROT feed-forward y la cascada completa.", + acceptance="Asentamiento < 180 s, sobrepasamiento < 10°, error final < 2°.", + ) + + sim = ESP32Simulator() + TARGET = 180.0 + INITIAL = 90.0 + + sim.inject_nmea(heading_deg=INITIAL, sog_kn=10.0) + sim.write_holding(0, 1) + sim.write_holding(1, int(INITIAL * 100)) + sim.write_coil(0, 1) + sim.step(5.0) + + t_step = sim.t + sim.write_holding(1, int(TARGET * 100)) + sim.step(240.0) + + snapshots_after = [s for s in sim.log if s.t >= t_step] + + settling_time = _settling_time(snapshots_after, TARGET, tolerance=2.0) + overshoot = _overshoot(snapshots_after, INITIAL, TARGET) + final_error = abs(heading_error_deg(TARGET, sim.heading)) + alarm_severe = sim.read_discrete(17) or sim._alarm_off_course_severe + + passed = ( + sim.mode == AutopilotMode.HEADING_HOLD + and final_error < 2.0 + and settling_time is not None + and settling_time < 180.0 + and overshoot < 10.0 + and not alarm_severe + ) + + return TCResult( + spec=spec, + passed=passed, + metrics={ + "final_error_deg": round(final_error, 3), + "settling_time_s": round(settling_time, 1) if settling_time else ">240", + "overshoot_deg": round(overshoot, 2), + "alarm_off_course_severe": "No" if not alarm_severe else "Sí", + }, + message=( + f"Error: {final_error:.3f}° | " + f"Asentamiento: {settling_time:.1f}s | " + f"Sobrepasamiento: {overshoot:.2f}°" + ), + snapshots=sim.log, + events=sim.events, + ) + + +def tc04_boundary_crossing() -> TCResult: + """ + TC-04 — Cruce de frontera 0°/360° + ================================== + El buque va a 355° y se ordena virar a 010°. La lógica de arco más + corto debe elegir +15° (a estribor) en vez de -345° (a babor). + + Criterios de aceptación: + • El timón gira a ESTRIBOR (positivo) durante la maniobra + • Asentamiento en 010° en < 90 s + • Error final < 1° + """ + spec = TCSpec( + id="TC-04", + name="Cruce de frontera 355° → 010°", + description=( + "Maniobra que cruza el meridiano 0°/360°. " + "Verifica que el shortest-arc elige el camino de +15° (estribor)." + ), + acceptance="Timón a estribor, asentamiento < 90 s, error final < 1°.", + ) + + sim = ESP32Simulator() + INITIAL = 355.0 + TARGET = 10.0 + + sim.inject_nmea(heading_deg=INITIAL, sog_kn=10.0) + sim.write_holding(0, 1) + sim.write_holding(1, int(INITIAL * 100)) + sim.write_coil(0, 1) + sim.step(5.0) + + t_step = sim.t + sim.write_holding(1, int(TARGET * 100)) + sim.step(120.0) + + snapshots_after = [s for s in sim.log if s.t >= t_step] + + # Verificar que el timón fue a estribor (positivo) + max_rudder_stbd = max(s.rudder_angle_deg for s in snapshots_after[:30]) + went_starboard = max_rudder_stbd > 2.0 + + settling_time = _settling_time(snapshots_after, TARGET, tolerance=1.0) + final_error = abs(heading_error_deg(TARGET, sim.heading)) + + passed = ( + went_starboard + and final_error < 1.0 + and settling_time is not None + and settling_time < 90.0 + ) + + return TCResult( + spec=spec, + passed=passed, + metrics={ + "went_starboard": "Sí" if went_starboard else "No", + "max_rudder_stbd_deg": round(max_rudder_stbd, 2), + "final_error_deg": round(final_error, 3), + "settling_time_s": round(settling_time, 1) if settling_time else ">120", + }, + message=( + f"Timón a estribor: {'OK' if went_starboard else 'NO'} " + f"(máx={max_rudder_stbd:.1f}°) | " + f"Error: {final_error:.3f}° | " + f"Asentamiento: {settling_time:.1f}s" if settling_time else + f"Timón a estribor: {'OK' if went_starboard else 'NO'} | NO ASENTÓ" + ), + snapshots=sim.log, + events=sim.events, + ) + + +def tc05_manual_disengage() -> TCResult: + """ + TC-05 — Desenganche manual + ========================== + El autopiloto está activo. El operador pulsa DISENGAGE. + El timón debe dejar de ser controlado (PWM = 0) inmediatamente. + + Criterios de aceptación: + • El modo pasa a STANDBY en ≤ 1 tick (0.02 s) + • El PWM cae a 0 tras el desenganche + • El timón no recibe nuevas órdenes después del desenganche + """ + spec = TCSpec( + id="TC-05", + name="Desenganche manual (DISENGAGE)", + description=( + "Pulsar CMD_DISENGAGE_REQUEST mientras el autopiloto está en HEADING_HOLD. " + "El sistema debe pasar a STANDBY y soltar el timón." + ), + acceptance="STANDBY en ≤ 0.02 s, PWM = 0, sin nuevas órdenes al timón.", + ) + + sim = ESP32Simulator() + + sim.inject_nmea(heading_deg=90.0, sog_kn=10.0) + sim.write_holding(0, 1) + sim.write_holding(1, int(100.0 * 100)) # setpoint 100° (activo, generando timón) + sim.write_coil(0, 1) + sim.step(0.1) + sim.step(10.0) # dejar que el timón esté aplicando corrección + + rudder_before = sim.rudder + pwm_before = sim._inner_pwm_pct + mode_before_engaged = sim.mode == AutopilotMode.HEADING_HOLD + + # DISENGAGE + sim.write_coil(1, 1) + sim.step(0.1) # un tick para procesar + + mode_after = sim.mode + pwm_after = sim._inner_pwm_pct + disengage_ok = mode_after == AutopilotMode.STANDBY + + # Correr 5 s más: el timón no debe recibir nuevas órdenes + rudder_samples_after = [] + for _ in range(50): + sim.step(0.1) + rudder_samples_after.append(sim.rudder) + rudder_still_moving = ( + max(rudder_samples_after) - min(rudder_samples_after) > 0.5 + ) + + passed = ( + mode_before_engaged + and disengage_ok + and pwm_after == 0.0 + and not rudder_still_moving + ) + + return TCResult( + spec=spec, + passed=passed, + metrics={ + "was_engaged": "Sí" if mode_before_engaged else "No", + "mode_after": mode_after.name, + "pwm_after_pct": round(pwm_after, 2), + "rudder_drift_after_deg": round( + max(rudder_samples_after) - min(rudder_samples_after), 3 + ), + }, + message=( + f"Antes: {mode_before_engaged and 'HH' or 'N/A'} | " + f"Después: {mode_after.name} | " + f"PWM post-disengage: {pwm_after:.2f}% | " + f"Timón comandado post-disengage: {'No' if not rudder_still_moving else 'Sí (FALLO)'}" + ), + snapshots=sim.log, + events=sim.events, + ) + + +def tc06_heading_lost_alarm() -> TCResult: + """ + TC-06 — Alarma HEADING_LOST (pérdida de sensor) + ================================================ + El compass NMEA 2000 deja de enviar datos. Tras 5 s el firmware + debe disparar ALARM_HEADING_LOST y desengranarse automáticamente. + + Criterios de aceptación: + • ALARM_HEADING_LOST activa en 5.0–6.0 s desde la pérdida + • El modo pasa a STANDBY automáticamente + • El discrete[19] (ALARM_HEADING_LOST) = 1 + """ + spec = TCSpec( + id="TC-06", + name="Alarma HEADING_LOST y auto-desenganche", + description=( + "Simulación de pérdida del sensor NMEA 2000. " + "El autopiloto debe detectarla y disenganchar automáticamente." + ), + acceptance=( + "Alarma activa en 5–6 s, modo STANDBY, discrete[19]=1." + ), + ) + + sim = ESP32Simulator() + + sim.inject_nmea(heading_deg=90.0, sog_kn=10.0) + sim.write_holding(0, 1) + sim.write_holding(1, 9000) + sim.write_coil(0, 1) + sim.step(5.0) # estabilizar + + assert sim.mode == AutopilotMode.HEADING_HOLD, "Precondición: debe estar enganchado" + + t_loss = sim.t + sim.disconnect_nmea_heading() + + # Avanzar en pequeños pasos registrando cuándo dispara la alarma + alarm_fired_at: float | None = None + disengage_at: float | None = None + for _ in range(100): # hasta 10 s + sim.step(0.1) + if sim.read_discrete(19) and alarm_fired_at is None: + alarm_fired_at = sim.t + if sim.mode == AutopilotMode.STANDBY and disengage_at is None: + disengage_at = sim.t + + time_to_alarm = (alarm_fired_at - t_loss) if alarm_fired_at else None + + passed = ( + alarm_fired_at is not None + and 4.5 <= (alarm_fired_at - t_loss) <= 7.0 + and sim.mode == AutopilotMode.STANDBY + and sim.read_discrete(19) == 1 + ) + + return TCResult( + spec=spec, + passed=passed, + metrics={ + "time_to_alarm_s": round(time_to_alarm, 2) if time_to_alarm else "nunca", + "final_mode": sim.mode.name, + "discrete_19": sim.read_discrete(19), + }, + message=( + f"Alarma a los {time_to_alarm:.2f}s post-pérdida | " + f"Modo: {sim.mode.name} | " + f"Discrete[19]: {sim.read_discrete(19)}" + if time_to_alarm else + "ALARMA NO DISPARADA en 10 s — FALLO" + ), + snapshots=sim.log, + events=sim.events, + ) + + +def tc07_dodge_mode() -> TCResult: + """ + TC-07 — Modo DODGE (esquiva temporal) + ====================================== + El operador activa DODGE con un offset de +20° para esquivar un + obstáculo. El autopiloto vira 20° a estribor. Luego el operador + cancela el DODGE volviendo a HEADING_HOLD en el rumbo original. + + Criterios de aceptación: + • Modo cambia a DODGE + • El buque vira hacia el setpoint de DODGE (original + 20°) + • Tras volver a HEADING_HOLD, el error respecto al rumbo original < 1° + """ + spec = TCSpec( + id="TC-07", + name="Modo DODGE (+20°) y retorno", + description=( + "El operador esquiva con offset +20° y luego vuelve al rumbo original. " + "Verifica la transición HEADING_HOLD ↔ DODGE ↔ HEADING_HOLD." + ), + acceptance=( + "DODGE activo, buque vira al setpoint DODGE, retorno sin error > 1°." + ), + ) + + sim = ESP32Simulator() + ORIGINAL_SP = 90.0 + DODGE_OFFSET = 20.0 + DODGE_SP = (ORIGINAL_SP + DODGE_OFFSET) % 360.0 + + sim.inject_nmea(heading_deg=ORIGINAL_SP, sog_kn=10.0) + sim.write_holding(0, 1) + sim.write_holding(1, int(ORIGINAL_SP * 100)) + sim.write_coil(0, 1) + sim.step(10.0) # estabilizar en 90° + + initial_heading_error = abs(heading_error_deg(ORIGINAL_SP, sim.heading)) + + # Activar DODGE con offset +20° + sim.write_holding(0, 4) # MODE_REQUEST = DODGE + sim.write_holding(8, int(DODGE_OFFSET * 100)) # DODGE_OFFSET_DEG_X100 + sim.write_coil(0, 1) # CMD_ENGAGE_REQUEST + sim.step(0.1) + + dodge_engaged = sim.mode == AutopilotMode.DODGE + dodge_setpoint = sim._heading_setpoint_deg + + # Dejar que el buque vire hacia el setpoint DODGE + sim.step(90.0) + + heading_at_dodge = sim.heading + dodge_error = abs(heading_error_deg(DODGE_SP, heading_at_dodge)) + + # Volver a HEADING_HOLD con el rumbo original + sim.write_holding(0, 1) + sim.write_holding(1, int(ORIGINAL_SP * 100)) + sim.write_coil(0, 1) + sim.step(0.1) + returned_to_hh = sim.mode == AutopilotMode.HEADING_HOLD + sim.step(120.0) + + final_error = abs(heading_error_deg(ORIGINAL_SP, sim.heading)) + + passed = ( + dodge_engaged + and abs(dodge_setpoint - DODGE_SP) < 0.5 + and dodge_error < 2.0 + and returned_to_hh + and final_error < 1.0 + ) + + return TCResult( + spec=spec, + passed=passed, + metrics={ + "dodge_engaged": "Sí" if dodge_engaged else "No", + "dodge_setpoint_deg": round(dodge_setpoint, 2), + "heading_at_dodge_end_deg": round(heading_at_dodge, 2), + "dodge_error_deg": round(dodge_error, 3), + "returned_to_hh": "Sí" if returned_to_hh else "No", + "final_error_vs_original_deg": round(final_error, 3), + }, + message=( + f"DODGE: {'OK' if dodge_engaged else 'NO'} sp={dodge_setpoint:.1f}° | " + f"Error en DODGE: {dodge_error:.2f}° | " + f"Retorno HH: {'OK' if returned_to_hh else 'NO'} | " + f"Error final vs original: {final_error:.3f}°" + ), + snapshots=sim.log, + events=sim.events, + ) + + +# --------------------------------------------------------------------------- +# Registro de todos los TCs +# --------------------------------------------------------------------------- +ALL_TCS: dict[str, Callable[[], TCResult]] = { + "TC-01": tc01_power_on_engage, + "TC-02": tc02_small_heading_change, + "TC-03": tc03_large_heading_change, + "TC-04": tc04_boundary_crossing, + "TC-05": tc05_manual_disengage, + "TC-06": tc06_heading_lost_alarm, + "TC-07": tc07_dodge_mode, +} + + +# --------------------------------------------------------------------------- +# Helpers de métricas +# --------------------------------------------------------------------------- +def _settling_time( + snapshots: list[SimSnapshot], target: float, tolerance: float = 1.0 +) -> float | None: + """ + Retorna el instante en que |error| entra en la banda de tolerancia + y ya no vuelve a salir. None si nunca asienta. + """ + if not snapshots: + return None + # Buscar el último instante donde |error| > tolerance + last_out = None + for s in snapshots: + if abs(heading_error_deg(target, s.heading_deg)) > tolerance: + last_out = s.t + if last_out is None: + return snapshots[0].t # siempre estuvo dentro + # Verificar que después de last_out permanece dentro + after = [s for s in snapshots if s.t > last_out] + if all(abs(heading_error_deg(target, s.heading_deg)) <= tolerance for s in after): + return last_out + return None # salió de la banda después del último out + + +def _overshoot( + snapshots: list[SimSnapshot], initial: float, target: float +) -> float: + """ + Sobrepasamiento en grados respecto al setpoint en la dirección de giro. + Positivo = cruzó el setpoint en la dirección del movimiento. + """ + if not snapshots: + return 0.0 + going_right = heading_error_deg(target, initial) > 0 # girando a estribor + peak = 0.0 + for s in snapshots[3:]: # ignorar los primeros muestras (transitorio inicial) + err = heading_error_deg(target, s.heading_deg) + if going_right: + # Sobrepasamiento = haber ido más allá del setpoint (error negativo) + if err < 0: + peak = max(peak, abs(err)) + else: + if err > 0: + peak = max(peak, abs(err)) + return peak + + +# --------------------------------------------------------------------------- +# Salida en consola +# --------------------------------------------------------------------------- +def _print_summary(results: list[TCResult]) -> None: + print() + print("=" * 72) + print(" AR-AUTOPILOT - Protocolo de Simulacion HIL") + print(f" {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("=" * 72) + print(f" {'ID':<8} {'Nombre':<35} {'Resultado':<8} {'Nota'}") + print(" " + "-" * 68) + for r in results: + badge = "[PASS]" if r.passed else "[FAIL]" + name = r.spec.name[:35] + note = r.message[:40] if len(r.message) > 40 else r.message + print(f" {r.spec.id:<8} {name:<35} {badge:<8} {note}") + print(" " + "-" * 68) + n_pass = sum(1 for r in results if r.passed) + n_fail = len(results) - n_pass + print(f" Total: {len(results)} PASS: {n_pass} FAIL: {n_fail}") + print("=" * 72) + print() + + +# --------------------------------------------------------------------------- +# Generador de informe HTML +# --------------------------------------------------------------------------- +def generate_html_report(results: list[TCResult], out_path: Path) -> None: + """Genera un informe HTML autocontenido con gráficas Chart.js.""" + + n_pass = sum(1 for r in results if r.passed) + n_fail = len(results) - n_pass + overall_badge = "PASS" if n_fail == 0 else "FAIL" + overall_color = "#22c55e" if n_fail == 0 else "#ef4444" + + tc_cards_html = "" + for r in results: + tc_cards_html += _render_tc_card(r) + + html_content = f""" + + + + +AR-Autopilot — Protocolo de Simulación + + + + +

AR-Autopilot · Protocolo de Simulación HIL

+
Generado: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} · Simulador Python puro · Sin hardware ESP32
+
+ {overall_badge} + {n_pass}/{len(results)} casos superados  |  {n_fail} fallos +
+{tc_cards_html} + +""" + + out_path.write_text(html_content, encoding="utf-8") + print(f" → Informe HTML: {out_path.resolve()}") + + +def _render_tc_card(r: TCResult) -> str: + """Genera el HTML de una tarjeta de caso de prueba.""" + passed_class = "pass" if r.passed else "fail" + passed_label = "PASS" if r.passed else "FAIL" + + # Métricas + metrics_html = "" + for k, v in r.metrics.items(): + metrics_html += ( + f'' + f'{html.escape(str(k))}: ' + f'{html.escape(str(v))}' + f'' + ) + + # Datos del gráfico (subsamplear si hay muchos puntos) + snaps = r.snapshots + if len(snaps) > 600: + step = len(snaps) // 600 + snaps = snaps[::step] + + times = [round(s.t, 2) for s in snaps] + headings = [round(s.heading_deg, 2) for s in snaps] + setpoints = [round(s.heading_setpoint_deg, 2) for s in snaps] + rudders = [round(s.rudder_angle_deg, 2) for s in snaps] + rots = [round(s.rot_dps, 3) for s in snaps] + + chart_id = r.spec.id.replace("-", "_") + + # Eventos + events_html = "" + for ev in r.events: + ev_class = { + "engage": "ev-engage", "engage_refused": "ev-alarm", + "disengage": "ev-disengage", "alarm": "ev-alarm", "ack": "ev-ack", + }.get(ev.kind, "") + events_html += ( + f'
' + f't={ev.t:.2f}s [{ev.kind}] {html.escape(ev.detail)}' + f'
' + ) + + return f""" +
+
+ {html.escape(r.spec.id)} + {html.escape(r.spec.name)} + {passed_label} +
+
{html.escape(r.spec.description)}
+
* Aceptación: {html.escape(r.spec.acceptance)}
+
{metrics_html}
+
{html.escape(r.message)}
+
+ +
+
{events_html}
+
+ +""" + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- +def main() -> int: + parser = argparse.ArgumentParser( + description="AR-Autopilot: protocolo de simulación HIL", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(""" + Ejemplos: + python tools/sim_protocol.py + python tools/sim_protocol.py TC-02 TC-04 + python tools/sim_protocol.py --out resultado.html + """), + ) + parser.add_argument( + "tests", nargs="*", metavar="TC-NN", + help="IDs de casos a ejecutar (default: todos)", + ) + parser.add_argument( + "--out", default="tools/sim_report.html", + metavar="ARCHIVO", + help="Ruta del informe HTML (default: tools/sim_report.html)", + ) + args = parser.parse_args() + + selected = args.tests or list(ALL_TCS.keys()) + unknown = [tc for tc in selected if tc not in ALL_TCS] + if unknown: + print(f"Error: TCs desconocidos: {', '.join(unknown)}") + print(f"Disponibles: {', '.join(ALL_TCS)}") + return 1 + + results: list[TCResult] = [] + for tc_id in selected: + print(f" Ejecutando {tc_id} …", end="", flush=True) + result = ALL_TCS[tc_id]() + badge = "PASS" if result.passed else "FAIL" + print(f" {badge}") + results.append(result) + + _print_summary(results) + + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + generate_html_report(results, out_path) + + return 0 if all(r.passed for r in results) else 1 + + +if __name__ == "__main__": + sys.exit(main())