""" NMEA 0183 Simulator — Secuencia realista de maniobras de buque ============================================================== Secuencia: 1. INITIAL — Rumbo 045°M estable, 60 segundos 2. TURN_STBD — Giro por estribor hacia 225° (180° del original) 3. COMP_STBD — Timonel compensa: ROT baja a 0, buque estabiliza en 225° 4. STEADY_STBD — Estable en 225°M, 60 segundos 5. TURN_PORT — Giro por babor de regreso a 045° 6. COMP_PORT — Timonel compensa: ROT sube desde negativo a 0 7. NORMAL — Navegación normal con movimiento suave de olas Ejecutar standalone para ver output: python -m simulator.nmea_simulator """ import sys import math import random import threading import socket import time from enum import Enum, auto from dataclasses import dataclass, field import config import logging import os # Log to file — no console output when running under pythonw _log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'compass_sim.log') logging.basicConfig( filename=_log_path, filemode='w', level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%H:%M:%S', ) def _log(msg: str): logging.info(msg) try: print(msg) # only visible when running from a terminal except Exception: pass # ── NMEA helpers ────────────────────────────────────────────────────────────── def _cs(s: str) -> str: v = 0 for c in s: v ^= ord(c) return f'{v:02X}' def _sentence(body: str) -> str: return f'${body}*{_cs(body)}\r\n' # ── Simulation phases ───────────────────────────────────────────────────────── class Phase(Enum): INITIAL = auto() # Estable en rumbo inicial TURN_STBD = auto() # Girando por estribor COMP_STBD = auto() # Timonel compensa al llegar a 225° STEADY_STBD = auto() # Estable en 225° TURN_PORT = auto() # Girando por babor COMP_PORT = auto() # Timonel compensa al llegar a 045° NORMAL = auto() # Navegación normal con olas PHASE_NAMES = { Phase.INITIAL: 'INITIAL — Estable en 045°M', Phase.TURN_STBD: 'TURN_STBD — Girando ESTRIBOR → 225°', Phase.COMP_STBD: 'COMP_STBD — Timonel compensa (ROT → 0)', Phase.STEADY_STBD: 'STEADY_STBD — Estable en 225°M', Phase.TURN_PORT: 'TURN_PORT — Girando BABOR → 045°', Phase.COMP_PORT: 'COMP_PORT — Timonel compensa (ROT → 0)', Phase.NORMAL: 'NORMAL — Navegación suave', } # ── Configuration ───────────────────────────────────────────────────────────── INITIAL_HDG = 45.0 # Rumbo inicial °M TURN_DEG = 25.0 # Grados de la maniobra (por estribor y luego por babor) MAX_ROT = 30.0 # ROT máximo °/min (30 = 0.5°/s — maniobra moderada) ROT_RAMP_SECS = 8.0 # Segundos para alcanzar MAX_ROT desde 0 COMP_THRESHOLD = 8.0 # Grados antes del rumbo objetivo donde el timonel compensa STEADY_SECS = 15.0 # Tiempo estable en cada rumbo (segundos) SOG = 8.5 # Velocidad (knots) VARIATION = -7.5 # Variación magnética (W = negativo) LAT = 27.1975 # Stuart, FL LON = -80.1951 # ── Vessel state ────────────────────────────────────────────────────────────── @dataclass class Vessel: heading: float = INITIAL_HDG rot: float = 0.0 # °/min (+ = estribor, - = babor) rot_target: float = 0.0 # ROT objetivo pitch: float = 0.0 roll: float = 0.0 sog: float = SOG phase: Phase = Phase.INITIAL phase_t: float = 0.0 # tiempo en la fase actual (s) norm_t: float = 0.0 # contador para simulación normal sim_t: float = 0.0 # tiempo global continuo (s) _target_hdg: float = 0.0 class VesselSim: TICK = 0.25 # segundos por step de simulación def __init__(self): self.v = Vessel() self.v._target_hdg = (INITIAL_HDG + TURN_DEG) % 360 self._phase_start_print() # ── Main tick ───────────────────────────────────────────────────────────── def step(self) -> None: v = self.v v.phase_t += self.TICK v.sim_t += self.TICK if v.phase == Phase.INITIAL: self._phase_initial() elif v.phase == Phase.TURN_STBD: self._phase_turn(+1) elif v.phase == Phase.COMP_STBD: self._phase_compensate(+1, Phase.STEADY_STBD) elif v.phase == Phase.STEADY_STBD: self._phase_steady(Phase.TURN_PORT) elif v.phase == Phase.TURN_PORT: self._phase_turn(-1) elif v.phase == Phase.COMP_PORT: self._phase_compensate(-1, Phase.NORMAL) elif v.phase == Phase.NORMAL: self._phase_normal() t = v.sim_t # Pitch: oleaje grueso — swell 8s (ω=0.785) + chop 3s (ω=2.09), máx ±5° pitch_wave = (3.5 * math.sin(t * 0.785) + 1.1 * math.sin(t * 2.09 + 1.1) + 0.3 * math.sin(t * 3.14 + 0.5)) v.pitch = max(-5.0, min(5.0, pitch_wave + random.uniform(-0.10, 0.10))) # Roll: período más corto que pitch (6s, ω=1.047), cabeceo cruzado, escora de maniobra roll_wave = (3.8 * math.sin(t * 1.047 + 0.75) + 0.9 * math.sin(t * 2.51 + 2.3)) roll_turn = -(v.rot / MAX_ROT) * 2.5 v.roll = max(-5.0, min(5.0, roll_wave + roll_turn + random.uniform(-0.10, 0.10))) # Update heading from ROT v.heading = (v.heading + v.rot / 60.0 * self.TICK) % 360 # ── Phases ──────────────────────────────────────────────────────────────── def _phase_initial(self): v = self.v v.rot = 0.0 if v.phase_t >= STEADY_SECS: self._set_phase(Phase.TURN_STBD) v._target_hdg = (INITIAL_HDG + TURN_DEG) % 360 def _phase_turn(self, direction: int): """direction: +1 = estribor, -1 = babor""" v = self.v target = v._target_hdg # Angular distance remaining to target if direction == +1: remaining = (target - v.heading) % 360 else: remaining = (v.heading - target) % 360 # Ramp ROT up over ROT_RAMP_SECS ramp_progress = min(v.phase_t / ROT_RAMP_SECS, 1.0) v.rot_target = direction * MAX_ROT * ramp_progress # Within COMP_THRESHOLD: start reducing ROT (helmsman anticipates) if remaining < COMP_THRESHOLD: comp_factor = remaining / COMP_THRESHOLD v.rot_target *= comp_factor # Smooth ROT toward target v.rot += (v.rot_target - v.rot) * 0.25 # Reached target heading? if remaining < 2.0: v.heading = target if direction == +1: self._set_phase(Phase.COMP_STBD) else: self._set_phase(Phase.COMP_PORT) def _phase_compensate(self, prev_direction: int, next_phase: Phase): """Timonel centra el timón: ROT decae a 0.""" v = self.v v.rot *= 0.80 # decaimiento exponencial suave if abs(v.rot) < 0.4: v.rot = 0.0 self._set_phase(next_phase) if next_phase == Phase.TURN_PORT: v._target_hdg = INITIAL_HDG # girar de vuelta a 045° def _phase_steady(self, next_phase: Phase): v = self.v v.rot *= 0.90 # cualquier ROT residual se disipa if v.phase_t >= STEADY_SECS: self._set_phase(next_phase) if next_phase == Phase.TURN_PORT: v._target_hdg = INITIAL_HDG def _phase_normal(self): """Navegación en mar grueso — giñadas amplias y correcciones rápidas.""" v = self.v v.norm_t += self.TICK t = v.norm_t # Giñada compuesta: deriva principal 12° + componente irregular 5° hdg_drift = 12.0 * math.sin(t * 0.09) + 5.0 * math.sin(t * 0.23 + 1.7) target_hdg = (INITIAL_HDG + hdg_drift) % 360 err = ((target_hdg - v.heading) + 180) % 360 - 180 v.rot_target = max(-25.0, min(25.0, err * 2.5)) v.rot += (v.rot_target - v.rot) * 0.20 def _add_sea_noise(self): pass # replaced by sinusoidal model in step() # ── Derived data ────────────────────────────────────────────────────────── def accel(self): """Aceleración simulada del BNO085 (m/s²).""" v = self.v # Centrípeta en eje Y proporcional al ROT centripetal_y = (v.rot / 60.0) * math.radians(v.sog * 0.5144) * 0.5 ax = math.sin(math.radians(v.pitch)) * 9.81 + random.uniform(-0.03, 0.03) ay = math.sin(math.radians(v.roll)) * 9.81 + centripetal_y + random.uniform(-0.03, 0.03) az = math.cos(math.radians(v.pitch)) * math.cos(math.radians(v.roll)) * 9.81 return ax, ay, az def gyro(self): """Velocidad angular del giroscopio (°/s).""" v = self.v gx = v.roll * 0.02 + random.uniform(-0.02, 0.02) gy = v.pitch * 0.02 + random.uniform(-0.02, 0.02) gz = v.rot / 60.0 + random.uniform(-0.01, 0.01) return gx, gy, gz # ── NMEA sentences ──────────────────────────────────────────────────────── def sentences(self) -> list[str]: self.step() v = self.v ax, ay, az = self.accel() gx, gy, gz = self.gyro() var_dir = 'W' if VARIATION < 0 else 'E' return [ _sentence(f'HCHDG,{v.heading:.2f},,,{abs(VARIATION):.1f},{var_dir}'), _sentence(f'IIROT,{v.rot:.2f},A'), _sentence( f'IIXDR,' f'A,{v.pitch:.2f},D,PITCH,' f'A,{v.roll:.2f},D,ROLL,' f'A,{ax:.3f},M,ACCELX,' f'A,{ay:.3f},M,ACCELY,' f'A,{az:.3f},M,ACCELZ,' f'A,{gx:.3f},D,GYROX,' f'A,{gy:.3f},D,GYROY,' f'A,{gz:.3f},D,GYROZ' ), ] # ── Phase helpers ───────────────────────────────────────────────────────── def _set_phase(self, phase: Phase): self.v.phase = phase self.v.phase_t = 0.0 self._phase_start_print() def _phase_start_print(self): v = self.v name = PHASE_NAMES.get(v.phase, str(v.phase)) _log(f'[SIM] -- {name}') _log(f'[SIM] HDG={v.heading:.1f} ROT={v.rot:.1f}/min ' f'PITCH={v.pitch:.1f} ROLL={v.roll:.1f}') # ── TCP server ──────────────────────────────────────────────────────────────── def _tcp_server(sim: VesselSim): srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind(('127.0.0.1', 10110)) srv.listen(5) _log('[SIM] NMEA TCP server en 127.0.0.1:10110 (1 Hz)') _log('[SIM] Conecta con: python main.py --sim') while True: conn, addr = srv.accept() _log(f'[SIM] Cliente conectado: {addr}') try: while True: t0 = time.time() for s in sim.sentences(): conn.sendall(s.encode('ascii')) elapsed = time.time() - t0 time.sleep(max(0, 1.0 - elapsed)) except Exception: _log(f'[SIM] Cliente desconectado: {addr}') conn.close() # ── Entry points ────────────────────────────────────────────────────────────── def start_simulator(): """Called by main.py --sim. Patches SerialReader and starts TCP server.""" sim = VesselSim() t = threading.Thread(target=_tcp_server, args=(sim,), daemon=True) t.start() time.sleep(0.35) config.SERIAL_PORTS = [{'port': 'tcp://127.0.0.1:10110', 'baud': 4800, 'name': 'Simulator'}] _patch_reader() def _patch_reader(): """Monkey-patch SerialReader to handle tcp:// URI for the simulator.""" import socket as _socket from core.serial_reader import SerialReader original_run = SerialReader.run def patched_run(self): if not self.port.startswith('tcp://'): return original_run(self) self._running = True host, port = self.port[6:].split(':') try: s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) s.connect((host, int(port))) self.connected.emit(True, self.port) buf = '' while self._running: data = s.recv(4096) if not data: break buf += data.decode('ascii', errors='ignore') while '\n' in buf: line, buf = buf.split('\n', 1) line = line.strip() if line.startswith('$') or line.startswith('!'): self.sentence.emit(line) except Exception as e: self.error.emit(str(e)) self.connected.emit(False, self.port) SerialReader.run = patched_run # ── Standalone ──────────────────────────────────────────────────────────────── if __name__ == '__main__': """Run standalone to see console output without the UI.""" print('=' * 58) print(' MARINE COMPASS SIMULATOR - Secuencia de Maniobras') print('=' * 58) print('') sim = VesselSim() try: t = threading.Thread(target=_tcp_server, args=(sim,), daemon=True) t.start() # Keep alive — TCP server runs in daemon thread while True: time.sleep(1) except KeyboardInterrupt: print('\n[SIM] Detenido.')