import bcrypt as _bcrypt if not hasattr(_bcrypt, "__about__"): import types; _bcrypt.__about__ = types.SimpleNamespace(__version__=_bcrypt.__version__ or "4.0.0") from datetime import datetime from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from contextlib import asynccontextmanager import asyncio import json import os from dotenv import load_dotenv load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), '..', '.env')) from database import engine, SessionLocal, ensure_column from models.aid import Aid from models.vessel import Vessel, VesselTrack, AtonTrack, RecordingEvent from models.lamp import Lamp from models.contact import Contact, AlertReport from models.org import Port, Company, BuoyOwnership import models.aid import models.vessel import models.user import models.lamp import models.contact import models.org from routers import aids from routers.auth import seed_users, get_current_user, require_admin from models.user import User from fastapi import Depends, Query as _Query from database import get_db from sqlalchemy.orm import Session from routers import auth as auth_router from routers import charts as charts_router from routers import equipment as equipment_router from routers import lamps as lamps_router from routers import contacts as contacts_router from routers import tracks as tracks_router from routers import org as org_router from services.ais_simulator import run_simulator, MIAMI_AIDS from services.alert_engine import evaluate_vessel, evaluate_aid_movement, aid_alert_state from services.gps_reader import GPSReader from services.aton_decoder import decode_type21, decode_type8_aton, process_aton_message, aton_state from services import settings_store from services.slave_relay import SlaveRelay import services.ais_catcher as _ais_catcher from services.ais_udp_reader import run_udp_listener import services.ais_udp_reader as _ais_udp # Crear tablas models.aid.Base.metadata.create_all(bind=engine) models.vessel.Base.metadata.create_all(bind=engine) models.user.Base.metadata.create_all(bind=engine) models.lamp.Base.metadata.create_all(bind=engine) models.contact.Base.metadata.create_all(bind=engine) models.org.Base.metadata.create_all(bind=engine) # Additive migrations for columns added after first install ensure_column("aids", "lamp_id", "TEXT") ensure_column("aids", "displacement_warn_m", "REAL") ensure_column("aids", "displacement_alarm_m", "REAL") ensure_column("aids", "signal_loss_min", "INTEGER") ensure_column("aids", "din3_function", "TEXT") ensure_column("aids", "din4_function", "TEXT") # Link estable a feature S-57 (cell + feature_id estable cuando viene de carta) ensure_column("aids", "source_chart", "TEXT") ensure_column("aids", "cell_id", "TEXT") ensure_column("aids", "chart_feature_id", "TEXT") ensure_column("lamps", "warn_pct", "REAL DEFAULT 20.0") ensure_column("lamps", "alarm_pct", "REAL DEFAULT 10.0") ensure_column("users", "prefs_json", "TEXT") ensure_column("users", "company_id", "TEXT") # Each entry: {"ws": WebSocket, "company_id": str|None} # company_id=None means superadmin/admin — sees ALL traffic. connected_clients: list[dict] = [] # Ownership cache: company_id → set of MMSI strings the company owns. # Rebuilt at startup and refreshed via POST /org/refresh (or when ownership changes). _ownership_cache: dict[str, set] = {} # company_id → {mmsi, ...} _aid_ownership_cache: dict[str, set] = {} # company_id → {aid_id, ...} # Track throttle: last persisted point per MMSI _vessel_track_last: dict[str, dict] = {} # mmsi → {ts, lat, lon} _aton_track_last: dict[str, dict] = {} # mmsi → {ts, lat, lon} # Signal-loss monitoring: last time each AIS AtoN was heard from _aton_last_seen: dict[str, datetime] = {} # mmsi → datetime UTC _signal_loss_state: dict[str, bool] = {} # mmsi → True if alert already sent # Digital input alert state: "{mmsi}_{din3|din4}" → True if currently triggered _digital_alert_state: dict[str, bool] = {} # Source of truth for runtime config — mutated via POST /settings. config = settings_store.SETTINGS # Background AIS reader task handle (lets us stop/restart on source change) _ais_task: asyncio.Task | None = None # Slave relay — active when server_role == "SLAVE" _slave_relay: SlaveRelay | None = None # Last-known battery alert state per ATON to avoid re-emitting every msg. # Values: None | 'WARN' | 'ALARM' _battery_alert_state: dict[str, str | None] = {} # Estado en memoria (posiciones actuales) vessels_state: dict = {} aids_state: dict = {} def _build_ownership_cache(db): """Rebuild the in-memory ownership maps (MMSI and aid_id) from the DB.""" _ownership_cache.clear() _aid_ownership_cache.clear() rows = db.query(BuoyOwnership).all() for row in rows: if row.mmsi: _ownership_cache.setdefault(row.company_id, set()).add(row.mmsi) if row.aid_id: _aid_ownership_cache.setdefault(row.company_id, set()).add(row.aid_id) def seed_ports(): """Ensure canonical Colombian ports exist in the DB.""" PORTS = [ {"id": "port-barranquilla", "name": "Barranquilla", "center_lat": 11.0041, "center_lon": -74.8070, "default_zoom": 12.0, "chart_name": None}, {"id": "port-cartagena", "name": "Cartagena", "center_lat": 10.3997, "center_lon": -75.5144, "default_zoom": 12.0, "chart_name": "BAHÍA_DE_CARTAGENA"}, {"id": "port-santamarta", "name": "Santa Marta", "center_lat": 11.2408, "center_lon": -74.2110, "default_zoom": 12.0, "chart_name": None}, {"id": "port-buenaventura", "name": "Buenaventura", "center_lat": 3.8800, "center_lon": -77.0311, "default_zoom": 12.0, "chart_name": None}, {"id": "port-tumaco", "name": "Tumaco", "center_lat": 1.8189, "center_lon": -78.7619, "default_zoom": 12.0, "chart_name": None}, ] db = SessionLocal() try: for p in PORTS: if not db.query(Port).filter(Port.id == p["id"]).first(): db.add(Port(**p)) db.commit() finally: db.close() def seed_contacts(): """Pre-load the operator company (INSM) as an OWNER contact so the REPORT flow has someone to notify out of the box.""" db = SessionLocal() try: from models.contact import Contact existing = db.query(Contact).filter(Contact.id == "insm-default").first() if not existing: db.add(Contact( id="insm-default", role="OWNER", name="INSM — Ingeniería Naval y Señalización Marítima", company_name="INSM", email="tecnicoinsm@gmail.com", preferred_channel="EMAIL", notes="Operator of this AidsMonitoring deployment.", )) db.commit() finally: db.close() def seed_aids(): db = SessionLocal() for a in MIAMI_AIDS: if not db.query(Aid).filter(Aid.id == a["id"]).first(): aid = Aid( id=a["id"], nombre=a["nombre"], categoria=a["categoria"], tipo=a["tipo"], tipo_ais=a["tipo_ais"], mmsi=a.get("mmsi"), lat_nominal=a["lat_nominal"], lon_nominal=a["lon_nominal"], radio_borneo_m=a.get("radio_borneo_m", 10.0), caracteristica_luz=a.get("caracteristica_luz"), alcance_nm=a.get("alcance_nm"), fuente_posicion="MANUAL", ) db.add(aid) db.commit() db.close() def _client_may_see(client: dict, owned_mmsi: str | None = None, owned_aid_id: str | None = None) -> bool: """Return True if this client is allowed to receive this message. - owned_mmsi → vessel/AtoN traffic filtered by MMSI ownership - owned_aid_id → aid-position traffic filtered by aid_id ownership - neither → system/alert message: always delivered """ cid = client.get("company_id") if cid is None: return True # admin/superadmin: sees all if owned_mmsi is not None: return owned_mmsi in _ownership_cache.get(cid, set()) if owned_aid_id is not None: return owned_aid_id in _aid_ownership_cache.get(cid, set()) return True # non-filtered message async def broadcast(message: dict, owned_mmsi: str | None = None, owned_aid_id: str | None = None): """ Send *message* to connected WebSocket clients. owned_mmsi → filter vessel/AtoN traffic by MMSI (company users see only their own) owned_aid_id → filter aid-position updates by aid_id Admins (company_id=None) always receive everything. When server_role == "SLAVE", also forwards the event upstream to the master. """ data = json.dumps(message) dead = [] for client in connected_clients: if not _client_may_see(client, owned_mmsi, owned_aid_id): continue try: await client["ws"].send_text(data) except Exception: dead.append(client) for c in dead: if c in connected_clients: connected_clients.remove(c) # Forward upstream when acting as a slave if _slave_relay is not None: _slave_relay.send(message) async def _persist_recording(db, alert: dict): """Save or close a RecordingEvent row when auto-recording triggers.""" from models.vessel import RecordingEvent import uuid as _uuid if alert["tipo"] == "GRABACION_INICIADA": rec = RecordingEvent( id=str(_uuid.uuid4()), mmsi=alert["mmsi"], aid_id=alert["aid_id"], inicio=datetime.utcnow(), trigger=alert.get("trigger", "PROXIMIDAD"), ) db.add(rec) db.commit() elif alert["tipo"] == "GRABACION_FINALIZADA": from sqlalchemy import and_ rec = db.query(RecordingEvent).filter( and_( RecordingEvent.mmsi == alert["mmsi"], RecordingEvent.aid_id == alert["aid_id"], RecordingEvent.cerrado == False, ) ).order_by(RecordingEvent.inicio.desc()).first() if rec: rec.fin = datetime.utcnow() rec.distancia_min_m = alert.get("distancia_min_m") rec.cerrado = True db.commit() async def process_message(msg: dict): db = SessionLocal() try: if msg["type"] == "vessel": mmsi = msg["mmsi"] vessels_state[mmsi] = msg aids_list = list(aids_state.values()) alerts = evaluate_vessel(msg, aids_list, config) await broadcast(msg) # vessels = public traffic, no filter for alert in alerts: # Filter proximity alerts to the company that owns the aid involved await broadcast({"type": "alert", **alert}, owned_aid_id=alert.get("aid_id")) if alert["tipo"] in ("GRABACION_INICIADA", "GRABACION_FINALIZADA"): await _persist_recording(db, alert) # ── Auto-persist VesselTrack (DVR) ─────────────────────────────── lat, lon = msg.get("lat"), msg.get("lon") if lat is not None and lon is not None: now = datetime.utcnow() prev = _vessel_track_last.get(mmsi) elapsed = (now - prev["ts"]).total_seconds() if prev else 9999 if elapsed >= 10: db.add(VesselTrack( mmsi=mmsi, timestamp=now, lat=lat, lon=lon, sog=msg.get("sog"), cog=msg.get("cog"), heading=msg.get("heading"), )) db.commit() _vessel_track_last[mmsi] = {"ts": now, "lat": lat, "lon": lon} elif msg["type"] == "aton": # Real AIS Type 21 / Type 8 from hardware receiver entry = process_aton_message(msg) if entry: mmsi = entry["mmsi"] aton_state[mmsi] = entry _aton_last_seen[mmsi] = datetime.utcnow() # track for signal-loss if _signal_loss_state.pop(mmsi, False): # Signal restored — clear any active loss alert on clients await broadcast({"type": "alert", "tipo": "SENAL_RESTAURADA", "mmsi": mmsi, "timestamp": datetime.utcnow().isoformat()}, owned_mmsi=mmsi) await broadcast({"type": "aton", **entry}, owned_mmsi=mmsi) # Auto-upsert Aid row on first sight (Type 21 carries name + position). # When an existing Aid with matching MMSI is found, update its # lat_actual + run drift evaluation using that aid's per-buoy # thresholds so the alert engine fires correctly. if msg.get("msg_type") == 21 and entry.get("lat") is not None: aid = db.query(Aid).filter(Aid.mmsi == mmsi).first() if not aid: from models.aid import Aid as _Aid import uuid as _uuid aid = _Aid( id=str(_uuid.uuid4()), nombre=entry.get("name") or f"AIS AtoN {mmsi}", categoria="FLOTANTE", tipo="BOYA_LATERAL", tipo_ais="ATON_21", mmsi=mmsi, lat_nominal=entry["lat"], lon_nominal=entry["lon"], fuente_posicion="AIS", source_chart="AIS", ) db.add(aid); db.commit() await broadcast({ "type": "alert", "id": str(_uuid.uuid4()), "tipo": "ALERTA_AMARILLA", "subtipo": "AYUDA_NUEVA_SIN_CONFIGURAR", "mmsi": mmsi, "aid_id": aid.id, "aid_nombre": aid.nombre, "mensaje": "Nueva ayuda detectada — falta asignar lámpara", "timestamp": datetime.utcnow().isoformat(), }) else: # Existing aid configured by operator (or auto-created). # Update its actual position, displacement, and broadcast # an aid_position event so the map can render the AIS # ghost marker. Drift alerts use this aid's per-buoy # thresholds (or fall back to global config). from services.alert_engine import haversine aid.lat_actual = entry["lat"] aid.lon_actual = entry["lon"] aid.desplazamiento_m = haversine( entry["lat"], entry["lon"], aid.lat_nominal, aid.lon_nominal, ) aid.ultima_senal = datetime.utcnow() # Drift evaluation — fires WARN/ALARM only on state changes drift_alerts = evaluate_aid_movement( aid.id, entry["lat"], entry["lon"], aid.lat_nominal, aid.lon_nominal, config, warn_m=aid.displacement_warn_m, alarm_m=aid.displacement_alarm_m, ) # en_posicion = within swing_radius of nominal aid.en_posicion = (aid.desplazamiento_m <= (aid.radio_borneo_m or 10.0)) db.commit() await broadcast({ "type": "aid_position", "id": aid.id, "lat_actual": entry["lat"], "lon_actual": entry["lon"], "desplazamiento_m": round(aid.desplazamiento_m, 1), "en_posicion": aid.en_posicion, "en_movimiento": False, }, owned_aid_id=aid.id) for alert in drift_alerts: await broadcast({"type": "alert", **alert}, owned_aid_id=alert.get("aid_id")) # Battery threshold check — per-aid lamp values if assigned, # otherwise the global defaults from settings_store. v = entry.get("voltage_v") if v is not None: aid = db.query(Aid).filter(Aid.mmsi == mmsi).first() lamp = None if aid and aid.lamp_id: lamp = db.query(Lamp).filter(Lamp.id == aid.lamp_id).first() if lamp: rng = lamp.voltage_max - lamp.voltage_min warn_pct = (lamp.warn_pct or 20.0) / 100.0 alarm_pct = (lamp.alarm_pct or 10.0) / 100.0 warn_v = lamp.voltage_min + rng * warn_pct alarm_v = lamp.voltage_min + rng * alarm_pct threshold_source = f"lamp:{lamp.manufacturer} {lamp.model}" else: warn_v = config["battery_warn_v"] alarm_v = config["battery_alarm_v"] threshold_source = "global" prev = _battery_alert_state.get(mmsi) new_state = None if v <= alarm_v: new_state = "ALARM" elif v <= warn_v: new_state = "WARN" if new_state and new_state != prev: import uuid as _uuid await broadcast({ "type": "alert", "id": str(_uuid.uuid4()), "tipo": "ALERTA_ROJA" if new_state == "ALARM" else "ALERTA_AMARILLA", "subtipo": "BATERIA_BAJA", "mmsi": mmsi, "aid_nombre": entry.get("name"), "voltage_v": v, "umbral": alarm_v if new_state == "ALARM" else warn_v, "fuente_umbral": threshold_source, "timestamp": datetime.utcnow().isoformat(), }, owned_mmsi=mmsi) # only the company that owns this buoy _battery_alert_state[mmsi] = new_state # ── Digital input alerts (water ingress / listing) ──────────── if msg.get("msg_type") == 8: aid = aid or db.query(Aid).filter(Aid.mmsi == mmsi).first() if aid: import uuid as _uuid2 _DIN_ALERT_MAP = { "WATER_INGRESS_WARN": ("ALERTA_AMARILLA", "INGRESO_AGUA", "⚠ Water ingress detected"), "WATER_INGRESS_CRITICAL": ("ALERTA_ROJA", "HUNDIMIENTO", "🔴 Critical water ingress — sinking risk"), "LISTING": ("ALERTA_ROJA", "ESCORA_CRITICA", "🔴 Critical list detected"), } for din_field, fn_attr in [("din3", "din3_function"), ("din4", "din4_function")]: fn = getattr(aid, fn_attr, None) if not fn or fn not in _DIN_ALERT_MAP: continue triggered = entry.get(din_field, False) # Also check IEC standard water level bit for CRITICAL if fn == "WATER_INGRESS_CRITICAL": triggered = triggered or entry.get("water_level_high", False) state_key = f"{mmsi}_{din_field}" prev_state = _digital_alert_state.get(state_key) if triggered and not prev_state: tipo, subtipo, mensaje = _DIN_ALERT_MAP[fn] await broadcast({ "type": "alert", "id": str(_uuid2.uuid4()), "tipo": tipo, "subtipo": subtipo, "mmsi": mmsi, "aid_id": aid.id, "aid_nombre": aid.nombre, "mensaje": mensaje, "timestamp": datetime.utcnow().isoformat(), }, owned_mmsi=mmsi) elif not triggered and prev_state: # Condition cleared await broadcast({ "type": "alert", "tipo": "CONDICION_RESUELTA", "subtipo": _DIN_ALERT_MAP[fn][1], "mmsi": mmsi, "aid_id": aid.id, "timestamp": datetime.utcnow().isoformat(), }, owned_mmsi=mmsi) _digital_alert_state[state_key] = triggered # ── Auto-persist AtonTrack (DVR) ───────────────────────────── at_lat = entry.get("lat") at_lon = entry.get("lon") if at_lat is not None and at_lon is not None: now = datetime.utcnow() prev = _aton_track_last.get(mmsi) elapsed = (now - prev["ts"]).total_seconds() if prev else 9999 if elapsed >= 30: # AtoN updates less frequent → 30s threshold db.add(AtonTrack( mmsi=mmsi, timestamp=now, lat=at_lat, lon=at_lon, voltage_v=entry.get("voltage_v"), off_position=entry.get("off_position"), )) db.commit() _aton_track_last[mmsi] = {"ts": now, "lat": at_lat, "lon": at_lon} elif msg["type"] == "aid_position": aid_id = msg["id"] if aid_id in aids_state: aids_state[aid_id].update(msg) aid = db.query(Aid).filter(Aid.id == aid_id).first() if aid: alert_list = evaluate_aid_movement( aid_id, msg["lat_actual"], msg["lon_actual"], aid.lat_nominal, aid.lon_nominal, config, warn_m=aid.displacement_warn_m, # per-aid override (None → global) alarm_m=aid.displacement_alarm_m, ) aid.lat_actual = msg["lat_actual"] aid.lon_actual = msg["lon_actual"] aid.desplazamiento_m = msg["desplazamiento_m"] aid.en_posicion = msg["en_posicion"] db.commit() await broadcast(msg) # aid positions = public, no filter for alert in alert_list: # Only the company that owns this aid receives movement/position alerts await broadcast({"type": "alert", **alert}, owned_aid_id=alert.get("aid_id")) finally: db.close() async def _start_ais_source(): """Start the AIS reader task matching the current config['ais_source'].""" src = config.get("ais_source", "SIMULATOR").upper() if src == "SIMULATOR": return asyncio.create_task(run_simulator(process_message)) if src == "SDR": # Auto-launch AIS-catcher if hardware is present and not already running if _ais_catcher.exe_exists() and not _ais_catcher.is_running(): result = _ais_catcher.launch() if result["ok"]: print(f"[ais] AIS-catcher launched (pid {result.get('pid')})") else: print(f"[ais] AIS-catcher launch failed: {result.get('error')}") return asyncio.create_task(run_udp_listener(process_message, port=10110)) return None async def _stop_ais_source(): global _ais_task if _ais_task and not _ais_task.done(): _ais_task.cancel() try: await _ais_task except asyncio.CancelledError: pass _ais_task = None @asynccontextmanager async def lifespan(app: FastAPI): settings_store.init() seed_aids() seed_users() seed_contacts() seed_ports() db = SessionLocal() _build_ownership_cache(db) db.close() db = SessionLocal() for aid in db.query(Aid).all(): aids_state[aid.id] = { "id": aid.id, "nombre": aid.nombre, "categoria": aid.categoria, "tipo": aid.tipo, "tipo_ais": aid.tipo_ais, "mmsi": aid.mmsi, "lat_nominal": aid.lat_nominal, "lon_nominal": aid.lon_nominal, "radio_borneo_m": aid.radio_borneo_m, "lat_actual": aid.lat_actual, "lon_actual": aid.lon_actual, "desplazamiento_m": aid.desplazamiento_m or 0, "en_posicion": aid.en_posicion, "en_movimiento": aid.en_movimiento, "caracteristica_luz": aid.caracteristica_luz, "alcance_nm": aid.alcance_nm, "lamp_id": aid.lamp_id, "puerto_responsable": aid.puerto_responsable, "empresa_responsable": aid.empresa_responsable, "ultima_senal": str(aid.ultima_senal) if aid.ultima_senal else None, } db.close() global _ais_task _ais_task = await _start_ais_source() # ── Pre-warm chart-cell coverage cache in background ─────────────────── # Reading 691 cell files is a ~2.5 s one-time cost. Doing it at startup # means the FIRST user request lands on a hot cache. Runs off the main # event loop so the server is responsive immediately. async def _warm_chart_cache(): try: from services import chart_manager as _cm import os t0 = asyncio.get_event_loop().time() count = 0 for cell_dir in _cm.CHARTS_DIR.iterdir(): if not cell_dir.is_dir(): continue for fname in ("depths.geojson", "land.geojson", "zones.geojson", "hazards.geojson", "features.geojson"): cache = cell_dir / fname if not cache.exists(): continue try: mtime = cache.stat().st_mtime except OSError: continue cov_key = f"{cache}|{mtime}" if cov_key in _cm._cell_coverage_cache: continue try: import json as _json fc = _json.loads(cache.read_text()) except Exception: continue cov = _cm._coverage_bbox(fc.get("features") or []) if cov is not None: _cm._cell_coverage_cache[cov_key] = cov count += 1 # Yield to event loop every 50 files so we don't block if count % 50 == 0: await asyncio.sleep(0) dt = asyncio.get_event_loop().time() - t0 print(f"[charts] Pre-warm: {count} files indexed in {dt:.1f}s " f"({len(_cm._cell_coverage_cache)} cached bboxes)") except Exception as e: print(f"[charts] Pre-warm failed (non-fatal): {e}") asyncio.create_task(_warm_chart_cache()) # ── Slave relay (start when role == SLAVE) ───────────────────────────── global _slave_relay role = config.get("server_role", "STANDALONE").upper() master_url = config.get("master_url", "").strip() slave_name = config.get("slave_name", "").strip() or config.get("station_name", "Slave") if role == "SLAVE" and master_url: _slave_relay = SlaveRelay(master_url=master_url, slave_name=slave_name) await _slave_relay.start() print(f"[cluster] Rol: SLAVE → maestro {master_url} (nombre='{slave_name}')") elif role == "MASTER": print(f"[cluster] Rol: MASTER — esperando conexiones de esclavos en /ws/slave") # GPS — settings_store now holds the port (mirrored from .env on first run) gps_port = config.get("gps_port") or None gps_baud = config.get("gps_baud") or None gps = GPSReader(broadcast, forced_port=gps_port, forced_baud=gps_baud) await gps.start() app.state.gps = gps # ── Signal-loss monitor (runs every 60 s) ────────────────────────────── async def _signal_loss_monitor(): import uuid as _uuid while True: await asyncio.sleep(60) now = datetime.utcnow() db2 = SessionLocal() try: for mmsi, last_seen in list(_aton_last_seen.items()): aid = db2.query(Aid).filter(Aid.mmsi == mmsi).first() if not aid or not aid.signal_loss_min: continue elapsed_min = (now - last_seen).total_seconds() / 60 if elapsed_min >= aid.signal_loss_min: if not _signal_loss_state.get(mmsi): _signal_loss_state[mmsi] = True await broadcast({ "type": "alert", "id": str(_uuid.uuid4()), "tipo": "ALERTA_ROJA", "subtipo": "PERDIDA_SENAL", "mmsi": mmsi, "aid_id": aid.id, "aid_nombre": aid.nombre, "minutos_sin_senal": round(elapsed_min), "umbral_min": aid.signal_loss_min, "timestamp": now.isoformat(), }, owned_mmsi=mmsi) else: _signal_loss_state.pop(mmsi, None) finally: db2.close() asyncio.create_task(_signal_loss_monitor()) yield # ── Shutdown: stop slave relay if running ────────────────────────────── if _slave_relay is not None: await _slave_relay.stop() app = FastAPI(title="AidsMonitoring", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:8080", "http://127.0.0.1:8080", "http://localhost:5173", "http://127.0.0.1:5173", ], allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"], allow_headers=["Content-Type", "Authorization"], ) # Compress GeoJSON / large JSON responses. minimum_size=1024 skips tiny ones # (status pings, single-aid lookups) where the gzip overhead isn't worth it. # Chart payloads (depths 350 MB, land 124 MB cross-cell total) compress 5–10×. app.add_middleware(GZipMiddleware, minimum_size=1024) app.include_router(auth_router.router) app.include_router(aids.router) app.include_router(charts_router.router) app.include_router(equipment_router.router) app.include_router(lamps_router.router) app.include_router(contacts_router.router) app.include_router(tracks_router.router) app.include_router(org_router.router) @app.post("/org/refresh") async def refresh_ownership( _user: User = Depends(require_admin), db: Session = Depends(get_db), ): """Rebuild the in-memory MMSI ownership cache from DB. Call after assigning/removing buoys.""" _build_ownership_cache(db) return {"ok": True, "companies": len(_ownership_cache), "total_mmsis": sum(len(v) for v in _ownership_cache.values())} def _require_recording_permission(user: User): """ADMIN, SUPERADMIN, or CLIENT_ADMIN may start/stop recordings.""" from models.user import Role if user.role not in (Role.ADMIN, Role.SUPERADMIN, Role.CLIENT_ADMIN): from fastapi import HTTPException raise HTTPException(status_code=403, detail="Only ADMIN or CLIENT_ADMIN may control recordings") @app.post("/recordings/start/{mmsi}") async def manual_start_recording( mmsi: str, aid_id: str = "MANUAL", current_user: User = Depends(get_current_user), ): _require_recording_permission(current_user) from services.alert_engine import active_recordings from models.vessel import RecordingEvent import uuid as _uuid key = f"{mmsi}_{aid_id}" if key in active_recordings: return {"ok": False, "detail": "Already recording"} active_recordings[key] = { "mmsi": mmsi, "aid_id": aid_id, "aid_nombre": aid_id, "inicio": datetime.utcnow().isoformat(), "trigger": "MANUAL", "min_dist": 0, "pre_buffer": [], } db = SessionLocal() try: rec = RecordingEvent( id=str(_uuid.uuid4()), mmsi=mmsi, aid_id=aid_id, inicio=datetime.utcnow(), trigger="MANUAL", ) db.add(rec); db.commit() finally: db.close() vessel = vessels_state.get(mmsi, {}) await broadcast({"type": "alert", "tipo": "GRABACION_INICIADA", "mmsi": mmsi, "aid_id": aid_id, "aid_nombre": aid_id, "distancia_m": 0, "trigger": "MANUAL", "timestamp": datetime.utcnow().isoformat()}) return {"ok": True} @app.post("/recordings/stop/{mmsi}") async def manual_stop_recording( mmsi: str, aid_id: str = "MANUAL", current_user: User = Depends(get_current_user), ): from services.alert_engine import active_recordings from models.vessel import RecordingEvent from sqlalchemy import and_ _require_recording_permission(current_user) key = f"{mmsi}_{aid_id}" rec_mem = active_recordings.pop(key, None) min_dist = round(rec_mem["min_dist"], 1) if rec_mem else None db = SessionLocal() try: rec = db.query(RecordingEvent).filter( and_(RecordingEvent.mmsi == mmsi, RecordingEvent.aid_id == aid_id, RecordingEvent.cerrado == False) ).order_by(RecordingEvent.inicio.desc()).first() if rec: rec.fin = datetime.utcnow() rec.distancia_min_m = min_dist rec.cerrado = True db.commit() finally: db.close() await broadcast({"type": "alert", "tipo": "GRABACION_FINALIZADA", "mmsi": mmsi, "aid_id": aid_id, "distancia_min_m": min_dist, "timestamp": datetime.utcnow().isoformat()}) return {"ok": True} @app.get("/debug") async def debug_state(): from services.alert_engine import aid_alert_state, aid_position_history return { "alert_states": dict(aid_alert_state), "history_sizes": {k: len(v) for k, v in aid_position_history.items()}, "aids_in_state": list(aids_state.keys()), "vessels_tracked": len(vessels_state), } @app.get("/settings") async def get_settings(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): """Return system defaults merged with this user's saved preferences.""" import json as _json base = settings_store.get_all() try: user_row = db.query(User).filter(User.id == current_user.id).first() prefs = _json.loads(user_row.prefs_json or '{}') if user_row else {} except Exception: prefs = {} return {**base, **prefs} @app.post("/settings") async def update_settings(payload: dict, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): """Save user-specific preferences to the DB. System-level keys (GPS port, AIS source) are also applied to the runtime config so they take effect without a restart. Returns {ok, settings, applied: [...]}.""" import json as _json global _ais_task # Split: system keys go to settings_store; everything stays in user prefs SYSTEM_KEYS = {"ais_source","ais_serial_port","ais_baud","ais_net_addr", "gps_port","gps_baud","smtp_host","smtp_port","smtp_user", "smtp_password","smtp_from","smtp_from_name","smtp_use_tls", "server_role","master_url","slave_name"} system_patch = {k: v for k, v in (payload or {}).items() if k in SYSTEM_KEYS} prev = settings_store.get_all() if system_patch: settings_store.apply_patch(system_patch) new = settings_store.get_all() applied = [] # AIS source change if system_patch.get("ais_source") and new["ais_source"] != prev["ais_source"]: await _stop_ais_source() _ais_task = await _start_ais_source() applied.append(f"ais_source → {new['ais_source']}") # Cluster role / master URL change — restart slave relay if needed if "server_role" in system_patch or "master_url" in system_patch or "slave_name" in system_patch: global _slave_relay if _slave_relay is not None: await _slave_relay.stop() _slave_relay = None role = new.get("server_role", "STANDALONE").upper() master_url = new.get("master_url", "").strip() sname = new.get("slave_name", "").strip() or new.get("station_name", "Slave") if role == "SLAVE" and master_url: _slave_relay = SlaveRelay(master_url=master_url, slave_name=sname) await _slave_relay.start() applied.append(f"server_role → SLAVE, relay → {master_url}") else: applied.append(f"server_role → {role}") # GPS port change if "gps_port" in system_patch or "gps_baud" in system_patch: gps: GPSReader = getattr(app.state, "gps", None) if gps: gps._forced_port = new["gps_port"] or None gps._forced_baud = new["gps_baud"] or None gps._port = None gps.status = "SEARCHING" gps.reconnect() applied.append(f"gps_port → {new['gps_port'] or 'auto'} @ {new['gps_baud']}") # Save full payload as this user's preferences try: user_row = db.query(User).filter(User.id == current_user.id).first() if user_row: existing = _json.loads(user_row.prefs_json or '{}') existing.update(payload or {}) user_row.prefs_json = _json.dumps(existing) db.commit() except Exception: pass return {"ok": True, "settings": {**new, **(payload or {})}, "applied": applied} @app.get("/gps/status") async def gps_status(request: Request): gps: GPSReader = getattr(request.app.state, "gps", None) if not gps: return {"status": "DISABLED", "port": None, "last_fix": None} return { "status": gps.status, "port": gps._port or gps._forced_port, "forced_port": gps._forced_port, "baud": gps._baud, "last_fix": gps.last_fix, } @app.post("/gps/port") async def set_gps_port(request: Request, port: str = "", baud: int = 9600): """Hot-change GPS port without server restart.""" gps: GPSReader = getattr(request.app.state, "gps", None) if not gps: return {"ok": False, "detail": "GPS reader not running"} gps._forced_port = port.strip() or None gps._forced_baud = baud or None gps._port = None # force reconnect on next loop iteration gps.status = "SEARCHING" gps.reconnect() # close active serial so _read_loop exits and re-probes # Persist to .env for next restart env_path = os.path.join(os.path.dirname(__file__), '..', '.env') try: lines = open(env_path).readlines() def _set(lines, key, val): for i, l in enumerate(lines): if l.startswith(f"{key}="): lines[i] = f"{key}={val}\n"; return lines lines.append(f"{key}={val}\n"); return lines lines = _set(lines, "GPS_PORT", port.strip()) lines = _set(lines, "GPS_BAUD", str(baud)) open(env_path, 'w').writelines(lines) except Exception as e: pass # non-fatal — in-memory change already applied return {"ok": True, "port": gps._forced_port, "baud": baud} @app.get("/ais/status") async def ais_status(): """Return AIS-catcher process status and SDR hardware info.""" return _ais_catcher.status() @app.get("/ais/stats") async def ais_stats(): """Return live UDP listener stats: sentence counts, last receive time.""" import time as _time udp = _ais_udp.get_stats() catcher = _ais_catcher.status() last_ts = udp.get("last_sentence_ts") seconds_since = round(_time.time() - last_ts, 1) if last_ts else None return { **udp, "seconds_since_last": seconds_since, "signal": last_ts is not None and seconds_since < 60, "catcher_running": catcher["running"], "sdr_devices": catcher["sdr_devices"], "ais_source": config.get("ais_source", "SIMULATOR"), } @app.get("/ais/raw") async def ais_raw(): """Return last 30 raw NMEA sentences received.""" return {"sentences": _ais_udp.get_raw()} @app.post("/ais/launch") async def ais_launch(udp_port: int = 10110, web_port: int = 8100): """Launch AIS-catcher.exe (auto-switches ais_source to SDR).""" global _ais_task result = _ais_catcher.launch(udp_port=udp_port, web_port=web_port) # Always sync source to SDR after a successful launch (or already-running), # otherwise the catcher streams to UDP:10110 with no listener attached. if result["ok"] and config.get("ais_source", "SIMULATOR").upper() != "SDR": settings_store.apply_patch({"ais_source": "SDR"}) await _stop_ais_source() _ais_task = await _start_ais_source() return result @app.post("/ais/stop") async def ais_stop(): """Stop AIS-catcher.exe.""" return _ais_catcher.stop() @app.websocket("/ws/slave") async def slave_websocket_endpoint(ws: WebSocket): """ MASTER-mode endpoint. Slave servers connect here to stream their events. The master rebroadcasts each event to all locally connected clients (after tagging it with _slave origin), and merges vessel/aton state in memory. """ if config.get("server_role", "STANDALONE").upper() not in ("MASTER", "STANDALONE"): await ws.close(code=1008, reason="This server is not a MASTER") return await ws.accept() slave_name = "unknown" print(f"[cluster] Nuevo esclavo conectado desde {ws.client}") try: while True: raw = await ws.receive_text() try: msg = json.loads(raw) except Exception: continue msg_type = msg.get("type") # Hello handshake — log the slave name if msg_type == "slave_hello": slave_name = msg.get("slave_name", "unknown") print(f"[cluster] Esclavo identificado: '{slave_name}'") continue # Merge vessel state into master's in-memory snapshot if msg_type == "vessel": mmsi = msg.get("mmsi") if mmsi: vessels_state[mmsi] = msg elif msg_type == "aton": mmsi = msg.get("mmsi") if mmsi: from services.aton_decoder import aton_state as _aton_state _aton_state[mmsi] = msg elif msg_type == "aid_position": aid_id = msg.get("id") if aid_id and aid_id in aids_state: aids_state[aid_id].update(msg) # Rebroadcast to all locally connected browser clients # _slave tag identifies origin; admins see all; company clients # still filtered by ownership as usual. await broadcast(msg) except WebSocketDisconnect: print(f"[cluster] Esclavo '{slave_name}' desconectado.") except Exception as e: print(f"[cluster] Error con esclavo '{slave_name}': {e}") @app.get("/cluster/status") async def cluster_status(current_user=Depends(get_current_user)): """Return current cluster role and slave relay status.""" role = config.get("server_role", "STANDALONE").upper() info: dict = { "role": role, "slave_name": config.get("slave_name", ""), "master_url": config.get("master_url", ""), } if role == "SLAVE" and _slave_relay is not None: info["relay"] = _slave_relay.status() return info @app.websocket("/ws") async def websocket_endpoint( ws: WebSocket, token: str | None = _Query(default=None), ): await ws.accept() # ── Resolve company_id from optional JWT token ──────────────────────────── company_id = None if token: try: from jose import jwt as _jwt, JWTError as _JWTError from routers.auth import SECRET_KEY, ALGORITHM payload = _jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if username: _db = SessionLocal() try: _u = _db.query(User).filter(User.username == username, User.activo == True).first() if _u: company_id = getattr(_u, "company_id", None) finally: _db.close() except Exception: await ws.close(code=1008) return if token is None: await ws.close(code=1008) return client = {"ws": ws, "company_id": company_id} connected_clients.append(client) # ── Build filtered init snapshot ────────────────────────────────────────── # company users see ONLY their company's aids/vessels/atons # admins (company_id=None) see everything owned_mmsis = _ownership_cache.get(company_id) if company_id else None owned_aid_ids= _aid_ownership_cache.get(company_id) if company_id else None # aids = official positions → visible to ALL users (public nav info) # vessels= ship AIS traffic → visible to ALL users # atons = live AIS AtoN msgs → filtered by company MMSI ownership init_aids = list(aids_state.values()) init_vessels = list(vessels_state.values()) if company_id is None: # Admin / superadmin → all AtoNs init_atons = list(aton_state.values()) else: # Client role: get their owned MMSIs (empty set = no buoys assigned = sees nothing) client_mmsis = _ownership_cache.get(company_id, set()) init_atons = [a for a in aton_state.values() if a.get("mmsi") in client_mmsis] await ws.send_text(json.dumps({ "type": "init", "aids": init_aids, "vessels": init_vessels, "atons": init_atons, })) # Re-emit any active aid alerts so new clients see current state # Company users only receive alerts for aids they own from datetime import datetime as _dt for aid_id, state in aid_alert_state.items(): if owned_aid_ids is not None and aid_id not in owned_aid_ids: continue # this aid doesn't belong to the connecting client's company if state == 'RED': await ws.send_text(json.dumps({"type": "alert", "tipo": "ALERTA_ROJA", "subtipo": "AYUDA_EN_MOVIMIENTO", "aid_id": aid_id, "mensaje": "Movimiento continuo detectado", "timestamp": _dt.utcnow().isoformat()})) elif state == 'YELLOW': aid = aids_state.get(aid_id, {}) await ws.send_text(json.dumps({"type": "alert", "tipo": "ALERTA_AMARILLA", "subtipo": "AYUDA_FUERA_POSICION", "aid_id": aid_id, "desplazamiento_m": round(aid.get("desplazamiento_m", 0), 1), "timestamp": _dt.utcnow().isoformat()})) try: while True: await ws.receive_text() except WebSocketDisconnect: if client in connected_clients: connected_clients.remove(client) app.mount("/", StaticFiles(directory=os.path.join(os.path.dirname(__file__), '..', 'frontend'), html=True), name="frontend")