cfd94f905a
- Restrict CORS to localhost origins (was allow_origins=[*])
- Require valid JWT on WebSocket /ws (anonymous no longer gets admin view)
- Fix path traversal in delete_cell(): resolve() + parent check
- Validate cell_id format in /charts/download-noaa/{cell_id}
- Exclude charts/ and Cartas/ from git (keep US1GC09M world overview)
- Add NOAA ENC Portal external link in charts catalog tab
- Untrack __pycache__/, .db, .claude/ session files
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
201 lines
7.7 KiB
Python
201 lines
7.7 KiB
Python
"""
|
||
AIS ATON decoder — Message Type 21 and Type 8 DAC=1,FI=22 (IEC 62320-2).
|
||
Maintains live ATON state keyed by MMSI.
|
||
"""
|
||
from datetime import datetime
|
||
from collections import defaultdict, deque
|
||
|
||
# Live ATON state: mmsi -> dict
|
||
aton_state: dict = {}
|
||
|
||
# Position history for movement detection: mmsi -> deque of (lat,lon,ts)
|
||
aton_history: dict = defaultdict(lambda: deque(maxlen=30))
|
||
|
||
ATON_TYPES = {
|
||
1:"Default",2:"Reference point",3:"RACON",4:"Fixed structure",5:"Spare",
|
||
6:"Light, no sectors",7:"Light, with sectors",8:"Leading light front",
|
||
9:"Leading light rear",10:"Beacon, cardinal N",11:"Beacon, cardinal NE",
|
||
12:"Beacon, cardinal E",13:"Beacon, cardinal SE",14:"Beacon, cardinal S",
|
||
15:"Beacon, cardinal SW",16:"Beacon, cardinal W",17:"Beacon, cardinal NW",
|
||
18:"Beacon, port hand",19:"Beacon, starboard hand",20:"Beacon, preferred channel port",
|
||
21:"Beacon, preferred channel starboard",22:"Beacon, isolated danger",
|
||
23:"Beacon, safe water",24:"Beacon, special mark",25:"Beacon, light vessel",
|
||
26:"LANBY",27:"Buoy, cardinal N",28:"Buoy, cardinal NE",29:"Buoy, cardinal E",
|
||
30:"Buoy, cardinal SE",31:"Buoy, cardinal S", # continues...
|
||
}
|
||
|
||
def _bits(payload: str, start: int, length: int) -> int:
|
||
"""Extract integer from AIS 6-bit ASCII payload."""
|
||
acc = 0
|
||
for i in range(start, start + length):
|
||
if i >= len(payload) * 6:
|
||
return 0
|
||
char_idx = i // 6
|
||
bit_idx = 5 - (i % 6)
|
||
char_val = ord(payload[char_idx]) - 48
|
||
if char_val > 39:
|
||
char_val -= 8
|
||
acc = (acc << 1) | ((char_val >> bit_idx) & 1)
|
||
return acc
|
||
|
||
def _signed(val: int, bits: int) -> int:
|
||
if val >= (1 << (bits - 1)):
|
||
val -= (1 << bits)
|
||
return val
|
||
|
||
def _text(payload: str, start: int, length: int) -> str:
|
||
chars = []
|
||
for i in range(length // 6):
|
||
v = _bits(payload, start + i * 6, 6)
|
||
if v < 32:
|
||
v += 64
|
||
if v == 64:
|
||
break
|
||
chars.append(chr(v))
|
||
return ''.join(chars).strip('@').strip()
|
||
|
||
def decode_type21(payload: str) -> dict | None:
|
||
"""Decode AIS Message Type 21 — Aid-to-Navigation Report."""
|
||
try:
|
||
msg_type = _bits(payload, 0, 6)
|
||
if msg_type != 21:
|
||
return None
|
||
mmsi = _bits(payload, 8, 30)
|
||
aton_t = _bits(payload, 38, 5)
|
||
name = _text(payload, 43, 120)
|
||
accuracy = _bits(payload, 163, 1)
|
||
lon = _signed(_bits(payload, 164, 28), 28) / 600000.0
|
||
lat = _signed(_bits(payload, 192, 27), 27) / 600000.0
|
||
off_pos = bool(_bits(payload, 259, 1))
|
||
virtual = bool(_bits(payload, 269, 1))
|
||
name_ext = _text(payload, 272, min(14*6, (len(payload)-272//6)*6)) if len(payload)*6 > 272 else ""
|
||
|
||
if lat == 0.0 and lon == 0.0:
|
||
return None
|
||
|
||
return {
|
||
"mmsi": str(mmsi),
|
||
"msg_type": 21,
|
||
"aton_type": aton_t,
|
||
"aton_name": (name + name_ext).strip() or f"ATON {mmsi}",
|
||
"lat": lat,
|
||
"lon": lon,
|
||
"accuracy": bool(accuracy),
|
||
"off_position": off_pos,
|
||
"virtual": virtual,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
}
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def decode_type8_aton(payload: str) -> dict | None:
|
||
"""
|
||
Decode AIS Message Type 8, DAC=1, FI=22 — IEC 62320-2 ATON monitoring.
|
||
Carries: voltage, sensors, lamp/racon status, off-position.
|
||
"""
|
||
try:
|
||
msg_type = _bits(payload, 0, 6)
|
||
if msg_type != 8:
|
||
return None
|
||
mmsi = _bits(payload, 8, 30)
|
||
dac = _bits(payload, 40, 10)
|
||
fi = _bits(payload, 50, 6)
|
||
if dac != 1 or fi != 22:
|
||
return None
|
||
|
||
off_pos = bool(_bits(payload, 56, 1))
|
||
# Analog 1 — typically main battery voltage (0-409.4 V, res 0.1 V)
|
||
analog1 = _bits(payload, 57, 12) * 0.05 # 0-204.75 V
|
||
# Analog 2 — external sensor 1 (temperature or secondary voltage)
|
||
analog2 = _bits(payload, 69, 12) * 0.05
|
||
# Analog 3 & 4 — additional sensors
|
||
analog3 = _bits(payload, 81, 12) * 0.05
|
||
analog4 = _bits(payload, 93, 12) * 0.05
|
||
# Digital bits: racon, lamp, buoy code alarm, controller alarm, etc.
|
||
racon = bool(_bits(payload, 105, 2))
|
||
light_ok = bool(_bits(payload, 107, 2))
|
||
health = _bits(payload, 109, 2) # 0=ok,1=warn,2=alarm,3=no signal
|
||
battery_low = bool(_bits(payload, 111, 1))
|
||
|
||
# IEC 62320-2 extended digital inputs (bits 112–119)
|
||
# bit 112 = buoy code / hull integrity → water ingress sensor (IN3)
|
||
# bit 113 = controller alarm → listing/tilt sensor (IN4)
|
||
# bit 114 = fog signal status
|
||
# bit 115 = EPIRB armed
|
||
# bit 116 = water level alarm → bilge high (critical)
|
||
# bits 117-119 = spare / manufacturer-defined
|
||
din3 = bool(_bits(payload, 112, 1)) # hull/water ingress
|
||
din4 = bool(_bits(payload, 113, 1)) # listing/tilt
|
||
water_level = bool(_bits(payload, 116, 1)) # bilge high level
|
||
|
||
return {
|
||
"mmsi": str(mmsi),
|
||
"msg_type": 8,
|
||
"dac": dac,
|
||
"fi": fi,
|
||
"off_position":off_pos,
|
||
"voltage_v": round(analog1, 2) if analog1 > 0 else None,
|
||
"sensor1": round(analog2, 2) if analog2 > 0 else None,
|
||
"sensor2": round(analog3, 2) if analog3 > 0 else None,
|
||
"sensor3": round(analog4, 2) if analog4 > 0 else None,
|
||
"racon_ok": racon,
|
||
"light_ok": light_ok,
|
||
"health": health,
|
||
"battery_low": battery_low,
|
||
# Digital inputs — meaning depends on din3_function / din4_function
|
||
# configured per-aid in the Aid model
|
||
"din3": din3, # IN3 state (True = triggered)
|
||
"din4": din4, # IN4 state (True = triggered)
|
||
"water_level_high": water_level, # bilge high level (IEC standard bit)
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
}
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def process_aton_message(raw: dict) -> dict | None:
|
||
"""
|
||
Process a decoded ATON message and update aton_state.
|
||
Returns the updated state entry (to broadcast) or None.
|
||
"""
|
||
mmsi = raw.get("mmsi")
|
||
if not mmsi:
|
||
return None
|
||
|
||
if mmsi not in aton_state:
|
||
aton_state[mmsi] = {"mmsi": mmsi, "source": "AIS_ATON"}
|
||
|
||
entry = aton_state[mmsi]
|
||
|
||
if raw["msg_type"] == 21:
|
||
entry.update({
|
||
"aton_type": raw["aton_type"],
|
||
"aton_type_name": ATON_TYPES.get(raw["aton_type"], f"Type {raw['aton_type']}"),
|
||
"name": raw["aton_name"],
|
||
"lat": raw["lat"],
|
||
"lon": raw["lon"],
|
||
"accuracy": raw["accuracy"],
|
||
"off_position": raw["off_position"],
|
||
"virtual": raw["virtual"],
|
||
"last_type21": raw["timestamp"],
|
||
})
|
||
aton_history[mmsi].append((raw["lat"], raw["lon"], raw["timestamp"]))
|
||
|
||
elif raw["msg_type"] == 8:
|
||
entry.update({
|
||
"voltage_v": raw.get("voltage_v"),
|
||
"sensor1": raw.get("sensor1"),
|
||
"sensor2": raw.get("sensor2"),
|
||
"sensor3": raw.get("sensor3"),
|
||
"racon_ok": raw.get("racon_ok"),
|
||
"light_ok": raw.get("light_ok"),
|
||
"health": raw.get("health"),
|
||
"battery_low": raw.get("battery_low"),
|
||
"off_position": raw.get("off_position", entry.get("off_position", False)),
|
||
"last_type8": raw["timestamp"],
|
||
})
|
||
|
||
entry["last_update"] = datetime.utcnow().isoformat()
|
||
return entry
|