Files
AidsMonitoring/backend/services/aton_decoder.py
T

185 lines
6.8 KiB
Python

"""
AIS ATON decoder — Message Type 21 and Type 8 DAC=1,FI=22 (IEC 62320-2).
Maintains live ATON state keyed by MMSI.
"""
from datetime import datetime
from collections import defaultdict, deque
# Live ATON state: mmsi -> dict
aton_state: dict = {}
# Position history for movement detection: mmsi -> deque of (lat,lon,ts)
aton_history: dict = defaultdict(lambda: deque(maxlen=30))
ATON_TYPES = {
1:"Default",2:"Reference point",3:"RACON",4:"Fixed structure",5:"Spare",
6:"Light, no sectors",7:"Light, with sectors",8:"Leading light front",
9:"Leading light rear",10:"Beacon, cardinal N",11:"Beacon, cardinal NE",
12:"Beacon, cardinal E",13:"Beacon, cardinal SE",14:"Beacon, cardinal S",
15:"Beacon, cardinal SW",16:"Beacon, cardinal W",17:"Beacon, cardinal NW",
18:"Beacon, port hand",19:"Beacon, starboard hand",20:"Beacon, preferred channel port",
21:"Beacon, preferred channel starboard",22:"Beacon, isolated danger",
23:"Beacon, safe water",24:"Beacon, special mark",25:"Beacon, light vessel",
26:"LANBY",27:"Buoy, cardinal N",28:"Buoy, cardinal NE",29:"Buoy, cardinal E",
30:"Buoy, cardinal SE",31:"Buoy, cardinal S", # continues...
}
def _bits(payload: str, start: int, length: int) -> int:
"""Extract integer from AIS 6-bit ASCII payload."""
acc = 0
for i in range(start, start + length):
if i >= len(payload) * 6:
return 0
char_idx = i // 6
bit_idx = 5 - (i % 6)
char_val = ord(payload[char_idx]) - 48
if char_val > 39:
char_val -= 8
acc = (acc << 1) | ((char_val >> bit_idx) & 1)
return acc
def _signed(val: int, bits: int) -> int:
if val >= (1 << (bits - 1)):
val -= (1 << bits)
return val
def _text(payload: str, start: int, length: int) -> str:
chars = []
for i in range(length // 6):
v = _bits(payload, start + i * 6, 6)
if v < 32:
v += 64
if v == 64:
break
chars.append(chr(v))
return ''.join(chars).strip('@').strip()
def decode_type21(payload: str) -> dict | None:
"""Decode AIS Message Type 21 — Aid-to-Navigation Report."""
try:
msg_type = _bits(payload, 0, 6)
if msg_type != 21:
return None
mmsi = _bits(payload, 8, 30)
aton_t = _bits(payload, 38, 5)
name = _text(payload, 43, 120)
accuracy = _bits(payload, 163, 1)
lon = _signed(_bits(payload, 164, 28), 28) / 600000.0
lat = _signed(_bits(payload, 192, 27), 27) / 600000.0
off_pos = bool(_bits(payload, 259, 1))
virtual = bool(_bits(payload, 269, 1))
name_ext = _text(payload, 272, min(14*6, (len(payload)-272//6)*6)) if len(payload)*6 > 272 else ""
if lat == 0.0 and lon == 0.0:
return None
return {
"mmsi": str(mmsi),
"msg_type": 21,
"aton_type": aton_t,
"aton_name": (name + name_ext).strip() or f"ATON {mmsi}",
"lat": lat,
"lon": lon,
"accuracy": bool(accuracy),
"off_position": off_pos,
"virtual": virtual,
"timestamp": datetime.utcnow().isoformat(),
}
except Exception:
return None
def decode_type8_aton(payload: str) -> dict | None:
"""
Decode AIS Message Type 8, DAC=1, FI=22 — IEC 62320-2 ATON monitoring.
Carries: voltage, sensors, lamp/racon status, off-position.
"""
try:
msg_type = _bits(payload, 0, 6)
if msg_type != 8:
return None
mmsi = _bits(payload, 8, 30)
dac = _bits(payload, 40, 10)
fi = _bits(payload, 50, 6)
if dac != 1 or fi != 22:
return None
off_pos = bool(_bits(payload, 56, 1))
# Analog 1 — typically main battery voltage (0-409.4 V, res 0.1 V)
analog1 = _bits(payload, 57, 12) * 0.05 # 0-204.75 V
# Analog 2 — external sensor 1 (temperature or secondary voltage)
analog2 = _bits(payload, 69, 12) * 0.05
# Analog 3 & 4 — additional sensors
analog3 = _bits(payload, 81, 12) * 0.05
analog4 = _bits(payload, 93, 12) * 0.05
# Digital bits: racon, lamp, buoy code alarm, controller alarm, etc.
racon = bool(_bits(payload, 105, 2))
light_ok = bool(_bits(payload, 107, 2))
health = _bits(payload, 109, 2) # 0=ok,1=warn,2=alarm,3=no signal
battery_low = bool(_bits(payload, 111, 1))
return {
"mmsi": str(mmsi),
"msg_type": 8,
"dac": dac,
"fi": fi,
"off_position":off_pos,
"voltage_v": round(analog1, 2) if analog1 > 0 else None,
"sensor1": round(analog2, 2) if analog2 > 0 else None,
"sensor2": round(analog3, 2) if analog3 > 0 else None,
"sensor3": round(analog4, 2) if analog4 > 0 else None,
"racon_ok": racon,
"light_ok": light_ok,
"health": health,
"battery_low": battery_low,
"timestamp": datetime.utcnow().isoformat(),
}
except Exception:
return None
def process_aton_message(raw: dict) -> dict | None:
"""
Process a decoded ATON message and update aton_state.
Returns the updated state entry (to broadcast) or None.
"""
mmsi = raw.get("mmsi")
if not mmsi:
return None
if mmsi not in aton_state:
aton_state[mmsi] = {"mmsi": mmsi, "source": "AIS_ATON"}
entry = aton_state[mmsi]
if raw["msg_type"] == 21:
entry.update({
"aton_type": raw["aton_type"],
"aton_type_name": ATON_TYPES.get(raw["aton_type"], f"Type {raw['aton_type']}"),
"name": raw["aton_name"],
"lat": raw["lat"],
"lon": raw["lon"],
"accuracy": raw["accuracy"],
"off_position": raw["off_position"],
"virtual": raw["virtual"],
"last_type21": raw["timestamp"],
})
aton_history[mmsi].append((raw["lat"], raw["lon"], raw["timestamp"]))
elif raw["msg_type"] == 8:
entry.update({
"voltage_v": raw.get("voltage_v"),
"sensor1": raw.get("sensor1"),
"sensor2": raw.get("sensor2"),
"sensor3": raw.get("sensor3"),
"racon_ok": raw.get("racon_ok"),
"light_ok": raw.get("light_ok"),
"health": raw.get("health"),
"battery_low": raw.get("battery_low"),
"off_position": raw.get("off_position", entry.get("off_position", False)),
"last_type8": raw["timestamp"],
})
entry["last_update"] = datetime.utcnow().isoformat()
return entry