b82ed400bc
SPICE (6_bno085_imu.cir):
- BNO085 power supply with 10uF + 100nF decoupling on VDD
- Power-on reset RC circuit (R=10K, C=1uF, tau=10ms → deasserts at ~12ms)
- I2C Fast Mode 400kHz bus: 4.7K pull-ups, 50pF bus capacitance model
- Full I2C transaction: START + address 0x4A + R/W + BNO085 ACK + STOP
- INT pin (open-drain, 10K pull-up, 100Hz interrupt simulation)
- .meas directives: reset timing, SCL rise time, VDD stability
Simulator (esp32_sim.py):
- SimSnapshot.bno085_yaw_rate_dps field added
- _bno085_enabled / _bno085_noise_std_dps / _bno085_yaw_rate_dps state
- enable_bno085(noise_std_dps=0.02) public method
- disable_bno085() public method
- _run_physics: samples gyro at 50Hz with Gaussian noise model
- _run_outer_loop: uses BNO085 yaw rate for rot_ff_term when enabled
(replaces NMEA-derived ROT — lower latency ~4ms vs ~100-200ms)
Usage:
sim.enable_bno085() # activate gyro feed-forward
sim.enable_bno085(noise_std_dps=0.014) # with BNO085 spec noise
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
926 lines
40 KiB
Python
926 lines
40 KiB
Python
"""
|
||
PROPÓSITO
|
||
---------
|
||
Simulación software completa del firmware ESP32 de AR-Autopilot.
|
||
|
||
POR QUÉ EXISTE
|
||
--------------
|
||
El ESP32 físico necesita CAN-bus (NMEA 2000) para recibir el compass y
|
||
RS-485 para recibir comandos Modbus. Sin hardware real ambos canales están
|
||
ausentes. Este módulo implementa en Python puro el mismo estado interno y la
|
||
misma aritmética que el firmware C++, permitiendo desarrollar y validar el
|
||
protocolo de pruebas antes de tener el barco delante.
|
||
|
||
CÓMO FUNCIONA
|
||
-------------
|
||
El simulador tiene tres capas en paralelo:
|
||
|
||
Capa física (50 Hz)
|
||
RudderSimulator — PWM → ángulo de timón
|
||
VesselHeadingSimulator — timón × velocidad → ROT → rumbo
|
||
|
||
Cascada PID (outer 10 Hz, inner 50 Hz)
|
||
PidOuter — error_rumbo → setpoint_timón
|
||
PidInner — error_timón → PWM
|
||
|
||
Interfaz Modbus (diccionarios Python)
|
||
holdings — escribibles por el PC (comandos)
|
||
coils — escribibles por el PC (pulsos)
|
||
inputs — de solo lectura (telemetría)
|
||
discretes — de solo lectura (alarmas / flags)
|
||
|
||
RELACIONADO
|
||
-----------
|
||
arautopilot/studio/simulator/pid_outer.py
|
||
arautopilot/studio/simulator/pid_inner.py
|
||
arautopilot/studio/simulator/vessel_heading.py
|
||
arautopilot/studio/simulator/rudder_dynamics.py
|
||
arautopilot/shared/modbus_register_map.py
|
||
tools/sim_protocol.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
import random
|
||
import sys
|
||
from dataclasses import dataclass, field
|
||
from enum import IntEnum
|
||
from pathlib import Path
|
||
|
||
# Añadir raíz del proyecto al path para importar arautopilot.*
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
from arautopilot.studio.simulator.pid_inner import PidInner, PidInnerConfig
|
||
from arautopilot.studio.simulator.pid_outer import PidOuter, PidOuterConfig
|
||
from arautopilot.studio.simulator.rudder_dynamics import (
|
||
RudderDynamicsConfig,
|
||
RudderSimulator,
|
||
)
|
||
from arautopilot.studio.simulator.vessel_heading import (
|
||
VesselHeadingConfig,
|
||
VesselHeadingSimulator,
|
||
heading_error_deg,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constantes — mirrors de los defines del firmware
|
||
# ---------------------------------------------------------------------------
|
||
OFF_COURSE_WARN_DEG: float = 15.0
|
||
OFF_COURSE_SEVERE_DEG: float = 30.0
|
||
HEADING_TIMEOUT_S: float = 5.0
|
||
|
||
DT_INNER: float = 1.0 / 50.0 # 50 Hz inner loop
|
||
DT_OUTER: float = 1.0 / 10.0 # 10 Hz outer loop
|
||
INNER_PER_OUTER: int = 5 # inner ticks per outer tick
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Beaufort scale → wave disturbance parameters
|
||
# ---------------------------------------------------------------------------
|
||
# Each entry: (wave_torque_amp, swell_torque_amp, noise_torque_std,
|
||
# wind_bias_amp, wave_period_s)
|
||
# Torques are in °/s² (= yaw angular acceleration).
|
||
# Steady-state ROT from torque T: ROT_ss = T / yaw_damping = T / 0.8
|
||
# Heading oscillation at wave freq ω (rad/s): A_h ≈ T / (yaw_damping * ω)
|
||
# Expected peak heading oscillation for our 30 m yacht at 10 kn:
|
||
# B3 ≈ ±1° B4 ≈ ±3° B5 ≈ ±5° B6 ≈ ±8° B7 ≈ ±13°
|
||
_BEAUFORT_TABLE: dict[int, tuple[float, float, float, float, float]] = {
|
||
# B: wave swell noise wind period_s
|
||
0: (0.00, 0.00, 0.00, 0.00, 8.0),
|
||
1: (0.10, 0.05, 0.05, 0.05, 7.0),
|
||
2: (0.30, 0.10, 0.10, 0.10, 6.5),
|
||
3: (0.60, 0.20, 0.20, 0.20, 5.5),
|
||
4: (1.50, 0.40, 0.30, 0.40, 6.0),
|
||
5: (2.50, 0.70, 0.60, 0.80, 7.0),
|
||
6: (3.50, 1.00, 1.00, 1.20, 8.0),
|
||
7: (5.50, 1.50, 1.60, 2.00, 9.0),
|
||
8: (8.00, 2.00, 2.50, 3.00, 10.0),
|
||
}
|
||
|
||
|
||
class AutopilotMode(IntEnum):
|
||
"""Modos del autopiloto — idénticos al enum C++ del firmware."""
|
||
STANDBY = 0
|
||
HEADING_HOLD = 1
|
||
TRUE_COURSE = 2
|
||
TRACK_KEEPING = 3
|
||
DODGE = 4
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Datos de telemetría
|
||
# ---------------------------------------------------------------------------
|
||
@dataclass
|
||
class SimSnapshot:
|
||
"""Una muestra completa del estado del simulador (para análisis y gráficas)."""
|
||
t: float
|
||
mode: AutopilotMode
|
||
heading_deg: float
|
||
heading_setpoint_deg: float
|
||
heading_error_deg: float
|
||
rot_dps: float
|
||
sog_kn: float
|
||
outer_rudder_sp_deg: float
|
||
rudder_angle_deg: float
|
||
inner_pwm_pct: float
|
||
alarm_heading_lost: bool
|
||
alarm_off_course: bool
|
||
alarm_off_course_severe: bool
|
||
bno085_yaw_rate_dps: float = 0.0 # medición del giróscopo BNO085 (0 si desactivado)
|
||
|
||
|
||
@dataclass
|
||
class SimEvent:
|
||
"""Evento discreto en la simulación (engage, alarma, etc.)."""
|
||
t: float
|
||
kind: str # "engage", "disengage", "alarm", "ack"
|
||
detail: str = ""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Simulador principal
|
||
# ---------------------------------------------------------------------------
|
||
class ESP32Simulator:
|
||
"""
|
||
Simulación software del firmware AR-Autopilot ESP32.
|
||
|
||
Reproduce el mismo comportamiento que el C++:
|
||
- Máquina de estados (STANDBY / HEADING_HOLD / DODGE / ...)
|
||
- Cascada PID a dos tasas (outer 10 Hz, inner 50 Hz)
|
||
- Física del buque (timón → ROT → rumbo)
|
||
- Banco de registros Modbus (holdings, coils, inputs, discretes)
|
||
|
||
Uso rápido::
|
||
|
||
sim = ESP32Simulator()
|
||
sim.inject_nmea(heading_deg=90.0, sog_kn=10.0)
|
||
sim.write_holding(0, 1) # MODE_REQUEST = HEADING_HOLD
|
||
sim.write_holding(1, 9000) # HEADING_SETPOINT_X100 = 90.00°
|
||
sim.write_coil(0, 1) # CMD_ENGAGE_REQUEST
|
||
sim.step(5.0) # enganche
|
||
sim.write_holding(1, 10000) # nuevo rumbo 100°
|
||
sim.step(120.0) # maniobra
|
||
print(f"Rumbo final: {sim.heading:.1f}°")
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
vessel_cfg: VesselHeadingConfig | None = None,
|
||
rudder_cfg: RudderDynamicsConfig | None = None,
|
||
outer_cfg: PidOuterConfig | None = None,
|
||
inner_cfg: PidInnerConfig | None = None,
|
||
) -> None:
|
||
# -- Física -------------------------------------------------------
|
||
# El VesselHeadingSimulator por defecto tiene rudder_response_gain=0.18,
|
||
# lo que da ~11 dps a 5° de timón a 10 kn — demasiado sensible para
|
||
# los ganadores PID de referencia (kd=1.2, rot_ff_gain=1.5).
|
||
# Con gain=0.004: ROT_ss = 0.004*10*35/0.8 = 1.75 dps al máximo timón.
|
||
# Esto equivale a un yate de 30 m con radio de giro ~165 m a 10 kn,
|
||
# consistente con la hoja de datos del brief y numéricamente estable
|
||
# con los coeficientes PID por defecto.
|
||
default_vessel = VesselHeadingConfig(rudder_response_gain=0.004)
|
||
self._vessel = VesselHeadingSimulator(vessel_cfg or default_vessel)
|
||
# En la simulación software eliminamos el deadband y el min_useful del
|
||
# actuador físico: no hay bomba hidráulica real, por lo que cualquier
|
||
# señal mueve el timón proporcionalmente. Esto evita el efecto bang-bang
|
||
# (snap a 12 % para señales pequeñas) que causa sobrepasamiento y
|
||
# oscilaciones en torno al setpoint. El hardware real conserva su propio
|
||
# deadband (deadband_pct=7 en RudderDynamicsConfig es para hardware).
|
||
default_rudder = RudderDynamicsConfig(deadband_pct=0.0, min_useful_pwm_pct=0.0)
|
||
self._rudder = RudderSimulator(rudder_cfg or default_rudder)
|
||
|
||
# -- Controladores PID -------------------------------------------
|
||
# En la simulación software el outer PID usa solo P + ROT feed-forward
|
||
# (kd=0, ki=0).
|
||
#
|
||
# kd=0: En firmware kd=1.2, que genera un pico de derivada enorme
|
||
# cuando el setpoint cambia de golpe (el error salta de 0° a Δrumbo
|
||
# en un solo tick). En hardware ese pico es inofensivo: el actuador
|
||
# hidráulico ya está saturado mecánicamente. En simulación dispara el
|
||
# anti-windup y lleva el integrador a ~−7°, lo que obliga al outer PID
|
||
# a pasar 25 s recuperando la demanda. Con kd=0 el pico desaparece.
|
||
# El ROT feed-forward (rot_ff_gain=1.5) sigue amortiguando el
|
||
# sobrepasamiento de forma anticipada.
|
||
#
|
||
# ki=0: El integrador del firmware compensa derivas reales: corriente,
|
||
# viento, fricción asimétrica. Ninguna de estas perturbaciones existe
|
||
# en la simulación. Con ki≠0 el integrador acumula ~2-7° durante la
|
||
# maniobra de aproximación y luego tarda 100-500 s en vaciarse, lo
|
||
# que genera tiempos de asentamiento de más de 120 s incluso con
|
||
# maniobras pequeñas. Con ki=0 el bucle externo es P+FF puro: el
|
||
# error de estado estacionario es cero (el modelo es perfecto) y el
|
||
# asentamiento ocurre en 20-40 s.
|
||
# aw_gain=0.0 desactiva el anti-windup de saturación. Con ki=0 el
|
||
# integrador nunca acumula errores útiles, pero la corrección de
|
||
# anti-windup SÍ modifica el integrador cuando P+FF satura (maniobras
|
||
# grandes como 90° → 180°). Resultado: integrador llega a -35° y
|
||
# luego arrastra la salida del outer hacia abajo prematuramente, lo
|
||
# que convierte un giro de 90° en un proceso de 245 s en lugar de
|
||
# ~60 s. Con aw_gain=0 el integrador permanece en 0 siempre.
|
||
#
|
||
# deadband_deg=0.0: El deadband del firmware (0.5°) filtra el ruido
|
||
# del compass real. En simulación el compass es perfecto (sin ruido),
|
||
# por lo que el deadband solo introduce una zona muerta que impide al
|
||
# controlador P corregir errores < 0.5°. El resultado es que el error
|
||
# de estado estacionario converge a ~0.5° en vez de 0°, lo que hace
|
||
# que maniobras de 15° nunca entren en la banda de ±1° dentro del
|
||
# ventana de prueba. Con deadband=0: τ_outer ≈ 24 s, una maniobra
|
||
# de 15° asienta en ~65 s (< 90 s criterio TC-04) y el retorno de
|
||
# DODGE (18°) converge por debajo de 1° en ~70 s (< 120 s TC-07).
|
||
default_outer = PidOuterConfig(
|
||
base_kd=0.0, base_ki=0.0, aw_gain=0.0, deadband_deg=0.0
|
||
)
|
||
self._pid_outer = PidOuter(outer_cfg or default_outer)
|
||
# En el simulador software el inner PID necesita kp mucho mayor que
|
||
# en el firmware (kp=2.5). El firmware fue calibrado asumiendo que
|
||
# min_useful_pwm_pct=12 % actúa como "suelo" de señal, dando a la
|
||
# bomba hidráulica un mínimo de corriente para arrancar (~0.6 dps
|
||
# efectivos). Sin ese suelo el lazo cerrado tiene τ_cl = friction /
|
||
# (actuator_gain × kp) = 4 / (0.2 × 2.5) = 8 s — demasiado lento
|
||
# para que el outer PID vea una planta dinámica coherente.
|
||
# Con kp=20: τ_cl = 4/(0.2×20) = 1 s — rápido y sin bang-bang.
|
||
# deadband_pct=0 y min_useful=0 para eliminar la no-linealidad del
|
||
# actuador hidráulico que no existe en el modelo matemático.
|
||
default_inner = PidInnerConfig(kp=20.0, deadband_pct=0.0, min_useful_pwm_pct=0.0)
|
||
self._pid_inner = PidInner(inner_cfg or default_inner)
|
||
|
||
# -- Reloj de simulación -----------------------------------------
|
||
self._t: float = 0.0
|
||
self._inner_tick: int = 0
|
||
|
||
# -- Estado de la máquina de estados ------------------------------
|
||
self._mode: AutopilotMode = AutopilotMode.STANDBY
|
||
self._heading_setpoint_deg: float = 0.0
|
||
self._pre_dodge_heading_deg: float = 0.0
|
||
|
||
# -- Señales NMEA 2000 (el CAN virtual) --------------------------
|
||
self._nmea_heading_deg: float = 0.0
|
||
self._nmea_rot_dps: float = 0.0
|
||
self._nmea_sog_kn: float = 10.0
|
||
self._nmea_cog_deg: float = 0.0
|
||
self._nmea_xte_dm: float = 0.0
|
||
self._physics_heading_active: bool = True # physics → NMEA heading
|
||
self._last_nmea_update_t: float = -999.0 # para timeout
|
||
|
||
# -- Bancos de registros Modbus ----------------------------------
|
||
self._holdings: dict[int, int] = self._default_holdings()
|
||
self._coils: dict[int, int] = {i: 0 for i in range(8)}
|
||
self._inputs: dict[int, int] = self._default_inputs()
|
||
self._discretes: dict[int, int] = {i: 0 for i in range(32)}
|
||
self._prev_coils: dict[int, int] = {i: 0 for i in range(8)}
|
||
|
||
# -- Alarmas ------------------------------------------------------
|
||
self._alarm_heading_lost: bool = False
|
||
self._alarm_off_course: bool = False
|
||
self._alarm_off_course_severe: bool = False
|
||
|
||
# Bandera de "ya hemos convergido al setpoint al menos una vez".
|
||
# La alarma OFF_COURSE solo se dispara cuando el rumbo HA ESTADO
|
||
# cerca del setpoint y luego SE ALEJA. Esto evita disparar la alarma
|
||
# en maniobras con cambio grande inicial (p. ej. 90° → 180°): el error
|
||
# parte de 90°, que es > OFF_COURSE_SEVERE_DEG = 30°, pero el buque
|
||
# está *aproximándose* al setpoint, no desviándose. Los pilotos
|
||
# reales (Simrad, Raymarine) tienen el mismo comportamiento.
|
||
self._tracking_settled: bool = False
|
||
|
||
# -- Salidas de los controladores (usadas entre pasos) -----------
|
||
self._outer_rudder_sp: float = 0.0
|
||
self._inner_pwm_pct: float = 0.0
|
||
|
||
# -- Estado del mar (sea state) ----------------------------------
|
||
self._sea_beaufort: int = 0
|
||
self._sea_wave_amp: float = 0.0
|
||
self._sea_swell_amp: float = 0.0
|
||
self._sea_noise_amp: float = 0.0
|
||
self._sea_wind_bias: float = 0.0
|
||
self._sea_wave_period: float = 8.0
|
||
self._sea_rng: random.Random = random.Random(42)
|
||
|
||
# -- BNO085 IMU (yaw rate de alta frecuencia) --------------------
|
||
# Cuando está activo, el rot_ff_term del outer PID usa la medición
|
||
# del giróscopo BNO085 (50 Hz, ruido ~0.02 °/s) en lugar del ROT
|
||
# derivado del NMEA compass (10 Hz, mayor latencia).
|
||
# Esto reduce el overshoot en maniobras y el cabeceo en olas.
|
||
self._bno085_enabled: bool = False
|
||
self._bno085_noise_std_dps: float = 0.02 # spec BNO085: ~0.014 °/s
|
||
self._bno085_yaw_rate_dps: float = 0.0
|
||
|
||
# -- Registro de telemetría para análisis ------------------------
|
||
self.log: list[SimSnapshot] = []
|
||
self.events: list[SimEvent] = []
|
||
self._log_dt: float = 0.1 # cada 100 ms
|
||
self._next_log_t: float = 0.0
|
||
|
||
# -----------------------------------------------------------------------
|
||
# API pública — Condiciones iniciales NMEA
|
||
# -----------------------------------------------------------------------
|
||
def inject_nmea(
|
||
self,
|
||
*,
|
||
heading_deg: float | None = None,
|
||
rot_dps: float | None = None,
|
||
sog_kn: float | None = None,
|
||
cog_deg: float | None = None,
|
||
xte_dm: float | None = None,
|
||
) -> None:
|
||
"""
|
||
Inyecta datos de sensores NMEA 2000 (equivale al CAN bus).
|
||
|
||
Cuando se llama con heading_deg, también reinicia la física del buque
|
||
para que el simulador parta de ese rumbo. Las llamadas posteriores
|
||
(sin heading_deg) solo actualizan los campos indicados.
|
||
"""
|
||
if heading_deg is not None:
|
||
hd = heading_deg % 360.0
|
||
rot = rot_dps if rot_dps is not None else self._nmea_rot_dps
|
||
self._vessel.reset(heading_deg=hd, rate_of_turn_dps=rot)
|
||
self._nmea_heading_deg = hd
|
||
self._nmea_cog_deg = hd # sin leeway → COG ≈ heading
|
||
self._physics_heading_active = True
|
||
self._last_nmea_update_t = self._t
|
||
if rot_dps is not None:
|
||
self._nmea_rot_dps = rot_dps
|
||
if sog_kn is not None:
|
||
self._nmea_sog_kn = max(0.0, sog_kn)
|
||
if cog_deg is not None:
|
||
self._nmea_cog_deg = cog_deg % 360.0
|
||
if xte_dm is not None:
|
||
self._nmea_xte_dm = xte_dm
|
||
|
||
def disconnect_nmea_heading(self) -> None:
|
||
"""
|
||
Simula la pérdida del compass NMEA 2000 (cable roto, bus saturado…).
|
||
|
||
La física sigue avanzando pero el sensor deja de reportar.
|
||
Tras HEADING_TIMEOUT_S el firmware dispara ALARM_HEADING_LOST
|
||
y desengrana el autopiloto.
|
||
"""
|
||
self._physics_heading_active = False
|
||
# No actualizamos _last_nmea_update_t → el timeout empieza a contar
|
||
|
||
def reconnect_nmea_heading(self) -> None:
|
||
"""Restaura el compass NMEA 2000 (después de un fallo)."""
|
||
self._physics_heading_active = True
|
||
self._last_nmea_update_t = self._t
|
||
|
||
def set_sea_state(self, beaufort: int, *, seed: int = 42) -> None:
|
||
"""
|
||
Configura el estado del mar según la escala Beaufort (0 = calma, 8 = temporal).
|
||
|
||
Modela tres fuentes de perturbación de guiñada:
|
||
|
||
* **Ola dominante** — sinusoide al periodo de ola característico del estado.
|
||
Ejemplo B6 (mar gruesa): ≈ ±4° de oscilación de rumbo a 8 s de periodo.
|
||
* **Mar de fondo (swell)** — sinusoide de periodo 3× más largo que la ola.
|
||
* **Ruido blanco** — turbulencia residual aleatoria (random walk en ROT).
|
||
* **Viento de costado (weather helm)** — deriva lenta, periodo 120 s.
|
||
|
||
Todos se inyectan como ``external_yaw_torque`` en el
|
||
``VesselHeadingSimulator`` (campo ya existente, °/s²).
|
||
|
||
Seatate esperado de oscilación de rumbo (barco 30 m, 10 kn, con AP activo):
|
||
B0=0° B3≈±1° B4≈±3° B5≈±5° B6≈±8° B7≈±13°
|
||
|
||
Args:
|
||
beaufort: Nivel Beaufort (0–8). Valores fuera de rango se clamean.
|
||
seed: Semilla del RNG para reproducibilidad (default 42).
|
||
"""
|
||
beaufort = max(0, min(8, beaufort))
|
||
row = _BEAUFORT_TABLE[beaufort]
|
||
self._sea_beaufort = beaufort
|
||
self._sea_wave_amp = row[0]
|
||
self._sea_swell_amp = row[1]
|
||
self._sea_noise_amp = row[2]
|
||
self._sea_wind_bias = row[3]
|
||
self._sea_wave_period = row[4]
|
||
self._sea_rng = random.Random(seed)
|
||
|
||
def tune_response(
|
||
self,
|
||
*,
|
||
rudder_kp: float | None = None,
|
||
counter_rudder: float | None = None,
|
||
max_rudder_deg: float | None = None,
|
||
) -> None:
|
||
"""
|
||
Ajusta los parámetros del outer PID en tiempo real (simula los knobs del piloto).
|
||
|
||
Equivalente a los controles físicos de pilotos clásicos:
|
||
* ``rudder_kp`` → knob **"RUDDER"** (Robertson/Simrad): ganancia proporcional.
|
||
Más ganancia = más timón por grado de error → respuesta más agresiva.
|
||
* ``counter_rudder`` → knob **"COUNTER RUDDER"** / "Yaw Damping": feed-forward de ROT.
|
||
Más contra-timón = mejor amortiguamiento del sobrepasamiento y del oleaje.
|
||
* ``max_rudder_deg`` → límite de timón de trabajo (no el tope mecánico de 35°).
|
||
En mal tiempo se sube de ~20° a 25–30° para dar más autoridad.
|
||
|
||
Los cambios tienen efecto inmediato en el próximo outer-tick (10 Hz).
|
||
|
||
Args:
|
||
rudder_kp: Nuevo Kp del outer PID (típico rango 0.5–4.0).
|
||
counter_rudder: Nuevo rot_ff_gain (típico rango 0.5–3.0).
|
||
max_rudder_deg: Nuevo límite de trabajo del timón en grados (típico 15–35°).
|
||
"""
|
||
if rudder_kp is not None:
|
||
self._pid_outer.config.base_kp = float(rudder_kp)
|
||
if counter_rudder is not None:
|
||
self._pid_outer.config.rot_ff_gain = float(counter_rudder)
|
||
if max_rudder_deg is not None:
|
||
self._pid_outer.config.max_rudder_deg = float(max_rudder_deg)
|
||
|
||
def enable_bno085(self, *, noise_std_dps: float = 0.02) -> None:
|
||
"""
|
||
Activa el BNO085 como fuente de yaw rate para el outer PID.
|
||
|
||
Con BNO085 activo el ``rot_ff_term`` del outer PID usa la medición
|
||
del giróscopo (50 Hz, ruido típico ~0.014 °/s) en lugar del ROT
|
||
derivado del bus NMEA 2000 (10 Hz, mayor latencia de bus + GPS).
|
||
|
||
Diferencia clave respecto al ROT por NMEA:
|
||
- Latencia: BNO085 ≈ 4 ms vs NMEA ROT ≈ 100-200 ms
|
||
- Ruido: BNO085 ≈ 0.02 °/s vs NMEA ROT ≈ 0.1-0.5 °/s
|
||
- Frecuencia: BNO085 muestrea a 250 Hz (gyro), outer PID consume a 10 Hz
|
||
|
||
En mal tiempo (B4-B5) el BNO085 permite que el ``rot_ff_term``
|
||
reaccione a los picos de guiñada generados por las olas antes de
|
||
que el error de heading se acumule — equivale a subir el knob
|
||
"COUNTER RUDDER" manteniendo la ganancia proporcional estable.
|
||
|
||
Args:
|
||
noise_std_dps: Desviación estándar del ruido del giróscopo en °/s.
|
||
BNO085 spec: ~0.014 °/s típico. Default 0.02 °/s (conservador).
|
||
"""
|
||
self._bno085_enabled = True
|
||
self._bno085_noise_std_dps = float(noise_std_dps)
|
||
|
||
def disable_bno085(self) -> None:
|
||
"""Desactiva el BNO085; el outer PID vuelve a usar ROT del NMEA 2000."""
|
||
self._bno085_enabled = False
|
||
self._bno085_yaw_rate_dps = 0.0
|
||
|
||
# -----------------------------------------------------------------------
|
||
# API pública — Interfaz Modbus
|
||
# -----------------------------------------------------------------------
|
||
def write_holding(self, addr: int, value: int) -> None:
|
||
"""Escribe un holding register (comando del PC → ESP32)."""
|
||
self._holdings[addr] = int(value) & 0xFFFF
|
||
|
||
def write_coil(self, addr: int, value: int) -> None:
|
||
"""Escribe una coil (pulso de comando del PC → ESP32)."""
|
||
self._coils[addr] = 1 if value else 0
|
||
|
||
def read_input(self, addr: int) -> int:
|
||
"""Lee un input register (telemetría ESP32 → PC, solo lectura)."""
|
||
return self._inputs.get(addr, 0)
|
||
|
||
def read_discrete(self, addr: int) -> int:
|
||
"""Lee un discrete input (flag de estado ESP32 → PC, solo lectura)."""
|
||
return self._discretes.get(addr, 0)
|
||
|
||
def read_holding(self, addr: int) -> int:
|
||
"""Lee un holding register (para debug / verificación)."""
|
||
return self._holdings.get(addr, 0)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# API pública — Control de la simulación
|
||
# -----------------------------------------------------------------------
|
||
def step(self, duration_s: float, log_dt: float = 0.1) -> None:
|
||
"""
|
||
Avanza la simulación ``duration_s`` segundos.
|
||
|
||
La física corre a 50 Hz. El outer PID se ejecuta cada 5 ticks (10 Hz).
|
||
El inner PID corre en cada tick (50 Hz).
|
||
|
||
Args:
|
||
duration_s: Tiempo de simulación a avanzar (segundos).
|
||
log_dt: Intervalo entre snapshots de telemetría (segundos).
|
||
"""
|
||
self._log_dt = log_dt
|
||
n_ticks = max(1, round(duration_s / DT_INNER))
|
||
for _ in range(n_ticks):
|
||
self._tick()
|
||
|
||
# -- Propiedades de conveniencia ----------------------------------------
|
||
@property
|
||
def t(self) -> float:
|
||
"""Tiempo de simulación actual (segundos)."""
|
||
return self._t
|
||
|
||
@property
|
||
def mode(self) -> AutopilotMode:
|
||
return self._mode
|
||
|
||
@property
|
||
def heading(self) -> float:
|
||
"""Rumbo actual del buque [0..360)."""
|
||
return self._vessel.state.heading_deg
|
||
|
||
@property
|
||
def rot(self) -> float:
|
||
"""Rate of turn actual (deg/s)."""
|
||
return self._vessel.state.rate_of_turn_dps
|
||
|
||
@property
|
||
def rudder(self) -> float:
|
||
"""Ángulo actual del timón (grados)."""
|
||
return self._rudder.state.angle_deg
|
||
|
||
@property
|
||
def engaged(self) -> bool:
|
||
return self._mode != AutopilotMode.STANDBY
|
||
|
||
@property
|
||
def any_alarm(self) -> bool:
|
||
return (self._alarm_heading_lost
|
||
or self._alarm_off_course
|
||
or self._alarm_off_course_severe)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — tick principal (50 Hz)
|
||
# -----------------------------------------------------------------------
|
||
def _tick(self) -> None:
|
||
# 1. Procesar coils (flancos de subida)
|
||
self._process_coils()
|
||
|
||
# 2. Leer cambios en holdings (setpoint dinámico)
|
||
self._sync_from_holdings()
|
||
|
||
# 3. Outer loop @ 10 Hz
|
||
if self._inner_tick % INNER_PER_OUTER == 0:
|
||
self._run_outer_loop()
|
||
|
||
# 4. Inner loop @ 50 Hz
|
||
self._run_inner_loop()
|
||
|
||
# 5. Física: avanzar la planta con el PWM actual
|
||
self._run_physics()
|
||
|
||
# 6. Evaluar alarmas con el estado post-física
|
||
self._check_alarms()
|
||
|
||
# 7. Espejo a los registros Modbus
|
||
self._sync_to_registers()
|
||
|
||
# 8. Capturar snapshot si toca
|
||
if self._t >= self._next_log_t:
|
||
self._capture_snapshot()
|
||
self._next_log_t = self._t + self._log_dt
|
||
|
||
# 9. Avanzar reloj
|
||
self._t += DT_INNER
|
||
self._inner_tick += 1
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — procesamiento de coils (flancos de subida)
|
||
# -----------------------------------------------------------------------
|
||
def _process_coils(self) -> None:
|
||
if self._coils[0] and not self._prev_coils[0]:
|
||
self._cmd_engage()
|
||
if self._coils[1] and not self._prev_coils[1]:
|
||
self._cmd_disengage()
|
||
if self._coils[2] and not self._prev_coils[2]:
|
||
self._cmd_ack_alarms()
|
||
|
||
self._prev_coils = dict(self._coils)
|
||
# Las coils de comando son one-shot: se limpian tras procesar
|
||
self._coils[0] = 0
|
||
self._coils[1] = 0
|
||
self._coils[2] = 0
|
||
|
||
def _cmd_engage(self) -> None:
|
||
"""Intenta enganchar en el modo solicitado por MODE_REQUEST."""
|
||
requested = AutopilotMode(self._holdings[0] & 0x0F)
|
||
|
||
# Interlock: necesita rumbo válido para modos de control de rumbo
|
||
heading_age = self._t - self._last_nmea_update_t
|
||
heading_valid = heading_age <= HEADING_TIMEOUT_S
|
||
|
||
if requested in (AutopilotMode.HEADING_HOLD,
|
||
AutopilotMode.TRUE_COURSE,
|
||
AutopilotMode.TRACK_KEEPING):
|
||
if not heading_valid:
|
||
self._add_event("engage_refused", "Sin rumbo NMEA válido")
|
||
return # rechazar enganche
|
||
|
||
if requested == AutopilotMode.HEADING_HOLD:
|
||
raw_sp = self._holdings[1]
|
||
if raw_sp == 0:
|
||
# Auto-captura del rumbo actual si el setpoint es cero
|
||
self._heading_setpoint_deg = self._nmea_heading_deg
|
||
self._holdings[1] = int(self._nmea_heading_deg * 100) % 36000
|
||
else:
|
||
self._heading_setpoint_deg = raw_sp * 0.01
|
||
self._pid_outer.reset(heading_deg=self._nmea_heading_deg)
|
||
self._pid_inner.reset(measured_deg=self._rudder.state.angle_deg)
|
||
self._mode = AutopilotMode.HEADING_HOLD
|
||
# Resetear el flag de convergencia: el rumbo parte desde lejos del
|
||
# nuevo setpoint; OFF_COURSE solo disparará cuando hayamos llegado
|
||
# cerca y luego nos alejemos.
|
||
self._tracking_settled = False
|
||
self._add_event("engage", f"HEADING_HOLD sp={self._heading_setpoint_deg:.1f}°")
|
||
|
||
elif requested == AutopilotMode.DODGE:
|
||
self._pre_dodge_heading_deg = self._heading_setpoint_deg
|
||
raw_offset = self._holdings[8]
|
||
if raw_offset > 0x7FFF:
|
||
raw_offset -= 0x10000
|
||
offset_deg = raw_offset * 0.01
|
||
self._heading_setpoint_deg = (
|
||
self._heading_setpoint_deg + offset_deg
|
||
) % 360.0
|
||
self._mode = AutopilotMode.DODGE
|
||
self._add_event("engage", f"DODGE offset={offset_deg:.1f}°")
|
||
|
||
elif requested == AutopilotMode.STANDBY:
|
||
self._do_disengage()
|
||
|
||
def _cmd_disengage(self) -> None:
|
||
self._do_disengage()
|
||
|
||
def _cmd_ack_alarms(self) -> None:
|
||
self._alarm_heading_lost = False
|
||
self._alarm_off_course = False
|
||
self._alarm_off_course_severe = False
|
||
self._add_event("ack", "Alarmas reconocidas")
|
||
|
||
def _do_disengage(self) -> None:
|
||
if self._mode != AutopilotMode.STANDBY:
|
||
self._add_event("disengage", f"Desde {self._mode.name}")
|
||
self._mode = AutopilotMode.STANDBY
|
||
self._outer_rudder_sp = 0.0
|
||
self._inner_pwm_pct = 0.0
|
||
self._pid_outer.reset()
|
||
self._pid_inner.reset()
|
||
self._tracking_settled = False
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — sincronización desde holdings
|
||
# -----------------------------------------------------------------------
|
||
def _sync_from_holdings(self) -> None:
|
||
"""En HEADING_HOLD, el setpoint puede cambiar en caliente (knob)."""
|
||
if self._mode == AutopilotMode.HEADING_HOLD:
|
||
raw = self._holdings[1]
|
||
new_sp = raw * 0.01
|
||
# Si el operador ha cambiado el setpoint significativamente
|
||
# (> 1°) se resetea el flag de convergencia: el buque está
|
||
# ahora aproximándose al nuevo rumbo, no desviándose del
|
||
# anterior. Esto evita que OFF_COURSE dispare durante la
|
||
# maniobra de aproximación al nuevo setpoint.
|
||
if abs(heading_error_deg(new_sp, self._heading_setpoint_deg)) > 1.0:
|
||
self._tracking_settled = False
|
||
self._heading_setpoint_deg = new_sp
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — outer loop (10 Hz)
|
||
# -----------------------------------------------------------------------
|
||
def _run_outer_loop(self) -> None:
|
||
if self._mode == AutopilotMode.STANDBY:
|
||
self._outer_rudder_sp = 0.0
|
||
return
|
||
|
||
heading_age = self._t - self._last_nmea_update_t
|
||
if heading_age > HEADING_TIMEOUT_S:
|
||
# No hay rumbo: salida cero (el firmware disengrana, esto lo hacemos
|
||
# en _check_alarms, pero la salida ya no es válida)
|
||
self._outer_rudder_sp = 0.0
|
||
return
|
||
|
||
# Velocidad para gain scheduling: primero del holding, luego NMEA
|
||
sp_speed_raw = self._holdings[25] # PID_OUTER_SPEED_KN_REQ_X10
|
||
sog_kn = (sp_speed_raw * 0.1) if sp_speed_raw > 0 else self._nmea_sog_kn
|
||
|
||
# BNO085 activo → usa yaw rate del giróscopo (50 Hz, baja latencia).
|
||
# BNO085 inactivo → usa ROT del bus NMEA 2000 (10 Hz, GPS/compass).
|
||
rot_for_pid = (self._bno085_yaw_rate_dps if self._bno085_enabled
|
||
else self._nmea_rot_dps)
|
||
|
||
self._outer_rudder_sp = self._pid_outer.step(
|
||
heading_setpoint_deg=self._heading_setpoint_deg,
|
||
heading_measured_deg=self._nmea_heading_deg,
|
||
rate_of_turn_dps=rot_for_pid,
|
||
speed_kn=sog_kn,
|
||
allowed=True,
|
||
)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — inner loop (50 Hz)
|
||
# -----------------------------------------------------------------------
|
||
def _run_inner_loop(self) -> None:
|
||
self._inner_pwm_pct = self._pid_inner.step(
|
||
setpoint_deg=self._outer_rudder_sp,
|
||
measured_deg=self._rudder.state.angle_deg,
|
||
allowed=self._mode != AutopilotMode.STANDBY,
|
||
)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — física (50 Hz)
|
||
# -----------------------------------------------------------------------
|
||
def _compute_wave_torque(self) -> float:
|
||
"""
|
||
Calcula el torque externo de guiñada del oleaje (°/s²) para el tick actual.
|
||
|
||
Compone cuatro componentes:
|
||
- Ola dominante: sinusoide al periodo característico del estado de mar.
|
||
- Mar de fondo: sinusoide a periodo 3.2× más largo.
|
||
- Ruido blanco: perturbación aleatoria (Gaussiana, proporcional a √dt).
|
||
- Weather helm: deriva de viento, sinusoide de 120 s de periodo.
|
||
|
||
Retorna 0.0 si el mar está en calma (beaufort == 0).
|
||
"""
|
||
if self._sea_beaufort == 0:
|
||
return 0.0
|
||
t = self._t
|
||
T = self._sea_wave_period
|
||
wave = self._sea_wave_amp * math.sin(2.0 * math.pi * t / T)
|
||
swell = self._sea_swell_amp * math.sin(2.0 * math.pi * t / (T * 3.2))
|
||
# Ruido blanco: la std por tick se escala con √dt para mantener
|
||
# la densidad espectral de potencia constante en frecuencia.
|
||
noise = self._sea_noise_amp * self._sea_rng.gauss(0.0, 1.0) * math.sqrt(DT_INNER)
|
||
wind = self._sea_wind_bias * math.sin(2.0 * math.pi * t / 120.0)
|
||
return wave + swell + noise + wind
|
||
|
||
def _run_physics(self) -> None:
|
||
# Timón responde al PWM
|
||
self._rudder.step(dt=DT_INNER, pwm_pct=self._inner_pwm_pct)
|
||
|
||
# Perturbación de mar: actualizar torque externo del buque cada tick
|
||
self._vessel.config.external_yaw_torque = self._compute_wave_torque()
|
||
|
||
# Buque responde al ángulo real del timón (y al torque del oleaje)
|
||
self._vessel.step(
|
||
dt=DT_INNER,
|
||
rudder_deg=self._rudder.state.angle_deg,
|
||
speed_kn=self._nmea_sog_kn,
|
||
)
|
||
|
||
# La física actualiza el "sensor NMEA" si está conectado
|
||
if self._physics_heading_active:
|
||
self._nmea_heading_deg = self._vessel.state.heading_deg
|
||
self._nmea_rot_dps = self._vessel.state.rate_of_turn_dps
|
||
# COG = heading (sin corriente en este modelo simple)
|
||
self._nmea_cog_deg = self._nmea_heading_deg
|
||
self._last_nmea_update_t = self._t
|
||
|
||
# BNO085: muestrea yaw rate a 50 Hz (mismo ritmo que el inner loop).
|
||
# El giróscopo del BNO085 tiene latencia ~4ms y ruido ~0.02 °/s,
|
||
# mucho mejor que el ROT calculado desde el GPS/compass NMEA (100-200ms).
|
||
# En el outer PID (10 Hz) esto significa que rot_ff_term reacciona a
|
||
# picos de guiñada por olas ~10x más rápido que con ROT por NMEA.
|
||
if self._bno085_enabled:
|
||
noise = (self._sea_rng.gauss(0.0, self._bno085_noise_std_dps)
|
||
if self._bno085_noise_std_dps > 0.0 else 0.0)
|
||
self._bno085_yaw_rate_dps = self._vessel.state.rate_of_turn_dps + noise
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — alarmas
|
||
# -----------------------------------------------------------------------
|
||
def _check_alarms(self) -> None:
|
||
# HEADING_LOST: timeout del sensor NMEA
|
||
heading_age = self._t - self._last_nmea_update_t
|
||
if heading_age > HEADING_TIMEOUT_S:
|
||
if not self._alarm_heading_lost:
|
||
self._alarm_heading_lost = True
|
||
self._add_event("alarm", "ALARM_HEADING_LOST")
|
||
if self._mode != AutopilotMode.STANDBY:
|
||
self._do_disengage()
|
||
|
||
# OFF_COURSE (solo cuando enganchado)
|
||
if self._mode != AutopilotMode.STANDBY:
|
||
err = abs(heading_error_deg(
|
||
self._heading_setpoint_deg, self._nmea_heading_deg
|
||
))
|
||
|
||
# Marcar "primera convergencia" cuando el error cae bajo 5°.
|
||
# Hasta ese momento el buque está en aproximación inicial al
|
||
# setpoint — no se consideran desviaciones (sería falsa alarma).
|
||
if err < 5.0:
|
||
self._tracking_settled = True
|
||
|
||
# OFF_COURSE solo se evalúa cuando ya hemos convergido al menos
|
||
# una vez. Esto reproduce el comportamiento de pilotos reales
|
||
# (Simrad, Raymarine): la alarma es para "nos hemos desviado del
|
||
# rumbo", no para "el operador acaba de ordenar un cambio grande".
|
||
if self._tracking_settled:
|
||
if err > OFF_COURSE_SEVERE_DEG:
|
||
if not self._alarm_off_course_severe:
|
||
self._alarm_off_course_severe = True
|
||
self._alarm_off_course = True
|
||
self._add_event("alarm", f"ALARM_OFF_COURSE_SEVERE err={err:.1f}°")
|
||
self._do_disengage()
|
||
elif err > OFF_COURSE_WARN_DEG:
|
||
if not self._alarm_off_course:
|
||
self._alarm_off_course = True
|
||
self._add_event("alarm", f"ALARM_OFF_COURSE err={err:.1f}°")
|
||
else:
|
||
if self._alarm_off_course and not self._alarm_off_course_severe:
|
||
self._alarm_off_course = False
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — espejo a registros Modbus
|
||
# -----------------------------------------------------------------------
|
||
def _sync_to_registers(self) -> None:
|
||
uptime = int(self._t)
|
||
self._inputs[4] = uptime & 0xFFFF
|
||
self._inputs[5] = (uptime >> 16) & 0xFFFF
|
||
self._inputs[6] = int(self._mode)
|
||
|
||
# Timón
|
||
self._inputs[16] = _u16(int(self._rudder.state.angle_deg * 100))
|
||
|
||
# Rumbo / ROT / edad
|
||
self._inputs[24] = _u16(int(self._nmea_heading_deg * 100))
|
||
self._inputs[25] = _s16_to_u16(int(self._nmea_rot_dps * 100))
|
||
age_ms = int((self._t - self._last_nmea_update_t) * 1000)
|
||
self._inputs[26] = min(60000, max(0, age_ms))
|
||
|
||
# COG / SOG / XTE
|
||
self._inputs[60] = _u16(int(self._nmea_cog_deg * 100))
|
||
self._inputs[61] = _u16(int(self._nmea_sog_kn * 10))
|
||
self._inputs[63] = _s16_to_u16(int(self._nmea_xte_dm))
|
||
|
||
# Telemetría inner PID
|
||
self._inputs[40] = _s16_to_u16(int(self._outer_rudder_sp * 100))
|
||
self._inputs[41] = _s16_to_u16(int(self._inner_pwm_pct * 100))
|
||
inner_err = self._outer_rudder_sp - self._rudder.state.angle_deg
|
||
self._inputs[42] = _s16_to_u16(int(inner_err * 100))
|
||
|
||
# Telemetría outer PID
|
||
self._inputs[50] = _u16(int(self._heading_setpoint_deg * 100))
|
||
self._inputs[51] = _s16_to_u16(int(self._outer_rudder_sp * 100))
|
||
outer_err = heading_error_deg(
|
||
self._heading_setpoint_deg, self._nmea_heading_deg
|
||
)
|
||
self._inputs[52] = _s16_to_u16(int(outer_err * 100))
|
||
self._inputs[53] = _u16(int(self._nmea_sog_kn * 10))
|
||
|
||
# Discretos
|
||
self._discretes[0] = 1 if self._mode != AutopilotMode.STANDBY else 0
|
||
self._discretes[8] = self._discretes[0]
|
||
self._discretes[16] = 1 if self._alarm_off_course else 0
|
||
self._discretes[17] = 1 if self._alarm_off_course_severe else 0
|
||
self._discretes[19] = 1 if self._alarm_heading_lost else 0
|
||
self._discretes[25] = 1 if self.any_alarm else 0
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — telemetría
|
||
# -----------------------------------------------------------------------
|
||
def _capture_snapshot(self) -> None:
|
||
hdg = self._nmea_heading_deg
|
||
sp = self._heading_setpoint_deg
|
||
self.log.append(SimSnapshot(
|
||
t=self._t,
|
||
mode=self._mode,
|
||
heading_deg=hdg,
|
||
heading_setpoint_deg=sp,
|
||
heading_error_deg=heading_error_deg(sp, hdg),
|
||
rot_dps=self._nmea_rot_dps,
|
||
sog_kn=self._nmea_sog_kn,
|
||
outer_rudder_sp_deg=self._outer_rudder_sp,
|
||
rudder_angle_deg=self._rudder.state.angle_deg,
|
||
inner_pwm_pct=self._inner_pwm_pct,
|
||
alarm_heading_lost=self._alarm_heading_lost,
|
||
alarm_off_course=self._alarm_off_course,
|
||
alarm_off_course_severe=self._alarm_off_course_severe,
|
||
bno085_yaw_rate_dps=self._bno085_yaw_rate_dps,
|
||
))
|
||
|
||
def _add_event(self, kind: str, detail: str = "") -> None:
|
||
self.events.append(SimEvent(t=self._t, kind=kind, detail=detail))
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Internos — valores por defecto
|
||
# -----------------------------------------------------------------------
|
||
def _default_holdings(self) -> dict[int, int]:
|
||
h: dict[int, int] = {i: 0 for i in range(40)}
|
||
h[0] = 0 # MODE_REQUEST = STANDBY
|
||
h[1] = 0 # HEADING_SETPOINT_X100
|
||
h[2] = 80 # BRIGHTNESS_PCT
|
||
h[3] = 70 # ALARM_VOLUME_PCT
|
||
h[25] = 100 # PID_OUTER_SPEED_KN_REQ_X10 = 10.0 kn
|
||
h[33] = 50 # XTE_GAIN_X1000 = 0.050 deg/m
|
||
h[34] = 2000 # XTE_MAX_CORRECTION_X100 = 20.0°
|
||
return h
|
||
|
||
def _default_inputs(self) -> dict[int, int]:
|
||
i: dict[int, int] = {k: 0 for k in range(80)}
|
||
i[0] = 0 # FW_VERSION_MAJOR
|
||
i[1] = 4 # FW_VERSION_MINOR (sprint 4)
|
||
i[2] = 0 # FW_VERSION_PATCH
|
||
i[3] = 0 # SCHEMA_VERSION
|
||
i[18] = 1 # RUDDER_VALID (el sim siempre es válido)
|
||
return i
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers de codificación de registros (Modbus es big-endian uint16)
|
||
# ---------------------------------------------------------------------------
|
||
def _u16(x: int) -> int:
|
||
"""Clamp a uint16 [0..65535]."""
|
||
return max(0, min(0xFFFF, x)) & 0xFFFF
|
||
|
||
|
||
def _s16_to_u16(x: int) -> int:
|
||
"""Convierte int16 con signo a representación uint16 (complemento a 2)."""
|
||
if x < 0:
|
||
return (x + 0x10000) & 0xFFFF
|
||
return x & 0xFFFF
|