Files
alro65 cfd94f905a security: CORS hardening, path traversal fix, WebSocket auth + cleanup
- Restrict CORS to localhost origins (was allow_origins=[*])
- Require valid JWT on WebSocket /ws (anonymous no longer gets admin view)
- Fix path traversal in delete_cell(): resolve() + parent check
- Validate cell_id format in /charts/download-noaa/{cell_id}
- Exclude charts/ and Cartas/ from git (keep US1GC09M world overview)
- Add NOAA ENC Portal external link in charts catalog tab
- Untrack __pycache__/, .db, .claude/ session files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-07-03 12:45:43 -04:00

201 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AIS ATON decoder — Message Type 21 and Type 8 DAC=1,FI=22 (IEC 62320-2).
Maintains live ATON state keyed by MMSI.
"""
from datetime import datetime
from collections import defaultdict, deque
# Live ATON state: mmsi -> dict
aton_state: dict = {}
# Position history for movement detection: mmsi -> deque of (lat,lon,ts)
aton_history: dict = defaultdict(lambda: deque(maxlen=30))
ATON_TYPES = {
1:"Default",2:"Reference point",3:"RACON",4:"Fixed structure",5:"Spare",
6:"Light, no sectors",7:"Light, with sectors",8:"Leading light front",
9:"Leading light rear",10:"Beacon, cardinal N",11:"Beacon, cardinal NE",
12:"Beacon, cardinal E",13:"Beacon, cardinal SE",14:"Beacon, cardinal S",
15:"Beacon, cardinal SW",16:"Beacon, cardinal W",17:"Beacon, cardinal NW",
18:"Beacon, port hand",19:"Beacon, starboard hand",20:"Beacon, preferred channel port",
21:"Beacon, preferred channel starboard",22:"Beacon, isolated danger",
23:"Beacon, safe water",24:"Beacon, special mark",25:"Beacon, light vessel",
26:"LANBY",27:"Buoy, cardinal N",28:"Buoy, cardinal NE",29:"Buoy, cardinal E",
30:"Buoy, cardinal SE",31:"Buoy, cardinal S", # continues...
}
def _bits(payload: str, start: int, length: int) -> int:
"""Extract integer from AIS 6-bit ASCII payload."""
acc = 0
for i in range(start, start + length):
if i >= len(payload) * 6:
return 0
char_idx = i // 6
bit_idx = 5 - (i % 6)
char_val = ord(payload[char_idx]) - 48
if char_val > 39:
char_val -= 8
acc = (acc << 1) | ((char_val >> bit_idx) & 1)
return acc
def _signed(val: int, bits: int) -> int:
if val >= (1 << (bits - 1)):
val -= (1 << bits)
return val
def _text(payload: str, start: int, length: int) -> str:
chars = []
for i in range(length // 6):
v = _bits(payload, start + i * 6, 6)
if v < 32:
v += 64
if v == 64:
break
chars.append(chr(v))
return ''.join(chars).strip('@').strip()
def decode_type21(payload: str) -> dict | None:
"""Decode AIS Message Type 21 — Aid-to-Navigation Report."""
try:
msg_type = _bits(payload, 0, 6)
if msg_type != 21:
return None
mmsi = _bits(payload, 8, 30)
aton_t = _bits(payload, 38, 5)
name = _text(payload, 43, 120)
accuracy = _bits(payload, 163, 1)
lon = _signed(_bits(payload, 164, 28), 28) / 600000.0
lat = _signed(_bits(payload, 192, 27), 27) / 600000.0
off_pos = bool(_bits(payload, 259, 1))
virtual = bool(_bits(payload, 269, 1))
name_ext = _text(payload, 272, min(14*6, (len(payload)-272//6)*6)) if len(payload)*6 > 272 else ""
if lat == 0.0 and lon == 0.0:
return None
return {
"mmsi": str(mmsi),
"msg_type": 21,
"aton_type": aton_t,
"aton_name": (name + name_ext).strip() or f"ATON {mmsi}",
"lat": lat,
"lon": lon,
"accuracy": bool(accuracy),
"off_position": off_pos,
"virtual": virtual,
"timestamp": datetime.utcnow().isoformat(),
}
except Exception:
return None
def decode_type8_aton(payload: str) -> dict | None:
"""
Decode AIS Message Type 8, DAC=1, FI=22 — IEC 62320-2 ATON monitoring.
Carries: voltage, sensors, lamp/racon status, off-position.
"""
try:
msg_type = _bits(payload, 0, 6)
if msg_type != 8:
return None
mmsi = _bits(payload, 8, 30)
dac = _bits(payload, 40, 10)
fi = _bits(payload, 50, 6)
if dac != 1 or fi != 22:
return None
off_pos = bool(_bits(payload, 56, 1))
# Analog 1 — typically main battery voltage (0-409.4 V, res 0.1 V)
analog1 = _bits(payload, 57, 12) * 0.05 # 0-204.75 V
# Analog 2 — external sensor 1 (temperature or secondary voltage)
analog2 = _bits(payload, 69, 12) * 0.05
# Analog 3 & 4 — additional sensors
analog3 = _bits(payload, 81, 12) * 0.05
analog4 = _bits(payload, 93, 12) * 0.05
# Digital bits: racon, lamp, buoy code alarm, controller alarm, etc.
racon = bool(_bits(payload, 105, 2))
light_ok = bool(_bits(payload, 107, 2))
health = _bits(payload, 109, 2) # 0=ok,1=warn,2=alarm,3=no signal
battery_low = bool(_bits(payload, 111, 1))
# IEC 62320-2 extended digital inputs (bits 112119)
# bit 112 = buoy code / hull integrity → water ingress sensor (IN3)
# bit 113 = controller alarm → listing/tilt sensor (IN4)
# bit 114 = fog signal status
# bit 115 = EPIRB armed
# bit 116 = water level alarm → bilge high (critical)
# bits 117-119 = spare / manufacturer-defined
din3 = bool(_bits(payload, 112, 1)) # hull/water ingress
din4 = bool(_bits(payload, 113, 1)) # listing/tilt
water_level = bool(_bits(payload, 116, 1)) # bilge high level
return {
"mmsi": str(mmsi),
"msg_type": 8,
"dac": dac,
"fi": fi,
"off_position":off_pos,
"voltage_v": round(analog1, 2) if analog1 > 0 else None,
"sensor1": round(analog2, 2) if analog2 > 0 else None,
"sensor2": round(analog3, 2) if analog3 > 0 else None,
"sensor3": round(analog4, 2) if analog4 > 0 else None,
"racon_ok": racon,
"light_ok": light_ok,
"health": health,
"battery_low": battery_low,
# Digital inputs — meaning depends on din3_function / din4_function
# configured per-aid in the Aid model
"din3": din3, # IN3 state (True = triggered)
"din4": din4, # IN4 state (True = triggered)
"water_level_high": water_level, # bilge high level (IEC standard bit)
"timestamp": datetime.utcnow().isoformat(),
}
except Exception:
return None
def process_aton_message(raw: dict) -> dict | None:
"""
Process a decoded ATON message and update aton_state.
Returns the updated state entry (to broadcast) or None.
"""
mmsi = raw.get("mmsi")
if not mmsi:
return None
if mmsi not in aton_state:
aton_state[mmsi] = {"mmsi": mmsi, "source": "AIS_ATON"}
entry = aton_state[mmsi]
if raw["msg_type"] == 21:
entry.update({
"aton_type": raw["aton_type"],
"aton_type_name": ATON_TYPES.get(raw["aton_type"], f"Type {raw['aton_type']}"),
"name": raw["aton_name"],
"lat": raw["lat"],
"lon": raw["lon"],
"accuracy": raw["accuracy"],
"off_position": raw["off_position"],
"virtual": raw["virtual"],
"last_type21": raw["timestamp"],
})
aton_history[mmsi].append((raw["lat"], raw["lon"], raw["timestamp"]))
elif raw["msg_type"] == 8:
entry.update({
"voltage_v": raw.get("voltage_v"),
"sensor1": raw.get("sensor1"),
"sensor2": raw.get("sensor2"),
"sensor3": raw.get("sensor3"),
"racon_ok": raw.get("racon_ok"),
"light_ok": raw.get("light_ok"),
"health": raw.get("health"),
"battery_low": raw.get("battery_low"),
"off_position": raw.get("off_position", entry.get("off_position", False)),
"last_type8": raw["timestamp"],
})
entry["last_update"] = datetime.utcnow().isoformat()
return entry