Files
AR-GPS/backend/chart_manager.py
T

1379 lines
55 KiB
Python

"""
S-57 ENC chart manager — parse, store and serve chart features as GeoJSON.
"""
import json
import logging
import math
import shutil
import tempfile
import zipfile
from pathlib import Path
log = logging.getLogger(__name__)
# Optional heavy deps — only needed for parsing .000 ENC files.
# GPS Navigator may not have them installed; CSV-based charts still work.
try:
import geopandas as gpd
import pandas as pd
_HAS_GPD = True
except ImportError:
gpd = None
pd = None
_HAS_GPD = False
log.warning("geopandas/pandas not installed — .000 ENC parsing disabled. "
"Install with: pip install geopandas pyogrio")
CHARTS_DIR = Path(__file__).parent.parent / "charts"
CHARTS_DIR.mkdir(exist_ok=True)
# S-57 layers we care about, mapped to a friendly category
S57_LAYERS = {
"LIGHTS": "light",
"BOYLAT": "buoy",
"BOYCAR": "buoy",
"BOYISD": "buoy",
"BOYSPP": "buoy",
"BOYSAW": "buoy",
"BCNLAT": "beacon",
"BCNCAR": "beacon",
"BCNISD": "beacon",
"BCNWTW": "beacon",
"LNDMRK": "landmark",
"RTPBCN": "beacon",
"LAKARE": "area", # Lake / lagoon / ciénaga area
# BUISGL is rendered as polygon footprint via _parse_land → landLayer.
# Removed from S57_LAYERS to avoid a duplicate centroid-point beacon dot.
}
# Human-readable light character codes
LITCHR = {
1:"F", 2:"Fl", 3:"LFl", 4:"Q", 5:"VQ", 6:"UQ", 7:"Iso",
8:"Oc", 9:"IQ", 10:"IVQ", 11:"IUQ", 12:"Mo", 13:"FFl",
14:"FlLFl", 15:"OcFl", 16:"FLFl", 25:"Al.Oc", 26:"Al.LFl",
27:"Al.Fl", 28:"Al.Grp",
}
COLOUR = {
1:"W", 2:"K", 3:"R", 4:"G", 5:"B", 6:"Y",
7:"Gy", 8:"Br", 9:"Amb", 10:"Vi", 11:"Or", 12:"Mg",
}
BOYSHP = {
1:"Conical(N)",2:"Can(C)",3:"Spherical",4:"Pillar",
5:"Spar",6:"Barrel",7:"Super-buoy",8:"Ice buoy",
}
# Cardinal category (CATCAM) per IHO S-57 Object Catalogue
CATCAM = {1:"N", 2:"E", 3:"S", 4:"W"}
# IALA region defaults by chart producer prefix (cell ID first 2 chars)
REGION_A_PREFIXES = {
"GB","FR","DE","NL","BE","DK","NO","SE","FI","IS","IE","ES","PT","IT",
"GR","TR","RU","UA","HR","CY","MT","EE","LV","LT","PL","RO","BG","SI",
"MA","DZ","TN","LY","EG","ZA","NG","SN","CI","KE","TZ","MZ",
"IN","PK","BD","LK","MM","TH","ID","MY","SG","VN","CN","HK","TW",
"AE","SA","OM","IR","IQ","KW","QA","BH","YE",
"AU","NZ","PG",
}
REGION_B_PREFIXES = {
"US","CA","MX","BS","CU","JM","HT","DO","PR","VI",
"BZ","GT","SV","HN","NI","CR","PA",
"CO","VE","EC","PE","CL","AR","UY","BR","GY","SR","PY","BO",
"JP","KR","KP","PH",
}
def auto_region(cell_id: str) -> str:
prefix = (cell_id or "").upper()[:2]
if prefix in REGION_A_PREFIXES: return "A"
if prefix in REGION_B_PREFIXES: return "B"
return "B"
# ── CSV → features (DIMAR / custom CSV charts) ────────────────────────────────
# Mapping from DIMAR CSV feat_type to canonical S-57 layer name.
_CSV_FEAT_TYPE_MAP = {
"BOYSPEC": "BOYSPP", # DIMAR uses BOYSPEC; IHO S-57 is BOYSPP
}
# Feature categories per layer
_CSV_CATEGORY_MAP = {
"BOYLAT": "buoy", "BOYCAR": "buoy", "BOYISD": "buoy",
"BOYSPP": "buoy", "BOYSAW": "buoy",
"BCNLAT": "beacon","BCNCAR": "beacon",
"LIGHTS": "light", "LNDMRK": "landmark",
}
def _csv_num(v):
if v is None or str(v).strip() == "":
return None
try:
return float(v)
except Exception:
return None
def _csv_int(v):
n = _csv_num(v)
return int(n) if n is not None else None
def _csv_fmt(v):
"""Format float: drop trailing .0 for whole numbers."""
try:
f = float(v)
return str(int(f)) if f == int(f) else str(f)
except Exception:
return str(v)
def _csv_light_desc(litchr, siggrp, colour, sigper, valnmr) -> str:
"""Build compact light description from CSV fields (e.g. 'Fl G 3s 3M')."""
parts = []
lc_int = _csv_int(litchr)
lc = LITCHR.get(lc_int, str(lc_int)) if lc_int is not None else ""
sg = _csv_int(siggrp)
if sg is not None:
lc = f"{lc}({sg})"
col_int = _csv_int(colour)
col_str = COLOUR.get(col_int, "") if col_int is not None else ""
if lc:
parts.append(f"{lc} {col_str}".strip())
sp = _csv_num(sigper)
if sp is not None:
parts.append(f"{_csv_fmt(sp)}s")
rng = _csv_num(valnmr)
if rng is not None:
parts.append(f"{_csv_fmt(rng)}M")
return " ".join(parts)
def _csv_infer_catcam(siggrp: str, name: str) -> str | None:
"""
Cardinal buoy quadrant from SIGGRP (reliable) or DIMAR naming convention.
Q(9)+LFl=W Q(6)+LFl=S Q(3)=E Q=N
DIMAR name suffixes: SS/VS→S SN/VN→N SE→E SO→W
"""
sg = _csv_int(siggrp)
if sg is not None:
if sg == 9: return "W"
if sg == 6: return "S"
if sg == 3: return "E"
return "N"
n = (name or "").upper()
if any(x in n for x in (" SS", "(SS)", "VS")): return "S"
if any(x in n for x in (" SE", "(SE)")): return "E"
if any(x in n for x in (" SO", "(SO)")): return "W"
return "N"
def _csv_infer_catlam(colour: str) -> int | None:
"""IALA B (Americas): green(4)=port=1, red(3)=stbd=2."""
c = _csv_int(colour)
if c == 4: return 1
if c == 3: return 2
return None
def _parse_csvs_to_features(csv_dir: Path) -> list[dict]:
"""
Read navigation-aid CSV files from csv_dir and return GeoJSON features.
Each CSV must have columns: OBJNAM, lon, lat, feat_type, LITCHR, SIGGRP,
COLOUR, SIGPER, VALNMR, HEIGHT, ORIENT, INFORM.
feat_type values: BOYLAT, BOYCAR, BOYISD, BOYSPEC/BOYSPP, BCNLAT, LIGHTS,
LNDMRK.
This function is the primary path for DIMAR/custom charts created with
QGISS57Converter; it preserves all light attributes (LITCHR, SIGPER, etc.)
that the GDAL S-57 driver may drop during the .000 round-trip.
"""
import csv as _csv_mod
features: list[dict] = []
for csv_file in sorted(csv_dir.glob("*.csv")):
with open(csv_file, newline="", encoding="utf-8") as fh:
reader = _csv_mod.DictReader(fh)
for row in reader:
feat_type = (row.get("feat_type") or csv_file.stem).strip()
layer = _CSV_FEAT_TYPE_MAP.get(feat_type, feat_type)
if not layer:
continue
lon = _csv_num(row.get("lon"))
lat = _csv_num(row.get("lat"))
if lon is None or lat is None:
continue
category = _CSV_CATEGORY_MAP.get(layer, "buoy")
litchr = row.get("LITCHR", "").strip()
siggrp = row.get("SIGGRP", "").strip()
colour = row.get("COLOUR", "").strip()
sigper = row.get("SIGPER", "").strip()
valnmr = row.get("VALNMR", "").strip()
height = row.get("HEIGHT", "").strip()
orient_r = row.get("ORIENT", "").strip()
name = row.get("OBJNAM", "").strip()
inform = row.get("INFORM", "").strip()
col_int = _csv_int(colour)
colours = [col_int] if col_int is not None else []
light_d = _csv_light_desc(litchr, siggrp, colour, sigper, valnmr)
props: dict = {
"layer": layer,
"category": category,
"name": name or None,
"info": inform or None,
"light_desc": light_d or None,
"range_nm": _csv_num(valnmr),
"height_m": _csv_num(height),
"colours": colours,
"colour_code": col_int,
}
if layer in ("BOYCAR", "BCNCAR"):
props["catcam"] = _csv_infer_catcam(siggrp, name)
if layer in ("BOYLAT", "BCNLAT"):
props["catlam"] = _csv_infer_catlam(colour)
orient_val = _csv_num(orient_r)
if orient_val is not None:
props["orient"] = orient_val
props["aid_type"] = classify(layer, props)
# Remove None values
props = {k: v for k, v in props.items() if v is not None}
features.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": props,
})
return features
def install_from_csv_dir(csv_dir: Path, cell_id: str) -> str:
"""
Create or update an AidsMonitoring chart cell from a directory of CSV files.
This is the preferred install pathway for custom (DIMAR) charts because it
preserves all S-57 attribute codes (LITCHR, SIGGRP, etc.) without loss.
csv_dir — directory containing *.csv files (BOYLAT.csv, BOYCAR.csv, etc.)
cell_id — chart cell identifier (e.g. 'BAHIA_DE_CARTAGENA')
"""
cell_id = cell_id.upper()
cell_dir = CHARTS_DIR / cell_id
cell_dir.mkdir(exist_ok=True)
features = _parse_csvs_to_features(csv_dir)
with open(cell_dir / "features.geojson", "w", encoding="utf-8") as f:
json.dump({"type": "FeatureCollection", "features": features}, f,
ensure_ascii=False)
log.info("CSV chart %s%d features", cell_id, len(features))
# Empty auxiliary caches so the frontend doesn't show stale data
for fname in ("depths.geojson", "land.geojson", "hazards.geojson", "zones.geojson"):
p = cell_dir / fname
if not p.exists():
with open(p, "w") as f:
json.dump({"type": "FeatureCollection", "features": []}, f)
# Update meta.json
bbox = None
if features:
lons = [ft["geometry"]["coordinates"][0] for ft in features]
lats = [ft["geometry"]["coordinates"][1] for ft in features]
bbox = [min(lons), min(lats), max(lons), max(lats)]
meta = get_meta(cell_id)
meta["feature_count"] = len(features)
meta["bbox"] = bbox
meta.setdefault("region", auto_region(cell_id))
_meta_path(cell_dir).write_text(json.dumps(meta))
return cell_id
def install_from_csv_zip(zip_path: Path) -> list[str]:
"""
Install one or more CSV chart cells from a ZIP archive.
Expected ZIP layout (any of these is accepted):
Option A — single cell (cell_id inferred from folder name or ZIP name):
BOYLAT.csv
BOYCAR.csv
...
Option B — cell_id in subfolder:
BAHIA_DE_CARTAGENA/BOYLAT.csv
BAHIA_DE_CARTAGENA/BOYCAR.csv
Option C — meta.json declares cell_id:
meta.json → {"cell_id": "BAHIA_DE_CARTAGENA"}
BOYLAT.csv
...
"""
import csv as _csv_mod
installed: list[str] = []
with zipfile.ZipFile(zip_path) as z:
namelist = z.namelist()
# Collect all CSV files grouped by subfolder
csv_files = [n for n in namelist if n.lower().endswith(".csv")]
if not csv_files:
raise ValueError("No CSV files found in ZIP")
# Check for meta.json
meta_cell_id: str | None = None
if "meta.json" in namelist:
try:
meta_cell_id = json.loads(z.read("meta.json")).get("cell_id")
except Exception:
pass
# Group by directory prefix
import collections
groups: dict[str, list[str]] = collections.defaultdict(list)
for name in csv_files:
prefix = str(Path(name).parent)
groups[prefix].append(name)
with tempfile.TemporaryDirectory() as tmp_root:
tmp_root_p = Path(tmp_root)
for prefix, members in groups.items():
# Determine cell_id
if meta_cell_id:
cid = meta_cell_id
elif prefix and prefix != ".":
cid = Path(prefix).name
else:
cid = Path(zip_path).stem # e.g. BAHIA_DE_CARTAGENA
# Extract CSVs to temp dir
tmp_csv = tmp_root_p / cid
tmp_csv.mkdir(exist_ok=True)
for member in members:
data = z.read(member)
(tmp_csv / Path(member).name).write_bytes(data)
# Install
result_id = install_from_csv_dir(tmp_csv, cid)
installed.append(result_id)
return installed
def _meta_path(cell_dir: Path) -> Path:
return cell_dir / "meta.json"
def get_meta(cell_id: str) -> dict:
cell_dir = CHARTS_DIR / cell_id.upper()
p = _meta_path(cell_dir)
if p.exists():
try: return json.loads(p.read_text())
except Exception: pass
return {}
def set_meta(cell_id: str, **patch) -> dict:
cell_dir = CHARTS_DIR / cell_id.upper()
if not cell_dir.exists():
raise FileNotFoundError(cell_id)
meta = get_meta(cell_id)
meta.update(patch)
_meta_path(cell_dir).write_text(json.dumps(meta))
return meta
def get_region(cell_id: str) -> str:
return get_meta(cell_id).get("region") or auto_region(cell_id)
# ── Aid classification ────────────────────────────────────────────────────
# Canonical aid types — derived from S-57 layer + attributes.
# Frontend dispatches symbol drawing on this; renders are region-aware
# only for LATERAL_* (which depend on IALA A vs B for color).
def classify(layer: str, props: dict) -> str:
catlam = props.get("catlam")
catcam = props.get("catcam")
if layer in ("BOYLAT", "BCNLAT"):
if catlam == 1: return "LATERAL_PORT"
if catlam == 2: return "LATERAL_STBD"
if catlam == 3: return "LATERAL_PREF_STBD" # preferred channel to STBD (acts port)
if catlam == 4: return "LATERAL_PREF_PORT" # preferred channel to PORT (acts stbd)
# Fallback: infer from primary colour (IALA B — Americas: green=port, red=stbd)
colour = props.get("colour_code")
if colour == 4: return "LATERAL_PORT" # green
if colour == 3: return "LATERAL_STBD" # red
return "LATERAL_UNKNOWN"
if layer in ("BOYCAR", "BCNCAR"):
if catcam == "N": return "CARDINAL_N"
if catcam == "E": return "CARDINAL_E"
if catcam == "S": return "CARDINAL_S"
if catcam == "W": return "CARDINAL_W"
return "CARDINAL_UNKNOWN"
if layer in ("BOYISD", "BCNISD"): return "ISOLATED_DANGER"
if layer == "BOYSAW": return "SAFE_WATER"
if layer == "BOYSPP": return "SPECIAL"
if layer == "LIGHTS": return "LIGHT_POINT"
if layer == "LNDMRK": return "LANDMARK"
if layer == "RTPBCN": return "RACON"
if layer.startswith("BCN"): return "BEACON_GENERIC"
if layer.startswith("BOY"): return "BUOY_GENERIC"
return "UNKNOWN"
# Topmark shape (TOPSHP)
TOPSHP = {
1:"cone_up",2:"cone_down",3:"sphere",4:"X",5:"cross",
6:"can",7:"T",8:"triangle_up",9:"triangle_down",
10:"circle",11:"rectangle",12:"board",13:"diamond",
14:"diamond_2",15:"square",16:"rectangle_2",17:"pennant",
18:"pennant_2",19:"pennant_3",20:"pennant_4",
21:"2sphere",22:"2cone_up",23:"2cone_down",
24:"cone_up_sphere",25:"2cone_base_up",26:"2cone_base_down",
27:"2cone_point",28:"besom_up",29:"besom_down",
30:"flag",31:"sphere_2",32:"2cone_base_apart",
33:"board_2",
}
def _safe_list(val) -> list:
"""Return a plain Python list from a pandas/numpy array-like value.
GDAL/pyogrio surfaces multi-valued S-57 attributes (like COLOUR) as
comma-separated STRINGS (e.g. "4" or "1,4"), not as Python lists.
Without the string branch we silently drop colour info."""
if val is None:
return []
try:
if hasattr(val, 'tolist'):
val = val.tolist()
if isinstance(val, str):
return [int(x) for x in val.split(",") if x.strip()]
if isinstance(val, (list, tuple)):
return [int(v) for v in val if v is not None]
return [int(val)]
except Exception:
return []
def _safe(val):
"""Return a plain Python scalar from any pandas/numpy value."""
if val is None:
return None
# numpy array or list with multiple elements — take first
if hasattr(val, '__len__') and not isinstance(val, str):
if len(val) == 0:
return None
val = val[0]
# numpy scalar
if hasattr(val, 'item'):
try:
val = val.item()
except Exception:
val = val.tolist()
# NaN check after extraction
try:
if isinstance(val, float) and _HAS_GPD and pd.isna(val):
return None
except Exception:
pass
return val
def _light_desc(row) -> str:
parts = []
chr_code = _safe(row.get("LITCHR"))
grp = _safe(row.get("SIGGRP"))
col_list = _safe(row.get("COLOUR"))
period = _safe(row.get("SIGPER"))
rng = _safe(row.get("VALNMR"))
lc = LITCHR.get(int(chr_code), str(chr_code)) if chr_code else ""
if grp:
lc = f"{lc}({grp})"
colours = ""
if col_list:
try:
nums = [int(x) for x in str(col_list).split(",") if x.strip()]
colours = ".".join(COLOUR.get(n, "?") for n in nums)
except Exception:
pass
if lc:
parts.append(f"{lc} {colours}".strip())
if period:
parts.append(f"{period}s")
if rng:
parts.append(f"{rng}M")
return " ".join(parts)
def _parse_cell(enc_path: Path) -> list[dict]:
if not _HAS_GPD:
raise RuntimeError("geopandas not installed — cannot parse .000 ENC files. "
"Run: pip install geopandas pyogrio")
features = []
# ── Build land union for LDLINE clipping ─────────────────────────────────
# Leading lines must stop at the riverbank / land boundary, not extend
# indefinitely. We union all land polygons in the cell and use them to
# clip each LDLINE so it terminates where it hits the shore.
from shapely.ops import unary_union
from shapely.geometry import LineString as SLineString, Point as SPoint
_land_clip = None
for _ll in ("LNDARE", "BUAARE"):
try:
_gdf_l = gpd.read_file(str(enc_path), layer=_ll, engine="pyogrio")
if _gdf_l.empty:
continue
_gdf_l = _gdf_l.to_crs(epsg=4326)
_polys = [g for g in _gdf_l.geometry
if g is not None and not g.is_empty
and g.geom_type in ("Polygon", "MultiPolygon")]
if _polys:
_u = unary_union(_polys).buffer(0)
_land_clip = _u if _land_clip is None else _land_clip.union(_u)
except Exception:
pass
for layer_name, category in S57_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty:
continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty:
continue
# Use centroid for all geometry types
pt = geom.centroid if geom.geom_type != "Point" else geom
colours = _safe_list(row.get("COLOUR")) # e.g. [3,4] = Red+Green
props = {
"layer": layer_name,
"category": category,
"name": _safe(row.get("OBJNAM")),
"info": _safe(row.get("INFORM")),
"height_m": _safe(row.get("HEIGHT")),
"range_nm": _safe(row.get("VALNMR")),
"status": _safe(row.get("STATUS")),
"colours": colours, # list of S-57 colour codes
"colour_code": colours[0] if colours else None, # primary colour
}
# Extract light description for ALL navigational aids and lights.
# Previously only BOY* layers were included, so BCNLAT/BCNWTW/LNDMRK
# silently lost their LITCHR/SIGPER/VALNMR — fixed here.
if category in ("light", "buoy", "beacon", "landmark"):
props["light_desc"] = _light_desc(row)
if layer_name.startswith("BOY") or layer_name.startswith("BCN"):
shape = _safe(row.get("BOYSHP"))
props["boyshp"] = int(shape) if shape else None # raw code for JS
props["shape"] = BOYSHP.get(int(shape), None) if shape else None
topshp = _safe(row.get("TOPSHP"))
props["topshp"] = int(topshp) if topshp else None
if layer_name in ("BOYCAR", "BCNCAR"):
catcam = _safe(row.get("CATCAM"))
props["catcam"] = CATCAM.get(int(catcam), None) if catcam else None
if layer_name in ("BOYLAT", "BCNLAT"):
catlam = _safe(row.get("CATLAM")) # 1=port, 2=stbd, 3=pref-stbd, 4=pref-port
props["catlam"] = int(catlam) if catlam else None
# ORIENT — bearing of the leading line in True degrees
orient_raw = _safe(row.get("ORIENT"))
if orient_raw:
try:
props["orient"] = float(orient_raw)
except ValueError:
pass
props["aid_type"] = classify(layer_name, props)
features.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
"properties": {k: v for k, v in props.items() if v is not None},
})
# ── Leading-line bearing projection for lights with ORIENT ────────
# Project a line from the mark in the ORIENT direction, then clip it
# against the land union so it terminates at the riverbank/shoreline.
# A long initial line (5 NM) is used so the clip always finds the shore.
if "orient" in props:
bearing_deg = props["orient"]
bearing_rad = math.radians(bearing_deg)
lat_rad = math.radians(pt.y)
cos_lat = math.cos(lat_rad)
# Initial generous length (5 NM) — will be clipped to land boundary
PROBE_NM = 5.0
d_lat = (PROBE_NM / 60.0) * math.cos(bearing_rad)
d_lon = (PROBE_NM / 60.0) * math.sin(bearing_rad) / max(cos_lat, 1e-6)
end = [pt.x + d_lon, pt.y + d_lat]
# Densify the probe line (10 intermediate points) so the clip is accurate
n_pts = 12
ldcoords = [
[pt.x + d_lon * i / n_pts, pt.y + d_lat * i / n_pts]
for i in range(n_pts + 1)
]
# Clip against land — the leading line must not cross land
if _land_clip is not None:
try:
probe = SLineString(ldcoords)
in_water = probe.difference(_land_clip)
if in_water.is_empty:
# Mark is on land — use minimum stub
ldcoords = [[pt.x, pt.y], end]
else:
# Keep only the segment that touches the mark
start_pt = SPoint(pt.x, pt.y)
geoms = (list(in_water.geoms)
if in_water.geom_type.startswith("Multi")
else [in_water])
# Sort by distance of each segment's first point to mark
geoms.sort(key=lambda g: start_pt.distance(SPoint(g.coords[0])))
# Take the first (nearest to mark) segment
seg = geoms[0]
ldcoords = [list(c[:2]) for c in seg.coords]
except Exception:
pass # fall through to unclipped coords
features.append({
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": ldcoords,
},
"properties": {
"layer": "LDLINE",
"category": "ldline",
"aid_type": "LEADING_LINE",
"name": props.get("name", ""),
"orient": bearing_deg,
"colours": colours,
},
})
# ── Proximity merge: copy light attrs from LIGHTS → co-located buoys/beacons ──
# In S-57 a LIGHTS object and a BOYLAT object may share the same coordinates.
# The LIGHTS carries LITCHR/VALNMR/HEIGHT/COLOUR; the BOYLAT carries BOYSHP/CATLAM.
# We find the nearest LIGHTS within 50 m and backfill missing attrs on the host.
MERGE_DEG = 0.00045 # ≈ 50 m at equator (1° lat ≈ 111 km)
light_feats = [f for f in features
if f["properties"].get("layer") == "LIGHTS"
and f["geometry"]["type"] == "Point"]
struct_feats = [f for f in features
if (f["properties"].get("layer","").startswith("BOY")
or f["properties"].get("layer","").startswith("BCN"))
and f["geometry"]["type"] == "Point"]
for lf in light_feats:
lc = lf["geometry"]["coordinates"]
lp = lf["properties"]
best_host, best_dist = None, MERGE_DEG
for bf in struct_feats:
bc = bf["geometry"]["coordinates"]
d = ((lc[0] - bc[0])**2 + (lc[1] - bc[1])**2) ** 0.5
if d < best_dist:
best_dist, best_host = d, bf
if best_host:
hp = best_host["properties"]
if lp.get("light_desc") and not hp.get("light_desc"):
hp["light_desc"] = lp["light_desc"]
if lp.get("range_nm") and not hp.get("range_nm"):
hp["range_nm"] = lp["range_nm"]
if lp.get("height_m") and not hp.get("height_m"):
hp["height_m"] = lp["height_m"]
if lp.get("colours") and not hp.get("colours"):
hp["colours"] = lp["colours"]
hp["colour_code"] = lp.get("colour_code")
# Also sync orient for leading lines if the buoy didn't have it
if lp.get("orient") is not None and hp.get("orient") is None:
hp["orient"] = lp["orient"]
return features
def _read_depth_unit(enc_path: Path) -> str:
"""Read DSPM_DUNI from the S-57 dataset metadata. 1=METERS, 2=FEET, 3=FATHOMS."""
try:
from osgeo import gdal
ds = gdal.OpenEx(str(enc_path))
if ds:
md = ds.GetMetadata() or {}
duni = md.get("DSPM_DUNI") or md.get("S57_DUNI")
if duni:
return {"1": "METERS", "2": "FEET", "3": "FATHOMS"}.get(str(duni), "METERS")
except Exception as e:
log.debug("Could not read DUNI from %s: %s", enc_path, e)
return "METERS" # NOAA / IHO default
def _bbox_of_coords(coords) -> list[float]:
xs, ys = [], []
def walk(c):
if isinstance(c[0], (int, float)):
xs.append(c[0]); ys.append(c[1])
else:
for sub in c: walk(sub)
walk(coords)
return [min(xs), min(ys), max(xs), max(ys)]
def _parse_depths(enc_path: Path) -> list[dict]:
"""Extract bathymetry from S-57 cell.
SOUNDG: MultiPoint Z (sounding points; depth in geometry's Z value)
DEPCNT: LineString (contour at VALDCO meters)
DEPARE: Polygon (depth area between DRVAL1 and DRVAL2 meters)
Each feature gets a `bbox` property [minx, miny, maxx, maxy] so the
/charts/depths endpoint can do fast viewport filtering without touching
the geometries. The depth unit (METERS/FEET/FATHOMS) read from the
cell's DSPM record is attached to every feature so the frontend legend
can display it without a separate request.
"""
if not _HAS_GPD:
return []
out = []
unit = _read_depth_unit(enc_path) # METERS / FEET / FATHOMS
# Build a UNIFIED land mask from every S-57 layer that represents
# something a depth feature shouldn't cover. NOAA puts urban islands /
# reclaimed land in BUAARE, dykes in DAMCON, dry docks in DRYDOC, etc.
# — using only LNDARE leaves visible bleed.
from shapely.ops import unary_union
from shapely.prepared import prep
LAND_LAYERS = ("LNDARE", "BUAARE", "DAMCON", "DRYDOC", "PONTON", "LNDRGN")
land_polys = []
for ll in LAND_LAYERS:
try:
gdf = gpd.read_file(str(enc_path), layer=ll, engine="pyogrio")
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
land_polys.extend(g for g in gdf.geometry
if g is not None and not g.is_empty
and g.geom_type in ("Polygon", "MultiPolygon"))
except Exception:
pass
land_union = None
land_prepared = None
if land_polys:
try:
land_union = unary_union(land_polys).buffer(0)
land_prepared = prep(land_union)
# Also emit the union as a LANDMASK feature so the frontend can
# paint land in the chart's own colour when OSM is hidden.
geoms = list(land_union.geoms) if land_union.geom_type.startswith("Multi") else [land_union]
for g in geoms:
if g.geom_type != "Polygon" or g.is_empty: continue
rings = [[list(c[:2]) for c in g.exterior.coords]]
rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors)
out.append({
"type": "Feature",
"geometry": {"type": "Polygon", "coordinates": rings},
"properties": {"layer": "LANDMASK", "bbox": _bbox_of_coords(rings)},
})
except Exception as e:
log.warning("Land union failed for %s: %s", enc_path, e)
# ── SOUNDG (drop sounding points that fall on land) ──
try:
gdf = gpd.read_file(str(enc_path), layer="SOUNDG", engine="pyogrio")
if not gdf.empty:
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
if not getattr(g, "has_z", False): continue
if land_prepared is not None and land_prepared.contains(g):
continue # sounding lies on land — skip
out.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [g.x, g.y]},
"properties": {"layer": "SOUNDG", "depth": round(g.z, 1),
"unit": unit,
"bbox": [g.x, g.y, g.x, g.y]},
})
except Exception:
pass
# ── DEPCNT (clip contour lines so they don't cross land) ──
try:
gdf = gpd.read_file(str(enc_path), layer="DEPCNT", engine="pyogrio")
if not gdf.empty:
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
depth = _safe(row.get("VALDCO"))
if land_union is not None and geom.intersects(land_union):
try: geom = geom.difference(land_union)
except Exception: pass
if geom.is_empty: continue
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
if g.geom_type != "LineString" or g.is_empty: continue
coords = [list(c[:2]) for c in g.coords]
if len(coords) < 2: continue
out.append({
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {"layer": "DEPCNT",
"depth": round(float(depth), 1) if depth is not None else None,
"unit": unit,
"bbox": _bbox_of_coords(coords)},
})
except Exception:
pass
def _emit_polygon_feature(g, props):
"""Write one Polygon (single-part) feature, with bbox + props."""
if g.geom_type != "Polygon" or g.is_empty: return
rings = [[list(c[:2]) for c in g.exterior.coords]]
rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors)
out.append({
"type": "Feature",
"geometry": {"type": "Polygon", "coordinates": rings},
"properties": {**props, "bbox": _bbox_of_coords(rings)},
})
# ── DEPARE (clipped to remove land overlap) ──
try:
gdf = gpd.read_file(str(enc_path), layer="DEPARE", engine="pyogrio")
if not gdf.empty:
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
dmin = _safe(row.get("DRVAL1"))
dmax = _safe(row.get("DRVAL2"))
if land_union is not None and geom.intersects(land_union):
try:
geom = geom.difference(land_union)
except Exception:
pass
if geom.is_empty: continue
props = {
"layer": "DEPARE",
"depth_min": round(float(dmin), 1) if dmin is not None else None,
"depth_max": round(float(dmax), 1) if dmax is not None else None,
"unit": unit,
}
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
_emit_polygon_feature(g, props)
except Exception:
pass
return out
def _emit_geometry(geom, props: dict) -> list[dict]:
"""Flatten a shapely geometry into GeoJSON Feature dicts with bbox.
Handles Point / LineString / Polygon and their Multi* variants."""
out = []
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
if g.is_empty:
continue
if g.geom_type == "Point":
coords = [g.x, g.y]
gtype = "Point"
bbox = [g.x, g.y, g.x, g.y]
elif g.geom_type == "LineString":
coords = [list(c[:2]) for c in g.coords]
gtype = "LineString"
bbox = _bbox_of_coords(coords)
elif g.geom_type == "Polygon":
rings = [[list(c[:2]) for c in g.exterior.coords]]
rings += [[list(c[:2]) for c in r.coords] for r in g.interiors]
coords = rings
gtype = "Polygon"
bbox = _bbox_of_coords(coords)
else:
continue
out.append({
"type": "Feature",
"geometry": {"type": gtype, "coordinates": coords},
"properties": {**{k: v for k, v in props.items() if v is not None},
"bbox": bbox},
})
return out
# ── Terrain / land features ───────────────────────────────────────────────────
_LAND_POLY_LAYERS = {"LNDARE": "land_area", "BUAARE": "built_up_area",
"BUISGL": "building"} # building footprints as polygons
_LAND_LINE_LAYERS = {"COALNE": "coastline"}
_LAND_POINT_LAYERS = {"BRIDGE": "bridge", "HRBFAC": "harbour",
"SILTNK": "silo_tank"}
def _split_coalne(geom, props: dict) -> list[dict]:
"""Parte un COALNE LineString/MultiLineString en segmentos descartando
saltos artificiales (bordes de celda de carta que cruzan el océano).
Un vértice-a-vértice de una costa real rara vez supera ~0.5° de arco
(~55 km). Si el salto supera MAX_DEG se considera borde artificial y se
corta ahí, emitiendo el segmento acumulado hasta ese punto.
"""
MAX_DEG = 0.5 # ~55 km — ajustar si aparecen cortes en costas rectas largas
def _emit_seg(coords):
if len(coords) < 2:
return []
bbox = _bbox_of_coords(coords)
return [{"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {**{k: v for k, v in props.items() if v is not None},
"bbox": bbox}}]
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
out = []
for g in geoms:
if g.geom_type != "LineString":
continue
pts = [list(c[:2]) for c in g.coords]
seg = [pts[0]]
for i in range(1, len(pts)):
dx = abs(pts[i][0] - pts[i-1][0])
dy = abs(pts[i][1] - pts[i-1][1])
if dx > MAX_DEG or dy > MAX_DEG:
# salto artificial → emitir segmento acumulado y empezar uno nuevo
out.extend(_emit_seg(seg))
seg = [pts[i]]
else:
seg.append(pts[i])
out.extend(_emit_seg(seg))
return out
def _parse_land(enc_path: Path) -> list[dict]:
if not _HAS_GPD:
return []
out = []
for layer_name, category in _LAND_POLY_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM"))}
out.extend(_emit_geometry(geom, props))
for layer_name, category in _LAND_LINE_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM"))}
# COALNE Polygon → ignorar: LANDMASK ya cubre el área de tierra.
# COALNE LineString → partir en segmentos eliminando saltos largos (bordes
# artificiales de celda que crean líneas diagonales cruzando el océano).
if geom.geom_type in ("Polygon", "MultiPolygon"):
continue # artefacto: COALNE área → ya cubierto por LANDMASK
out.extend(_split_coalne(geom, props))
for layer_name, category in _LAND_POINT_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
pt = geom.centroid if geom.geom_type != "Point" else geom
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM")),
"height_m": _safe(row.get("HEIGHT"))}
if layer_name == "BRIDGE":
props["horclr"] = _safe(row.get("HORCLR"))
props["verclr"] = _safe(row.get("VERCLR"))
out.append({"type": "Feature",
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
"properties": {k: v for k, v in props.items() if v is not None}})
return out
# ── Navigational hazards ──────────────────────────────────────────────────────
_HAZARD_LAYERS = {"WRECKS": "wreck", "OBSTRN": "obstruction", "UWTROC": "rock"}
def _parse_hazards(enc_path: Path) -> list[dict]:
if not _HAS_GPD:
return []
out = []
for layer_name, category in _HAZARD_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
pt = geom.centroid if geom.geom_type != "Point" else geom
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM")),
"depth": _safe(row.get("VALSOU"))}
if layer_name == "WRECKS": props["catwrk"] = _safe(row.get("CATWRK"))
if layer_name == "UWTROC": props["catuwr"] = _safe(row.get("CATUWR"))
if layer_name == "OBSTRN": props["catobs"] = _safe(row.get("CATOBS"))
out.append({"type": "Feature",
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
"properties": {k: v for k, v in props.items() if v is not None}})
return out
# ── Navigation zones / areas ──────────────────────────────────────────────────
_ZONE_LAYERS = {
"RESARE": "restricted", "CTNARE": "caution", "ACHARE": "anchorage",
"TSSLPT": "traffic_lane","PRCARE": "precautionary","FAIRWY": "fairway",
"DMPGRD": "dumpground", "PIPARE": "pipeline_area",
}
def _parse_zones(enc_path: Path) -> list[dict]:
if not _HAS_GPD:
return []
out = []
for layer_name, category in _ZONE_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM")),
"info": _safe(row.get("INFORM"))}
if layer_name == "RESARE":
props["catrea"] = _safe(row.get("CATREA"))
props["restrn"] = _safe_list(row.get("RESTRN"))
if layer_name == "ACHARE":
props["catach"] = _safe(row.get("CATACH"))
out.extend(_emit_geometry(geom, props))
return out
def _ensure_meta(cell_id: str):
"""Write meta.json with auto-detected region if not already set."""
meta = get_meta(cell_id)
if "region" not in meta:
set_meta(cell_id, region=auto_region(cell_id))
def install_from_zip(zip_path: Path) -> list[str]:
if not _HAS_GPD:
raise RuntimeError("geopandas not installed — cannot parse .000 ENC files. "
"Run: pip install geopandas pyogrio")
installed = []
with zipfile.ZipFile(zip_path) as z:
enc_files = [n for n in z.namelist() if n.upper().endswith(".000")]
if not enc_files:
raise ValueError("No .000 ENC files found in ZIP")
for enc_name in enc_files:
cell_id = Path(enc_name).stem.upper()
cell_dir = CHARTS_DIR / cell_id
cell_dir.mkdir(exist_ok=True)
enc_dest = cell_dir / f"{cell_id}.000"
with z.open(enc_name) as src, open(enc_dest, "wb") as dst:
shutil.copyfileobj(src, dst)
_build_cache(cell_id, enc_dest)
_ensure_meta(cell_id)
installed.append(cell_id)
return installed
def _read_cell_id(enc_path: Path) -> str:
"""Lee el nombre de celda real del header DSID del S-57.
Si el archivo tiene nombre temporal (TMP*), usa DSID_DSNM o DSID_DSNM como fallback."""
stem = enc_path.stem.upper()
# Si el nombre ya parece un cell ID válido (no empieza con TMP), usarlo directamente
if not stem.startswith("TMP"):
return stem
# Intentar leer DSNM del header S-57 via GDAL
try:
from osgeo import gdal
ds = gdal.OpenEx(str(enc_path))
if ds:
md = ds.GetMetadata() or {}
dsnm = (md.get("DSID_DSNM") or "").strip().upper()
if dsnm and not dsnm.startswith("TMP"):
return dsnm
except Exception as e:
log.debug("Could not read DSNM from %s: %s", enc_path, e)
# Fallback: usar el stem aunque sea TMP (se mostrará en lista)
return stem
def install_from_enc(enc_path: Path, orig_name: str | None = None) -> str:
if not _HAS_GPD:
raise RuntimeError("geopandas not installed — cannot parse .000 ENC files. "
"Run: pip install geopandas pyogrio")
# Preferir el nombre original del archivo (sin extensión) como cell_id.
# Si no viene o empieza con TMP, intentar leerlo del header S-57.
if orig_name and not orig_name.upper().startswith("TMP"):
cell_id = orig_name.upper()
else:
cell_id = _read_cell_id(enc_path)
cell_dir = CHARTS_DIR / cell_id
cell_dir.mkdir(exist_ok=True)
enc_dest = cell_dir / f"{cell_id}.000"
shutil.copy2(enc_path, enc_dest)
_build_cache(cell_id, enc_dest)
_ensure_meta(cell_id)
return cell_id
def _build_cache(cell_id: str, enc_path: Path):
log.info("Parsing ENC %s …", cell_id)
cell_dir = CHARTS_DIR / cell_id
features = _parse_cell(enc_path)
with open(cell_dir / "features.geojson", "w") as f:
json.dump({"type": "FeatureCollection", "features": features}, f)
log.info("ENC %s%d aid features", cell_id, len(features))
depths = _parse_depths(enc_path)
with open(cell_dir / "depths.geojson", "w") as f:
json.dump({"type": "FeatureCollection", "features": depths}, f)
log.info("ENC %s%d depth features", cell_id, len(depths))
land = _parse_land(enc_path)
land_path = cell_dir / "land.geojson"
# Preserve existing land cache when the .000 has no LNDARE layer
# (e.g. custom charts built from CSV that only contain nav aids)
if land or not land_path.exists():
with open(land_path, "w") as f:
json.dump({"type": "FeatureCollection", "features": land}, f)
log.info("ENC %s%d land features%s", cell_id, len(land),
" (preserved existing)" if not land and land_path.exists() else "")
hazards = _parse_hazards(enc_path)
hazards_path = cell_dir / "hazards.geojson"
if hazards or not hazards_path.exists():
with open(hazards_path, "w") as f:
json.dump({"type": "FeatureCollection", "features": hazards}, f)
log.info("ENC %s%d hazard features", cell_id, len(hazards))
zones = _parse_zones(enc_path)
zones_path = cell_dir / "zones.geojson"
if zones or not zones_path.exists():
with open(zones_path, "w") as f:
json.dump({"type": "FeatureCollection", "features": zones}, f)
log.info("ENC %s%d zone features", cell_id, len(zones))
# Cache count and bbox in meta.json so list_cells() doesn't need to read features.geojson
# Solo usa features de tipo Point para el bbox — los LDLINE (LineString) tienen
# coordinates[0] = [lon, lat] (lista), no un float, y rompen min(lons).
bbox = None
try:
lons, lats = [], []
for f in features:
geom = f.get("geometry", {})
if geom.get("type") == "Point":
coords = geom.get("coordinates", [])
if len(coords) >= 2:
lons.append(float(coords[0]))
lats.append(float(coords[1]))
if lons:
bbox = [min(lons), min(lats), max(lons), max(lats)]
except Exception:
pass
meta = get_meta(cell_id)
meta["feature_count"] = len(features)
meta["bbox"] = bbox
_meta_path(cell_dir).write_text(json.dumps(meta))
def list_cells() -> list[dict]:
cells = []
for cell_dir in sorted(CHARTS_DIR.iterdir()):
if not cell_dir.is_dir():
continue
# Skip leftover temp directories from failed installs
if cell_dir.name.startswith("TMP"):
continue
enc = list(cell_dir.glob("*.000"))
cache = cell_dir / "features.geojson"
meta = get_meta(cell_dir.name)
count = meta.get("feature_count")
bbox = meta.get("bbox")
# Recompute bbox si es None (celdas instaladas con código anterior que tenía
# el bug LDLINE: coordinates[0] de un LineString es [lon,lat] no float →
# min(lons) fallaba → bbox quedaba None → celdas excluidas del viewport).
# Solo usa features Point para el bbox y persiste en meta.json para la próxima vez.
if bbox is None and cache.exists():
try:
fc = json.loads(cache.read_text())
feats = fc.get("features") or []
if count is None:
count = len(feats)
lons, lats = [], []
for f in feats:
geom = f.get("geometry", {})
if geom.get("type") == "Point":
coords = geom.get("coordinates", [])
if len(coords) >= 2:
lons.append(float(coords[0]))
lats.append(float(coords[1]))
if lons:
bbox = [min(lons), min(lats), max(lons), max(lats)]
# Persistir para que las siguientes llamadas sean rápidas
meta["bbox"] = bbox
if count is not None:
meta["feature_count"] = count
try:
_meta_path(cell_dir).write_text(json.dumps(meta))
except Exception:
pass
except Exception:
if count is None:
count = 0
if count is None:
count = 0
cells.append({
"id": cell_dir.name,
"enc_file": enc[0].name if enc else None,
"features": count or 0,
"cached": cache.exists(),
"bbox": bbox,
"region": get_region(cell_dir.name),
})
return cells
def delete_cell(cell_id: str):
cell_dir = CHARTS_DIR / cell_id.upper()
if cell_dir.exists():
shutil.rmtree(cell_dir)
def scan_and_install(directory: str | Path) -> dict:
"""
Walk *directory* (local path — e.g. an SD card drive letter like E:\\)
and install every .000 ENC cell and every .zip archive found.
Returns:
{
"installed": ["CELL1", "CELL2", ...], # successfully installed
"skipped": ["already_there.000"], # already installed (skipped)
"errors": [{"file": "...", "error": "..."}] # parse failures
}
"""
base = Path(directory)
if not base.exists():
raise FileNotFoundError(f"Path not found: {base}")
if not base.is_dir():
raise NotADirectoryError(f"Not a directory: {base}")
installed: list[str] = []
skipped: list[str] = []
errors: list[dict] = []
# Collect candidates — full recursive walk of the directory tree.
# Typical SD card layouts: ENC_ROOT/*.000, ENC_ROOT/US5NY01M/*.000,
# NOAA_ENC/US5NY01M/US5NY01M.000, etc.
candidates: list[Path] = []
for entry in base.rglob("*"):
if entry.is_file():
candidates.append(entry)
# Process .zip first (may contain multiple cells), then .000
zips = [c for c in candidates if c.suffix.lower() == ".zip"]
encs = [c for c in candidates if c.suffix.upper() == ".000"]
existing_ids = {cell_dir.name.upper()
for cell_dir in CHARTS_DIR.iterdir() if cell_dir.is_dir()}
for zp in zips:
try:
import zipfile as _zf
with _zf.ZipFile(zp) as z:
names = z.namelist()
has_csv = any(n.lower().endswith(".csv") for n in names)
has_enc = any(n.upper().endswith(".000") for n in names)
if has_csv and not has_enc:
ids = install_from_csv_zip(zp)
else:
ids = install_from_zip(zp)
installed.extend(ids)
except Exception as exc:
errors.append({"file": zp.name, "error": str(exc)})
for ep in encs:
cid = ep.stem.upper()
if cid in existing_ids:
skipped.append(ep.name)
continue
try:
cid = install_from_enc(ep, ep.stem.upper())
installed.append(cid)
except Exception as exc:
errors.append({"file": ep.name, "error": str(exc)})
return {"installed": installed, "skipped": skipped, "errors": errors}
def get_all_features() -> dict:
all_features = []
for cell_dir in CHARTS_DIR.iterdir():
cache = cell_dir / "features.geojson"
if not cache.exists():
continue
try:
fc = json.loads(cache.read_text())
except Exception:
continue
cell_id = cell_dir.name
region = get_region(cell_id)
for f in (fc.get("features") or []):
p = f.setdefault("properties", {})
p["cell"] = cell_id
p["cell_region"] = region
# Backfill aid_type for old caches that pre-date the classifier.
if "aid_type" not in p:
p["aid_type"] = classify(p.get("layer", ""), p)
all_features.extend(fc["features"])
return {"type": "FeatureCollection", "features": all_features}
def _aggregate_cache(filename: str, bbox=None) -> dict:
"""Generic aggregator: read <filename> from every installed cell."""
all_features = []
if bbox is not None:
w, s, e, n = bbox
for cell_dir in CHARTS_DIR.iterdir():
cache = cell_dir / filename
if not cache.exists():
continue
try:
fc = json.loads(cache.read_text())
except Exception:
continue
for f in (fc.get("features") or []):
p = f.setdefault("properties", {})
p["cell"] = cell_dir.name
if bbox is not None:
fb = p.get("bbox")
if fb and (fb[2] < w or fb[0] > e or fb[3] < s or fb[1] > n):
continue
all_features.append(f)
return {"type": "FeatureCollection", "features": all_features}
def get_all_depths(bbox: tuple[float, float, float, float] | None = None) -> dict:
return _aggregate_cache("depths.geojson", bbox)
def get_all_land(bbox: tuple[float, float, float, float] | None = None) -> dict:
return _aggregate_cache("land.geojson", bbox)
def get_all_hazards() -> dict:
return _aggregate_cache("hazards.geojson")
def get_all_zones(bbox: tuple[float, float, float, float] | None = None) -> dict:
return _aggregate_cache("zones.geojson", bbox)