""" Asyncio UDP listener on port 10110 for NMEA AIS data from AIS-catcher. Reassembles multi-part sentences (!AIVDM) and routes to process_message(). """ from __future__ import annotations import asyncio import time from collections import defaultdict, deque from typing import Callable, Awaitable from .ais_decoder import parse_nmea_sentence, SHIP_TYPE_NAMES from .aton_decoder import decode_type21, decode_type8_aton # ── Module-level stats (read by /ais/stats endpoint) ───────────────────────── _stats: dict = { "sentences_received": 0, # raw UDP lines seen "sentences_decoded": 0, # lines that decoded to a known type "vessels_seen": 0, # distinct MMSIs (vessels only) "last_sentence_ts": None, "listening": False, "udp_port": None, } # Last 30 raw sentences for /ais/raw endpoint _raw_buffer: deque = deque(maxlen=30) # Buffer for multi-part NMEA messages, keyed by (channel, seq_id) _multipart: dict[tuple, list] = defaultdict(list) # Static data accumulated by MMSI (from Type 5 / Type 24) _static_cache: dict[str, dict] = {} _mmsi_seen: set[str] = set() def get_stats() -> dict: return dict(_stats) def get_raw() -> list[str]: return list(_raw_buffer) async def _handle_sentence(sentence: str, process_msg: Callable) -> None: """Parse one raw NMEA sentence and call process_msg with the result.""" sentence = sentence.strip() if not (sentence.startswith("!AIVDM") or sentence.startswith("!AIVDO")): return _stats["sentences_received"] += 1 _stats["last_sentence_ts"] = time.time() _raw_buffer.append(sentence) # Strip checksum for field parsing raw = sentence if '*' in raw: raw = raw[:raw.rindex('*')] parts = raw.split(',') if len(parts) < 6: return try: total_parts = int(parts[1]) part_num = int(parts[2]) seq_id = parts[3] channel = parts[4] payload = parts[5] except (ValueError, IndexError): return if total_parts == 1: full_payload = payload else: key = (channel, seq_id) _multipart[key].append((part_num, payload)) if len(_multipart[key]) < total_parts: return _multipart[key].sort(key=lambda x: x[0]) full_payload = ''.join(p for _, p in _multipart[key]) del _multipart[key] decoded = parse_nmea_sentence(f"!AIVDM,1,1,,A,{full_payload},0") if decoded is None: return _stats["sentences_decoded"] += 1 msg_type_raw = decoded.get("type") if msg_type_raw == "vessel": mmsi = decoded["mmsi"] static = _static_cache.get(mmsi, {}) merged = {**static, **decoded} if "nombre" not in merged: merged["nombre"] = f"MMSI {mmsi}" if "tipo" not in merged: merged["tipo"] = 0 merged["tipo_nombre"] = SHIP_TYPE_NAMES.get(merged.get("tipo", 0), "Unknown") if "destino" not in merged: merged["destino"] = None if "bandera" not in merged: merged["bandera"] = None if mmsi not in _mmsi_seen: _mmsi_seen.add(mmsi) _stats["vessels_seen"] = len(_mmsi_seen) await process_msg(merged) elif msg_type_raw == "vessel_static": mmsi = decoded["mmsi"] if mmsi not in _static_cache: _static_cache[mmsi] = {} _static_cache[mmsi].update({k: v for k, v in decoded.items() if k not in ("type", "timestamp", "source") and v is not None}) elif msg_type_raw == "aton_raw": msg_type = decoded["msg_type"] payload = decoded["payload"] if msg_type == 21: result = decode_type21(payload) if result: await process_msg({"type": "aton", "msg_type": 21, **result}) elif msg_type == 8: result = decode_type8_aton(payload) if result: await process_msg({"type": "aton", "msg_type": 8, **result}) class _AISUDPProtocol(asyncio.DatagramProtocol): def __init__(self, process_msg: Callable): self._process = process_msg def datagram_received(self, data: bytes, addr): text = data.decode("ascii", errors="ignore") for line in text.splitlines(): line = line.strip() if line: asyncio.ensure_future(_handle_sentence(line, self._process)) def error_received(self, exc): print(f"[ais-udp] Error: {exc}") def connection_lost(self, exc): _stats["listening"] = False async def run_udp_listener( process_msg: Callable[[dict], Awaitable[None]], host: str = "0.0.0.0", port: int = 10110, ): """Start async UDP listener. Runs until cancelled.""" loop = asyncio.get_event_loop() _stats["udp_port"] = port print(f"[ais-udp] Listening on {host}:{port}") try: transport, protocol = await loop.create_datagram_endpoint( lambda: _AISUDPProtocol(process_msg), local_addr=(host, port), allow_broadcast=True, ) _stats["listening"] = True try: await asyncio.sleep(float("inf")) finally: transport.close() _stats["listening"] = False except asyncio.CancelledError: _stats["listening"] = False print("[ais-udp] Listener stopped") except OSError as e: _stats["listening"] = False print(f"[ais-udp] Could not bind to {host}:{port} — {e}")