Files
Compass/simulator/nmea_simulator.py
T
2026-07-03 12:23:41 -04:00

376 lines
15 KiB
Python

"""
NMEA 0183 Simulator — Secuencia realista de maniobras de buque
==============================================================
Secuencia:
1. INITIAL — Rumbo 045°M estable, 60 segundos
2. TURN_STBD — Giro por estribor hacia 225° (180° del original)
3. COMP_STBD — Timonel compensa: ROT baja a 0, buque estabiliza en 225°
4. STEADY_STBD — Estable en 225°M, 60 segundos
5. TURN_PORT — Giro por babor de regreso a 045°
6. COMP_PORT — Timonel compensa: ROT sube desde negativo a 0
7. NORMAL — Navegación normal con movimiento suave de olas
Ejecutar standalone para ver output:
python -m simulator.nmea_simulator
"""
import sys
import math
import random
import threading
import socket
import time
from enum import Enum, auto
from dataclasses import dataclass, field
import config
import logging
import os
# Log to file — no console output when running under pythonw
_log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'compass_sim.log')
logging.basicConfig(
filename=_log_path,
filemode='w',
level=logging.INFO,
format='%(asctime)s %(message)s',
datefmt='%H:%M:%S',
)
def _log(msg: str):
logging.info(msg)
try:
print(msg) # only visible when running from a terminal
except Exception:
pass
# ── NMEA helpers ──────────────────────────────────────────────────────────────
def _cs(s: str) -> str:
v = 0
for c in s:
v ^= ord(c)
return f'{v:02X}'
def _sentence(body: str) -> str:
return f'${body}*{_cs(body)}\r\n'
# ── Simulation phases ─────────────────────────────────────────────────────────
class Phase(Enum):
INITIAL = auto() # Estable en rumbo inicial
TURN_STBD = auto() # Girando por estribor
COMP_STBD = auto() # Timonel compensa al llegar a 225°
STEADY_STBD = auto() # Estable en 225°
TURN_PORT = auto() # Girando por babor
COMP_PORT = auto() # Timonel compensa al llegar a 045°
NORMAL = auto() # Navegación normal con olas
PHASE_NAMES = {
Phase.INITIAL: 'INITIAL — Estable en 045°M',
Phase.TURN_STBD: 'TURN_STBD — Girando ESTRIBOR → 225°',
Phase.COMP_STBD: 'COMP_STBD — Timonel compensa (ROT → 0)',
Phase.STEADY_STBD: 'STEADY_STBD — Estable en 225°M',
Phase.TURN_PORT: 'TURN_PORT — Girando BABOR → 045°',
Phase.COMP_PORT: 'COMP_PORT — Timonel compensa (ROT → 0)',
Phase.NORMAL: 'NORMAL — Navegación suave',
}
# ── Configuration ─────────────────────────────────────────────────────────────
INITIAL_HDG = 45.0 # Rumbo inicial °M
TURN_DEG = 25.0 # Grados de la maniobra (por estribor y luego por babor)
MAX_ROT = 30.0 # ROT máximo °/min (30 = 0.5°/s — maniobra moderada)
ROT_RAMP_SECS = 8.0 # Segundos para alcanzar MAX_ROT desde 0
COMP_THRESHOLD = 8.0 # Grados antes del rumbo objetivo donde el timonel compensa
STEADY_SECS = 15.0 # Tiempo estable en cada rumbo (segundos)
SOG = 8.5 # Velocidad (knots)
VARIATION = -7.5 # Variación magnética (W = negativo)
LAT = 27.1975 # Stuart, FL
LON = -80.1951
# ── Vessel state ──────────────────────────────────────────────────────────────
@dataclass
class Vessel:
heading: float = INITIAL_HDG
rot: float = 0.0 # °/min (+ = estribor, - = babor)
rot_target: float = 0.0 # ROT objetivo
pitch: float = 0.0
roll: float = 0.0
sog: float = SOG
phase: Phase = Phase.INITIAL
phase_t: float = 0.0 # tiempo en la fase actual (s)
norm_t: float = 0.0 # contador para simulación normal
sim_t: float = 0.0 # tiempo global continuo (s)
_target_hdg: float = 0.0
class VesselSim:
TICK = 0.25 # segundos por step de simulación
def __init__(self):
self.v = Vessel()
self.v._target_hdg = (INITIAL_HDG + TURN_DEG) % 360
self._phase_start_print()
# ── Main tick ─────────────────────────────────────────────────────────────
def step(self) -> None:
v = self.v
v.phase_t += self.TICK
v.sim_t += self.TICK
if v.phase == Phase.INITIAL: self._phase_initial()
elif v.phase == Phase.TURN_STBD: self._phase_turn(+1)
elif v.phase == Phase.COMP_STBD: self._phase_compensate(+1, Phase.STEADY_STBD)
elif v.phase == Phase.STEADY_STBD: self._phase_steady(Phase.TURN_PORT)
elif v.phase == Phase.TURN_PORT: self._phase_turn(-1)
elif v.phase == Phase.COMP_PORT: self._phase_compensate(-1, Phase.NORMAL)
elif v.phase == Phase.NORMAL: self._phase_normal()
t = v.sim_t
# Pitch: oleaje grueso — swell 8s (ω=0.785) + chop 3s (ω=2.09), máx ±5°
pitch_wave = (3.5 * math.sin(t * 0.785)
+ 1.1 * math.sin(t * 2.09 + 1.1)
+ 0.3 * math.sin(t * 3.14 + 0.5))
v.pitch = max(-5.0, min(5.0, pitch_wave + random.uniform(-0.10, 0.10)))
# Roll: período más corto que pitch (6s, ω=1.047), cabeceo cruzado, escora de maniobra
roll_wave = (3.8 * math.sin(t * 1.047 + 0.75)
+ 0.9 * math.sin(t * 2.51 + 2.3))
roll_turn = -(v.rot / MAX_ROT) * 2.5
v.roll = max(-5.0, min(5.0, roll_wave + roll_turn + random.uniform(-0.10, 0.10)))
# Update heading from ROT
v.heading = (v.heading + v.rot / 60.0 * self.TICK) % 360
# ── Phases ────────────────────────────────────────────────────────────────
def _phase_initial(self):
v = self.v
v.rot = 0.0
if v.phase_t >= STEADY_SECS:
self._set_phase(Phase.TURN_STBD)
v._target_hdg = (INITIAL_HDG + TURN_DEG) % 360
def _phase_turn(self, direction: int):
"""direction: +1 = estribor, -1 = babor"""
v = self.v
target = v._target_hdg
# Angular distance remaining to target
if direction == +1:
remaining = (target - v.heading) % 360
else:
remaining = (v.heading - target) % 360
# Ramp ROT up over ROT_RAMP_SECS
ramp_progress = min(v.phase_t / ROT_RAMP_SECS, 1.0)
v.rot_target = direction * MAX_ROT * ramp_progress
# Within COMP_THRESHOLD: start reducing ROT (helmsman anticipates)
if remaining < COMP_THRESHOLD:
comp_factor = remaining / COMP_THRESHOLD
v.rot_target *= comp_factor
# Smooth ROT toward target
v.rot += (v.rot_target - v.rot) * 0.25
# Reached target heading?
if remaining < 2.0:
v.heading = target
if direction == +1:
self._set_phase(Phase.COMP_STBD)
else:
self._set_phase(Phase.COMP_PORT)
def _phase_compensate(self, prev_direction: int, next_phase: Phase):
"""Timonel centra el timón: ROT decae a 0."""
v = self.v
v.rot *= 0.80 # decaimiento exponencial suave
if abs(v.rot) < 0.4:
v.rot = 0.0
self._set_phase(next_phase)
if next_phase == Phase.TURN_PORT:
v._target_hdg = INITIAL_HDG # girar de vuelta a 045°
def _phase_steady(self, next_phase: Phase):
v = self.v
v.rot *= 0.90 # cualquier ROT residual se disipa
if v.phase_t >= STEADY_SECS:
self._set_phase(next_phase)
if next_phase == Phase.TURN_PORT:
v._target_hdg = INITIAL_HDG
def _phase_normal(self):
"""Navegación en mar grueso — giñadas amplias y correcciones rápidas."""
v = self.v
v.norm_t += self.TICK
t = v.norm_t
# Giñada compuesta: deriva principal 12° + componente irregular 5°
hdg_drift = 12.0 * math.sin(t * 0.09) + 5.0 * math.sin(t * 0.23 + 1.7)
target_hdg = (INITIAL_HDG + hdg_drift) % 360
err = ((target_hdg - v.heading) + 180) % 360 - 180
v.rot_target = max(-25.0, min(25.0, err * 2.5))
v.rot += (v.rot_target - v.rot) * 0.20
def _add_sea_noise(self):
pass # replaced by sinusoidal model in step()
# ── Derived data ──────────────────────────────────────────────────────────
def accel(self):
"""Aceleración simulada del BNO085 (m/s²)."""
v = self.v
# Centrípeta en eje Y proporcional al ROT
centripetal_y = (v.rot / 60.0) * math.radians(v.sog * 0.5144) * 0.5
ax = math.sin(math.radians(v.pitch)) * 9.81 + random.uniform(-0.03, 0.03)
ay = math.sin(math.radians(v.roll)) * 9.81 + centripetal_y + random.uniform(-0.03, 0.03)
az = math.cos(math.radians(v.pitch)) * math.cos(math.radians(v.roll)) * 9.81
return ax, ay, az
def gyro(self):
"""Velocidad angular del giroscopio (°/s)."""
v = self.v
gx = v.roll * 0.02 + random.uniform(-0.02, 0.02)
gy = v.pitch * 0.02 + random.uniform(-0.02, 0.02)
gz = v.rot / 60.0 + random.uniform(-0.01, 0.01)
return gx, gy, gz
# ── NMEA sentences ────────────────────────────────────────────────────────
def sentences(self) -> list[str]:
self.step()
v = self.v
ax, ay, az = self.accel()
gx, gy, gz = self.gyro()
var_dir = 'W' if VARIATION < 0 else 'E'
return [
_sentence(f'HCHDG,{v.heading:.2f},,,{abs(VARIATION):.1f},{var_dir}'),
_sentence(f'IIROT,{v.rot:.2f},A'),
_sentence(
f'IIXDR,'
f'A,{v.pitch:.2f},D,PITCH,'
f'A,{v.roll:.2f},D,ROLL,'
f'A,{ax:.3f},M,ACCELX,'
f'A,{ay:.3f},M,ACCELY,'
f'A,{az:.3f},M,ACCELZ,'
f'A,{gx:.3f},D,GYROX,'
f'A,{gy:.3f},D,GYROY,'
f'A,{gz:.3f},D,GYROZ'
),
]
# ── Phase helpers ─────────────────────────────────────────────────────────
def _set_phase(self, phase: Phase):
self.v.phase = phase
self.v.phase_t = 0.0
self._phase_start_print()
def _phase_start_print(self):
v = self.v
name = PHASE_NAMES.get(v.phase, str(v.phase))
_log(f'[SIM] -- {name}')
_log(f'[SIM] HDG={v.heading:.1f} ROT={v.rot:.1f}/min '
f'PITCH={v.pitch:.1f} ROLL={v.roll:.1f}')
# ── TCP server ────────────────────────────────────────────────────────────────
def _tcp_server(sim: VesselSim):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(('127.0.0.1', 10110))
srv.listen(5)
_log('[SIM] NMEA TCP server en 127.0.0.1:10110 (1 Hz)')
_log('[SIM] Conecta con: python main.py --sim')
while True:
conn, addr = srv.accept()
_log(f'[SIM] Cliente conectado: {addr}')
try:
while True:
t0 = time.time()
for s in sim.sentences():
conn.sendall(s.encode('ascii'))
elapsed = time.time() - t0
time.sleep(max(0, 1.0 - elapsed))
except Exception:
_log(f'[SIM] Cliente desconectado: {addr}')
conn.close()
# ── Entry points ──────────────────────────────────────────────────────────────
def start_simulator():
"""Called by main.py --sim. Patches SerialReader and starts TCP server."""
sim = VesselSim()
t = threading.Thread(target=_tcp_server, args=(sim,), daemon=True)
t.start()
time.sleep(0.35)
config.SERIAL_PORTS = [{'port': 'tcp://127.0.0.1:10110', 'baud': 4800, 'name': 'Simulator'}]
_patch_reader()
def _patch_reader():
"""Monkey-patch SerialReader to handle tcp:// URI for the simulator."""
import socket as _socket
from core.serial_reader import SerialReader
original_run = SerialReader.run
def patched_run(self):
if not self.port.startswith('tcp://'):
return original_run(self)
self._running = True
host, port = self.port[6:].split(':')
try:
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
s.connect((host, int(port)))
self.connected.emit(True, self.port)
buf = ''
while self._running:
data = s.recv(4096)
if not data:
break
buf += data.decode('ascii', errors='ignore')
while '\n' in buf:
line, buf = buf.split('\n', 1)
line = line.strip()
if line.startswith('$') or line.startswith('!'):
self.sentence.emit(line)
except Exception as e:
self.error.emit(str(e))
self.connected.emit(False, self.port)
SerialReader.run = patched_run
# ── Standalone ────────────────────────────────────────────────────────────────
if __name__ == '__main__':
"""Run standalone to see console output without the UI."""
print('=' * 58)
print(' MARINE COMPASS SIMULATOR - Secuencia de Maniobras')
print('=' * 58)
print('')
sim = VesselSim()
try:
t = threading.Thread(target=_tcp_server, args=(sim,), daemon=True)
t.start()
# Keep alive — TCP server runs in daemon thread
while True:
time.sleep(1)
except KeyboardInterrupt:
print('\n[SIM] Detenido.')