Files
alro65 cfd94f905a security: CORS hardening, path traversal fix, WebSocket auth + cleanup
- Restrict CORS to localhost origins (was allow_origins=[*])
- Require valid JWT on WebSocket /ws (anonymous no longer gets admin view)
- Fix path traversal in delete_cell(): resolve() + parent check
- Validate cell_id format in /charts/download-noaa/{cell_id}
- Exclude charts/ and Cartas/ from git (keep US1GC09M world overview)
- Add NOAA ENC Portal external link in charts catalog tab
- Untrack __pycache__/, .db, .claude/ session files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-03 12:45:43 -04:00

178 lines
6.8 KiB
Python

"""
AidsMonitoring — Slave relay service
=====================================
When server_role == "SLAVE", this service maintains a persistent WebSocket
connection to the master server and forwards all AIS/ATON/aid_position/alert
events upstream.
The relay is fully non-blocking:
- Events are enqueued from broadcast() via send() (never blocks)
- A single asyncio task drains the queue and writes to the master WS
- If the master is unreachable, a ring-buffer keeps the last max_queue
messages; older events are silently dropped
- On reconnect the slave sends a hello so the master can log it
Usage (from main.py):
relay = SlaveRelay(master_url="ws://10.0.0.1:8000/ws/slave",
slave_name="Puerto Barranquilla")
await relay.start()
...
relay.send({"type": "vessel", "mmsi": "012345678", ...})
...
await relay.stop()
"""
from __future__ import annotations
import asyncio
import json
import logging
from collections import deque
log = logging.getLogger("aids.slave_relay")
class SlaveRelay:
"""Non-blocking WebSocket relay: slave → master."""
def __init__(
self,
master_url: str,
slave_name: str,
max_queue: int = 500,
reconnect_delay: float = 5.0,
):
"""
Parameters
----------
master_url WebSocket URL of master's slave endpoint.
e.g. "ws://192.168.1.10:8000/ws/slave"
slave_name Human-readable identifier sent in hello message.
e.g. "Puerto Barranquilla"
max_queue Ring-buffer capacity. Oldest events dropped when full.
reconnect_delay Seconds to wait before reconnecting after a disconnect.
"""
self.master_url = master_url
self.slave_name = slave_name
self._max_queue = max_queue
self._reconnect_delay = reconnect_delay
self._queue: deque = deque(maxlen=max_queue)
self._task: asyncio.Task | None = None
self._running = False
self._connected = False
self._connect_count = 0 # total successful connections (for logging)
# ── Public API ──────────────────────────────────────────────────────────
async def start(self):
"""Start the background relay task."""
if self._task and not self._task.done():
return # already running
self._running = True
self._task = asyncio.create_task(self._run(), name="slave_relay")
log.info(f"[slave] Relay iniciado → {self.master_url} (nombre='{self.slave_name}')")
async def stop(self):
"""Cancel the relay task and wait for it to finish."""
self._running = False
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self._connected = False
log.info("[slave] Relay detenido.")
def send(self, msg: dict):
"""
Enqueue an event for forwarding. Non-blocking — safe to call from
broadcast() in the main event loop. Drops oldest if queue is full.
"""
if not self._running:
return
# Tag every message with slave origin so the master knows the source
tagged = {**msg, "_slave": self.slave_name}
self._queue.append(tagged)
# ── Status ───────────────────────────────────────────────────────────────
@property
def connected(self) -> bool:
return self._connected
@property
def queue_len(self) -> int:
return len(self._queue)
def status(self) -> dict:
return {
"running": self._running,
"connected": self._connected,
"master_url": self.master_url,
"slave_name": self.slave_name,
"queue_len": len(self._queue),
"connect_count": self._connect_count,
}
# ── Background task ──────────────────────────────────────────────────────
async def _run(self):
import websockets
while self._running:
try:
log.info(f"[slave] Conectando a maestro {self.master_url} …")
async with websockets.connect(
self.master_url,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
open_timeout=10,
) as ws:
self._connected = True
self._connect_count += 1
log.info(
f"[slave] Conectado al maestro "
f"(conexión #{self._connect_count}, "
f"{len(self._queue)} msgs en cola)"
)
# ── Announce this slave ───────────────────────────────
await ws.send(json.dumps({
"type": "slave_hello",
"slave_name": self.slave_name,
"version": "1.0",
}))
# ── Drain queue continuously ──────────────────────────
while self._running:
if self._queue:
msg = self._queue.popleft()
try:
await ws.send(json.dumps(msg))
except Exception as send_err:
# Re-queue the failed message (prepend)
self._queue.appendleft(msg)
log.warning(
f"[slave] Error al enviar: {send_err}"
)
break # force reconnect
else:
# Nothing to send — yield to event loop
await asyncio.sleep(0.02) # 20 ms idle
except asyncio.CancelledError:
break
except Exception as conn_err:
log.warning(
f"[slave] Desconectado ({conn_err}). "
f"Reintentando en {self._reconnect_delay} s …"
)
finally:
self._connected = False
if self._running:
await asyncio.sleep(self._reconnect_delay)