Files

197 lines
7.5 KiB
Python

"""
Serial port scanner — detects connected NMEA/AIS equipment.
"""
import asyncio
import logging
from typing import Optional
import serial
import serial.tools.list_ports
from fastapi import APIRouter, Request
log = logging.getLogger(__name__)
router = APIRouter(prefix="/equipment", tags=["equipment"])
BAUD_RATES = [4800, 9600, 38400, 115200]
# Sentence prefix → device type + description
SENTENCE_DB = {
# GPS / GNSS
"$GPRMC": ("GPS", "GPS Receiver — RMC position fix"),
"$GNRMC": ("GPS", "GNSS Receiver — RMC position fix"),
"$GLRMC": ("GPS", "GLONASS Receiver — RMC position fix"),
"$GPGGA": ("GPS", "GPS Receiver — GGA fix data"),
"$GNGGA": ("GPS", "GNSS Receiver — GGA fix data"),
"$GPGLL": ("GPS", "GPS Receiver — GLL geographic position"),
"$GPGSV": ("GPS", "GPS Receiver — GSV satellites in view"),
"$GPVTG": ("GPS", "GPS Receiver — VTG track / speed"),
"$GPZDA": ("GPS", "GPS Receiver — ZDA date & time"),
# AIS
"!AIVDM": ("AIS", "AIS Receiver — VDM other vessel message"),
"!AIVDO": ("AIS", "AIS Transponder — VDO own vessel message"),
"!BSVDM": ("AIS", "AIS Receiver (base station) — VDM"),
"!ABVDM": ("AIS", "AIS Receiver — VDM (AB talker)"),
# AIS transponder status
"$AIALR": ("AIS", "AIS Transponder — ALR alarm relay"),
"$AITXT": ("AIS", "AIS Transponder — TXT text message"),
"$AIBBM": ("AIS", "AIS Transponder — BBM broadcast binary"),
# ATON / IEC 62320
"$AIDGN": ("ATON", "AIS ATON Transponder — diagnostic"),
# Heading sensors
"$HEHDT": ("GYRO", "Gyrocompass — HDT true heading"),
"$HEHDT": ("GYRO", "Gyrocompass — HDT true heading"),
"$TIROT": ("GYRO", "Rate of Turn indicator"),
"$HEROT": ("GYRO", "Gyrocompass — ROT rate of turn"),
# Radar
"$RATTM": ("RADAR", "Radar — TTM tracked target"),
"$RARSD": ("RADAR", "Radar — RSD radar system data"),
"$RAROT": ("RADAR", "Radar — ROT rate of turn"),
# Depth sounder
"$SDDBT": ("SONAR", "Depth Sounder — DBT depth below transducer"),
"$SDDPT": ("SONAR", "Depth Sounder — DPT depth of water"),
"$SDMTW": ("SONAR", "Depth Sounder — MTW water temperature"),
# Wind
"$WIMWV": ("WIND", "Wind sensor — MWV wind speed/angle"),
"$WIMWD": ("WIND", "Wind sensor — MWD wind direction"),
# Autopilot
"$APXTE": ("AUTOPILOT", "Autopilot — XTE cross-track error"),
"$APHSC": ("AUTOPILOT", "Autopilot — HSC heading to steer"),
# Generic NMEA
"$PGRME": ("GPS", "Garmin GPS proprietary sentence"),
"$PGRMZ": ("GPS", "Garmin GPS altitude"),
}
def _identify(sentences: list[str]) -> tuple[str, str, list[str]]:
"""Return (device_type, description, sentences) from a list of raw lines."""
for line in sentences:
line = line.strip()
for prefix, (dtype, desc) in SENTENCE_DB.items():
if line.startswith(prefix):
return dtype, desc, sentences
# Unknown but has data
if any(s.strip() for s in sentences):
return "UNKNOWN", "Unknown NMEA device", sentences
return "NO DATA", "No NMEA sentences received", sentences
def _probe_port(port: str, baud: int, n_lines: int = 15, timeout: float = 1.5) -> Optional[dict]:
"""
Try to open port at given baud and read NMEA lines.
Returns a result dict or None if port is busy / no data.
"""
sentences = []
try:
with serial.Serial(port, baud, timeout=timeout) as s:
buf = b""
for _ in range(n_lines):
chunk = s.readline()
if not chunk:
continue
buf += chunk
try:
line = chunk.decode("ascii", errors="ignore").strip()
if line:
sentences.append(line)
except Exception:
pass
# Check if we got any NMEA-like data
has_nmea = any(
line.startswith(p) for line in sentences
for p in ("$", "!")
)
if not has_nmea:
return None
except serial.SerialException as e:
msg = str(e).lower()
if "access is denied" in msg or "permission" in msg:
return {"port": port, "baud": baud, "busy": True,
"device_type": "IN USE", "description": "Port in use by system",
"sentences": []}
return None
except Exception:
return None
dtype, desc, _ = _identify(sentences)
return {
"port": port,
"baud": baud,
"busy": False,
"device_type": dtype,
"description": desc,
"sentences": sentences[:8], # return first 8 for display
}
SKIP_DESC = ("bluetooth", "serie est")
@router.get("/scan")
async def scan_ports(request: Request):
"""
Scan all serial ports and return detected equipment.
Runs in a thread pool to avoid blocking the event loop.
"""
def _do_scan():
results = []
all_ports = serial.tools.list_ports.comports()
for port_info in all_ports:
desc = port_info.description.lower()
if any(s in desc for s in SKIP_DESC):
continue # skip Bluetooth virtual COM ports
port = port_info.device
hwid = port_info.hwid or ""
found = None
for baud in BAUD_RATES:
result = _probe_port(port, baud)
if result:
result["hwid"] = hwid
result["port_desc"] = port_info.description
found = result
if result.get("busy"):
break # no point trying other baud rates
if result["device_type"] != "NO DATA":
break # identified — stop trying other rates
if found is None:
found = {
"port": port,
"baud": None,
"busy": False,
"device_type": "NO DATA",
"description": "No NMEA data on any baud rate",
"sentences": [],
"hwid": hwid,
"port_desc": port_info.description,
}
results.append(found)
return results
results = await asyncio.get_event_loop().run_in_executor(None, _do_scan)
# Enrich "IN USE" entries with the actual internal source.
# The scanner can't probe a port held by our own GPS reader, so we cross-
# reference and label it with what we know instead of leaving it as "IN USE".
gps = getattr(request.app.state, "gps", None)
gps_port = (getattr(gps, "_port", None) or getattr(gps, "_forced_port", None)) if gps else None
if gps_port:
for r in results:
if r.get("device_type") != "IN USE" or r["port"] != gps_port:
continue
fix = getattr(gps, "last_fix", None) or {}
bits = []
if fix.get("sats") is not None:
bits.append(f"{fix['sats']} sats")
if fix.get("hdop") is not None:
bits.append(f"HDOP {fix['hdop']:.2f}")
extra = f" ({', '.join(bits)})" if bits else ""
r["device_type"] = "GPS"
r["description"] = f"GPS Receiver — held by AidsMonitoring{extra}"
r["baud"] = getattr(gps, "_baud", None)
return results