cfd94f905a
- Restrict CORS to localhost origins (was allow_origins=[*])
- Require valid JWT on WebSocket /ws (anonymous no longer gets admin view)
- Fix path traversal in delete_cell(): resolve() + parent check
- Validate cell_id format in /charts/download-noaa/{cell_id}
- Exclude charts/ and Cartas/ from git (keep US1GC09M world overview)
- Add NOAA ENC Portal external link in charts catalog tab
- Untrack __pycache__/, .db, .claude/ session files
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1136 lines
49 KiB
Python
1136 lines
49 KiB
Python
import bcrypt as _bcrypt
|
||
if not hasattr(_bcrypt, "__about__"):
|
||
import types; _bcrypt.__about__ = types.SimpleNamespace(__version__=_bcrypt.__version__ or "4.0.0")
|
||
|
||
from datetime import datetime
|
||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.middleware.gzip import GZipMiddleware
|
||
from contextlib import asynccontextmanager
|
||
import asyncio
|
||
import json
|
||
import os
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), '..', '.env'))
|
||
|
||
from database import engine, SessionLocal, ensure_column
|
||
from models.aid import Aid
|
||
from models.vessel import Vessel, VesselTrack, AtonTrack, RecordingEvent
|
||
from models.lamp import Lamp
|
||
from models.contact import Contact, AlertReport
|
||
from models.org import Port, Company, BuoyOwnership
|
||
import models.aid
|
||
import models.vessel
|
||
import models.user
|
||
import models.lamp
|
||
import models.contact
|
||
import models.org
|
||
from routers import aids
|
||
from routers.auth import seed_users, get_current_user, require_admin
|
||
from models.user import User
|
||
from fastapi import Depends, Query as _Query
|
||
from database import get_db
|
||
from sqlalchemy.orm import Session
|
||
from routers import auth as auth_router
|
||
from routers import charts as charts_router
|
||
from routers import equipment as equipment_router
|
||
from routers import lamps as lamps_router
|
||
from routers import contacts as contacts_router
|
||
from routers import tracks as tracks_router
|
||
from routers import org as org_router
|
||
from services.ais_simulator import run_simulator, MIAMI_AIDS
|
||
from services.alert_engine import evaluate_vessel, evaluate_aid_movement, aid_alert_state
|
||
from services.gps_reader import GPSReader
|
||
from services.aton_decoder import decode_type21, decode_type8_aton, process_aton_message, aton_state
|
||
from services import settings_store
|
||
from services.slave_relay import SlaveRelay
|
||
import services.ais_catcher as _ais_catcher
|
||
from services.ais_udp_reader import run_udp_listener
|
||
import services.ais_udp_reader as _ais_udp
|
||
|
||
# Crear tablas
|
||
models.aid.Base.metadata.create_all(bind=engine)
|
||
models.vessel.Base.metadata.create_all(bind=engine)
|
||
models.user.Base.metadata.create_all(bind=engine)
|
||
models.lamp.Base.metadata.create_all(bind=engine)
|
||
models.contact.Base.metadata.create_all(bind=engine)
|
||
models.org.Base.metadata.create_all(bind=engine)
|
||
# Additive migrations for columns added after first install
|
||
ensure_column("aids", "lamp_id", "TEXT")
|
||
ensure_column("aids", "displacement_warn_m", "REAL")
|
||
ensure_column("aids", "displacement_alarm_m", "REAL")
|
||
ensure_column("aids", "signal_loss_min", "INTEGER")
|
||
ensure_column("aids", "din3_function", "TEXT")
|
||
ensure_column("aids", "din4_function", "TEXT")
|
||
# Link estable a feature S-57 (cell + feature_id estable cuando viene de carta)
|
||
ensure_column("aids", "source_chart", "TEXT")
|
||
ensure_column("aids", "cell_id", "TEXT")
|
||
ensure_column("aids", "chart_feature_id", "TEXT")
|
||
ensure_column("lamps", "warn_pct", "REAL DEFAULT 20.0")
|
||
ensure_column("lamps", "alarm_pct", "REAL DEFAULT 10.0")
|
||
ensure_column("users", "prefs_json", "TEXT")
|
||
ensure_column("users", "company_id", "TEXT")
|
||
|
||
# Each entry: {"ws": WebSocket, "company_id": str|None}
|
||
# company_id=None means superadmin/admin — sees ALL traffic.
|
||
connected_clients: list[dict] = []
|
||
|
||
# Ownership cache: company_id → set of MMSI strings the company owns.
|
||
# Rebuilt at startup and refreshed via POST /org/refresh (or when ownership changes).
|
||
_ownership_cache: dict[str, set] = {} # company_id → {mmsi, ...}
|
||
_aid_ownership_cache: dict[str, set] = {} # company_id → {aid_id, ...}
|
||
|
||
# Track throttle: last persisted point per MMSI
|
||
_vessel_track_last: dict[str, dict] = {} # mmsi → {ts, lat, lon}
|
||
_aton_track_last: dict[str, dict] = {} # mmsi → {ts, lat, lon}
|
||
|
||
# Signal-loss monitoring: last time each AIS AtoN was heard from
|
||
_aton_last_seen: dict[str, datetime] = {} # mmsi → datetime UTC
|
||
_signal_loss_state: dict[str, bool] = {} # mmsi → True if alert already sent
|
||
|
||
# Digital input alert state: "{mmsi}_{din3|din4}" → True if currently triggered
|
||
_digital_alert_state: dict[str, bool] = {}
|
||
|
||
# Source of truth for runtime config — mutated via POST /settings.
|
||
config = settings_store.SETTINGS
|
||
|
||
# Background AIS reader task handle (lets us stop/restart on source change)
|
||
_ais_task: asyncio.Task | None = None
|
||
|
||
# Slave relay — active when server_role == "SLAVE"
|
||
_slave_relay: SlaveRelay | None = None
|
||
|
||
# Last-known battery alert state per ATON to avoid re-emitting every msg.
|
||
# Values: None | 'WARN' | 'ALARM'
|
||
_battery_alert_state: dict[str, str | None] = {}
|
||
|
||
# Estado en memoria (posiciones actuales)
|
||
vessels_state: dict = {}
|
||
aids_state: dict = {}
|
||
|
||
def _build_ownership_cache(db):
|
||
"""Rebuild the in-memory ownership maps (MMSI and aid_id) from the DB."""
|
||
_ownership_cache.clear()
|
||
_aid_ownership_cache.clear()
|
||
rows = db.query(BuoyOwnership).all()
|
||
for row in rows:
|
||
if row.mmsi:
|
||
_ownership_cache.setdefault(row.company_id, set()).add(row.mmsi)
|
||
if row.aid_id:
|
||
_aid_ownership_cache.setdefault(row.company_id, set()).add(row.aid_id)
|
||
|
||
|
||
def seed_ports():
|
||
"""Ensure canonical Colombian ports exist in the DB."""
|
||
PORTS = [
|
||
{"id": "port-barranquilla", "name": "Barranquilla",
|
||
"center_lat": 11.0041, "center_lon": -74.8070, "default_zoom": 12.0,
|
||
"chart_name": None},
|
||
{"id": "port-cartagena", "name": "Cartagena",
|
||
"center_lat": 10.3997, "center_lon": -75.5144, "default_zoom": 12.0,
|
||
"chart_name": "BAHÍA_DE_CARTAGENA"},
|
||
{"id": "port-santamarta", "name": "Santa Marta",
|
||
"center_lat": 11.2408, "center_lon": -74.2110, "default_zoom": 12.0,
|
||
"chart_name": None},
|
||
{"id": "port-buenaventura", "name": "Buenaventura",
|
||
"center_lat": 3.8800, "center_lon": -77.0311, "default_zoom": 12.0,
|
||
"chart_name": None},
|
||
{"id": "port-tumaco", "name": "Tumaco",
|
||
"center_lat": 1.8189, "center_lon": -78.7619, "default_zoom": 12.0,
|
||
"chart_name": None},
|
||
]
|
||
db = SessionLocal()
|
||
try:
|
||
for p in PORTS:
|
||
if not db.query(Port).filter(Port.id == p["id"]).first():
|
||
db.add(Port(**p))
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def seed_contacts():
|
||
"""Pre-load the operator company (INSM) as an OWNER contact so the
|
||
REPORT flow has someone to notify out of the box."""
|
||
db = SessionLocal()
|
||
try:
|
||
from models.contact import Contact
|
||
existing = db.query(Contact).filter(Contact.id == "insm-default").first()
|
||
if not existing:
|
||
db.add(Contact(
|
||
id="insm-default",
|
||
role="OWNER",
|
||
name="INSM — Ingeniería Naval y Señalización Marítima",
|
||
company_name="INSM",
|
||
email="tecnicoinsm@gmail.com",
|
||
preferred_channel="EMAIL",
|
||
notes="Operator of this AidsMonitoring deployment.",
|
||
))
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def seed_aids():
|
||
db = SessionLocal()
|
||
for a in MIAMI_AIDS:
|
||
if not db.query(Aid).filter(Aid.id == a["id"]).first():
|
||
aid = Aid(
|
||
id=a["id"],
|
||
nombre=a["nombre"],
|
||
categoria=a["categoria"],
|
||
tipo=a["tipo"],
|
||
tipo_ais=a["tipo_ais"],
|
||
mmsi=a.get("mmsi"),
|
||
lat_nominal=a["lat_nominal"],
|
||
lon_nominal=a["lon_nominal"],
|
||
radio_borneo_m=a.get("radio_borneo_m", 10.0),
|
||
caracteristica_luz=a.get("caracteristica_luz"),
|
||
alcance_nm=a.get("alcance_nm"),
|
||
fuente_posicion="MANUAL",
|
||
)
|
||
db.add(aid)
|
||
db.commit()
|
||
db.close()
|
||
|
||
def _client_may_see(client: dict,
|
||
owned_mmsi: str | None = None,
|
||
owned_aid_id: str | None = None) -> bool:
|
||
"""Return True if this client is allowed to receive this message.
|
||
|
||
- owned_mmsi → vessel/AtoN traffic filtered by MMSI ownership
|
||
- owned_aid_id → aid-position traffic filtered by aid_id ownership
|
||
- neither → system/alert message: always delivered
|
||
"""
|
||
cid = client.get("company_id")
|
||
if cid is None:
|
||
return True # admin/superadmin: sees all
|
||
if owned_mmsi is not None:
|
||
return owned_mmsi in _ownership_cache.get(cid, set())
|
||
if owned_aid_id is not None:
|
||
return owned_aid_id in _aid_ownership_cache.get(cid, set())
|
||
return True # non-filtered message
|
||
|
||
|
||
async def broadcast(message: dict,
|
||
owned_mmsi: str | None = None,
|
||
owned_aid_id: str | None = None):
|
||
"""
|
||
Send *message* to connected WebSocket clients.
|
||
|
||
owned_mmsi → filter vessel/AtoN traffic by MMSI (company users see only their own)
|
||
owned_aid_id → filter aid-position updates by aid_id
|
||
Admins (company_id=None) always receive everything.
|
||
|
||
When server_role == "SLAVE", also forwards the event upstream to the master.
|
||
"""
|
||
data = json.dumps(message)
|
||
dead = []
|
||
for client in connected_clients:
|
||
if not _client_may_see(client, owned_mmsi, owned_aid_id):
|
||
continue
|
||
try:
|
||
await client["ws"].send_text(data)
|
||
except Exception:
|
||
dead.append(client)
|
||
for c in dead:
|
||
if c in connected_clients:
|
||
connected_clients.remove(c)
|
||
|
||
# Forward upstream when acting as a slave
|
||
if _slave_relay is not None:
|
||
_slave_relay.send(message)
|
||
|
||
async def _persist_recording(db, alert: dict):
|
||
"""Save or close a RecordingEvent row when auto-recording triggers."""
|
||
from models.vessel import RecordingEvent
|
||
import uuid as _uuid
|
||
if alert["tipo"] == "GRABACION_INICIADA":
|
||
rec = RecordingEvent(
|
||
id=str(_uuid.uuid4()),
|
||
mmsi=alert["mmsi"],
|
||
aid_id=alert["aid_id"],
|
||
inicio=datetime.utcnow(),
|
||
trigger=alert.get("trigger", "PROXIMIDAD"),
|
||
)
|
||
db.add(rec)
|
||
db.commit()
|
||
elif alert["tipo"] == "GRABACION_FINALIZADA":
|
||
from sqlalchemy import and_
|
||
rec = db.query(RecordingEvent).filter(
|
||
and_(
|
||
RecordingEvent.mmsi == alert["mmsi"],
|
||
RecordingEvent.aid_id == alert["aid_id"],
|
||
RecordingEvent.cerrado == False,
|
||
)
|
||
).order_by(RecordingEvent.inicio.desc()).first()
|
||
if rec:
|
||
rec.fin = datetime.utcnow()
|
||
rec.distancia_min_m = alert.get("distancia_min_m")
|
||
rec.cerrado = True
|
||
db.commit()
|
||
|
||
async def process_message(msg: dict):
|
||
db = SessionLocal()
|
||
try:
|
||
if msg["type"] == "vessel":
|
||
mmsi = msg["mmsi"]
|
||
vessels_state[mmsi] = msg
|
||
aids_list = list(aids_state.values())
|
||
alerts = evaluate_vessel(msg, aids_list, config)
|
||
await broadcast(msg) # vessels = public traffic, no filter
|
||
for alert in alerts:
|
||
# Filter proximity alerts to the company that owns the aid involved
|
||
await broadcast({"type": "alert", **alert},
|
||
owned_aid_id=alert.get("aid_id"))
|
||
if alert["tipo"] in ("GRABACION_INICIADA", "GRABACION_FINALIZADA"):
|
||
await _persist_recording(db, alert)
|
||
|
||
# ── Auto-persist VesselTrack (DVR) ───────────────────────────────
|
||
lat, lon = msg.get("lat"), msg.get("lon")
|
||
if lat is not None and lon is not None:
|
||
now = datetime.utcnow()
|
||
prev = _vessel_track_last.get(mmsi)
|
||
elapsed = (now - prev["ts"]).total_seconds() if prev else 9999
|
||
if elapsed >= 10:
|
||
db.add(VesselTrack(
|
||
mmsi=mmsi, timestamp=now,
|
||
lat=lat, lon=lon,
|
||
sog=msg.get("sog"), cog=msg.get("cog"),
|
||
heading=msg.get("heading"),
|
||
))
|
||
db.commit()
|
||
_vessel_track_last[mmsi] = {"ts": now, "lat": lat, "lon": lon}
|
||
|
||
elif msg["type"] == "aton":
|
||
# Real AIS Type 21 / Type 8 from hardware receiver
|
||
entry = process_aton_message(msg)
|
||
if entry:
|
||
mmsi = entry["mmsi"]
|
||
aton_state[mmsi] = entry
|
||
_aton_last_seen[mmsi] = datetime.utcnow() # track for signal-loss
|
||
if _signal_loss_state.pop(mmsi, False):
|
||
# Signal restored — clear any active loss alert on clients
|
||
await broadcast({"type": "alert",
|
||
"tipo": "SENAL_RESTAURADA",
|
||
"mmsi": mmsi,
|
||
"timestamp": datetime.utcnow().isoformat()},
|
||
owned_mmsi=mmsi)
|
||
await broadcast({"type": "aton", **entry}, owned_mmsi=mmsi)
|
||
|
||
# Auto-upsert Aid row on first sight (Type 21 carries name + position).
|
||
# When an existing Aid with matching MMSI is found, update its
|
||
# lat_actual + run drift evaluation using that aid's per-buoy
|
||
# thresholds so the alert engine fires correctly.
|
||
if msg.get("msg_type") == 21 and entry.get("lat") is not None:
|
||
aid = db.query(Aid).filter(Aid.mmsi == mmsi).first()
|
||
if not aid:
|
||
from models.aid import Aid as _Aid
|
||
import uuid as _uuid
|
||
aid = _Aid(
|
||
id=str(_uuid.uuid4()),
|
||
nombre=entry.get("name") or f"AIS AtoN {mmsi}",
|
||
categoria="FLOTANTE",
|
||
tipo="BOYA_LATERAL",
|
||
tipo_ais="ATON_21",
|
||
mmsi=mmsi,
|
||
lat_nominal=entry["lat"],
|
||
lon_nominal=entry["lon"],
|
||
fuente_posicion="AIS",
|
||
source_chart="AIS",
|
||
)
|
||
db.add(aid); db.commit()
|
||
await broadcast({
|
||
"type": "alert",
|
||
"id": str(_uuid.uuid4()),
|
||
"tipo": "ALERTA_AMARILLA",
|
||
"subtipo": "AYUDA_NUEVA_SIN_CONFIGURAR",
|
||
"mmsi": mmsi,
|
||
"aid_id": aid.id,
|
||
"aid_nombre": aid.nombre,
|
||
"mensaje": "Nueva ayuda detectada — falta asignar lámpara",
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
})
|
||
else:
|
||
# Existing aid configured by operator (or auto-created).
|
||
# Update its actual position, displacement, and broadcast
|
||
# an aid_position event so the map can render the AIS
|
||
# ghost marker. Drift alerts use this aid's per-buoy
|
||
# thresholds (or fall back to global config).
|
||
from services.alert_engine import haversine
|
||
aid.lat_actual = entry["lat"]
|
||
aid.lon_actual = entry["lon"]
|
||
aid.desplazamiento_m = haversine(
|
||
entry["lat"], entry["lon"],
|
||
aid.lat_nominal, aid.lon_nominal,
|
||
)
|
||
aid.ultima_senal = datetime.utcnow()
|
||
# Drift evaluation — fires WARN/ALARM only on state changes
|
||
drift_alerts = evaluate_aid_movement(
|
||
aid.id, entry["lat"], entry["lon"],
|
||
aid.lat_nominal, aid.lon_nominal, config,
|
||
warn_m=aid.displacement_warn_m,
|
||
alarm_m=aid.displacement_alarm_m,
|
||
)
|
||
# en_posicion = within swing_radius of nominal
|
||
aid.en_posicion = (aid.desplazamiento_m
|
||
<= (aid.radio_borneo_m or 10.0))
|
||
db.commit()
|
||
await broadcast({
|
||
"type": "aid_position",
|
||
"id": aid.id,
|
||
"lat_actual": entry["lat"],
|
||
"lon_actual": entry["lon"],
|
||
"desplazamiento_m": round(aid.desplazamiento_m, 1),
|
||
"en_posicion": aid.en_posicion,
|
||
"en_movimiento": False,
|
||
}, owned_aid_id=aid.id)
|
||
for alert in drift_alerts:
|
||
await broadcast({"type": "alert", **alert},
|
||
owned_aid_id=alert.get("aid_id"))
|
||
|
||
# Battery threshold check — per-aid lamp values if assigned,
|
||
# otherwise the global defaults from settings_store.
|
||
v = entry.get("voltage_v")
|
||
if v is not None:
|
||
aid = db.query(Aid).filter(Aid.mmsi == mmsi).first()
|
||
lamp = None
|
||
if aid and aid.lamp_id:
|
||
lamp = db.query(Lamp).filter(Lamp.id == aid.lamp_id).first()
|
||
if lamp:
|
||
rng = lamp.voltage_max - lamp.voltage_min
|
||
warn_pct = (lamp.warn_pct or 20.0) / 100.0
|
||
alarm_pct = (lamp.alarm_pct or 10.0) / 100.0
|
||
warn_v = lamp.voltage_min + rng * warn_pct
|
||
alarm_v = lamp.voltage_min + rng * alarm_pct
|
||
threshold_source = f"lamp:{lamp.manufacturer} {lamp.model}"
|
||
else:
|
||
warn_v = config["battery_warn_v"]
|
||
alarm_v = config["battery_alarm_v"]
|
||
threshold_source = "global"
|
||
prev = _battery_alert_state.get(mmsi)
|
||
new_state = None
|
||
if v <= alarm_v: new_state = "ALARM"
|
||
elif v <= warn_v: new_state = "WARN"
|
||
if new_state and new_state != prev:
|
||
import uuid as _uuid
|
||
await broadcast({
|
||
"type": "alert",
|
||
"id": str(_uuid.uuid4()),
|
||
"tipo": "ALERTA_ROJA" if new_state == "ALARM" else "ALERTA_AMARILLA",
|
||
"subtipo": "BATERIA_BAJA",
|
||
"mmsi": mmsi,
|
||
"aid_nombre": entry.get("name"),
|
||
"voltage_v": v,
|
||
"umbral": alarm_v if new_state == "ALARM" else warn_v,
|
||
"fuente_umbral": threshold_source,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
}, owned_mmsi=mmsi) # only the company that owns this buoy
|
||
_battery_alert_state[mmsi] = new_state
|
||
|
||
# ── Digital input alerts (water ingress / listing) ────────────
|
||
if msg.get("msg_type") == 8:
|
||
aid = aid or db.query(Aid).filter(Aid.mmsi == mmsi).first()
|
||
if aid:
|
||
import uuid as _uuid2
|
||
_DIN_ALERT_MAP = {
|
||
"WATER_INGRESS_WARN": ("ALERTA_AMARILLA", "INGRESO_AGUA", "⚠ Water ingress detected"),
|
||
"WATER_INGRESS_CRITICAL": ("ALERTA_ROJA", "HUNDIMIENTO", "🔴 Critical water ingress — sinking risk"),
|
||
"LISTING": ("ALERTA_ROJA", "ESCORA_CRITICA", "🔴 Critical list detected"),
|
||
}
|
||
for din_field, fn_attr in [("din3", "din3_function"), ("din4", "din4_function")]:
|
||
fn = getattr(aid, fn_attr, None)
|
||
if not fn or fn not in _DIN_ALERT_MAP:
|
||
continue
|
||
triggered = entry.get(din_field, False)
|
||
# Also check IEC standard water level bit for CRITICAL
|
||
if fn == "WATER_INGRESS_CRITICAL":
|
||
triggered = triggered or entry.get("water_level_high", False)
|
||
state_key = f"{mmsi}_{din_field}"
|
||
prev_state = _digital_alert_state.get(state_key)
|
||
if triggered and not prev_state:
|
||
tipo, subtipo, mensaje = _DIN_ALERT_MAP[fn]
|
||
await broadcast({
|
||
"type": "alert",
|
||
"id": str(_uuid2.uuid4()),
|
||
"tipo": tipo,
|
||
"subtipo": subtipo,
|
||
"mmsi": mmsi,
|
||
"aid_id": aid.id,
|
||
"aid_nombre": aid.nombre,
|
||
"mensaje": mensaje,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
}, owned_mmsi=mmsi)
|
||
elif not triggered and prev_state:
|
||
# Condition cleared
|
||
await broadcast({
|
||
"type": "alert",
|
||
"tipo": "CONDICION_RESUELTA",
|
||
"subtipo": _DIN_ALERT_MAP[fn][1],
|
||
"mmsi": mmsi,
|
||
"aid_id": aid.id,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
}, owned_mmsi=mmsi)
|
||
_digital_alert_state[state_key] = triggered
|
||
|
||
# ── Auto-persist AtonTrack (DVR) ─────────────────────────────
|
||
at_lat = entry.get("lat")
|
||
at_lon = entry.get("lon")
|
||
if at_lat is not None and at_lon is not None:
|
||
now = datetime.utcnow()
|
||
prev = _aton_track_last.get(mmsi)
|
||
elapsed = (now - prev["ts"]).total_seconds() if prev else 9999
|
||
if elapsed >= 30: # AtoN updates less frequent → 30s threshold
|
||
db.add(AtonTrack(
|
||
mmsi=mmsi, timestamp=now,
|
||
lat=at_lat, lon=at_lon,
|
||
voltage_v=entry.get("voltage_v"),
|
||
off_position=entry.get("off_position"),
|
||
))
|
||
db.commit()
|
||
_aton_track_last[mmsi] = {"ts": now, "lat": at_lat, "lon": at_lon}
|
||
|
||
elif msg["type"] == "aid_position":
|
||
aid_id = msg["id"]
|
||
if aid_id in aids_state:
|
||
aids_state[aid_id].update(msg)
|
||
aid = db.query(Aid).filter(Aid.id == aid_id).first()
|
||
if aid:
|
||
alert_list = evaluate_aid_movement(
|
||
aid_id, msg["lat_actual"], msg["lon_actual"],
|
||
aid.lat_nominal, aid.lon_nominal, config,
|
||
warn_m=aid.displacement_warn_m, # per-aid override (None → global)
|
||
alarm_m=aid.displacement_alarm_m,
|
||
)
|
||
aid.lat_actual = msg["lat_actual"]
|
||
aid.lon_actual = msg["lon_actual"]
|
||
aid.desplazamiento_m = msg["desplazamiento_m"]
|
||
aid.en_posicion = msg["en_posicion"]
|
||
db.commit()
|
||
await broadcast(msg) # aid positions = public, no filter
|
||
for alert in alert_list:
|
||
# Only the company that owns this aid receives movement/position alerts
|
||
await broadcast({"type": "alert", **alert},
|
||
owned_aid_id=alert.get("aid_id"))
|
||
finally:
|
||
db.close()
|
||
|
||
async def _start_ais_source():
|
||
"""Start the AIS reader task matching the current config['ais_source']."""
|
||
src = config.get("ais_source", "SIMULATOR").upper()
|
||
if src == "SIMULATOR":
|
||
return asyncio.create_task(run_simulator(process_message))
|
||
if src == "SDR":
|
||
# Auto-launch AIS-catcher if hardware is present and not already running
|
||
if _ais_catcher.exe_exists() and not _ais_catcher.is_running():
|
||
result = _ais_catcher.launch()
|
||
if result["ok"]:
|
||
print(f"[ais] AIS-catcher launched (pid {result.get('pid')})")
|
||
else:
|
||
print(f"[ais] AIS-catcher launch failed: {result.get('error')}")
|
||
return asyncio.create_task(run_udp_listener(process_message, port=10110))
|
||
return None
|
||
|
||
|
||
async def _stop_ais_source():
|
||
global _ais_task
|
||
if _ais_task and not _ais_task.done():
|
||
_ais_task.cancel()
|
||
try: await _ais_task
|
||
except asyncio.CancelledError: pass
|
||
_ais_task = None
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
settings_store.init()
|
||
seed_aids()
|
||
seed_users()
|
||
seed_contacts()
|
||
seed_ports()
|
||
db = SessionLocal()
|
||
_build_ownership_cache(db)
|
||
db.close()
|
||
db = SessionLocal()
|
||
for aid in db.query(Aid).all():
|
||
aids_state[aid.id] = {
|
||
"id": aid.id, "nombre": aid.nombre, "categoria": aid.categoria,
|
||
"tipo": aid.tipo, "tipo_ais": aid.tipo_ais, "mmsi": aid.mmsi,
|
||
"lat_nominal": aid.lat_nominal, "lon_nominal": aid.lon_nominal,
|
||
"radio_borneo_m": aid.radio_borneo_m,
|
||
"lat_actual": aid.lat_actual, "lon_actual": aid.lon_actual,
|
||
"desplazamiento_m": aid.desplazamiento_m or 0,
|
||
"en_posicion": aid.en_posicion, "en_movimiento": aid.en_movimiento,
|
||
"caracteristica_luz": aid.caracteristica_luz, "alcance_nm": aid.alcance_nm,
|
||
"lamp_id": aid.lamp_id,
|
||
"puerto_responsable": aid.puerto_responsable,
|
||
"empresa_responsable": aid.empresa_responsable,
|
||
"ultima_senal": str(aid.ultima_senal) if aid.ultima_senal else None,
|
||
}
|
||
db.close()
|
||
|
||
global _ais_task
|
||
_ais_task = await _start_ais_source()
|
||
|
||
# ── Pre-warm chart-cell coverage cache in background ───────────────────
|
||
# Reading 691 cell files is a ~2.5 s one-time cost. Doing it at startup
|
||
# means the FIRST user request lands on a hot cache. Runs off the main
|
||
# event loop so the server is responsive immediately.
|
||
async def _warm_chart_cache():
|
||
try:
|
||
from services import chart_manager as _cm
|
||
import os
|
||
t0 = asyncio.get_event_loop().time()
|
||
count = 0
|
||
for cell_dir in _cm.CHARTS_DIR.iterdir():
|
||
if not cell_dir.is_dir():
|
||
continue
|
||
for fname in ("depths.geojson", "land.geojson", "zones.geojson",
|
||
"hazards.geojson", "features.geojson"):
|
||
cache = cell_dir / fname
|
||
if not cache.exists():
|
||
continue
|
||
try:
|
||
mtime = cache.stat().st_mtime
|
||
except OSError:
|
||
continue
|
||
cov_key = f"{cache}|{mtime}"
|
||
if cov_key in _cm._cell_coverage_cache:
|
||
continue
|
||
try:
|
||
import json as _json
|
||
fc = _json.loads(cache.read_text())
|
||
except Exception:
|
||
continue
|
||
cov = _cm._coverage_bbox(fc.get("features") or [])
|
||
if cov is not None:
|
||
_cm._cell_coverage_cache[cov_key] = cov
|
||
count += 1
|
||
# Yield to event loop every 50 files so we don't block
|
||
if count % 50 == 0:
|
||
await asyncio.sleep(0)
|
||
dt = asyncio.get_event_loop().time() - t0
|
||
print(f"[charts] Pre-warm: {count} files indexed in {dt:.1f}s "
|
||
f"({len(_cm._cell_coverage_cache)} cached bboxes)")
|
||
except Exception as e:
|
||
print(f"[charts] Pre-warm failed (non-fatal): {e}")
|
||
asyncio.create_task(_warm_chart_cache())
|
||
|
||
# ── Slave relay (start when role == SLAVE) ─────────────────────────────
|
||
global _slave_relay
|
||
role = config.get("server_role", "STANDALONE").upper()
|
||
master_url = config.get("master_url", "").strip()
|
||
slave_name = config.get("slave_name", "").strip() or config.get("station_name", "Slave")
|
||
if role == "SLAVE" and master_url:
|
||
_slave_relay = SlaveRelay(master_url=master_url, slave_name=slave_name)
|
||
await _slave_relay.start()
|
||
print(f"[cluster] Rol: SLAVE → maestro {master_url} (nombre='{slave_name}')")
|
||
elif role == "MASTER":
|
||
print(f"[cluster] Rol: MASTER — esperando conexiones de esclavos en /ws/slave")
|
||
|
||
# GPS — settings_store now holds the port (mirrored from .env on first run)
|
||
gps_port = config.get("gps_port") or None
|
||
gps_baud = config.get("gps_baud") or None
|
||
gps = GPSReader(broadcast, forced_port=gps_port, forced_baud=gps_baud)
|
||
await gps.start()
|
||
app.state.gps = gps
|
||
|
||
# ── Signal-loss monitor (runs every 60 s) ──────────────────────────────
|
||
async def _signal_loss_monitor():
|
||
import uuid as _uuid
|
||
while True:
|
||
await asyncio.sleep(60)
|
||
now = datetime.utcnow()
|
||
db2 = SessionLocal()
|
||
try:
|
||
for mmsi, last_seen in list(_aton_last_seen.items()):
|
||
aid = db2.query(Aid).filter(Aid.mmsi == mmsi).first()
|
||
if not aid or not aid.signal_loss_min:
|
||
continue
|
||
elapsed_min = (now - last_seen).total_seconds() / 60
|
||
if elapsed_min >= aid.signal_loss_min:
|
||
if not _signal_loss_state.get(mmsi):
|
||
_signal_loss_state[mmsi] = True
|
||
await broadcast({
|
||
"type": "alert",
|
||
"id": str(_uuid.uuid4()),
|
||
"tipo": "ALERTA_ROJA",
|
||
"subtipo": "PERDIDA_SENAL",
|
||
"mmsi": mmsi,
|
||
"aid_id": aid.id,
|
||
"aid_nombre": aid.nombre,
|
||
"minutos_sin_senal": round(elapsed_min),
|
||
"umbral_min": aid.signal_loss_min,
|
||
"timestamp": now.isoformat(),
|
||
}, owned_mmsi=mmsi)
|
||
else:
|
||
_signal_loss_state.pop(mmsi, None)
|
||
finally:
|
||
db2.close()
|
||
|
||
asyncio.create_task(_signal_loss_monitor())
|
||
|
||
yield
|
||
|
||
# ── Shutdown: stop slave relay if running ──────────────────────────────
|
||
if _slave_relay is not None:
|
||
await _slave_relay.stop()
|
||
|
||
app = FastAPI(title="AidsMonitoring", lifespan=lifespan)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=[
|
||
"http://localhost:8080",
|
||
"http://127.0.0.1:8080",
|
||
"http://localhost:5173",
|
||
"http://127.0.0.1:5173",
|
||
],
|
||
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
|
||
allow_headers=["Content-Type", "Authorization"],
|
||
)
|
||
# Compress GeoJSON / large JSON responses. minimum_size=1024 skips tiny ones
|
||
# (status pings, single-aid lookups) where the gzip overhead isn't worth it.
|
||
# Chart payloads (depths 350 MB, land 124 MB cross-cell total) compress 5–10×.
|
||
app.add_middleware(GZipMiddleware, minimum_size=1024)
|
||
|
||
app.include_router(auth_router.router)
|
||
app.include_router(aids.router)
|
||
app.include_router(charts_router.router)
|
||
app.include_router(equipment_router.router)
|
||
app.include_router(lamps_router.router)
|
||
app.include_router(contacts_router.router)
|
||
app.include_router(tracks_router.router)
|
||
app.include_router(org_router.router)
|
||
|
||
@app.post("/org/refresh")
|
||
async def refresh_ownership(
|
||
_user: User = Depends(require_admin),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""Rebuild the in-memory MMSI ownership cache from DB. Call after assigning/removing buoys."""
|
||
_build_ownership_cache(db)
|
||
return {"ok": True, "companies": len(_ownership_cache),
|
||
"total_mmsis": sum(len(v) for v in _ownership_cache.values())}
|
||
|
||
|
||
def _require_recording_permission(user: User):
|
||
"""ADMIN, SUPERADMIN, or CLIENT_ADMIN may start/stop recordings."""
|
||
from models.user import Role
|
||
if user.role not in (Role.ADMIN, Role.SUPERADMIN, Role.CLIENT_ADMIN):
|
||
from fastapi import HTTPException
|
||
raise HTTPException(status_code=403,
|
||
detail="Only ADMIN or CLIENT_ADMIN may control recordings")
|
||
|
||
@app.post("/recordings/start/{mmsi}")
|
||
async def manual_start_recording(
|
||
mmsi: str, aid_id: str = "MANUAL",
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
_require_recording_permission(current_user)
|
||
from services.alert_engine import active_recordings
|
||
from models.vessel import RecordingEvent
|
||
import uuid as _uuid
|
||
key = f"{mmsi}_{aid_id}"
|
||
if key in active_recordings:
|
||
return {"ok": False, "detail": "Already recording"}
|
||
active_recordings[key] = {
|
||
"mmsi": mmsi, "aid_id": aid_id, "aid_nombre": aid_id,
|
||
"inicio": datetime.utcnow().isoformat(),
|
||
"trigger": "MANUAL", "min_dist": 0, "pre_buffer": [],
|
||
}
|
||
db = SessionLocal()
|
||
try:
|
||
rec = RecordingEvent(
|
||
id=str(_uuid.uuid4()), mmsi=mmsi, aid_id=aid_id,
|
||
inicio=datetime.utcnow(), trigger="MANUAL",
|
||
)
|
||
db.add(rec); db.commit()
|
||
finally:
|
||
db.close()
|
||
vessel = vessels_state.get(mmsi, {})
|
||
await broadcast({"type": "alert", "tipo": "GRABACION_INICIADA",
|
||
"mmsi": mmsi, "aid_id": aid_id, "aid_nombre": aid_id,
|
||
"distancia_m": 0, "trigger": "MANUAL",
|
||
"timestamp": datetime.utcnow().isoformat()})
|
||
return {"ok": True}
|
||
|
||
@app.post("/recordings/stop/{mmsi}")
|
||
async def manual_stop_recording(
|
||
mmsi: str, aid_id: str = "MANUAL",
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
from services.alert_engine import active_recordings
|
||
from models.vessel import RecordingEvent
|
||
from sqlalchemy import and_
|
||
_require_recording_permission(current_user)
|
||
key = f"{mmsi}_{aid_id}"
|
||
rec_mem = active_recordings.pop(key, None)
|
||
min_dist = round(rec_mem["min_dist"], 1) if rec_mem else None
|
||
db = SessionLocal()
|
||
try:
|
||
rec = db.query(RecordingEvent).filter(
|
||
and_(RecordingEvent.mmsi == mmsi, RecordingEvent.aid_id == aid_id,
|
||
RecordingEvent.cerrado == False)
|
||
).order_by(RecordingEvent.inicio.desc()).first()
|
||
if rec:
|
||
rec.fin = datetime.utcnow()
|
||
rec.distancia_min_m = min_dist
|
||
rec.cerrado = True
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
await broadcast({"type": "alert", "tipo": "GRABACION_FINALIZADA",
|
||
"mmsi": mmsi, "aid_id": aid_id,
|
||
"distancia_min_m": min_dist,
|
||
"timestamp": datetime.utcnow().isoformat()})
|
||
return {"ok": True}
|
||
|
||
@app.get("/debug")
|
||
async def debug_state():
|
||
from services.alert_engine import aid_alert_state, aid_position_history
|
||
return {
|
||
"alert_states": dict(aid_alert_state),
|
||
"history_sizes": {k: len(v) for k, v in aid_position_history.items()},
|
||
"aids_in_state": list(aids_state.keys()),
|
||
"vessels_tracked": len(vessels_state),
|
||
}
|
||
|
||
@app.get("/settings")
|
||
async def get_settings(current_user: User = Depends(get_current_user),
|
||
db: Session = Depends(get_db)):
|
||
"""Return system defaults merged with this user's saved preferences."""
|
||
import json as _json
|
||
base = settings_store.get_all()
|
||
try:
|
||
user_row = db.query(User).filter(User.id == current_user.id).first()
|
||
prefs = _json.loads(user_row.prefs_json or '{}') if user_row else {}
|
||
except Exception:
|
||
prefs = {}
|
||
return {**base, **prefs}
|
||
|
||
|
||
@app.post("/settings")
|
||
async def update_settings(payload: dict,
|
||
current_user: User = Depends(get_current_user),
|
||
db: Session = Depends(get_db)):
|
||
"""Save user-specific preferences to the DB. System-level keys (GPS port,
|
||
AIS source) are also applied to the runtime config so they take effect
|
||
without a restart. Returns {ok, settings, applied: [...]}."""
|
||
import json as _json
|
||
global _ais_task
|
||
|
||
# Split: system keys go to settings_store; everything stays in user prefs
|
||
SYSTEM_KEYS = {"ais_source","ais_serial_port","ais_baud","ais_net_addr",
|
||
"gps_port","gps_baud","smtp_host","smtp_port","smtp_user",
|
||
"smtp_password","smtp_from","smtp_from_name","smtp_use_tls",
|
||
"server_role","master_url","slave_name"}
|
||
|
||
system_patch = {k: v for k, v in (payload or {}).items() if k in SYSTEM_KEYS}
|
||
prev = settings_store.get_all()
|
||
if system_patch:
|
||
settings_store.apply_patch(system_patch)
|
||
new = settings_store.get_all()
|
||
applied = []
|
||
|
||
# AIS source change
|
||
if system_patch.get("ais_source") and new["ais_source"] != prev["ais_source"]:
|
||
await _stop_ais_source()
|
||
_ais_task = await _start_ais_source()
|
||
applied.append(f"ais_source → {new['ais_source']}")
|
||
|
||
# Cluster role / master URL change — restart slave relay if needed
|
||
if "server_role" in system_patch or "master_url" in system_patch or "slave_name" in system_patch:
|
||
global _slave_relay
|
||
if _slave_relay is not None:
|
||
await _slave_relay.stop()
|
||
_slave_relay = None
|
||
role = new.get("server_role", "STANDALONE").upper()
|
||
master_url = new.get("master_url", "").strip()
|
||
sname = new.get("slave_name", "").strip() or new.get("station_name", "Slave")
|
||
if role == "SLAVE" and master_url:
|
||
_slave_relay = SlaveRelay(master_url=master_url, slave_name=sname)
|
||
await _slave_relay.start()
|
||
applied.append(f"server_role → SLAVE, relay → {master_url}")
|
||
else:
|
||
applied.append(f"server_role → {role}")
|
||
|
||
# GPS port change
|
||
if "gps_port" in system_patch or "gps_baud" in system_patch:
|
||
gps: GPSReader = getattr(app.state, "gps", None)
|
||
if gps:
|
||
gps._forced_port = new["gps_port"] or None
|
||
gps._forced_baud = new["gps_baud"] or None
|
||
gps._port = None
|
||
gps.status = "SEARCHING"
|
||
gps.reconnect()
|
||
applied.append(f"gps_port → {new['gps_port'] or 'auto'} @ {new['gps_baud']}")
|
||
|
||
# Save full payload as this user's preferences
|
||
try:
|
||
user_row = db.query(User).filter(User.id == current_user.id).first()
|
||
if user_row:
|
||
existing = _json.loads(user_row.prefs_json or '{}')
|
||
existing.update(payload or {})
|
||
user_row.prefs_json = _json.dumps(existing)
|
||
db.commit()
|
||
except Exception:
|
||
pass
|
||
|
||
return {"ok": True, "settings": {**new, **(payload or {})}, "applied": applied}
|
||
|
||
|
||
@app.get("/gps/status")
|
||
async def gps_status(request: Request):
|
||
gps: GPSReader = getattr(request.app.state, "gps", None)
|
||
if not gps:
|
||
return {"status": "DISABLED", "port": None, "last_fix": None}
|
||
return {
|
||
"status": gps.status,
|
||
"port": gps._port or gps._forced_port,
|
||
"forced_port": gps._forced_port,
|
||
"baud": gps._baud,
|
||
"last_fix": gps.last_fix,
|
||
}
|
||
|
||
@app.post("/gps/port")
|
||
async def set_gps_port(request: Request, port: str = "", baud: int = 9600):
|
||
"""Hot-change GPS port without server restart."""
|
||
gps: GPSReader = getattr(request.app.state, "gps", None)
|
||
if not gps:
|
||
return {"ok": False, "detail": "GPS reader not running"}
|
||
gps._forced_port = port.strip() or None
|
||
gps._forced_baud = baud or None
|
||
gps._port = None # force reconnect on next loop iteration
|
||
gps.status = "SEARCHING"
|
||
gps.reconnect() # close active serial so _read_loop exits and re-probes
|
||
# Persist to .env for next restart
|
||
env_path = os.path.join(os.path.dirname(__file__), '..', '.env')
|
||
try:
|
||
lines = open(env_path).readlines()
|
||
def _set(lines, key, val):
|
||
for i, l in enumerate(lines):
|
||
if l.startswith(f"{key}="):
|
||
lines[i] = f"{key}={val}\n"; return lines
|
||
lines.append(f"{key}={val}\n"); return lines
|
||
lines = _set(lines, "GPS_PORT", port.strip())
|
||
lines = _set(lines, "GPS_BAUD", str(baud))
|
||
open(env_path, 'w').writelines(lines)
|
||
except Exception as e:
|
||
pass # non-fatal — in-memory change already applied
|
||
return {"ok": True, "port": gps._forced_port, "baud": baud}
|
||
|
||
@app.get("/ais/status")
|
||
async def ais_status():
|
||
"""Return AIS-catcher process status and SDR hardware info."""
|
||
return _ais_catcher.status()
|
||
|
||
|
||
@app.get("/ais/stats")
|
||
async def ais_stats():
|
||
"""Return live UDP listener stats: sentence counts, last receive time."""
|
||
import time as _time
|
||
udp = _ais_udp.get_stats()
|
||
catcher = _ais_catcher.status()
|
||
last_ts = udp.get("last_sentence_ts")
|
||
seconds_since = round(_time.time() - last_ts, 1) if last_ts else None
|
||
return {
|
||
**udp,
|
||
"seconds_since_last": seconds_since,
|
||
"signal": last_ts is not None and seconds_since < 60,
|
||
"catcher_running": catcher["running"],
|
||
"sdr_devices": catcher["sdr_devices"],
|
||
"ais_source": config.get("ais_source", "SIMULATOR"),
|
||
}
|
||
|
||
|
||
@app.get("/ais/raw")
|
||
async def ais_raw():
|
||
"""Return last 30 raw NMEA sentences received."""
|
||
return {"sentences": _ais_udp.get_raw()}
|
||
|
||
|
||
@app.post("/ais/launch")
|
||
async def ais_launch(udp_port: int = 10110, web_port: int = 8100):
|
||
"""Launch AIS-catcher.exe (auto-switches ais_source to SDR)."""
|
||
global _ais_task
|
||
result = _ais_catcher.launch(udp_port=udp_port, web_port=web_port)
|
||
# Always sync source to SDR after a successful launch (or already-running),
|
||
# otherwise the catcher streams to UDP:10110 with no listener attached.
|
||
if result["ok"] and config.get("ais_source", "SIMULATOR").upper() != "SDR":
|
||
settings_store.apply_patch({"ais_source": "SDR"})
|
||
await _stop_ais_source()
|
||
_ais_task = await _start_ais_source()
|
||
return result
|
||
|
||
|
||
@app.post("/ais/stop")
|
||
async def ais_stop():
|
||
"""Stop AIS-catcher.exe."""
|
||
return _ais_catcher.stop()
|
||
|
||
|
||
@app.websocket("/ws/slave")
|
||
async def slave_websocket_endpoint(ws: WebSocket):
|
||
"""
|
||
MASTER-mode endpoint. Slave servers connect here to stream their events.
|
||
The master rebroadcasts each event to all locally connected clients (after
|
||
tagging it with _slave origin), and merges vessel/aton state in memory.
|
||
"""
|
||
if config.get("server_role", "STANDALONE").upper() not in ("MASTER", "STANDALONE"):
|
||
await ws.close(code=1008, reason="This server is not a MASTER")
|
||
return
|
||
|
||
await ws.accept()
|
||
slave_name = "unknown"
|
||
print(f"[cluster] Nuevo esclavo conectado desde {ws.client}")
|
||
|
||
try:
|
||
while True:
|
||
raw = await ws.receive_text()
|
||
try:
|
||
msg = json.loads(raw)
|
||
except Exception:
|
||
continue
|
||
|
||
msg_type = msg.get("type")
|
||
|
||
# Hello handshake — log the slave name
|
||
if msg_type == "slave_hello":
|
||
slave_name = msg.get("slave_name", "unknown")
|
||
print(f"[cluster] Esclavo identificado: '{slave_name}'")
|
||
continue
|
||
|
||
# Merge vessel state into master's in-memory snapshot
|
||
if msg_type == "vessel":
|
||
mmsi = msg.get("mmsi")
|
||
if mmsi:
|
||
vessels_state[mmsi] = msg
|
||
|
||
elif msg_type == "aton":
|
||
mmsi = msg.get("mmsi")
|
||
if mmsi:
|
||
from services.aton_decoder import aton_state as _aton_state
|
||
_aton_state[mmsi] = msg
|
||
|
||
elif msg_type == "aid_position":
|
||
aid_id = msg.get("id")
|
||
if aid_id and aid_id in aids_state:
|
||
aids_state[aid_id].update(msg)
|
||
|
||
# Rebroadcast to all locally connected browser clients
|
||
# _slave tag identifies origin; admins see all; company clients
|
||
# still filtered by ownership as usual.
|
||
await broadcast(msg)
|
||
|
||
except WebSocketDisconnect:
|
||
print(f"[cluster] Esclavo '{slave_name}' desconectado.")
|
||
except Exception as e:
|
||
print(f"[cluster] Error con esclavo '{slave_name}': {e}")
|
||
|
||
|
||
@app.get("/cluster/status")
|
||
async def cluster_status(current_user=Depends(get_current_user)):
|
||
"""Return current cluster role and slave relay status."""
|
||
role = config.get("server_role", "STANDALONE").upper()
|
||
info: dict = {
|
||
"role": role,
|
||
"slave_name": config.get("slave_name", ""),
|
||
"master_url": config.get("master_url", ""),
|
||
}
|
||
if role == "SLAVE" and _slave_relay is not None:
|
||
info["relay"] = _slave_relay.status()
|
||
return info
|
||
|
||
|
||
@app.websocket("/ws")
|
||
async def websocket_endpoint(
|
||
ws: WebSocket,
|
||
token: str | None = _Query(default=None),
|
||
):
|
||
await ws.accept()
|
||
|
||
# ── Resolve company_id from optional JWT token ────────────────────────────
|
||
company_id = None
|
||
if token:
|
||
try:
|
||
from jose import jwt as _jwt, JWTError as _JWTError
|
||
from routers.auth import SECRET_KEY, ALGORITHM
|
||
payload = _jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||
username = payload.get("sub")
|
||
if username:
|
||
_db = SessionLocal()
|
||
try:
|
||
_u = _db.query(User).filter(User.username == username,
|
||
User.activo == True).first()
|
||
if _u:
|
||
company_id = getattr(_u, "company_id", None)
|
||
finally:
|
||
_db.close()
|
||
except Exception:
|
||
await ws.close(code=1008)
|
||
return
|
||
|
||
if token is None:
|
||
await ws.close(code=1008)
|
||
return
|
||
|
||
client = {"ws": ws, "company_id": company_id}
|
||
connected_clients.append(client)
|
||
|
||
# ── Build filtered init snapshot ──────────────────────────────────────────
|
||
# company users see ONLY their company's aids/vessels/atons
|
||
# admins (company_id=None) see everything
|
||
owned_mmsis = _ownership_cache.get(company_id) if company_id else None
|
||
owned_aid_ids= _aid_ownership_cache.get(company_id) if company_id else None
|
||
|
||
# aids = official positions → visible to ALL users (public nav info)
|
||
# vessels= ship AIS traffic → visible to ALL users
|
||
# atons = live AIS AtoN msgs → filtered by company MMSI ownership
|
||
init_aids = list(aids_state.values())
|
||
init_vessels = list(vessels_state.values())
|
||
|
||
if company_id is None:
|
||
# Admin / superadmin → all AtoNs
|
||
init_atons = list(aton_state.values())
|
||
else:
|
||
# Client role: get their owned MMSIs (empty set = no buoys assigned = sees nothing)
|
||
client_mmsis = _ownership_cache.get(company_id, set())
|
||
init_atons = [a for a in aton_state.values()
|
||
if a.get("mmsi") in client_mmsis]
|
||
|
||
await ws.send_text(json.dumps({
|
||
"type": "init",
|
||
"aids": init_aids,
|
||
"vessels": init_vessels,
|
||
"atons": init_atons,
|
||
}))
|
||
|
||
# Re-emit any active aid alerts so new clients see current state
|
||
# Company users only receive alerts for aids they own
|
||
from datetime import datetime as _dt
|
||
for aid_id, state in aid_alert_state.items():
|
||
if owned_aid_ids is not None and aid_id not in owned_aid_ids:
|
||
continue # this aid doesn't belong to the connecting client's company
|
||
if state == 'RED':
|
||
await ws.send_text(json.dumps({"type": "alert", "tipo": "ALERTA_ROJA",
|
||
"subtipo": "AYUDA_EN_MOVIMIENTO", "aid_id": aid_id,
|
||
"mensaje": "Movimiento continuo detectado",
|
||
"timestamp": _dt.utcnow().isoformat()}))
|
||
elif state == 'YELLOW':
|
||
aid = aids_state.get(aid_id, {})
|
||
await ws.send_text(json.dumps({"type": "alert", "tipo": "ALERTA_AMARILLA",
|
||
"subtipo": "AYUDA_FUERA_POSICION", "aid_id": aid_id,
|
||
"desplazamiento_m": round(aid.get("desplazamiento_m", 0), 1),
|
||
"timestamp": _dt.utcnow().isoformat()}))
|
||
try:
|
||
while True:
|
||
await ws.receive_text()
|
||
except WebSocketDisconnect:
|
||
if client in connected_clients:
|
||
connected_clients.remove(client)
|
||
|
||
app.mount("/", StaticFiles(directory=os.path.join(os.path.dirname(__file__), '..', 'frontend'), html=True), name="frontend")
|