211 lines
8.3 KiB
Python
211 lines
8.3 KiB
Python
"""NMEA 0183 serial reader — parses GGA, RMC, VTG, GSV, GSA, GLL.
|
|
Runs in a background thread; calls broadcast_fn(msg) directly.
|
|
Thread safety is handled by the caller (Qt signal emit or similar)."""
|
|
import threading, serial, serial.tools.list_ports
|
|
|
|
# Known USB-serial VIDs: u-blox, CH340, FTDI, Prolific
|
|
KNOWN_VIDS = {0x1546, 0x1A86, 0x0403, 0x067B}
|
|
|
|
SYSTEM_MAP = {"GP": "GPS", "GL": "GLONASS", "GA": "Galileo",
|
|
"GB": "BeiDou", "GN": "GNSS", "GQ": "QZSS"}
|
|
|
|
|
|
class NMEAReader(threading.Thread):
|
|
def __init__(self, port: str, baud: int, broadcast_fn):
|
|
super().__init__(daemon=True, name="nmea-reader")
|
|
self.port = port
|
|
self.baud = baud
|
|
self._bcast = broadcast_fn
|
|
self._stop = threading.Event()
|
|
self._fix = {}
|
|
self._sats = {} # key="{sys}_{prn}" → dict
|
|
self._active = set() # PRN strings from GSA
|
|
|
|
# ── Auto-detect ───────────────────────────────────────────────────────────
|
|
@staticmethod
|
|
def autodetect() -> str | None:
|
|
"""Try to find a GPS serial port.
|
|
1. Match by USB vendor ID (u-blox, CH340, FTDI, Prolific) — fast and reliable.
|
|
2. If no VID match, return the first available COM port from the OS list
|
|
(works for USB-CDC devices that don't expose a VID, e.g. some clone adapters).
|
|
"""
|
|
ports = serial.tools.list_ports.comports()
|
|
# Priority: well-known GPS USB chip vendor IDs
|
|
for p in ports:
|
|
if (p.vid or 0) in KNOWN_VIDS:
|
|
return p.device
|
|
# Fallback: first port in the system list (avoid brute-force which can hang on BT ports)
|
|
if ports:
|
|
return ports[0].device
|
|
return None
|
|
|
|
@staticmethod
|
|
def list_ports():
|
|
return [{"port": p.device, "desc": p.description,
|
|
"vid": p.vid, "pid": p.pid}
|
|
for p in serial.tools.list_ports.comports()]
|
|
|
|
# ── Thread control ────────────────────────────────────────────────────────
|
|
def stop(self):
|
|
self._stop.set()
|
|
|
|
def _emit(self, msg: dict):
|
|
self._bcast(msg)
|
|
|
|
# ── Main loop ─────────────────────────────────────────────────────────────
|
|
def run(self):
|
|
try:
|
|
ser = serial.Serial(self.port, self.baud, timeout=0.3)
|
|
except Exception as e:
|
|
self._emit({"type": "error", "msg": str(e)})
|
|
return
|
|
|
|
self._emit({"type": "connected", "port": self.port, "baud": self.baud})
|
|
|
|
while not self._stop.is_set():
|
|
try:
|
|
raw = ser.readline()
|
|
if not raw:
|
|
continue
|
|
line = raw.decode("ascii", errors="replace").strip()
|
|
if not line.startswith("$"):
|
|
continue
|
|
self._emit({"type": "raw", "sentence": line})
|
|
self._parse(line)
|
|
except serial.SerialException as e:
|
|
self._emit({"type": "error", "msg": str(e)})
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
ser.close()
|
|
self._emit({"type": "disconnected"})
|
|
|
|
# ── NMEA dispatch ─────────────────────────────────────────────────────────
|
|
def _parse(self, line: str):
|
|
body = line[1:line.index("*")] if "*" in line else line[1:]
|
|
parts = body.split(",")
|
|
if not parts:
|
|
return
|
|
talker = parts[0][:2]
|
|
sentence = parts[0][2:]
|
|
dispatch = {
|
|
"GGA": self._gga, "RMC": self._rmc, "VTG": self._vtg,
|
|
"GSV": self._gsv, "GSA": self._gsa, "GLL": self._gll,
|
|
}
|
|
fn = dispatch.get(sentence)
|
|
if fn:
|
|
try:
|
|
fn(parts, talker)
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────
|
|
@staticmethod
|
|
def _lat(v, h):
|
|
if not v: return None
|
|
d = int(v[:2]); m = float(v[2:])
|
|
return -(d + m/60) if h == "S" else (d + m/60)
|
|
|
|
@staticmethod
|
|
def _lon(v, h):
|
|
if not v: return None
|
|
d = int(v[:3]); m = float(v[3:])
|
|
return -(d + m/60) if h == "W" else (d + m/60)
|
|
|
|
@staticmethod
|
|
def _f(v):
|
|
s = v.split("*")[0].strip() if v else ""
|
|
return float(s) if s else None
|
|
|
|
@staticmethod
|
|
def _i(v):
|
|
s = v.split("*")[0].strip() if v else ""
|
|
return int(s) if s else None
|
|
|
|
def _sat_list(self):
|
|
return list(self._sats.values())
|
|
|
|
# ── Sentence handlers ─────────────────────────────────────────────────────
|
|
def _gga(self, p, talker):
|
|
if len(p) < 10: return
|
|
lat = self._lat(p[2], p[3])
|
|
lon = self._lon(p[4], p[5])
|
|
fq = self._i(p[6]) or 0
|
|
sats = self._i(p[7]) or 0
|
|
hdop = self._f(p[8])
|
|
alt = self._f(p[9])
|
|
self._fix.update({"lat": lat, "lon": lon, "fix_quality": fq,
|
|
"satellites": sats, "hdop": hdop, "altitude": alt,
|
|
"utc": p[1]})
|
|
self._emit({"type": "position", **self._fix, "sats": self._sat_list()})
|
|
|
|
def _rmc(self, p, talker):
|
|
if len(p) < 9: return
|
|
if p[2] != "A": return # void
|
|
lat = self._lat(p[3], p[4])
|
|
lon = self._lon(p[5], p[6])
|
|
sog = self._f(p[7])
|
|
cog = self._f(p[8])
|
|
magvar = None
|
|
if len(p) > 11 and p[10]:
|
|
mv = self._f(p[10])
|
|
if mv is not None:
|
|
magvar = -mv if (len(p) > 11 and p[11].startswith("W")) else mv
|
|
self._fix.update({"sog": sog, "cog": cog, "magvar": magvar,
|
|
"date": p[9]})
|
|
self._emit({"type": "rmc", "lat": lat, "lon": lon,
|
|
"sog": sog, "cog": cog, "magvar": magvar, "date": p[9]})
|
|
|
|
def _vtg(self, p, talker):
|
|
if len(p) < 8: return
|
|
mode = p[9].split("*")[0] if len(p) > 9 else ""
|
|
if mode == "N": return
|
|
cog_t = self._f(p[1])
|
|
cog_m = self._f(p[3])
|
|
sog = self._f(p[5])
|
|
self._fix.update({"cog": cog_t, "cog_m": cog_m, "sog": sog})
|
|
|
|
def _gsv(self, p, talker):
|
|
sys = SYSTEM_MAP.get(talker, talker)
|
|
i = 4
|
|
while i + 2 < len(p):
|
|
prn = p[i].strip()
|
|
el = self._i(p[i+1]) if i+1 < len(p) else None
|
|
az = self._i(p[i+2]) if i+2 < len(p) else None
|
|
snr_raw = p[i+3] if i+3 < len(p) else ""
|
|
snr = self._i(snr_raw)
|
|
if prn:
|
|
key = f"{sys}_{prn}"
|
|
self._sats[key] = {
|
|
"key": key, "prn": prn, "system": sys,
|
|
"el": el, "az": az, "snr": snr,
|
|
"used": prn in self._active,
|
|
}
|
|
i += 4
|
|
self._emit({"type": "satellites", "sats": self._sat_list()})
|
|
|
|
def _gsa(self, p, talker):
|
|
if len(p) < 15: return
|
|
self._active = {p[i].strip() for i in range(3, 15) if p[i].strip()}
|
|
fix_mode = self._i(p[2]) or 1
|
|
pdop = self._f(p[15]) if len(p) > 15 else None
|
|
hdop = self._f(p[16]) if len(p) > 16 else None
|
|
vdop = self._f(p[17]) if len(p) > 17 else None
|
|
self._fix.update({"fix_mode": fix_mode, "pdop": pdop,
|
|
"hdop": hdop, "vdop": vdop})
|
|
self._emit({"type": "dop", "fix_mode": fix_mode,
|
|
"pdop": pdop, "hdop": hdop, "vdop": vdop})
|
|
# refresh used flag
|
|
for k, s in self._sats.items():
|
|
s["used"] = s["prn"] in self._active
|
|
|
|
def _gll(self, p, talker):
|
|
if len(p) < 6: return
|
|
status = p[6] if len(p) > 6 else p[5]
|
|
if "A" not in status: return
|
|
lat = self._lat(p[1], p[2])
|
|
lon = self._lon(p[3], p[4])
|
|
if lat and lon:
|
|
self._fix.update({"lat": lat, "lon": lon})
|