""" Serial port scanner — detects connected NMEA/AIS equipment. """ import asyncio import logging from typing import Optional import serial import serial.tools.list_ports from fastapi import APIRouter, Request log = logging.getLogger(__name__) router = APIRouter(prefix="/equipment", tags=["equipment"]) BAUD_RATES = [4800, 9600, 38400, 115200] # Sentence prefix → device type + description SENTENCE_DB = { # GPS / GNSS "$GPRMC": ("GPS", "GPS Receiver — RMC position fix"), "$GNRMC": ("GPS", "GNSS Receiver — RMC position fix"), "$GLRMC": ("GPS", "GLONASS Receiver — RMC position fix"), "$GPGGA": ("GPS", "GPS Receiver — GGA fix data"), "$GNGGA": ("GPS", "GNSS Receiver — GGA fix data"), "$GPGLL": ("GPS", "GPS Receiver — GLL geographic position"), "$GPGSV": ("GPS", "GPS Receiver — GSV satellites in view"), "$GPVTG": ("GPS", "GPS Receiver — VTG track / speed"), "$GPZDA": ("GPS", "GPS Receiver — ZDA date & time"), # AIS "!AIVDM": ("AIS", "AIS Receiver — VDM other vessel message"), "!AIVDO": ("AIS", "AIS Transponder — VDO own vessel message"), "!BSVDM": ("AIS", "AIS Receiver (base station) — VDM"), "!ABVDM": ("AIS", "AIS Receiver — VDM (AB talker)"), # AIS transponder status "$AIALR": ("AIS", "AIS Transponder — ALR alarm relay"), "$AITXT": ("AIS", "AIS Transponder — TXT text message"), "$AIBBM": ("AIS", "AIS Transponder — BBM broadcast binary"), # ATON / IEC 62320 "$AIDGN": ("ATON", "AIS ATON Transponder — diagnostic"), # Heading sensors "$HEHDT": ("GYRO", "Gyrocompass — HDT true heading"), "$HEHDT": ("GYRO", "Gyrocompass — HDT true heading"), "$TIROT": ("GYRO", "Rate of Turn indicator"), "$HEROT": ("GYRO", "Gyrocompass — ROT rate of turn"), # Radar "$RATTM": ("RADAR", "Radar — TTM tracked target"), "$RARSD": ("RADAR", "Radar — RSD radar system data"), "$RAROT": ("RADAR", "Radar — ROT rate of turn"), # Depth sounder "$SDDBT": ("SONAR", "Depth Sounder — DBT depth below transducer"), "$SDDPT": ("SONAR", "Depth Sounder — DPT depth of water"), "$SDMTW": ("SONAR", "Depth Sounder — MTW water temperature"), # Wind "$WIMWV": ("WIND", "Wind sensor — MWV wind speed/angle"), "$WIMWD": ("WIND", "Wind sensor — MWD wind direction"), # Autopilot "$APXTE": ("AUTOPILOT", "Autopilot — XTE cross-track error"), "$APHSC": ("AUTOPILOT", "Autopilot — HSC heading to steer"), # Generic NMEA "$PGRME": ("GPS", "Garmin GPS proprietary sentence"), "$PGRMZ": ("GPS", "Garmin GPS altitude"), } def _identify(sentences: list[str]) -> tuple[str, str, list[str]]: """Return (device_type, description, sentences) from a list of raw lines.""" for line in sentences: line = line.strip() for prefix, (dtype, desc) in SENTENCE_DB.items(): if line.startswith(prefix): return dtype, desc, sentences # Unknown but has data if any(s.strip() for s in sentences): return "UNKNOWN", "Unknown NMEA device", sentences return "NO DATA", "No NMEA sentences received", sentences def _probe_port(port: str, baud: int, n_lines: int = 15, timeout: float = 1.5) -> Optional[dict]: """ Try to open port at given baud and read NMEA lines. Returns a result dict or None if port is busy / no data. """ sentences = [] try: with serial.Serial(port, baud, timeout=timeout) as s: buf = b"" for _ in range(n_lines): chunk = s.readline() if not chunk: continue buf += chunk try: line = chunk.decode("ascii", errors="ignore").strip() if line: sentences.append(line) except Exception: pass # Check if we got any NMEA-like data has_nmea = any( line.startswith(p) for line in sentences for p in ("$", "!") ) if not has_nmea: return None except serial.SerialException as e: msg = str(e).lower() if "access is denied" in msg or "permission" in msg: return {"port": port, "baud": baud, "busy": True, "device_type": "IN USE", "description": "Port in use by system", "sentences": []} return None except Exception: return None dtype, desc, _ = _identify(sentences) return { "port": port, "baud": baud, "busy": False, "device_type": dtype, "description": desc, "sentences": sentences[:8], # return first 8 for display } SKIP_DESC = ("bluetooth", "serie est") @router.get("/scan") async def scan_ports(request: Request): """ Scan all serial ports and return detected equipment. Runs in a thread pool to avoid blocking the event loop. """ def _do_scan(): results = [] all_ports = serial.tools.list_ports.comports() for port_info in all_ports: desc = port_info.description.lower() if any(s in desc for s in SKIP_DESC): continue # skip Bluetooth virtual COM ports port = port_info.device hwid = port_info.hwid or "" found = None for baud in BAUD_RATES: result = _probe_port(port, baud) if result: result["hwid"] = hwid result["port_desc"] = port_info.description found = result if result.get("busy"): break # no point trying other baud rates if result["device_type"] != "NO DATA": break # identified — stop trying other rates if found is None: found = { "port": port, "baud": None, "busy": False, "device_type": "NO DATA", "description": "No NMEA data on any baud rate", "sentences": [], "hwid": hwid, "port_desc": port_info.description, } results.append(found) return results results = await asyncio.get_event_loop().run_in_executor(None, _do_scan) # Enrich "IN USE" entries with the actual internal source. # The scanner can't probe a port held by our own GPS reader, so we cross- # reference and label it with what we know instead of leaving it as "IN USE". gps = getattr(request.app.state, "gps", None) gps_port = (getattr(gps, "_port", None) or getattr(gps, "_forced_port", None)) if gps else None if gps_port: for r in results: if r.get("device_type") != "IN USE" or r["port"] != gps_port: continue fix = getattr(gps, "last_fix", None) or {} bits = [] if fix.get("sats") is not None: bits.append(f"{fix['sats']} sats") if fix.get("hdop") is not None: bits.append(f"HDOP {fix['hdop']:.2f}") extra = f" ({', '.join(bits)})" if bits else "" r["device_type"] = "GPS" r["description"] = f"GPS Receiver — held by AidsMonitoring{extra}" r["baud"] = getattr(gps, "_baud", None) return results