""" GPS NMEA 0183 reader — auto-detects serial port and parses $GPRMC / $GPGGA. Broadcasts position updates via the shared broadcast_callback. """ import asyncio import logging import os import serial import serial.tools.list_ports from typing import Callable, Optional log = logging.getLogger(__name__) BAUD_RATES = [9600, 4800, 115200, 38400] # 9600 first — most GPS default PROBE_TIMEOUT = 0.8 # seconds per readline() — short enough to fail fast PROBE_LINES = 12 # max lines to read before giving up on a baud rate # All GNSS talker prefixes: GPS, GNSS-combined, GLONASS, Galileo, BeiDou, NavIC, QZSS, SBAS NMEA_PREFIXES = (b'$GP', b'$GN', b'$GL', b'$GA', b'$GB', b'$GI', b'$GQ', b'$GJ') # Sentence tags that carry position/fix data POSITION_TAGS = {'RMC', 'GGA', 'GLL'} # Hardware ID patterns that identify USB-attached serial adapters USB_HWID = ('USB VID:PID', 'FTDI', 'CH340', 'CP210', 'PL2303', 'ARDUINO', '2341:', '1A86:', '10C4:') # Skip Bluetooth virtual COM ports — they hang on open SKIP_DESC = ('bluetooth', 'serie est') # ── Auto-detect ─────────────────────────────────────────────────────────────── def find_gps_port(forced_port: str = None, forced_baud: int = None) -> Optional[tuple[str, int]]: """ Find a serial port sending NMEA data. If forced_port is set, only probe that port (skips scan). Returns (port, baud) or None. """ if forced_port: # Try all baud rates on the configured port, fastest first bauds = [forced_baud] if forced_baud else BAUD_RATES for baud in bauds: result = _probe_port(forced_port, baud, PROBE_TIMEOUT) if result: return result log.warning("GPS: forced port %s responded on none of %s", forced_port, bauds) return None all_ports = serial.tools.list_ports.comports() usb_ports = [] other_ports = [] for p in all_ports: desc = p.description.lower() hwid = (p.hwid or '').upper() if any(s in desc for s in SKIP_DESC): log.debug("GPS scan: skipping %s (%s) — Bluetooth", p.device, p.description) continue if any(s in hwid for s in USB_HWID): usb_ports.append(p.device) else: other_ports.append(p.device) candidates = usb_ports + other_ports log.info("GPS scan — USB: %s other: %s", usb_ports, other_ports) for port in candidates: for baud in BAUD_RATES: result = _probe_port(port, baud, PROBE_TIMEOUT) if result: return result return None def _probe_port(port: str, baud: int, timeout: float) -> Optional[tuple[str, int]]: try: with serial.Serial(port, baud, timeout=timeout) as s: buf = b"" for _ in range(PROBE_LINES): chunk = s.readline() if not chunk: continue buf += chunk for prefix in NMEA_PREFIXES: if prefix in buf: log.info("GPS detected on %s @ %d baud", port, baud) return port, baud except serial.SerialException as e: log.warning("GPS probe %s@%d: %s", port, baud, e) except Exception as e: log.debug("GPS probe %s@%d failed: %s", port, baud, e) return None # ── NMEA parsers ────────────────────────────────────────────────────────────── def _nmea_checksum_ok(sentence: str) -> bool: if '*' not in sentence: return True # no checksum field, accept data, cs = sentence.rsplit('*', 1) data = data.lstrip('$') calc = 0 for ch in data: calc ^= ord(ch) return calc == int(cs.strip(), 16) def _parse_lat(val: str, hemi: str) -> Optional[float]: if not val: return None deg = float(val[:2]) mins = float(val[2:]) lat = deg + mins / 60.0 return -lat if hemi == 'S' else lat def _parse_lon(val: str, hemi: str) -> Optional[float]: if not val: return None deg = float(val[:3]) mins = float(val[3:]) lon = deg + mins / 60.0 return -lon if hemi == 'W' else lon def parse_gprmc(fields: list[str]) -> Optional[dict]: """$GPRMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,ddmmyy,...""" try: if fields[2] != 'A': # A=active, V=void return None lat = _parse_lat(fields[3], fields[4]) lon = _parse_lon(fields[5], fields[6]) sog = float(fields[7]) if fields[7] else 0.0 # knots cog = float(fields[8]) if fields[8] else 0.0 if lat is None or lon is None: return None return {'lat': lat, 'lon': lon, 'sog': sog, 'cog': cog, 'source': 'GPRMC'} except (IndexError, ValueError): return None def parse_gpgga(fields: list[str]) -> Optional[dict]: """$GPGGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,q,nn,x.x,alt,...""" try: fix = int(fields[6]) if fields[6] else 0 if fix == 0: return None lat = _parse_lat(fields[2], fields[3]) lon = _parse_lon(fields[4], fields[5]) alt = float(fields[9]) if fields[9] else 0.0 sats = int(fields[7]) if fields[7] else 0 hdop = float(fields[8]) if fields[8] else 99.0 if lat is None or lon is None: return None return {'lat': lat, 'lon': lon, 'alt': alt, 'sats': sats, 'hdop': hdop, 'source': 'GPGGA'} except (IndexError, ValueError): return None def parse_gpgll(fields: list[str]) -> Optional[dict]: """$GPGLL / $GNGLL,llll.ll,a,yyyyy.yy,a,hhmmss,A/V,...""" try: lat = _parse_lat(fields[1], fields[2]) lon = _parse_lon(fields[3], fields[4]) status = fields[6] if len(fields) > 6 else fields[5] if len(fields) > 5 else 'V' if status.strip().upper() not in ('A', ''): return None if lat is None or lon is None: return None return {'lat': lat, 'lon': lon, 'sog': 0.0, 'cog': 0.0, 'source': 'GLL'} except (IndexError, ValueError): return None def parse_sentence(raw: str) -> Optional[dict]: raw = raw.strip() if not raw.startswith('$'): return None if not _nmea_checksum_ok(raw): return None sentence = raw.split('*')[0].lstrip('$') fields = sentence.split(',') tag = fields[0][-3:] # RMC, GGA, GLL, etc. if tag == 'RMC': return parse_gprmc(fields) if tag == 'GGA': return parse_gpgga(fields) if tag == 'GLL': return parse_gpgll(fields) return None # ── Async reader ────────────────────────────────────────────────────────────── class GPSReader: def __init__(self, broadcast_cb: Callable, forced_port: str = None, forced_baud: int = None): self.broadcast = broadcast_cb self._port: Optional[str] = None self._baud: int = 9600 self._forced_port = forced_port self._forced_baud = forced_baud self._running = False self._ser: Optional[serial.Serial] = None self.last_fix: Optional[dict] = None self.status: str = 'SEARCHING' def reconnect(self): """Force the read loop to drop the current serial and re-probe.""" if self._ser is not None: try: self._ser.close() except Exception: pass async def start(self): self._running = True asyncio.create_task(self._run()) async def _run(self): while self._running: if self._forced_port: log.info("GPS: using configured port %s", self._forced_port) else: log.info("GPS: scanning ports...") self.status = 'SEARCHING' result = await asyncio.get_event_loop().run_in_executor( None, lambda: find_gps_port(self._forced_port, self._forced_baud)) if not result: msg = f"GPS: port {self._forced_port} not responding" if self._forced_port else "GPS: no NMEA device found" log.warning("%s, retrying in 10s", msg) self.status = 'NOT FOUND' await asyncio.sleep(10) continue self._port, self._baud = result self.status = 'ONLINE' log.info("GPS: reading from %s @ %d", self._port, self._baud) await self._read_loop() async def _read_loop(self): import time loop = asyncio.get_event_loop() try: ser = await loop.run_in_executor( None, lambda: serial.Serial(self._port, self._baud, timeout=2)) self._ser = ser except Exception as e: log.error("GPS: could not open %s — %s", self._port, e) self.status = 'ERROR' return _last_fix_ts = 0.0 # epoch of last valid fix sentence _nofix_sent = False # have we already broadcast NO FIX this gap? NO_FIX_TIMEOUT = 8.0 # seconds with no valid sentence → NO FIX try: while self._running: raw = await loop.run_in_executor(None, ser.readline) now = time.monotonic() # If no valid fix for NO_FIX_TIMEOUT seconds, broadcast once if now - _last_fix_ts > NO_FIX_TIMEOUT and not _nofix_sent: await self.broadcast({'type': 'gps', 'status': 'NO FIX', 'lat': None, 'lon': None}) _nofix_sent = True try: line = raw.decode('ascii', errors='ignore') except Exception: continue line_s = line.strip() # Only process GNSS talker sentences if not any(line_s.startswith(p) for p in ('$GP','$GN','$GL','$GA','$GB','$GI','$GQ')): continue # Only position/fix sentences — skip GSV, GSA, VTG, ZDA silently try: tag = line_s.split(',')[0].lstrip('$')[-3:] except Exception: continue if tag not in POSITION_TAGS: continue fix = parse_sentence(line_s) if fix: _last_fix_ts = now _nofix_sent = False # Merge new fix into last_fix so GGA-only fields (hdop, sats, alt) # persist across RMC/GLL sentences that don't carry them. merged = dict(self.last_fix or {}) merged.update({k: v for k, v in fix.items() if v is not None}) self.last_fix = merged await self.broadcast({ 'type': 'gps', 'status': 'FIX', 'lat': merged['lat'], 'lon': merged['lon'], 'sog': merged.get('sog', 0), 'cog': merged.get('cog', 0), 'sats': merged.get('sats'), 'hdop': merged.get('hdop'), 'alt': merged.get('alt'), 'source': fix['source'], }) # void RMC/GGA: silently ignored — NO FIX only after timeout above except Exception as e: log.error("GPS: read error — %s", e) self.status = 'DISCONNECTED' finally: try: ser.close() except Exception: pass self._ser = None await asyncio.sleep(3) # reconnect delay