304 lines
13 KiB
Python
304 lines
13 KiB
Python
"""Python ↔ JavaScript bridge for GPS Navigator (PyQt5 standalone).
|
|
Exposed to JS as window.py via QWebChannel.
|
|
"""
|
|
import json
|
|
import os
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal
|
|
|
|
|
|
class GPSBridge(QObject):
|
|
# Signal: emitted from NMEA reader thread → received in JS via .connect()
|
|
# Qt automatically queues cross-thread signal delivery — no asyncio needed.
|
|
gpsMessage = pyqtSignal(str)
|
|
|
|
def __init__(self, db_path: Path, parent=None):
|
|
super().__init__(parent)
|
|
self._db_path = db_path
|
|
self._reader = None
|
|
self._last_fix: dict = {}
|
|
self._track_interval = int(os.getenv("TRACK_INTERVAL_SEC", 5))
|
|
self._track_counter = 0
|
|
|
|
# ── Called by NMEAReader background thread ────────────────────────────────
|
|
def _on_nmea(self, msg: dict):
|
|
if msg.get("type") == "position" and msg.get("fix_quality", 0) > 0:
|
|
self._last_fix.update(msg)
|
|
self._track_counter += 1
|
|
if self._track_counter >= self._track_interval:
|
|
self._track_counter = 0
|
|
from backend.database import log_position
|
|
log_position(self._db_path, self._last_fix)
|
|
# Qt queues the signal delivery to the main thread — thread-safe.
|
|
self.gpsMessage.emit(json.dumps(msg))
|
|
|
|
# ── Lifecycle ─────────────────────────────────────────────────────────────
|
|
@pyqtSlot()
|
|
def autodetect_and_start(self):
|
|
"""Called by JS once QWebChannel is ready — ensures signal handler
|
|
is connected before any GPS messages are emitted."""
|
|
from backend.nmea_reader import NMEAReader
|
|
if self._reader and self._reader.is_alive():
|
|
return # already running
|
|
port = os.getenv("GPS_PORT", "") or NMEAReader.autodetect()
|
|
baud = int(os.getenv("GPS_BAUD", 9600))
|
|
if port:
|
|
self._reader = NMEAReader(port, baud, self._on_nmea)
|
|
self._reader.start()
|
|
else:
|
|
# Emit a status so JS knows autodetect found nothing
|
|
self.gpsMessage.emit(
|
|
json.dumps({"type": "no_port", "msg": "No GPS port detected"})
|
|
)
|
|
|
|
def shutdown(self):
|
|
if self._reader:
|
|
self._reader.stop()
|
|
self._reader = None
|
|
|
|
# ── GPS port management ───────────────────────────────────────────────────
|
|
@pyqtSlot(result=str)
|
|
def list_ports(self):
|
|
from backend.nmea_reader import NMEAReader
|
|
return json.dumps(NMEAReader.list_ports())
|
|
|
|
@pyqtSlot(str, int)
|
|
def connect_gps(self, port: str, baud: int):
|
|
from backend.nmea_reader import NMEAReader
|
|
if self._reader and self._reader.is_alive():
|
|
old = self._reader
|
|
old.stop()
|
|
# Join in background thread so we don't block the event loop
|
|
def _restart():
|
|
old.join(timeout=1.5)
|
|
r = NMEAReader(port, baud, self._on_nmea)
|
|
self._reader = r
|
|
r.start()
|
|
threading.Thread(target=_restart, daemon=True).start()
|
|
else:
|
|
self._reader = NMEAReader(port, baud, self._on_nmea)
|
|
self._reader.start()
|
|
|
|
@pyqtSlot()
|
|
def disconnect_gps(self):
|
|
if self._reader:
|
|
self._reader.stop()
|
|
self._reader = None
|
|
|
|
@pyqtSlot(result=str)
|
|
def get_status(self):
|
|
return json.dumps({
|
|
"connected": self._reader is not None and self._reader.is_alive(),
|
|
"port": self._reader.port if self._reader else None,
|
|
"fix": self._last_fix,
|
|
})
|
|
|
|
# ── Waypoints ─────────────────────────────────────────────────────────────
|
|
@pyqtSlot(result=str)
|
|
def get_waypoints(self):
|
|
from backend.database import get_waypoints
|
|
return json.dumps(get_waypoints(self._db_path))
|
|
|
|
@pyqtSlot(str, result=str)
|
|
def save_waypoint(self, data_json: str):
|
|
from backend.database import save_waypoint
|
|
return json.dumps(save_waypoint(self._db_path, json.loads(data_json)))
|
|
|
|
@pyqtSlot(str)
|
|
def delete_waypoint(self, wid: str):
|
|
from backend.database import delete_waypoint
|
|
delete_waypoint(self._db_path, wid)
|
|
|
|
# ── Routes ────────────────────────────────────────────────────────────────
|
|
@pyqtSlot(result=str)
|
|
def get_routes(self):
|
|
from backend.database import get_routes
|
|
return json.dumps(get_routes(self._db_path))
|
|
|
|
@pyqtSlot(str, result=str)
|
|
def save_route(self, data_json: str):
|
|
from backend.database import save_route
|
|
return json.dumps(save_route(self._db_path, json.loads(data_json)))
|
|
|
|
@pyqtSlot(str)
|
|
def delete_route(self, rid: str):
|
|
from backend.database import delete_route
|
|
delete_route(self._db_path, rid)
|
|
|
|
# ── Track log ─────────────────────────────────────────────────────────────
|
|
@pyqtSlot(int, result=str)
|
|
def get_track(self, limit: int):
|
|
from backend.database import get_track
|
|
return json.dumps(get_track(self._db_path, limit))
|
|
|
|
@pyqtSlot()
|
|
def clear_track(self):
|
|
from backend.database import clear_track
|
|
clear_track(self._db_path)
|
|
|
|
# ── Charts ────────────────────────────────────────────────────────────────
|
|
@pyqtSlot(result=str)
|
|
def get_chart_cells(self):
|
|
from backend.chart_manager import list_cells
|
|
return json.dumps(list_cells())
|
|
|
|
# Allowed data types for get_cell_data — prevents path traversal via data_type
|
|
_ALLOWED_DATA_TYPES = frozenset(
|
|
{"features", "land", "depths", "hazards", "zones"}
|
|
)
|
|
|
|
@pyqtSlot(str, str, result=str)
|
|
def get_cell_data(self, cell_id: str, data_type: str):
|
|
"""Lee el GeoJSON de una celda concreta (features/land/depths/hazards/zones).
|
|
- Para depths: filtra SOUNDG (puntos individuales, no útiles en zoom general).
|
|
- Si el resultado supera ~600KB: aplica decimación stride=3 en coordenadas
|
|
para mantener cada mensaje QWebChannel dentro de límites seguros.
|
|
Igual que ECDIS: un slot por celda/tipo — nunca agrega todas las celdas.
|
|
IMPORTANTE: siempre retorna JSON válido aunque falle — si el slot lanza una
|
|
excepción, PyQt5 puede no llamar al callback JS y la Promise queda colgada."""
|
|
import logging as _log
|
|
try:
|
|
from backend.chart_manager import CHARTS_DIR
|
|
|
|
# Guard against path traversal: cell_id and data_type must not
|
|
# contain path separators or parent-directory references.
|
|
if (not cell_id
|
|
or any(c in cell_id for c in ("/", "\\", "..", ":"))
|
|
or data_type not in self._ALLOWED_DATA_TYPES):
|
|
return json.dumps({'type': 'FeatureCollection', 'features': []})
|
|
|
|
path = CHARTS_DIR / cell_id / f'{data_type}.geojson'
|
|
|
|
# Ensure the resolved path stays inside CHARTS_DIR (defense in depth)
|
|
try:
|
|
path.resolve().relative_to(CHARTS_DIR.resolve())
|
|
except ValueError:
|
|
return json.dumps({'type': 'FeatureCollection', 'features': []})
|
|
|
|
if not path.exists():
|
|
return json.dumps({'type': 'FeatureCollection', 'features': []})
|
|
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
feats = data.get('features', [])
|
|
|
|
# Filtrar SOUNDG (puntos de sondeo — demasiados para zoom general)
|
|
if data_type == 'depths':
|
|
feats = [ft for ft in feats
|
|
if ft.get('properties', {}).get('layer') != 'SOUNDG']
|
|
|
|
# Decimación progresiva para mantener cada mensaje QWebChannel < ~700 KB.
|
|
raw_json = json.dumps({'type': 'FeatureCollection', 'features': feats})
|
|
if len(raw_json) > 600_000:
|
|
feats = [self._decimate_feature(ft, stride=3) for ft in feats]
|
|
raw_json = json.dumps({'type': 'FeatureCollection', 'features': feats})
|
|
if len(raw_json) > 700_000:
|
|
feats = [self._decimate_feature(ft, stride=5) for ft in feats]
|
|
raw_json = json.dumps({'type': 'FeatureCollection', 'features': feats})
|
|
|
|
return raw_json
|
|
except Exception as e:
|
|
_log.getLogger(__name__).error(
|
|
"get_cell_data %s/%s: %s", cell_id, data_type, e)
|
|
return json.dumps({'type': 'FeatureCollection', 'features': []})
|
|
|
|
@staticmethod
|
|
def _decimate_feature(feat: dict, stride: int) -> dict:
|
|
"""Reduce densidad de vértices en polígonos/líneas para achicar el JSON."""
|
|
import copy
|
|
geom = copy.deepcopy(feat.get('geometry', {}))
|
|
gtype = geom.get('type', '')
|
|
|
|
def dec_ring(ring):
|
|
if len(ring) <= 6:
|
|
return ring
|
|
pts = ring[::stride]
|
|
if pts[-1] != ring[-1]:
|
|
pts.append(ring[-1])
|
|
return pts
|
|
|
|
if gtype == 'Polygon':
|
|
geom['coordinates'] = [dec_ring(r) for r in geom.get('coordinates', [])]
|
|
elif gtype == 'MultiPolygon':
|
|
geom['coordinates'] = [[dec_ring(r) for r in poly]
|
|
for poly in geom.get('coordinates', [])]
|
|
elif gtype == 'LineString':
|
|
cs = geom.get('coordinates', [])
|
|
geom['coordinates'] = cs[::stride] if len(cs) > 6 else cs
|
|
|
|
return {'type': feat.get('type', 'Feature'),
|
|
'geometry': geom,
|
|
'properties': feat.get('properties', {})}
|
|
|
|
@pyqtSlot(result=str)
|
|
def get_chart_zones(self):
|
|
from backend.chart_manager import get_all_zones
|
|
return json.dumps(get_all_zones())
|
|
|
|
@pyqtSlot(result=str)
|
|
def get_chart_land(self):
|
|
from backend.chart_manager import get_all_land
|
|
return json.dumps(get_all_land())
|
|
|
|
@pyqtSlot(str)
|
|
def delete_chart(self, cell_id: str):
|
|
from backend.chart_manager import delete_cell
|
|
delete_cell(cell_id)
|
|
|
|
@pyqtSlot(str, str)
|
|
def set_chart_region(self, cell_id: str, region: str):
|
|
from backend.chart_manager import set_meta
|
|
set_meta(cell_id, region=region.upper())
|
|
|
|
@pyqtSlot(str, result=str)
|
|
def scan_charts_path(self, path: str):
|
|
from backend.chart_manager import scan_and_install
|
|
try:
|
|
result = scan_and_install(path)
|
|
except (FileNotFoundError, NotADirectoryError) as e:
|
|
result = {"installed": [], "skipped": [],
|
|
"errors": [{"file": path, "error": str(e)}]}
|
|
except Exception as e:
|
|
result = {"installed": [], "skipped": [],
|
|
"errors": [{"file": path, "error": str(e)}]}
|
|
return json.dumps(result)
|
|
|
|
@pyqtSlot(result=str)
|
|
def open_chart_file_dialog(self):
|
|
"""Open Qt native file dialog — install selected .000/.zip chart files."""
|
|
import zipfile as _zf
|
|
from PyQt5.QtWidgets import QFileDialog
|
|
from backend.chart_manager import (
|
|
install_from_enc, install_from_zip, install_from_csv_zip
|
|
)
|
|
|
|
files, _ = QFileDialog.getOpenFileNames(
|
|
None, "Open ENC Chart", "",
|
|
"ENC Charts (*.000 *.zip);;All Files (*)"
|
|
)
|
|
if not files:
|
|
return json.dumps({"installed": [], "skipped": [], "errors": []})
|
|
|
|
installed, skipped, errors = [], [], []
|
|
for fpath in files:
|
|
fp = Path(fpath)
|
|
try:
|
|
if fp.suffix.lower() == ".zip":
|
|
with _zf.ZipFile(fp) as z:
|
|
names = z.namelist()
|
|
has_csv = any(n.lower().endswith(".csv") for n in names)
|
|
has_enc = any(n.upper().endswith(".000") for n in names)
|
|
ids = install_from_csv_zip(fp) if (has_csv and not has_enc) \
|
|
else install_from_zip(fp)
|
|
installed.extend(ids)
|
|
elif fp.suffix.upper() == ".000":
|
|
cid = install_from_enc(fp, fp.stem.upper())
|
|
installed.append(cid)
|
|
except Exception as exc:
|
|
errors.append({"file": fp.name, "error": str(exc)})
|
|
|
|
return json.dumps({"installed": installed, "skipped": skipped, "errors": errors})
|