"""Python ↔ JavaScript bridge for GPS Navigator (PyQt5 standalone). Exposed to JS as window.py via QWebChannel. """ import json import os import threading from pathlib import Path from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal class GPSBridge(QObject): # Signal: emitted from NMEA reader thread → received in JS via .connect() # Qt automatically queues cross-thread signal delivery — no asyncio needed. gpsMessage = pyqtSignal(str) def __init__(self, db_path: Path, parent=None): super().__init__(parent) self._db_path = db_path self._reader = None self._last_fix: dict = {} self._track_interval = int(os.getenv("TRACK_INTERVAL_SEC", 5)) self._track_counter = 0 # ── Called by NMEAReader background thread ──────────────────────────────── def _on_nmea(self, msg: dict): if msg.get("type") == "position" and msg.get("fix_quality", 0) > 0: self._last_fix.update(msg) self._track_counter += 1 if self._track_counter >= self._track_interval: self._track_counter = 0 from backend.database import log_position log_position(self._db_path, self._last_fix) # Qt queues the signal delivery to the main thread — thread-safe. self.gpsMessage.emit(json.dumps(msg)) # ── Lifecycle ───────────────────────────────────────────────────────────── @pyqtSlot() def autodetect_and_start(self): """Called by JS once QWebChannel is ready — ensures signal handler is connected before any GPS messages are emitted.""" from backend.nmea_reader import NMEAReader if self._reader and self._reader.is_alive(): return # already running port = os.getenv("GPS_PORT", "") or NMEAReader.autodetect() baud = int(os.getenv("GPS_BAUD", 9600)) if port: self._reader = NMEAReader(port, baud, self._on_nmea) self._reader.start() else: # Emit a status so JS knows autodetect found nothing self.gpsMessage.emit( json.dumps({"type": "no_port", "msg": "No GPS port detected"}) ) def shutdown(self): if self._reader: self._reader.stop() self._reader = None # ── GPS port management ─────────────────────────────────────────────────── @pyqtSlot(result=str) def list_ports(self): from backend.nmea_reader import NMEAReader return json.dumps(NMEAReader.list_ports()) @pyqtSlot(str, int) def connect_gps(self, port: str, baud: int): from backend.nmea_reader import NMEAReader if self._reader and self._reader.is_alive(): old = self._reader old.stop() # Join in background thread so we don't block the event loop def _restart(): old.join(timeout=1.5) r = NMEAReader(port, baud, self._on_nmea) self._reader = r r.start() threading.Thread(target=_restart, daemon=True).start() else: self._reader = NMEAReader(port, baud, self._on_nmea) self._reader.start() @pyqtSlot() def disconnect_gps(self): if self._reader: self._reader.stop() self._reader = None @pyqtSlot(result=str) def get_status(self): return json.dumps({ "connected": self._reader is not None and self._reader.is_alive(), "port": self._reader.port if self._reader else None, "fix": self._last_fix, }) # ── Waypoints ───────────────────────────────────────────────────────────── @pyqtSlot(result=str) def get_waypoints(self): from backend.database import get_waypoints return json.dumps(get_waypoints(self._db_path)) @pyqtSlot(str, result=str) def save_waypoint(self, data_json: str): from backend.database import save_waypoint return json.dumps(save_waypoint(self._db_path, json.loads(data_json))) @pyqtSlot(str) def delete_waypoint(self, wid: str): from backend.database import delete_waypoint delete_waypoint(self._db_path, wid) # ── Routes ──────────────────────────────────────────────────────────────── @pyqtSlot(result=str) def get_routes(self): from backend.database import get_routes return json.dumps(get_routes(self._db_path)) @pyqtSlot(str, result=str) def save_route(self, data_json: str): from backend.database import save_route return json.dumps(save_route(self._db_path, json.loads(data_json))) @pyqtSlot(str) def delete_route(self, rid: str): from backend.database import delete_route delete_route(self._db_path, rid) # ── Track log ───────────────────────────────────────────────────────────── @pyqtSlot(int, result=str) def get_track(self, limit: int): from backend.database import get_track return json.dumps(get_track(self._db_path, limit)) @pyqtSlot() def clear_track(self): from backend.database import clear_track clear_track(self._db_path) # ── Charts ──────────────────────────────────────────────────────────────── @pyqtSlot(result=str) def get_chart_cells(self): from backend.chart_manager import list_cells return json.dumps(list_cells()) # Allowed data types for get_cell_data — prevents path traversal via data_type _ALLOWED_DATA_TYPES = frozenset( {"features", "land", "depths", "hazards", "zones"} ) @pyqtSlot(str, str, result=str) def get_cell_data(self, cell_id: str, data_type: str): """Lee el GeoJSON de una celda concreta (features/land/depths/hazards/zones). - Para depths: filtra SOUNDG (puntos individuales, no útiles en zoom general). - Si el resultado supera ~600KB: aplica decimación stride=3 en coordenadas para mantener cada mensaje QWebChannel dentro de límites seguros. Igual que ECDIS: un slot por celda/tipo — nunca agrega todas las celdas. IMPORTANTE: siempre retorna JSON válido aunque falle — si el slot lanza una excepción, PyQt5 puede no llamar al callback JS y la Promise queda colgada.""" import logging as _log try: from backend.chart_manager import CHARTS_DIR # Guard against path traversal: cell_id and data_type must not # contain path separators or parent-directory references. if (not cell_id or any(c in cell_id for c in ("/", "\\", "..", ":")) or data_type not in self._ALLOWED_DATA_TYPES): return json.dumps({'type': 'FeatureCollection', 'features': []}) path = CHARTS_DIR / cell_id / f'{data_type}.geojson' # Ensure the resolved path stays inside CHARTS_DIR (defense in depth) try: path.resolve().relative_to(CHARTS_DIR.resolve()) except ValueError: return json.dumps({'type': 'FeatureCollection', 'features': []}) if not path.exists(): return json.dumps({'type': 'FeatureCollection', 'features': []}) with open(path, 'r', encoding='utf-8') as f: data = json.load(f) feats = data.get('features', []) # Filtrar SOUNDG (puntos de sondeo — demasiados para zoom general) if data_type == 'depths': feats = [ft for ft in feats if ft.get('properties', {}).get('layer') != 'SOUNDG'] # Decimación progresiva para mantener cada mensaje QWebChannel < ~700 KB. raw_json = json.dumps({'type': 'FeatureCollection', 'features': feats}) if len(raw_json) > 600_000: feats = [self._decimate_feature(ft, stride=3) for ft in feats] raw_json = json.dumps({'type': 'FeatureCollection', 'features': feats}) if len(raw_json) > 700_000: feats = [self._decimate_feature(ft, stride=5) for ft in feats] raw_json = json.dumps({'type': 'FeatureCollection', 'features': feats}) return raw_json except Exception as e: _log.getLogger(__name__).error( "get_cell_data %s/%s: %s", cell_id, data_type, e) return json.dumps({'type': 'FeatureCollection', 'features': []}) @staticmethod def _decimate_feature(feat: dict, stride: int) -> dict: """Reduce densidad de vértices en polígonos/líneas para achicar el JSON.""" import copy geom = copy.deepcopy(feat.get('geometry', {})) gtype = geom.get('type', '') def dec_ring(ring): if len(ring) <= 6: return ring pts = ring[::stride] if pts[-1] != ring[-1]: pts.append(ring[-1]) return pts if gtype == 'Polygon': geom['coordinates'] = [dec_ring(r) for r in geom.get('coordinates', [])] elif gtype == 'MultiPolygon': geom['coordinates'] = [[dec_ring(r) for r in poly] for poly in geom.get('coordinates', [])] elif gtype == 'LineString': cs = geom.get('coordinates', []) geom['coordinates'] = cs[::stride] if len(cs) > 6 else cs return {'type': feat.get('type', 'Feature'), 'geometry': geom, 'properties': feat.get('properties', {})} @pyqtSlot(result=str) def get_chart_zones(self): from backend.chart_manager import get_all_zones return json.dumps(get_all_zones()) @pyqtSlot(result=str) def get_chart_land(self): from backend.chart_manager import get_all_land return json.dumps(get_all_land()) @pyqtSlot(str) def delete_chart(self, cell_id: str): from backend.chart_manager import delete_cell delete_cell(cell_id) @pyqtSlot(str, str) def set_chart_region(self, cell_id: str, region: str): from backend.chart_manager import set_meta set_meta(cell_id, region=region.upper()) @pyqtSlot(str, result=str) def scan_charts_path(self, path: str): from backend.chart_manager import scan_and_install try: result = scan_and_install(path) except (FileNotFoundError, NotADirectoryError) as e: result = {"installed": [], "skipped": [], "errors": [{"file": path, "error": str(e)}]} except Exception as e: result = {"installed": [], "skipped": [], "errors": [{"file": path, "error": str(e)}]} return json.dumps(result) @pyqtSlot(result=str) def open_chart_file_dialog(self): """Open Qt native file dialog — install selected .000/.zip chart files.""" import zipfile as _zf from PyQt5.QtWidgets import QFileDialog from backend.chart_manager import ( install_from_enc, install_from_zip, install_from_csv_zip ) files, _ = QFileDialog.getOpenFileNames( None, "Open ENC Chart", "", "ENC Charts (*.000 *.zip);;All Files (*)" ) if not files: return json.dumps({"installed": [], "skipped": [], "errors": []}) installed, skipped, errors = [], [], [] for fpath in files: fp = Path(fpath) try: if fp.suffix.lower() == ".zip": with _zf.ZipFile(fp) as z: names = z.namelist() has_csv = any(n.lower().endswith(".csv") for n in names) has_enc = any(n.upper().endswith(".000") for n in names) ids = install_from_csv_zip(fp) if (has_csv and not has_enc) \ else install_from_zip(fp) installed.extend(ids) elif fp.suffix.upper() == ".000": cid = install_from_enc(fp, fp.stem.upper()) installed.append(cid) except Exception as exc: errors.append({"file": fp.name, "error": str(exc)}) return json.dumps({"installed": installed, "skipped": skipped, "errors": errors})