310 lines
12 KiB
Python
310 lines
12 KiB
Python
"""
|
|
GPS NMEA 0183 reader — auto-detects serial port and parses $GPRMC / $GPGGA.
|
|
Broadcasts position updates via the shared broadcast_callback.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import serial
|
|
import serial.tools.list_ports
|
|
from typing import Callable, Optional
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
BAUD_RATES = [9600, 4800, 115200, 38400] # 9600 first — most GPS default
|
|
PROBE_TIMEOUT = 0.8 # seconds per readline() — short enough to fail fast
|
|
PROBE_LINES = 12 # max lines to read before giving up on a baud rate
|
|
# All GNSS talker prefixes: GPS, GNSS-combined, GLONASS, Galileo, BeiDou, NavIC, QZSS, SBAS
|
|
NMEA_PREFIXES = (b'$GP', b'$GN', b'$GL', b'$GA', b'$GB', b'$GI', b'$GQ', b'$GJ')
|
|
# Sentence tags that carry position/fix data
|
|
POSITION_TAGS = {'RMC', 'GGA', 'GLL'}
|
|
|
|
# Hardware ID patterns that identify USB-attached serial adapters
|
|
USB_HWID = ('USB VID:PID', 'FTDI', 'CH340', 'CP210', 'PL2303', 'ARDUINO', '2341:', '1A86:', '10C4:')
|
|
# Skip Bluetooth virtual COM ports — they hang on open
|
|
SKIP_DESC = ('bluetooth', 'serie est')
|
|
|
|
|
|
# ── Auto-detect ───────────────────────────────────────────────────────────────
|
|
|
|
def find_gps_port(forced_port: str = None, forced_baud: int = None) -> Optional[tuple[str, int]]:
|
|
"""
|
|
Find a serial port sending NMEA data.
|
|
If forced_port is set, only probe that port (skips scan).
|
|
Returns (port, baud) or None.
|
|
"""
|
|
if forced_port:
|
|
# Try all baud rates on the configured port, fastest first
|
|
bauds = [forced_baud] if forced_baud else BAUD_RATES
|
|
for baud in bauds:
|
|
result = _probe_port(forced_port, baud, PROBE_TIMEOUT)
|
|
if result:
|
|
return result
|
|
log.warning("GPS: forced port %s responded on none of %s", forced_port, bauds)
|
|
return None
|
|
|
|
all_ports = serial.tools.list_ports.comports()
|
|
usb_ports = []
|
|
other_ports = []
|
|
for p in all_ports:
|
|
desc = p.description.lower()
|
|
hwid = (p.hwid or '').upper()
|
|
if any(s in desc for s in SKIP_DESC):
|
|
log.debug("GPS scan: skipping %s (%s) — Bluetooth", p.device, p.description)
|
|
continue
|
|
if any(s in hwid for s in USB_HWID):
|
|
usb_ports.append(p.device)
|
|
else:
|
|
other_ports.append(p.device)
|
|
|
|
candidates = usb_ports + other_ports
|
|
log.info("GPS scan — USB: %s other: %s", usb_ports, other_ports)
|
|
|
|
for port in candidates:
|
|
for baud in BAUD_RATES:
|
|
result = _probe_port(port, baud, PROBE_TIMEOUT)
|
|
if result:
|
|
return result
|
|
return None
|
|
|
|
|
|
def _probe_port(port: str, baud: int, timeout: float) -> Optional[tuple[str, int]]:
|
|
try:
|
|
with serial.Serial(port, baud, timeout=timeout) as s:
|
|
buf = b""
|
|
for _ in range(PROBE_LINES):
|
|
chunk = s.readline()
|
|
if not chunk:
|
|
continue
|
|
buf += chunk
|
|
for prefix in NMEA_PREFIXES:
|
|
if prefix in buf:
|
|
log.info("GPS detected on %s @ %d baud", port, baud)
|
|
return port, baud
|
|
except serial.SerialException as e:
|
|
log.warning("GPS probe %s@%d: %s", port, baud, e)
|
|
except Exception as e:
|
|
log.debug("GPS probe %s@%d failed: %s", port, baud, e)
|
|
return None
|
|
|
|
|
|
# ── NMEA parsers ──────────────────────────────────────────────────────────────
|
|
|
|
def _nmea_checksum_ok(sentence: str) -> bool:
|
|
if '*' not in sentence:
|
|
return True # no checksum field, accept
|
|
data, cs = sentence.rsplit('*', 1)
|
|
data = data.lstrip('$')
|
|
calc = 0
|
|
for ch in data:
|
|
calc ^= ord(ch)
|
|
return calc == int(cs.strip(), 16)
|
|
|
|
|
|
def _parse_lat(val: str, hemi: str) -> Optional[float]:
|
|
if not val:
|
|
return None
|
|
deg = float(val[:2])
|
|
mins = float(val[2:])
|
|
lat = deg + mins / 60.0
|
|
return -lat if hemi == 'S' else lat
|
|
|
|
|
|
def _parse_lon(val: str, hemi: str) -> Optional[float]:
|
|
if not val:
|
|
return None
|
|
deg = float(val[:3])
|
|
mins = float(val[3:])
|
|
lon = deg + mins / 60.0
|
|
return -lon if hemi == 'W' else lon
|
|
|
|
|
|
def parse_gprmc(fields: list[str]) -> Optional[dict]:
|
|
"""$GPRMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,ddmmyy,..."""
|
|
try:
|
|
if fields[2] != 'A': # A=active, V=void
|
|
return None
|
|
lat = _parse_lat(fields[3], fields[4])
|
|
lon = _parse_lon(fields[5], fields[6])
|
|
sog = float(fields[7]) if fields[7] else 0.0 # knots
|
|
cog = float(fields[8]) if fields[8] else 0.0
|
|
if lat is None or lon is None:
|
|
return None
|
|
return {'lat': lat, 'lon': lon, 'sog': sog, 'cog': cog, 'source': 'GPRMC'}
|
|
except (IndexError, ValueError):
|
|
return None
|
|
|
|
|
|
def parse_gpgga(fields: list[str]) -> Optional[dict]:
|
|
"""$GPGGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,q,nn,x.x,alt,..."""
|
|
try:
|
|
fix = int(fields[6]) if fields[6] else 0
|
|
if fix == 0:
|
|
return None
|
|
lat = _parse_lat(fields[2], fields[3])
|
|
lon = _parse_lon(fields[4], fields[5])
|
|
alt = float(fields[9]) if fields[9] else 0.0
|
|
sats = int(fields[7]) if fields[7] else 0
|
|
hdop = float(fields[8]) if fields[8] else 99.0
|
|
if lat is None or lon is None:
|
|
return None
|
|
return {'lat': lat, 'lon': lon, 'alt': alt, 'sats': sats,
|
|
'hdop': hdop, 'source': 'GPGGA'}
|
|
except (IndexError, ValueError):
|
|
return None
|
|
|
|
|
|
def parse_gpgll(fields: list[str]) -> Optional[dict]:
|
|
"""$GPGLL / $GNGLL,llll.ll,a,yyyyy.yy,a,hhmmss,A/V,..."""
|
|
try:
|
|
lat = _parse_lat(fields[1], fields[2])
|
|
lon = _parse_lon(fields[3], fields[4])
|
|
status = fields[6] if len(fields) > 6 else fields[5] if len(fields) > 5 else 'V'
|
|
if status.strip().upper() not in ('A', ''):
|
|
return None
|
|
if lat is None or lon is None:
|
|
return None
|
|
return {'lat': lat, 'lon': lon, 'sog': 0.0, 'cog': 0.0, 'source': 'GLL'}
|
|
except (IndexError, ValueError):
|
|
return None
|
|
|
|
|
|
def parse_sentence(raw: str) -> Optional[dict]:
|
|
raw = raw.strip()
|
|
if not raw.startswith('$'):
|
|
return None
|
|
if not _nmea_checksum_ok(raw):
|
|
return None
|
|
sentence = raw.split('*')[0].lstrip('$')
|
|
fields = sentence.split(',')
|
|
tag = fields[0][-3:] # RMC, GGA, GLL, etc.
|
|
|
|
if tag == 'RMC':
|
|
return parse_gprmc(fields)
|
|
if tag == 'GGA':
|
|
return parse_gpgga(fields)
|
|
if tag == 'GLL':
|
|
return parse_gpgll(fields)
|
|
return None
|
|
|
|
|
|
# ── Async reader ──────────────────────────────────────────────────────────────
|
|
|
|
class GPSReader:
|
|
def __init__(self, broadcast_cb: Callable, forced_port: str = None, forced_baud: int = None):
|
|
self.broadcast = broadcast_cb
|
|
self._port: Optional[str] = None
|
|
self._baud: int = 9600
|
|
self._forced_port = forced_port
|
|
self._forced_baud = forced_baud
|
|
self._running = False
|
|
self._ser: Optional[serial.Serial] = None
|
|
self.last_fix: Optional[dict] = None
|
|
self.status: str = 'SEARCHING'
|
|
|
|
def reconnect(self):
|
|
"""Force the read loop to drop the current serial and re-probe."""
|
|
if self._ser is not None:
|
|
try: self._ser.close()
|
|
except Exception: pass
|
|
|
|
async def start(self):
|
|
self._running = True
|
|
asyncio.create_task(self._run())
|
|
|
|
async def _run(self):
|
|
while self._running:
|
|
if self._forced_port:
|
|
log.info("GPS: using configured port %s", self._forced_port)
|
|
else:
|
|
log.info("GPS: scanning ports...")
|
|
self.status = 'SEARCHING'
|
|
result = await asyncio.get_event_loop().run_in_executor(
|
|
None, lambda: find_gps_port(self._forced_port, self._forced_baud))
|
|
if not result:
|
|
msg = f"GPS: port {self._forced_port} not responding" if self._forced_port else "GPS: no NMEA device found"
|
|
log.warning("%s, retrying in 10s", msg)
|
|
self.status = 'NOT FOUND'
|
|
await asyncio.sleep(10)
|
|
continue
|
|
|
|
self._port, self._baud = result
|
|
self.status = 'ONLINE'
|
|
log.info("GPS: reading from %s @ %d", self._port, self._baud)
|
|
await self._read_loop()
|
|
|
|
async def _read_loop(self):
|
|
import time
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
ser = await loop.run_in_executor(
|
|
None, lambda: serial.Serial(self._port, self._baud, timeout=2))
|
|
self._ser = ser
|
|
except Exception as e:
|
|
log.error("GPS: could not open %s — %s", self._port, e)
|
|
self.status = 'ERROR'
|
|
return
|
|
|
|
_last_fix_ts = 0.0 # epoch of last valid fix sentence
|
|
_nofix_sent = False # have we already broadcast NO FIX this gap?
|
|
NO_FIX_TIMEOUT = 8.0 # seconds with no valid sentence → NO FIX
|
|
|
|
try:
|
|
while self._running:
|
|
raw = await loop.run_in_executor(None, ser.readline)
|
|
now = time.monotonic()
|
|
|
|
# If no valid fix for NO_FIX_TIMEOUT seconds, broadcast once
|
|
if now - _last_fix_ts > NO_FIX_TIMEOUT and not _nofix_sent:
|
|
await self.broadcast({'type': 'gps', 'status': 'NO FIX',
|
|
'lat': None, 'lon': None})
|
|
_nofix_sent = True
|
|
|
|
try:
|
|
line = raw.decode('ascii', errors='ignore')
|
|
except Exception:
|
|
continue
|
|
|
|
line_s = line.strip()
|
|
# Only process GNSS talker sentences
|
|
if not any(line_s.startswith(p) for p in ('$GP','$GN','$GL','$GA','$GB','$GI','$GQ')):
|
|
continue
|
|
# Only position/fix sentences — skip GSV, GSA, VTG, ZDA silently
|
|
try:
|
|
tag = line_s.split(',')[0].lstrip('$')[-3:]
|
|
except Exception:
|
|
continue
|
|
if tag not in POSITION_TAGS:
|
|
continue
|
|
|
|
fix = parse_sentence(line_s)
|
|
if fix:
|
|
_last_fix_ts = now
|
|
_nofix_sent = False
|
|
# Merge new fix into last_fix so GGA-only fields (hdop, sats, alt)
|
|
# persist across RMC/GLL sentences that don't carry them.
|
|
merged = dict(self.last_fix or {})
|
|
merged.update({k: v for k, v in fix.items() if v is not None})
|
|
self.last_fix = merged
|
|
await self.broadcast({
|
|
'type': 'gps',
|
|
'status': 'FIX',
|
|
'lat': merged['lat'],
|
|
'lon': merged['lon'],
|
|
'sog': merged.get('sog', 0),
|
|
'cog': merged.get('cog', 0),
|
|
'sats': merged.get('sats'),
|
|
'hdop': merged.get('hdop'),
|
|
'alt': merged.get('alt'),
|
|
'source': fix['source'],
|
|
})
|
|
# void RMC/GGA: silently ignored — NO FIX only after timeout above
|
|
except Exception as e:
|
|
log.error("GPS: read error — %s", e)
|
|
self.status = 'DISCONNECTED'
|
|
finally:
|
|
try: ser.close()
|
|
except Exception: pass
|
|
self._ser = None
|
|
await asyncio.sleep(3) # reconnect delay
|