""" AIS ATON decoder — Message Type 21 and Type 8 DAC=1,FI=22 (IEC 62320-2). Maintains live ATON state keyed by MMSI. """ from datetime import datetime from collections import defaultdict, deque # Live ATON state: mmsi -> dict aton_state: dict = {} # Position history for movement detection: mmsi -> deque of (lat,lon,ts) aton_history: dict = defaultdict(lambda: deque(maxlen=30)) ATON_TYPES = { 1:"Default",2:"Reference point",3:"RACON",4:"Fixed structure",5:"Spare", 6:"Light, no sectors",7:"Light, with sectors",8:"Leading light front", 9:"Leading light rear",10:"Beacon, cardinal N",11:"Beacon, cardinal NE", 12:"Beacon, cardinal E",13:"Beacon, cardinal SE",14:"Beacon, cardinal S", 15:"Beacon, cardinal SW",16:"Beacon, cardinal W",17:"Beacon, cardinal NW", 18:"Beacon, port hand",19:"Beacon, starboard hand",20:"Beacon, preferred channel port", 21:"Beacon, preferred channel starboard",22:"Beacon, isolated danger", 23:"Beacon, safe water",24:"Beacon, special mark",25:"Beacon, light vessel", 26:"LANBY",27:"Buoy, cardinal N",28:"Buoy, cardinal NE",29:"Buoy, cardinal E", 30:"Buoy, cardinal SE",31:"Buoy, cardinal S", # continues... } def _bits(payload: str, start: int, length: int) -> int: """Extract integer from AIS 6-bit ASCII payload.""" acc = 0 for i in range(start, start + length): if i >= len(payload) * 6: return 0 char_idx = i // 6 bit_idx = 5 - (i % 6) char_val = ord(payload[char_idx]) - 48 if char_val > 39: char_val -= 8 acc = (acc << 1) | ((char_val >> bit_idx) & 1) return acc def _signed(val: int, bits: int) -> int: if val >= (1 << (bits - 1)): val -= (1 << bits) return val def _text(payload: str, start: int, length: int) -> str: chars = [] for i in range(length // 6): v = _bits(payload, start + i * 6, 6) if v < 32: v += 64 if v == 64: break chars.append(chr(v)) return ''.join(chars).strip('@').strip() def decode_type21(payload: str) -> dict | None: """Decode AIS Message Type 21 — Aid-to-Navigation Report.""" try: msg_type = _bits(payload, 0, 6) if msg_type != 21: return None mmsi = _bits(payload, 8, 30) aton_t = _bits(payload, 38, 5) name = _text(payload, 43, 120) accuracy = _bits(payload, 163, 1) lon = _signed(_bits(payload, 164, 28), 28) / 600000.0 lat = _signed(_bits(payload, 192, 27), 27) / 600000.0 off_pos = bool(_bits(payload, 259, 1)) virtual = bool(_bits(payload, 269, 1)) name_ext = _text(payload, 272, min(14*6, (len(payload)-272//6)*6)) if len(payload)*6 > 272 else "" if lat == 0.0 and lon == 0.0: return None return { "mmsi": str(mmsi), "msg_type": 21, "aton_type": aton_t, "aton_name": (name + name_ext).strip() or f"ATON {mmsi}", "lat": lat, "lon": lon, "accuracy": bool(accuracy), "off_position": off_pos, "virtual": virtual, "timestamp": datetime.utcnow().isoformat(), } except Exception: return None def decode_type8_aton(payload: str) -> dict | None: """ Decode AIS Message Type 8, DAC=1, FI=22 — IEC 62320-2 ATON monitoring. Carries: voltage, sensors, lamp/racon status, off-position. """ try: msg_type = _bits(payload, 0, 6) if msg_type != 8: return None mmsi = _bits(payload, 8, 30) dac = _bits(payload, 40, 10) fi = _bits(payload, 50, 6) if dac != 1 or fi != 22: return None off_pos = bool(_bits(payload, 56, 1)) # Analog 1 — typically main battery voltage (0-409.4 V, res 0.1 V) analog1 = _bits(payload, 57, 12) * 0.05 # 0-204.75 V # Analog 2 — external sensor 1 (temperature or secondary voltage) analog2 = _bits(payload, 69, 12) * 0.05 # Analog 3 & 4 — additional sensors analog3 = _bits(payload, 81, 12) * 0.05 analog4 = _bits(payload, 93, 12) * 0.05 # Digital bits: racon, lamp, buoy code alarm, controller alarm, etc. racon = bool(_bits(payload, 105, 2)) light_ok = bool(_bits(payload, 107, 2)) health = _bits(payload, 109, 2) # 0=ok,1=warn,2=alarm,3=no signal battery_low = bool(_bits(payload, 111, 1)) # IEC 62320-2 extended digital inputs (bits 112–119) # bit 112 = buoy code / hull integrity → water ingress sensor (IN3) # bit 113 = controller alarm → listing/tilt sensor (IN4) # bit 114 = fog signal status # bit 115 = EPIRB armed # bit 116 = water level alarm → bilge high (critical) # bits 117-119 = spare / manufacturer-defined din3 = bool(_bits(payload, 112, 1)) # hull/water ingress din4 = bool(_bits(payload, 113, 1)) # listing/tilt water_level = bool(_bits(payload, 116, 1)) # bilge high level return { "mmsi": str(mmsi), "msg_type": 8, "dac": dac, "fi": fi, "off_position":off_pos, "voltage_v": round(analog1, 2) if analog1 > 0 else None, "sensor1": round(analog2, 2) if analog2 > 0 else None, "sensor2": round(analog3, 2) if analog3 > 0 else None, "sensor3": round(analog4, 2) if analog4 > 0 else None, "racon_ok": racon, "light_ok": light_ok, "health": health, "battery_low": battery_low, # Digital inputs — meaning depends on din3_function / din4_function # configured per-aid in the Aid model "din3": din3, # IN3 state (True = triggered) "din4": din4, # IN4 state (True = triggered) "water_level_high": water_level, # bilge high level (IEC standard bit) "timestamp": datetime.utcnow().isoformat(), } except Exception: return None def process_aton_message(raw: dict) -> dict | None: """ Process a decoded ATON message and update aton_state. Returns the updated state entry (to broadcast) or None. """ mmsi = raw.get("mmsi") if not mmsi: return None if mmsi not in aton_state: aton_state[mmsi] = {"mmsi": mmsi, "source": "AIS_ATON"} entry = aton_state[mmsi] if raw["msg_type"] == 21: entry.update({ "aton_type": raw["aton_type"], "aton_type_name": ATON_TYPES.get(raw["aton_type"], f"Type {raw['aton_type']}"), "name": raw["aton_name"], "lat": raw["lat"], "lon": raw["lon"], "accuracy": raw["accuracy"], "off_position": raw["off_position"], "virtual": raw["virtual"], "last_type21": raw["timestamp"], }) aton_history[mmsi].append((raw["lat"], raw["lon"], raw["timestamp"])) elif raw["msg_type"] == 8: entry.update({ "voltage_v": raw.get("voltage_v"), "sensor1": raw.get("sensor1"), "sensor2": raw.get("sensor2"), "sensor3": raw.get("sensor3"), "racon_ok": raw.get("racon_ok"), "light_ok": raw.get("light_ok"), "health": raw.get("health"), "battery_low": raw.get("battery_low"), "off_position": raw.get("off_position", entry.get("off_position", False)), "last_type8": raw["timestamp"], }) entry["last_update"] = datetime.utcnow().isoformat() return entry