Files
AidsMonitoring/backend/services/ais_udp_reader.py
T

172 lines
5.5 KiB
Python

"""
Asyncio UDP listener on port 10110 for NMEA AIS data from AIS-catcher.
Reassembles multi-part sentences (!AIVDM) and routes to process_message().
"""
from __future__ import annotations
import asyncio
import time
from collections import defaultdict, deque
from typing import Callable, Awaitable
from .ais_decoder import parse_nmea_sentence, SHIP_TYPE_NAMES
from .aton_decoder import decode_type21, decode_type8_aton
# ── Module-level stats (read by /ais/stats endpoint) ─────────────────────────
_stats: dict = {
"sentences_received": 0, # raw UDP lines seen
"sentences_decoded": 0, # lines that decoded to a known type
"vessels_seen": 0, # distinct MMSIs (vessels only)
"last_sentence_ts": None,
"listening": False,
"udp_port": None,
}
# Last 30 raw sentences for /ais/raw endpoint
_raw_buffer: deque = deque(maxlen=30)
# Buffer for multi-part NMEA messages, keyed by (channel, seq_id)
_multipart: dict[tuple, list] = defaultdict(list)
# Static data accumulated by MMSI (from Type 5 / Type 24)
_static_cache: dict[str, dict] = {}
_mmsi_seen: set[str] = set()
def get_stats() -> dict:
return dict(_stats)
def get_raw() -> list[str]:
return list(_raw_buffer)
async def _handle_sentence(sentence: str, process_msg: Callable) -> None:
"""Parse one raw NMEA sentence and call process_msg with the result."""
sentence = sentence.strip()
if not (sentence.startswith("!AIVDM") or sentence.startswith("!AIVDO")):
return
_stats["sentences_received"] += 1
_stats["last_sentence_ts"] = time.time()
_raw_buffer.append(sentence)
# Strip checksum for field parsing
raw = sentence
if '*' in raw:
raw = raw[:raw.rindex('*')]
parts = raw.split(',')
if len(parts) < 6:
return
try:
total_parts = int(parts[1])
part_num = int(parts[2])
seq_id = parts[3]
channel = parts[4]
payload = parts[5]
except (ValueError, IndexError):
return
if total_parts == 1:
full_payload = payload
else:
key = (channel, seq_id)
_multipart[key].append((part_num, payload))
if len(_multipart[key]) < total_parts:
return
_multipart[key].sort(key=lambda x: x[0])
full_payload = ''.join(p for _, p in _multipart[key])
del _multipart[key]
decoded = parse_nmea_sentence(f"!AIVDM,1,1,,A,{full_payload},0")
if decoded is None:
return
_stats["sentences_decoded"] += 1
msg_type_raw = decoded.get("type")
if msg_type_raw == "vessel":
mmsi = decoded["mmsi"]
static = _static_cache.get(mmsi, {})
merged = {**static, **decoded}
if "nombre" not in merged:
merged["nombre"] = f"MMSI {mmsi}"
if "tipo" not in merged:
merged["tipo"] = 0
merged["tipo_nombre"] = SHIP_TYPE_NAMES.get(merged.get("tipo", 0), "Unknown")
if "destino" not in merged:
merged["destino"] = None
if "bandera" not in merged:
merged["bandera"] = None
if mmsi not in _mmsi_seen:
_mmsi_seen.add(mmsi)
_stats["vessels_seen"] = len(_mmsi_seen)
await process_msg(merged)
elif msg_type_raw == "vessel_static":
mmsi = decoded["mmsi"]
if mmsi not in _static_cache:
_static_cache[mmsi] = {}
_static_cache[mmsi].update({k: v for k, v in decoded.items()
if k not in ("type", "timestamp", "source") and v is not None})
elif msg_type_raw == "aton_raw":
msg_type = decoded["msg_type"]
payload = decoded["payload"]
if msg_type == 21:
result = decode_type21(payload)
if result:
await process_msg({"type": "aton", "msg_type": 21, **result})
elif msg_type == 8:
result = decode_type8_aton(payload)
if result:
await process_msg({"type": "aton", "msg_type": 8, **result})
class _AISUDPProtocol(asyncio.DatagramProtocol):
def __init__(self, process_msg: Callable):
self._process = process_msg
def datagram_received(self, data: bytes, addr):
text = data.decode("ascii", errors="ignore")
for line in text.splitlines():
line = line.strip()
if line:
asyncio.ensure_future(_handle_sentence(line, self._process))
def error_received(self, exc):
print(f"[ais-udp] Error: {exc}")
def connection_lost(self, exc):
_stats["listening"] = False
async def run_udp_listener(
process_msg: Callable[[dict], Awaitable[None]],
host: str = "0.0.0.0",
port: int = 10110,
):
"""Start async UDP listener. Runs until cancelled."""
loop = asyncio.get_event_loop()
_stats["udp_port"] = port
print(f"[ais-udp] Listening on {host}:{port}")
try:
transport, protocol = await loop.create_datagram_endpoint(
lambda: _AISUDPProtocol(process_msg),
local_addr=(host, port),
allow_broadcast=True,
)
_stats["listening"] = True
try:
await asyncio.sleep(float("inf"))
finally:
transport.close()
_stats["listening"] = False
except asyncio.CancelledError:
_stats["listening"] = False
print("[ais-udp] Listener stopped")
except OSError as e:
_stats["listening"] = False
print(f"[ais-udp] Could not bind to {host}:{port}{e}")