197 lines
7.5 KiB
Python
197 lines
7.5 KiB
Python
"""
|
|
Serial port scanner — detects connected NMEA/AIS equipment.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import serial
|
|
import serial.tools.list_ports
|
|
from fastapi import APIRouter, Request
|
|
|
|
log = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/equipment", tags=["equipment"])
|
|
|
|
BAUD_RATES = [4800, 9600, 38400, 115200]
|
|
|
|
# Sentence prefix → device type + description
|
|
SENTENCE_DB = {
|
|
# GPS / GNSS
|
|
"$GPRMC": ("GPS", "GPS Receiver — RMC position fix"),
|
|
"$GNRMC": ("GPS", "GNSS Receiver — RMC position fix"),
|
|
"$GLRMC": ("GPS", "GLONASS Receiver — RMC position fix"),
|
|
"$GPGGA": ("GPS", "GPS Receiver — GGA fix data"),
|
|
"$GNGGA": ("GPS", "GNSS Receiver — GGA fix data"),
|
|
"$GPGLL": ("GPS", "GPS Receiver — GLL geographic position"),
|
|
"$GPGSV": ("GPS", "GPS Receiver — GSV satellites in view"),
|
|
"$GPVTG": ("GPS", "GPS Receiver — VTG track / speed"),
|
|
"$GPZDA": ("GPS", "GPS Receiver — ZDA date & time"),
|
|
# AIS
|
|
"!AIVDM": ("AIS", "AIS Receiver — VDM other vessel message"),
|
|
"!AIVDO": ("AIS", "AIS Transponder — VDO own vessel message"),
|
|
"!BSVDM": ("AIS", "AIS Receiver (base station) — VDM"),
|
|
"!ABVDM": ("AIS", "AIS Receiver — VDM (AB talker)"),
|
|
# AIS transponder status
|
|
"$AIALR": ("AIS", "AIS Transponder — ALR alarm relay"),
|
|
"$AITXT": ("AIS", "AIS Transponder — TXT text message"),
|
|
"$AIBBM": ("AIS", "AIS Transponder — BBM broadcast binary"),
|
|
# ATON / IEC 62320
|
|
"$AIDGN": ("ATON", "AIS ATON Transponder — diagnostic"),
|
|
# Heading sensors
|
|
"$HEHDT": ("GYRO", "Gyrocompass — HDT true heading"),
|
|
"$HEHDT": ("GYRO", "Gyrocompass — HDT true heading"),
|
|
"$TIROT": ("GYRO", "Rate of Turn indicator"),
|
|
"$HEROT": ("GYRO", "Gyrocompass — ROT rate of turn"),
|
|
# Radar
|
|
"$RATTM": ("RADAR", "Radar — TTM tracked target"),
|
|
"$RARSD": ("RADAR", "Radar — RSD radar system data"),
|
|
"$RAROT": ("RADAR", "Radar — ROT rate of turn"),
|
|
# Depth sounder
|
|
"$SDDBT": ("SONAR", "Depth Sounder — DBT depth below transducer"),
|
|
"$SDDPT": ("SONAR", "Depth Sounder — DPT depth of water"),
|
|
"$SDMTW": ("SONAR", "Depth Sounder — MTW water temperature"),
|
|
# Wind
|
|
"$WIMWV": ("WIND", "Wind sensor — MWV wind speed/angle"),
|
|
"$WIMWD": ("WIND", "Wind sensor — MWD wind direction"),
|
|
# Autopilot
|
|
"$APXTE": ("AUTOPILOT", "Autopilot — XTE cross-track error"),
|
|
"$APHSC": ("AUTOPILOT", "Autopilot — HSC heading to steer"),
|
|
# Generic NMEA
|
|
"$PGRME": ("GPS", "Garmin GPS proprietary sentence"),
|
|
"$PGRMZ": ("GPS", "Garmin GPS altitude"),
|
|
}
|
|
|
|
|
|
def _identify(sentences: list[str]) -> tuple[str, str, list[str]]:
|
|
"""Return (device_type, description, sentences) from a list of raw lines."""
|
|
for line in sentences:
|
|
line = line.strip()
|
|
for prefix, (dtype, desc) in SENTENCE_DB.items():
|
|
if line.startswith(prefix):
|
|
return dtype, desc, sentences
|
|
# Unknown but has data
|
|
if any(s.strip() for s in sentences):
|
|
return "UNKNOWN", "Unknown NMEA device", sentences
|
|
return "NO DATA", "No NMEA sentences received", sentences
|
|
|
|
|
|
def _probe_port(port: str, baud: int, n_lines: int = 15, timeout: float = 1.5) -> Optional[dict]:
|
|
"""
|
|
Try to open port at given baud and read NMEA lines.
|
|
Returns a result dict or None if port is busy / no data.
|
|
"""
|
|
sentences = []
|
|
try:
|
|
with serial.Serial(port, baud, timeout=timeout) as s:
|
|
buf = b""
|
|
for _ in range(n_lines):
|
|
chunk = s.readline()
|
|
if not chunk:
|
|
continue
|
|
buf += chunk
|
|
try:
|
|
line = chunk.decode("ascii", errors="ignore").strip()
|
|
if line:
|
|
sentences.append(line)
|
|
except Exception:
|
|
pass
|
|
# Check if we got any NMEA-like data
|
|
has_nmea = any(
|
|
line.startswith(p) for line in sentences
|
|
for p in ("$", "!")
|
|
)
|
|
if not has_nmea:
|
|
return None
|
|
except serial.SerialException as e:
|
|
msg = str(e).lower()
|
|
if "access is denied" in msg or "permission" in msg:
|
|
return {"port": port, "baud": baud, "busy": True,
|
|
"device_type": "IN USE", "description": "Port in use by system",
|
|
"sentences": []}
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
dtype, desc, _ = _identify(sentences)
|
|
return {
|
|
"port": port,
|
|
"baud": baud,
|
|
"busy": False,
|
|
"device_type": dtype,
|
|
"description": desc,
|
|
"sentences": sentences[:8], # return first 8 for display
|
|
}
|
|
|
|
|
|
SKIP_DESC = ("bluetooth", "serie est")
|
|
|
|
|
|
@router.get("/scan")
|
|
async def scan_ports(request: Request):
|
|
"""
|
|
Scan all serial ports and return detected equipment.
|
|
Runs in a thread pool to avoid blocking the event loop.
|
|
"""
|
|
def _do_scan():
|
|
results = []
|
|
all_ports = serial.tools.list_ports.comports()
|
|
|
|
for port_info in all_ports:
|
|
desc = port_info.description.lower()
|
|
if any(s in desc for s in SKIP_DESC):
|
|
continue # skip Bluetooth virtual COM ports
|
|
|
|
port = port_info.device
|
|
hwid = port_info.hwid or ""
|
|
found = None
|
|
|
|
for baud in BAUD_RATES:
|
|
result = _probe_port(port, baud)
|
|
if result:
|
|
result["hwid"] = hwid
|
|
result["port_desc"] = port_info.description
|
|
found = result
|
|
if result.get("busy"):
|
|
break # no point trying other baud rates
|
|
if result["device_type"] != "NO DATA":
|
|
break # identified — stop trying other rates
|
|
|
|
if found is None:
|
|
found = {
|
|
"port": port,
|
|
"baud": None,
|
|
"busy": False,
|
|
"device_type": "NO DATA",
|
|
"description": "No NMEA data on any baud rate",
|
|
"sentences": [],
|
|
"hwid": hwid,
|
|
"port_desc": port_info.description,
|
|
}
|
|
results.append(found)
|
|
|
|
return results
|
|
|
|
results = await asyncio.get_event_loop().run_in_executor(None, _do_scan)
|
|
|
|
# Enrich "IN USE" entries with the actual internal source.
|
|
# The scanner can't probe a port held by our own GPS reader, so we cross-
|
|
# reference and label it with what we know instead of leaving it as "IN USE".
|
|
gps = getattr(request.app.state, "gps", None)
|
|
gps_port = (getattr(gps, "_port", None) or getattr(gps, "_forced_port", None)) if gps else None
|
|
if gps_port:
|
|
for r in results:
|
|
if r.get("device_type") != "IN USE" or r["port"] != gps_port:
|
|
continue
|
|
fix = getattr(gps, "last_fix", None) or {}
|
|
bits = []
|
|
if fix.get("sats") is not None:
|
|
bits.append(f"{fix['sats']} sats")
|
|
if fix.get("hdop") is not None:
|
|
bits.append(f"HDOP {fix['hdop']:.2f}")
|
|
extra = f" ({', '.join(bits)})" if bits else ""
|
|
r["device_type"] = "GPS"
|
|
r["description"] = f"GPS Receiver — held by AidsMonitoring{extra}"
|
|
r["baud"] = getattr(gps, "_baud", None)
|
|
|
|
return results
|