376 lines
15 KiB
Python
376 lines
15 KiB
Python
"""
|
|
NMEA 0183 Simulator — Secuencia realista de maniobras de buque
|
|
==============================================================
|
|
|
|
Secuencia:
|
|
1. INITIAL — Rumbo 045°M estable, 60 segundos
|
|
2. TURN_STBD — Giro por estribor hacia 225° (180° del original)
|
|
3. COMP_STBD — Timonel compensa: ROT baja a 0, buque estabiliza en 225°
|
|
4. STEADY_STBD — Estable en 225°M, 60 segundos
|
|
5. TURN_PORT — Giro por babor de regreso a 045°
|
|
6. COMP_PORT — Timonel compensa: ROT sube desde negativo a 0
|
|
7. NORMAL — Navegación normal con movimiento suave de olas
|
|
|
|
Ejecutar standalone para ver output:
|
|
python -m simulator.nmea_simulator
|
|
"""
|
|
|
|
import sys
|
|
import math
|
|
import random
|
|
import threading
|
|
import socket
|
|
import time
|
|
from enum import Enum, auto
|
|
from dataclasses import dataclass, field
|
|
|
|
import config
|
|
|
|
import logging
|
|
import os
|
|
|
|
# Log to file — no console output when running under pythonw
|
|
_log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'compass_sim.log')
|
|
logging.basicConfig(
|
|
filename=_log_path,
|
|
filemode='w',
|
|
level=logging.INFO,
|
|
format='%(asctime)s %(message)s',
|
|
datefmt='%H:%M:%S',
|
|
)
|
|
|
|
def _log(msg: str):
|
|
logging.info(msg)
|
|
try:
|
|
print(msg) # only visible when running from a terminal
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ── NMEA helpers ──────────────────────────────────────────────────────────────
|
|
|
|
def _cs(s: str) -> str:
|
|
v = 0
|
|
for c in s:
|
|
v ^= ord(c)
|
|
return f'{v:02X}'
|
|
|
|
def _sentence(body: str) -> str:
|
|
return f'${body}*{_cs(body)}\r\n'
|
|
|
|
|
|
# ── Simulation phases ─────────────────────────────────────────────────────────
|
|
|
|
class Phase(Enum):
|
|
INITIAL = auto() # Estable en rumbo inicial
|
|
TURN_STBD = auto() # Girando por estribor
|
|
COMP_STBD = auto() # Timonel compensa al llegar a 225°
|
|
STEADY_STBD = auto() # Estable en 225°
|
|
TURN_PORT = auto() # Girando por babor
|
|
COMP_PORT = auto() # Timonel compensa al llegar a 045°
|
|
NORMAL = auto() # Navegación normal con olas
|
|
|
|
PHASE_NAMES = {
|
|
Phase.INITIAL: 'INITIAL — Estable en 045°M',
|
|
Phase.TURN_STBD: 'TURN_STBD — Girando ESTRIBOR → 225°',
|
|
Phase.COMP_STBD: 'COMP_STBD — Timonel compensa (ROT → 0)',
|
|
Phase.STEADY_STBD: 'STEADY_STBD — Estable en 225°M',
|
|
Phase.TURN_PORT: 'TURN_PORT — Girando BABOR → 045°',
|
|
Phase.COMP_PORT: 'COMP_PORT — Timonel compensa (ROT → 0)',
|
|
Phase.NORMAL: 'NORMAL — Navegación suave',
|
|
}
|
|
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
|
|
|
INITIAL_HDG = 45.0 # Rumbo inicial °M
|
|
TURN_DEG = 25.0 # Grados de la maniobra (por estribor y luego por babor)
|
|
MAX_ROT = 30.0 # ROT máximo °/min (30 = 0.5°/s — maniobra moderada)
|
|
ROT_RAMP_SECS = 8.0 # Segundos para alcanzar MAX_ROT desde 0
|
|
COMP_THRESHOLD = 8.0 # Grados antes del rumbo objetivo donde el timonel compensa
|
|
STEADY_SECS = 15.0 # Tiempo estable en cada rumbo (segundos)
|
|
SOG = 8.5 # Velocidad (knots)
|
|
VARIATION = -7.5 # Variación magnética (W = negativo)
|
|
LAT = 27.1975 # Stuart, FL
|
|
LON = -80.1951
|
|
|
|
|
|
# ── Vessel state ──────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class Vessel:
|
|
heading: float = INITIAL_HDG
|
|
rot: float = 0.0 # °/min (+ = estribor, - = babor)
|
|
rot_target: float = 0.0 # ROT objetivo
|
|
pitch: float = 0.0
|
|
roll: float = 0.0
|
|
sog: float = SOG
|
|
phase: Phase = Phase.INITIAL
|
|
phase_t: float = 0.0 # tiempo en la fase actual (s)
|
|
norm_t: float = 0.0 # contador para simulación normal
|
|
sim_t: float = 0.0 # tiempo global continuo (s)
|
|
_target_hdg: float = 0.0
|
|
|
|
|
|
class VesselSim:
|
|
|
|
TICK = 0.25 # segundos por step de simulación
|
|
|
|
def __init__(self):
|
|
self.v = Vessel()
|
|
self.v._target_hdg = (INITIAL_HDG + TURN_DEG) % 360
|
|
self._phase_start_print()
|
|
|
|
# ── Main tick ─────────────────────────────────────────────────────────────
|
|
|
|
def step(self) -> None:
|
|
v = self.v
|
|
v.phase_t += self.TICK
|
|
v.sim_t += self.TICK
|
|
|
|
if v.phase == Phase.INITIAL: self._phase_initial()
|
|
elif v.phase == Phase.TURN_STBD: self._phase_turn(+1)
|
|
elif v.phase == Phase.COMP_STBD: self._phase_compensate(+1, Phase.STEADY_STBD)
|
|
elif v.phase == Phase.STEADY_STBD: self._phase_steady(Phase.TURN_PORT)
|
|
elif v.phase == Phase.TURN_PORT: self._phase_turn(-1)
|
|
elif v.phase == Phase.COMP_PORT: self._phase_compensate(-1, Phase.NORMAL)
|
|
elif v.phase == Phase.NORMAL: self._phase_normal()
|
|
|
|
t = v.sim_t
|
|
# Pitch: oleaje grueso — swell 8s (ω=0.785) + chop 3s (ω=2.09), máx ±5°
|
|
pitch_wave = (3.5 * math.sin(t * 0.785)
|
|
+ 1.1 * math.sin(t * 2.09 + 1.1)
|
|
+ 0.3 * math.sin(t * 3.14 + 0.5))
|
|
v.pitch = max(-5.0, min(5.0, pitch_wave + random.uniform(-0.10, 0.10)))
|
|
|
|
# Roll: período más corto que pitch (6s, ω=1.047), cabeceo cruzado, escora de maniobra
|
|
roll_wave = (3.8 * math.sin(t * 1.047 + 0.75)
|
|
+ 0.9 * math.sin(t * 2.51 + 2.3))
|
|
roll_turn = -(v.rot / MAX_ROT) * 2.5
|
|
v.roll = max(-5.0, min(5.0, roll_wave + roll_turn + random.uniform(-0.10, 0.10)))
|
|
|
|
# Update heading from ROT
|
|
v.heading = (v.heading + v.rot / 60.0 * self.TICK) % 360
|
|
|
|
# ── Phases ────────────────────────────────────────────────────────────────
|
|
|
|
def _phase_initial(self):
|
|
v = self.v
|
|
v.rot = 0.0
|
|
if v.phase_t >= STEADY_SECS:
|
|
self._set_phase(Phase.TURN_STBD)
|
|
v._target_hdg = (INITIAL_HDG + TURN_DEG) % 360
|
|
|
|
def _phase_turn(self, direction: int):
|
|
"""direction: +1 = estribor, -1 = babor"""
|
|
v = self.v
|
|
target = v._target_hdg
|
|
|
|
# Angular distance remaining to target
|
|
if direction == +1:
|
|
remaining = (target - v.heading) % 360
|
|
else:
|
|
remaining = (v.heading - target) % 360
|
|
|
|
# Ramp ROT up over ROT_RAMP_SECS
|
|
ramp_progress = min(v.phase_t / ROT_RAMP_SECS, 1.0)
|
|
v.rot_target = direction * MAX_ROT * ramp_progress
|
|
|
|
# Within COMP_THRESHOLD: start reducing ROT (helmsman anticipates)
|
|
if remaining < COMP_THRESHOLD:
|
|
comp_factor = remaining / COMP_THRESHOLD
|
|
v.rot_target *= comp_factor
|
|
|
|
# Smooth ROT toward target
|
|
v.rot += (v.rot_target - v.rot) * 0.25
|
|
|
|
# Reached target heading?
|
|
if remaining < 2.0:
|
|
v.heading = target
|
|
if direction == +1:
|
|
self._set_phase(Phase.COMP_STBD)
|
|
else:
|
|
self._set_phase(Phase.COMP_PORT)
|
|
|
|
def _phase_compensate(self, prev_direction: int, next_phase: Phase):
|
|
"""Timonel centra el timón: ROT decae a 0."""
|
|
v = self.v
|
|
v.rot *= 0.80 # decaimiento exponencial suave
|
|
if abs(v.rot) < 0.4:
|
|
v.rot = 0.0
|
|
self._set_phase(next_phase)
|
|
if next_phase == Phase.TURN_PORT:
|
|
v._target_hdg = INITIAL_HDG # girar de vuelta a 045°
|
|
|
|
def _phase_steady(self, next_phase: Phase):
|
|
v = self.v
|
|
v.rot *= 0.90 # cualquier ROT residual se disipa
|
|
if v.phase_t >= STEADY_SECS:
|
|
self._set_phase(next_phase)
|
|
if next_phase == Phase.TURN_PORT:
|
|
v._target_hdg = INITIAL_HDG
|
|
|
|
def _phase_normal(self):
|
|
"""Navegación en mar grueso — giñadas amplias y correcciones rápidas."""
|
|
v = self.v
|
|
v.norm_t += self.TICK
|
|
t = v.norm_t
|
|
# Giñada compuesta: deriva principal 12° + componente irregular 5°
|
|
hdg_drift = 12.0 * math.sin(t * 0.09) + 5.0 * math.sin(t * 0.23 + 1.7)
|
|
target_hdg = (INITIAL_HDG + hdg_drift) % 360
|
|
err = ((target_hdg - v.heading) + 180) % 360 - 180
|
|
v.rot_target = max(-25.0, min(25.0, err * 2.5))
|
|
v.rot += (v.rot_target - v.rot) * 0.20
|
|
|
|
def _add_sea_noise(self):
|
|
pass # replaced by sinusoidal model in step()
|
|
|
|
# ── Derived data ──────────────────────────────────────────────────────────
|
|
|
|
def accel(self):
|
|
"""Aceleración simulada del BNO085 (m/s²)."""
|
|
v = self.v
|
|
# Centrípeta en eje Y proporcional al ROT
|
|
centripetal_y = (v.rot / 60.0) * math.radians(v.sog * 0.5144) * 0.5
|
|
ax = math.sin(math.radians(v.pitch)) * 9.81 + random.uniform(-0.03, 0.03)
|
|
ay = math.sin(math.radians(v.roll)) * 9.81 + centripetal_y + random.uniform(-0.03, 0.03)
|
|
az = math.cos(math.radians(v.pitch)) * math.cos(math.radians(v.roll)) * 9.81
|
|
return ax, ay, az
|
|
|
|
def gyro(self):
|
|
"""Velocidad angular del giroscopio (°/s)."""
|
|
v = self.v
|
|
gx = v.roll * 0.02 + random.uniform(-0.02, 0.02)
|
|
gy = v.pitch * 0.02 + random.uniform(-0.02, 0.02)
|
|
gz = v.rot / 60.0 + random.uniform(-0.01, 0.01)
|
|
return gx, gy, gz
|
|
|
|
# ── NMEA sentences ────────────────────────────────────────────────────────
|
|
|
|
def sentences(self) -> list[str]:
|
|
self.step()
|
|
v = self.v
|
|
ax, ay, az = self.accel()
|
|
gx, gy, gz = self.gyro()
|
|
var_dir = 'W' if VARIATION < 0 else 'E'
|
|
|
|
return [
|
|
_sentence(f'HCHDG,{v.heading:.2f},,,{abs(VARIATION):.1f},{var_dir}'),
|
|
_sentence(f'IIROT,{v.rot:.2f},A'),
|
|
_sentence(
|
|
f'IIXDR,'
|
|
f'A,{v.pitch:.2f},D,PITCH,'
|
|
f'A,{v.roll:.2f},D,ROLL,'
|
|
f'A,{ax:.3f},M,ACCELX,'
|
|
f'A,{ay:.3f},M,ACCELY,'
|
|
f'A,{az:.3f},M,ACCELZ,'
|
|
f'A,{gx:.3f},D,GYROX,'
|
|
f'A,{gy:.3f},D,GYROY,'
|
|
f'A,{gz:.3f},D,GYROZ'
|
|
),
|
|
]
|
|
|
|
# ── Phase helpers ─────────────────────────────────────────────────────────
|
|
|
|
def _set_phase(self, phase: Phase):
|
|
self.v.phase = phase
|
|
self.v.phase_t = 0.0
|
|
self._phase_start_print()
|
|
|
|
def _phase_start_print(self):
|
|
v = self.v
|
|
name = PHASE_NAMES.get(v.phase, str(v.phase))
|
|
_log(f'[SIM] -- {name}')
|
|
_log(f'[SIM] HDG={v.heading:.1f} ROT={v.rot:.1f}/min '
|
|
f'PITCH={v.pitch:.1f} ROLL={v.roll:.1f}')
|
|
|
|
|
|
# ── TCP server ────────────────────────────────────────────────────────────────
|
|
|
|
def _tcp_server(sim: VesselSim):
|
|
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
srv.bind(('127.0.0.1', 10110))
|
|
srv.listen(5)
|
|
_log('[SIM] NMEA TCP server en 127.0.0.1:10110 (1 Hz)')
|
|
_log('[SIM] Conecta con: python main.py --sim')
|
|
|
|
while True:
|
|
conn, addr = srv.accept()
|
|
_log(f'[SIM] Cliente conectado: {addr}')
|
|
try:
|
|
while True:
|
|
t0 = time.time()
|
|
for s in sim.sentences():
|
|
conn.sendall(s.encode('ascii'))
|
|
elapsed = time.time() - t0
|
|
time.sleep(max(0, 1.0 - elapsed))
|
|
except Exception:
|
|
_log(f'[SIM] Cliente desconectado: {addr}')
|
|
conn.close()
|
|
|
|
|
|
# ── Entry points ──────────────────────────────────────────────────────────────
|
|
|
|
def start_simulator():
|
|
"""Called by main.py --sim. Patches SerialReader and starts TCP server."""
|
|
sim = VesselSim()
|
|
t = threading.Thread(target=_tcp_server, args=(sim,), daemon=True)
|
|
t.start()
|
|
time.sleep(0.35)
|
|
config.SERIAL_PORTS = [{'port': 'tcp://127.0.0.1:10110', 'baud': 4800, 'name': 'Simulator'}]
|
|
_patch_reader()
|
|
|
|
|
|
def _patch_reader():
|
|
"""Monkey-patch SerialReader to handle tcp:// URI for the simulator."""
|
|
import socket as _socket
|
|
from core.serial_reader import SerialReader
|
|
|
|
original_run = SerialReader.run
|
|
|
|
def patched_run(self):
|
|
if not self.port.startswith('tcp://'):
|
|
return original_run(self)
|
|
self._running = True
|
|
host, port = self.port[6:].split(':')
|
|
try:
|
|
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
|
|
s.connect((host, int(port)))
|
|
self.connected.emit(True, self.port)
|
|
buf = ''
|
|
while self._running:
|
|
data = s.recv(4096)
|
|
if not data:
|
|
break
|
|
buf += data.decode('ascii', errors='ignore')
|
|
while '\n' in buf:
|
|
line, buf = buf.split('\n', 1)
|
|
line = line.strip()
|
|
if line.startswith('$') or line.startswith('!'):
|
|
self.sentence.emit(line)
|
|
except Exception as e:
|
|
self.error.emit(str(e))
|
|
self.connected.emit(False, self.port)
|
|
|
|
SerialReader.run = patched_run
|
|
|
|
|
|
# ── Standalone ────────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == '__main__':
|
|
"""Run standalone to see console output without the UI."""
|
|
print('=' * 58)
|
|
print(' MARINE COMPASS SIMULATOR - Secuencia de Maniobras')
|
|
print('=' * 58)
|
|
print('')
|
|
sim = VesselSim()
|
|
try:
|
|
t = threading.Thread(target=_tcp_server, args=(sim,), daemon=True)
|
|
t.start()
|
|
# Keep alive — TCP server runs in daemon thread
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print('\n[SIM] Detenido.')
|