Files

310 lines
12 KiB
Python

"""
GPS NMEA 0183 reader — auto-detects serial port and parses $GPRMC / $GPGGA.
Broadcasts position updates via the shared broadcast_callback.
"""
import asyncio
import logging
import os
import serial
import serial.tools.list_ports
from typing import Callable, Optional
log = logging.getLogger(__name__)
BAUD_RATES = [9600, 4800, 115200, 38400] # 9600 first — most GPS default
PROBE_TIMEOUT = 0.8 # seconds per readline() — short enough to fail fast
PROBE_LINES = 12 # max lines to read before giving up on a baud rate
# All GNSS talker prefixes: GPS, GNSS-combined, GLONASS, Galileo, BeiDou, NavIC, QZSS, SBAS
NMEA_PREFIXES = (b'$GP', b'$GN', b'$GL', b'$GA', b'$GB', b'$GI', b'$GQ', b'$GJ')
# Sentence tags that carry position/fix data
POSITION_TAGS = {'RMC', 'GGA', 'GLL'}
# Hardware ID patterns that identify USB-attached serial adapters
USB_HWID = ('USB VID:PID', 'FTDI', 'CH340', 'CP210', 'PL2303', 'ARDUINO', '2341:', '1A86:', '10C4:')
# Skip Bluetooth virtual COM ports — they hang on open
SKIP_DESC = ('bluetooth', 'serie est')
# ── Auto-detect ───────────────────────────────────────────────────────────────
def find_gps_port(forced_port: str = None, forced_baud: int = None) -> Optional[tuple[str, int]]:
"""
Find a serial port sending NMEA data.
If forced_port is set, only probe that port (skips scan).
Returns (port, baud) or None.
"""
if forced_port:
# Try all baud rates on the configured port, fastest first
bauds = [forced_baud] if forced_baud else BAUD_RATES
for baud in bauds:
result = _probe_port(forced_port, baud, PROBE_TIMEOUT)
if result:
return result
log.warning("GPS: forced port %s responded on none of %s", forced_port, bauds)
return None
all_ports = serial.tools.list_ports.comports()
usb_ports = []
other_ports = []
for p in all_ports:
desc = p.description.lower()
hwid = (p.hwid or '').upper()
if any(s in desc for s in SKIP_DESC):
log.debug("GPS scan: skipping %s (%s) — Bluetooth", p.device, p.description)
continue
if any(s in hwid for s in USB_HWID):
usb_ports.append(p.device)
else:
other_ports.append(p.device)
candidates = usb_ports + other_ports
log.info("GPS scan — USB: %s other: %s", usb_ports, other_ports)
for port in candidates:
for baud in BAUD_RATES:
result = _probe_port(port, baud, PROBE_TIMEOUT)
if result:
return result
return None
def _probe_port(port: str, baud: int, timeout: float) -> Optional[tuple[str, int]]:
try:
with serial.Serial(port, baud, timeout=timeout) as s:
buf = b""
for _ in range(PROBE_LINES):
chunk = s.readline()
if not chunk:
continue
buf += chunk
for prefix in NMEA_PREFIXES:
if prefix in buf:
log.info("GPS detected on %s @ %d baud", port, baud)
return port, baud
except serial.SerialException as e:
log.warning("GPS probe %s@%d: %s", port, baud, e)
except Exception as e:
log.debug("GPS probe %s@%d failed: %s", port, baud, e)
return None
# ── NMEA parsers ──────────────────────────────────────────────────────────────
def _nmea_checksum_ok(sentence: str) -> bool:
if '*' not in sentence:
return True # no checksum field, accept
data, cs = sentence.rsplit('*', 1)
data = data.lstrip('$')
calc = 0
for ch in data:
calc ^= ord(ch)
return calc == int(cs.strip(), 16)
def _parse_lat(val: str, hemi: str) -> Optional[float]:
if not val:
return None
deg = float(val[:2])
mins = float(val[2:])
lat = deg + mins / 60.0
return -lat if hemi == 'S' else lat
def _parse_lon(val: str, hemi: str) -> Optional[float]:
if not val:
return None
deg = float(val[:3])
mins = float(val[3:])
lon = deg + mins / 60.0
return -lon if hemi == 'W' else lon
def parse_gprmc(fields: list[str]) -> Optional[dict]:
"""$GPRMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,ddmmyy,..."""
try:
if fields[2] != 'A': # A=active, V=void
return None
lat = _parse_lat(fields[3], fields[4])
lon = _parse_lon(fields[5], fields[6])
sog = float(fields[7]) if fields[7] else 0.0 # knots
cog = float(fields[8]) if fields[8] else 0.0
if lat is None or lon is None:
return None
return {'lat': lat, 'lon': lon, 'sog': sog, 'cog': cog, 'source': 'GPRMC'}
except (IndexError, ValueError):
return None
def parse_gpgga(fields: list[str]) -> Optional[dict]:
"""$GPGGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,q,nn,x.x,alt,..."""
try:
fix = int(fields[6]) if fields[6] else 0
if fix == 0:
return None
lat = _parse_lat(fields[2], fields[3])
lon = _parse_lon(fields[4], fields[5])
alt = float(fields[9]) if fields[9] else 0.0
sats = int(fields[7]) if fields[7] else 0
hdop = float(fields[8]) if fields[8] else 99.0
if lat is None or lon is None:
return None
return {'lat': lat, 'lon': lon, 'alt': alt, 'sats': sats,
'hdop': hdop, 'source': 'GPGGA'}
except (IndexError, ValueError):
return None
def parse_gpgll(fields: list[str]) -> Optional[dict]:
"""$GPGLL / $GNGLL,llll.ll,a,yyyyy.yy,a,hhmmss,A/V,..."""
try:
lat = _parse_lat(fields[1], fields[2])
lon = _parse_lon(fields[3], fields[4])
status = fields[6] if len(fields) > 6 else fields[5] if len(fields) > 5 else 'V'
if status.strip().upper() not in ('A', ''):
return None
if lat is None or lon is None:
return None
return {'lat': lat, 'lon': lon, 'sog': 0.0, 'cog': 0.0, 'source': 'GLL'}
except (IndexError, ValueError):
return None
def parse_sentence(raw: str) -> Optional[dict]:
raw = raw.strip()
if not raw.startswith('$'):
return None
if not _nmea_checksum_ok(raw):
return None
sentence = raw.split('*')[0].lstrip('$')
fields = sentence.split(',')
tag = fields[0][-3:] # RMC, GGA, GLL, etc.
if tag == 'RMC':
return parse_gprmc(fields)
if tag == 'GGA':
return parse_gpgga(fields)
if tag == 'GLL':
return parse_gpgll(fields)
return None
# ── Async reader ──────────────────────────────────────────────────────────────
class GPSReader:
def __init__(self, broadcast_cb: Callable, forced_port: str = None, forced_baud: int = None):
self.broadcast = broadcast_cb
self._port: Optional[str] = None
self._baud: int = 9600
self._forced_port = forced_port
self._forced_baud = forced_baud
self._running = False
self._ser: Optional[serial.Serial] = None
self.last_fix: Optional[dict] = None
self.status: str = 'SEARCHING'
def reconnect(self):
"""Force the read loop to drop the current serial and re-probe."""
if self._ser is not None:
try: self._ser.close()
except Exception: pass
async def start(self):
self._running = True
asyncio.create_task(self._run())
async def _run(self):
while self._running:
if self._forced_port:
log.info("GPS: using configured port %s", self._forced_port)
else:
log.info("GPS: scanning ports...")
self.status = 'SEARCHING'
result = await asyncio.get_event_loop().run_in_executor(
None, lambda: find_gps_port(self._forced_port, self._forced_baud))
if not result:
msg = f"GPS: port {self._forced_port} not responding" if self._forced_port else "GPS: no NMEA device found"
log.warning("%s, retrying in 10s", msg)
self.status = 'NOT FOUND'
await asyncio.sleep(10)
continue
self._port, self._baud = result
self.status = 'ONLINE'
log.info("GPS: reading from %s @ %d", self._port, self._baud)
await self._read_loop()
async def _read_loop(self):
import time
loop = asyncio.get_event_loop()
try:
ser = await loop.run_in_executor(
None, lambda: serial.Serial(self._port, self._baud, timeout=2))
self._ser = ser
except Exception as e:
log.error("GPS: could not open %s%s", self._port, e)
self.status = 'ERROR'
return
_last_fix_ts = 0.0 # epoch of last valid fix sentence
_nofix_sent = False # have we already broadcast NO FIX this gap?
NO_FIX_TIMEOUT = 8.0 # seconds with no valid sentence → NO FIX
try:
while self._running:
raw = await loop.run_in_executor(None, ser.readline)
now = time.monotonic()
# If no valid fix for NO_FIX_TIMEOUT seconds, broadcast once
if now - _last_fix_ts > NO_FIX_TIMEOUT and not _nofix_sent:
await self.broadcast({'type': 'gps', 'status': 'NO FIX',
'lat': None, 'lon': None})
_nofix_sent = True
try:
line = raw.decode('ascii', errors='ignore')
except Exception:
continue
line_s = line.strip()
# Only process GNSS talker sentences
if not any(line_s.startswith(p) for p in ('$GP','$GN','$GL','$GA','$GB','$GI','$GQ')):
continue
# Only position/fix sentences — skip GSV, GSA, VTG, ZDA silently
try:
tag = line_s.split(',')[0].lstrip('$')[-3:]
except Exception:
continue
if tag not in POSITION_TAGS:
continue
fix = parse_sentence(line_s)
if fix:
_last_fix_ts = now
_nofix_sent = False
# Merge new fix into last_fix so GGA-only fields (hdop, sats, alt)
# persist across RMC/GLL sentences that don't carry them.
merged = dict(self.last_fix or {})
merged.update({k: v for k, v in fix.items() if v is not None})
self.last_fix = merged
await self.broadcast({
'type': 'gps',
'status': 'FIX',
'lat': merged['lat'],
'lon': merged['lon'],
'sog': merged.get('sog', 0),
'cog': merged.get('cog', 0),
'sats': merged.get('sats'),
'hdop': merged.get('hdop'),
'alt': merged.get('alt'),
'source': fix['source'],
})
# void RMC/GGA: silently ignored — NO FIX only after timeout above
except Exception as e:
log.error("GPS: read error — %s", e)
self.status = 'DISCONNECTED'
finally:
try: ser.close()
except Exception: pass
self._ser = None
await asyncio.sleep(3) # reconnect delay