Files
AidsMonitoring/backend/services/ais_catcher.py
T

145 lines
4.1 KiB
Python

"""
AIS-catcher process manager.
Detects RTL-SDR hardware and manages AIS-catcher.exe.
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from pathlib import Path
_HERE = Path(__file__).parent
TOOLS_DIR = _HERE.parent.parent / "tools" / "ais-catcher"
EXE_PATH = TOOLS_DIR / "AIS-catcher.exe"
_process: subprocess.Popen | None = None
def exe_exists() -> bool:
return EXE_PATH.exists()
def detect_sdr() -> list[dict]:
"""Return list of RTL-SDR devices detected via PowerShell PnP (VID_0BDA)."""
found = []
try:
ps = (
"Get-PnpDevice -Status OK | "
"Where-Object { ($_.HardwareID | Out-String) -match 'VID_0BDA' } | "
"Select-Object "
"@{N='desc';E={$_.FriendlyName}},"
"@{N='hw';E={($_.HardwareID | Select-Object -First 1)}} | "
"ConvertTo-Json -Compress"
)
out = subprocess.check_output(
["powershell", "-NoProfile", "-NonInteractive", "-Command", ps],
text=True, timeout=6, stderr=subprocess.DEVNULL,
).strip()
if not out:
return found
data = json.loads(out)
if isinstance(data, dict):
data = [data]
for d in (data or []):
found.append({
"description": d.get("desc") or "RTL-SDR dongle",
"hardware_id": d.get("hw") or "",
"type": "rtl_sdr",
})
except Exception as e:
print(f"[ais-catcher] SDR detect error: {e}")
return found
def _kill_existing():
"""Kill any stray AIS-catcher processes already running."""
try:
subprocess.run(
["taskkill", "/F", "/IM", "AIS-catcher.exe"],
capture_output=True, timeout=5
)
except Exception:
pass
def is_running() -> bool:
global _process
if _process is not None and _process.poll() is None:
return True
# Also check system-wide (could have been launched externally)
try:
out = subprocess.check_output(
["tasklist", "/FI", "IMAGENAME eq AIS-catcher.exe", "/NH"],
text=True, timeout=3, stderr=subprocess.DEVNULL,
)
return "AIS-catcher.exe" in out
except Exception:
return False
def launch(udp_port: int = 10110, web_port: int = 8100) -> dict:
"""
Launch AIS-catcher.exe if not already running.
Returns {"ok": bool, "already_running": bool, "pid": int|None, "error": str|None}
"""
global _process
if not exe_exists():
return {"ok": False, "error": f"AIS-catcher.exe not found at {EXE_PATH}"}
if is_running():
return {"ok": True, "already_running": True, "pid": _process.pid if _process else None}
sdrs = detect_sdr()
if not sdrs:
return {"ok": False, "error": "No RTL-SDR device detected"}
try:
_kill_existing()
cmd = [
str(EXE_PATH),
"-v", "10",
"-X",
"-u", f"127.0.0.1", str(udp_port),
"-N", str(web_port),
"PLUGIN_DIR", str(TOOLS_DIR / "plugins"),
]
_process = subprocess.Popen(
cmd,
cwd=str(TOOLS_DIR),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
)
return {"ok": True, "already_running": False, "pid": _process.pid}
except Exception as e:
return {"ok": False, "error": str(e)}
def stop() -> dict:
"""Stop the managed AIS-catcher process."""
global _process
if _process and _process.poll() is None:
_process.terminate()
try:
_process.wait(timeout=4)
except subprocess.TimeoutExpired:
_process.kill()
_process = None
_kill_existing()
return {"ok": True}
def status() -> dict:
sdrs = detect_sdr()
running = is_running()
pid = _process.pid if (_process and _process.poll() is None) else None
return {
"exe_found": exe_exists(),
"sdr_devices": sdrs,
"running": running,
"pid": pid,
}