Files
AidsMonitoring/backend/services/chart_manager.py
T

1209 lines
48 KiB
Python

"""
S-57 ENC chart manager — parse, store and serve chart features as GeoJSON.
"""
import json
import logging
import math
import shutil
import zipfile
from pathlib import Path
import geopandas as gpd
import pandas as pd
log = logging.getLogger(__name__)
CHARTS_DIR = Path(__file__).parent.parent.parent / "charts"
CHARTS_DIR.mkdir(exist_ok=True)
# S-57 layers we care about, mapped to a friendly category
S57_LAYERS = {
"LIGHTS": "light",
"BOYLAT": "buoy",
"BOYCAR": "buoy",
"BOYISD": "buoy",
"BOYSPP": "buoy",
"BOYSAW": "buoy",
"BCNLAT": "beacon",
"BCNCAR": "beacon",
"BCNISD": "beacon",
"BCNWTW": "beacon",
"LNDMRK": "landmark",
"RTPBCN": "beacon",
"LAKARE": "area", # Lake / lagoon / ciénaga area
# BUISGL is rendered as polygon footprint via _parse_land → landLayer.
# Removed from S57_LAYERS to avoid a duplicate centroid-point beacon dot.
}
# Human-readable light character codes
LITCHR = {
1:"F", 2:"Fl", 3:"LFl", 4:"Q", 5:"VQ", 6:"UQ", 7:"Iso",
8:"Oc", 9:"IQ", 10:"IVQ", 11:"IUQ", 12:"Mo", 13:"FFl",
14:"FlLFl", 15:"OcFl", 16:"FLFl", 25:"Al.Oc", 26:"Al.LFl",
27:"Al.Fl", 28:"Al.Grp",
}
COLOUR = {
1:"W", 2:"K", 3:"R", 4:"G", 5:"B", 6:"Y",
7:"Gy", 8:"Br", 9:"Amb", 10:"Vi", 11:"Or", 12:"Mg",
}
BOYSHP = {
1:"Conical(N)",2:"Can(C)",3:"Spherical",4:"Pillar",
5:"Spar",6:"Barrel",7:"Super-buoy",8:"Ice buoy",
}
# Cardinal category (CATCAM) per IHO S-57 Object Catalogue
CATCAM = {1:"N", 2:"E", 3:"S", 4:"W"}
# IALA region defaults by chart producer prefix (cell ID first 2 chars)
REGION_A_PREFIXES = {
"GB","FR","DE","NL","BE","DK","NO","SE","FI","IS","IE","ES","PT","IT",
"GR","TR","RU","UA","HR","CY","MT","EE","LV","LT","PL","RO","BG","SI",
"MA","DZ","TN","LY","EG","ZA","NG","SN","CI","KE","TZ","MZ",
"IN","PK","BD","LK","MM","TH","ID","MY","SG","VN","CN","HK","TW",
"AE","SA","OM","IR","IQ","KW","QA","BH","YE",
"AU","NZ","PG",
}
REGION_B_PREFIXES = {
"US","CA","MX","BS","CU","JM","HT","DO","PR","VI",
"BZ","GT","SV","HN","NI","CR","PA",
"CO","VE","EC","PE","CL","AR","UY","BR","GY","SR","PY","BO",
"JP","KR","KP","PH",
}
def auto_region(cell_id: str) -> str:
prefix = (cell_id or "").upper()[:2]
if prefix in REGION_A_PREFIXES: return "A"
if prefix in REGION_B_PREFIXES: return "B"
return "B"
# ── CSV → features (DIMAR / custom CSV charts) ────────────────────────────────
# Mapping from DIMAR CSV feat_type to canonical S-57 layer name.
_CSV_FEAT_TYPE_MAP = {
"BOYSPEC": "BOYSPP", # DIMAR uses BOYSPEC; IHO S-57 is BOYSPP
}
# Feature categories per layer
_CSV_CATEGORY_MAP = {
"BOYLAT": "buoy", "BOYCAR": "buoy", "BOYISD": "buoy",
"BOYSPP": "buoy", "BOYSAW": "buoy",
"BCNLAT": "beacon","BCNCAR": "beacon",
"LIGHTS": "light", "LNDMRK": "landmark",
}
def _csv_num(v):
if v is None or str(v).strip() == "":
return None
try:
return float(v)
except Exception:
return None
def _csv_int(v):
n = _csv_num(v)
return int(n) if n is not None else None
def _csv_fmt(v):
"""Format float: drop trailing .0 for whole numbers."""
try:
f = float(v)
return str(int(f)) if f == int(f) else str(f)
except Exception:
return str(v)
def _csv_light_desc(litchr, siggrp, colour, sigper, valnmr) -> str:
"""Build compact light description from CSV fields (e.g. 'Fl G 3s 3M')."""
parts = []
lc_int = _csv_int(litchr)
lc = LITCHR.get(lc_int, str(lc_int)) if lc_int is not None else ""
sg = _csv_int(siggrp)
if sg is not None:
lc = f"{lc}({sg})"
col_int = _csv_int(colour)
col_str = COLOUR.get(col_int, "") if col_int is not None else ""
if lc:
parts.append(f"{lc} {col_str}".strip())
sp = _csv_num(sigper)
if sp is not None:
parts.append(f"{_csv_fmt(sp)}s")
rng = _csv_num(valnmr)
if rng is not None:
parts.append(f"{_csv_fmt(rng)}M")
return " ".join(parts)
def _csv_infer_catcam(siggrp: str, name: str) -> str | None:
"""
Cardinal buoy quadrant from SIGGRP (reliable) or DIMAR naming convention.
Q(9)+LFl=W Q(6)+LFl=S Q(3)=E Q=N
DIMAR name suffixes: SS/VS→S SN/VN→N SE→E SO→W
"""
sg = _csv_int(siggrp)
if sg is not None:
if sg == 9: return "W"
if sg == 6: return "S"
if sg == 3: return "E"
return "N"
n = (name or "").upper()
if any(x in n for x in (" SS", "(SS)", "VS")): return "S"
if any(x in n for x in (" SE", "(SE)")): return "E"
if any(x in n for x in (" SO", "(SO)")): return "W"
return "N"
def _csv_infer_catlam(colour: str) -> int | None:
"""IALA B (Americas): green(4)=port=1, red(3)=stbd=2."""
c = _csv_int(colour)
if c == 4: return 1
if c == 3: return 2
return None
def _parse_csvs_to_features(csv_dir: Path) -> list[dict]:
"""
Read navigation-aid CSV files from csv_dir and return GeoJSON features.
Each CSV must have columns: OBJNAM, lon, lat, feat_type, LITCHR, SIGGRP,
COLOUR, SIGPER, VALNMR, HEIGHT, ORIENT, INFORM.
feat_type values: BOYLAT, BOYCAR, BOYISD, BOYSPEC/BOYSPP, BCNLAT, LIGHTS,
LNDMRK.
This function is the primary path for DIMAR/custom charts created with
QGISS57Converter; it preserves all light attributes (LITCHR, SIGPER, etc.)
that the GDAL S-57 driver may drop during the .000 round-trip.
"""
import csv as _csv_mod
features: list[dict] = []
for csv_file in sorted(csv_dir.glob("*.csv")):
with open(csv_file, newline="", encoding="utf-8") as fh:
reader = _csv_mod.DictReader(fh)
for row in reader:
feat_type = (row.get("feat_type") or csv_file.stem).strip()
layer = _CSV_FEAT_TYPE_MAP.get(feat_type, feat_type)
if not layer:
continue
lon = _csv_num(row.get("lon"))
lat = _csv_num(row.get("lat"))
if lon is None or lat is None:
continue
category = _CSV_CATEGORY_MAP.get(layer, "buoy")
litchr = row.get("LITCHR", "").strip()
siggrp = row.get("SIGGRP", "").strip()
colour = row.get("COLOUR", "").strip()
sigper = row.get("SIGPER", "").strip()
valnmr = row.get("VALNMR", "").strip()
height = row.get("HEIGHT", "").strip()
orient_r = row.get("ORIENT", "").strip()
name = row.get("OBJNAM", "").strip()
inform = row.get("INFORM", "").strip()
col_int = _csv_int(colour)
colours = [col_int] if col_int is not None else []
light_d = _csv_light_desc(litchr, siggrp, colour, sigper, valnmr)
props: dict = {
"layer": layer,
"category": category,
"name": name or None,
"info": inform or None,
"light_desc": light_d or None,
"range_nm": _csv_num(valnmr),
"height_m": _csv_num(height),
"colours": colours,
"colour_code": col_int,
}
if layer in ("BOYCAR", "BCNCAR"):
props["catcam"] = _csv_infer_catcam(siggrp, name)
if layer in ("BOYLAT", "BCNLAT"):
props["catlam"] = _csv_infer_catlam(colour)
orient_val = _csv_num(orient_r)
if orient_val is not None:
props["orient"] = orient_val
props["aid_type"] = classify(layer, props)
# Remove None values
props = {k: v for k, v in props.items() if v is not None}
features.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": props,
})
return features
def install_from_csv_dir(csv_dir: Path, cell_id: str) -> str:
"""
Create or update an AidsMonitoring chart cell from a directory of CSV files.
This is the preferred install pathway for custom (DIMAR) charts because it
preserves all S-57 attribute codes (LITCHR, SIGGRP, etc.) without loss.
csv_dir — directory containing *.csv files (BOYLAT.csv, BOYCAR.csv, etc.)
cell_id — chart cell identifier (e.g. 'BAHIA_DE_CARTAGENA')
"""
cell_id = cell_id.upper()
cell_dir = CHARTS_DIR / cell_id
cell_dir.mkdir(exist_ok=True)
features = _parse_csvs_to_features(csv_dir)
with open(cell_dir / "features.geojson", "w", encoding="utf-8") as f:
json.dump({"type": "FeatureCollection", "features": features}, f,
ensure_ascii=False)
log.info("CSV chart %s%d features", cell_id, len(features))
# Empty auxiliary caches so the frontend doesn't show stale data
for fname in ("depths.geojson", "land.geojson", "hazards.geojson", "zones.geojson"):
p = cell_dir / fname
if not p.exists():
with open(p, "w") as f:
json.dump({"type": "FeatureCollection", "features": []}, f)
# Update meta.json
bbox = None
if features:
lons = [ft["geometry"]["coordinates"][0] for ft in features]
lats = [ft["geometry"]["coordinates"][1] for ft in features]
bbox = [min(lons), min(lats), max(lons), max(lats)]
meta = get_meta(cell_id)
meta["feature_count"] = len(features)
meta["bbox"] = bbox
meta.setdefault("region", auto_region(cell_id))
_meta_path(cell_dir).write_text(json.dumps(meta))
return cell_id
def install_from_csv_zip(zip_path: Path) -> list[str]:
"""
Install one or more CSV chart cells from a ZIP archive.
Expected ZIP layout (any of these is accepted):
Option A — single cell (cell_id inferred from folder name or ZIP name):
BOYLAT.csv
BOYCAR.csv
...
Option B — cell_id in subfolder:
BAHIA_DE_CARTAGENA/BOYLAT.csv
BAHIA_DE_CARTAGENA/BOYCAR.csv
Option C — meta.json declares cell_id:
meta.json → {"cell_id": "BAHIA_DE_CARTAGENA"}
BOYLAT.csv
...
"""
import csv as _csv_mod
installed: list[str] = []
with zipfile.ZipFile(zip_path) as z:
namelist = z.namelist()
# Collect all CSV files grouped by subfolder
csv_files = [n for n in namelist if n.lower().endswith(".csv")]
if not csv_files:
raise ValueError("No CSV files found in ZIP")
# Check for meta.json
meta_cell_id: str | None = None
if "meta.json" in namelist:
try:
meta_cell_id = json.loads(z.read("meta.json")).get("cell_id")
except Exception:
pass
# Group by directory prefix
import collections
groups: dict[str, list[str]] = collections.defaultdict(list)
for name in csv_files:
prefix = str(Path(name).parent)
groups[prefix].append(name)
with tempfile.TemporaryDirectory() as tmp_root:
tmp_root_p = Path(tmp_root)
for prefix, members in groups.items():
# Determine cell_id
if meta_cell_id:
cid = meta_cell_id
elif prefix and prefix != ".":
cid = Path(prefix).name
else:
cid = Path(zip_path).stem # e.g. BAHIA_DE_CARTAGENA
# Extract CSVs to temp dir
tmp_csv = tmp_root_p / cid
tmp_csv.mkdir(exist_ok=True)
for member in members:
data = z.read(member)
(tmp_csv / Path(member).name).write_bytes(data)
# Install
result_id = install_from_csv_dir(tmp_csv, cid)
installed.append(result_id)
return installed
def _meta_path(cell_dir: Path) -> Path:
return cell_dir / "meta.json"
def get_meta(cell_id: str) -> dict:
cell_dir = CHARTS_DIR / cell_id.upper()
p = _meta_path(cell_dir)
if p.exists():
try: return json.loads(p.read_text())
except Exception: pass
return {}
def set_meta(cell_id: str, **patch) -> dict:
cell_dir = CHARTS_DIR / cell_id.upper()
if not cell_dir.exists():
raise FileNotFoundError(cell_id)
meta = get_meta(cell_id)
meta.update(patch)
_meta_path(cell_dir).write_text(json.dumps(meta))
return meta
def get_region(cell_id: str) -> str:
return get_meta(cell_id).get("region") or auto_region(cell_id)
# ── Aid classification ────────────────────────────────────────────────────
# Canonical aid types — derived from S-57 layer + attributes.
# Frontend dispatches symbol drawing on this; renders are region-aware
# only for LATERAL_* (which depend on IALA A vs B for color).
def classify(layer: str, props: dict) -> str:
catlam = props.get("catlam")
catcam = props.get("catcam")
if layer in ("BOYLAT", "BCNLAT"):
if catlam == 1: return "LATERAL_PORT"
if catlam == 2: return "LATERAL_STBD"
if catlam == 3: return "LATERAL_PREF_STBD" # preferred channel to STBD (acts port)
if catlam == 4: return "LATERAL_PREF_PORT" # preferred channel to PORT (acts stbd)
# Fallback: infer from primary colour (IALA B — Americas: green=port, red=stbd)
colour = props.get("colour_code")
if colour == 4: return "LATERAL_PORT" # green
if colour == 3: return "LATERAL_STBD" # red
return "LATERAL_UNKNOWN"
if layer in ("BOYCAR", "BCNCAR"):
if catcam == "N": return "CARDINAL_N"
if catcam == "E": return "CARDINAL_E"
if catcam == "S": return "CARDINAL_S"
if catcam == "W": return "CARDINAL_W"
return "CARDINAL_UNKNOWN"
if layer in ("BOYISD", "BCNISD"): return "ISOLATED_DANGER"
if layer == "BOYSAW": return "SAFE_WATER"
if layer == "BOYSPP": return "SPECIAL"
if layer == "LIGHTS": return "LIGHT_POINT"
if layer == "LNDMRK": return "LANDMARK"
if layer == "RTPBCN": return "RACON"
if layer.startswith("BCN"): return "BEACON_GENERIC"
if layer.startswith("BOY"): return "BUOY_GENERIC"
return "UNKNOWN"
# Topmark shape (TOPSHP)
TOPSHP = {
1:"cone_up",2:"cone_down",3:"sphere",4:"X",5:"cross",
6:"can",7:"T",8:"triangle_up",9:"triangle_down",
10:"circle",11:"rectangle",12:"board",13:"diamond",
14:"diamond_2",15:"square",16:"rectangle_2",17:"pennant",
18:"pennant_2",19:"pennant_3",20:"pennant_4",
21:"2sphere",22:"2cone_up",23:"2cone_down",
24:"cone_up_sphere",25:"2cone_base_up",26:"2cone_base_down",
27:"2cone_point",28:"besom_up",29:"besom_down",
30:"flag",31:"sphere_2",32:"2cone_base_apart",
33:"board_2",
}
def _safe_list(val) -> list:
"""Return a plain Python list from a pandas/numpy array-like value.
GDAL/pyogrio surfaces multi-valued S-57 attributes (like COLOUR) as
comma-separated STRINGS (e.g. "4" or "1,4"), not as Python lists.
Without the string branch we silently drop colour info."""
if val is None:
return []
try:
if hasattr(val, 'tolist'):
val = val.tolist()
if isinstance(val, str):
return [int(x) for x in val.split(",") if x.strip()]
if isinstance(val, (list, tuple)):
return [int(v) for v in val if v is not None]
return [int(val)]
except Exception:
return []
def _safe(val):
"""Return a plain Python scalar from any pandas/numpy value."""
if val is None:
return None
# numpy array or list with multiple elements — take first
if hasattr(val, '__len__') and not isinstance(val, str):
if len(val) == 0:
return None
val = val[0]
# numpy scalar
if hasattr(val, 'item'):
try:
val = val.item()
except Exception:
val = val.tolist()
# NaN check after extraction
try:
if isinstance(val, float) and pd.isna(val):
return None
except Exception:
pass
return val
def _light_desc(row) -> str:
parts = []
chr_code = _safe(row.get("LITCHR"))
grp = _safe(row.get("SIGGRP"))
col_list = _safe(row.get("COLOUR"))
period = _safe(row.get("SIGPER"))
rng = _safe(row.get("VALNMR"))
lc = LITCHR.get(int(chr_code), str(chr_code)) if chr_code else ""
if grp:
lc = f"{lc}({grp})"
colours = ""
if col_list:
try:
nums = [int(x) for x in str(col_list).split(",") if x.strip()]
colours = ".".join(COLOUR.get(n, "?") for n in nums)
except Exception:
pass
if lc:
parts.append(f"{lc} {colours}".strip())
if period:
parts.append(f"{period}s")
if rng:
parts.append(f"{rng}M")
return " ".join(parts)
def _parse_cell(enc_path: Path) -> list[dict]:
features = []
# ── Build land union for LDLINE clipping ─────────────────────────────────
# Leading lines must stop at the riverbank / land boundary, not extend
# indefinitely. We union all land polygons in the cell and use them to
# clip each LDLINE so it terminates where it hits the shore.
from shapely.ops import unary_union
from shapely.geometry import LineString as SLineString, Point as SPoint
_land_clip = None
for _ll in ("LNDARE", "BUAARE"):
try:
_gdf_l = gpd.read_file(str(enc_path), layer=_ll, engine="pyogrio")
if _gdf_l.empty:
continue
_gdf_l = _gdf_l.to_crs(epsg=4326)
_polys = [g for g in _gdf_l.geometry
if g is not None and not g.is_empty
and g.geom_type in ("Polygon", "MultiPolygon")]
if _polys:
_u = unary_union(_polys).buffer(0)
_land_clip = _u if _land_clip is None else _land_clip.union(_u)
except Exception:
pass
for layer_name, category in S57_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty:
continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty:
continue
# Use centroid for all geometry types
pt = geom.centroid if geom.geom_type != "Point" else geom
colours = _safe_list(row.get("COLOUR")) # e.g. [3,4] = Red+Green
props = {
"layer": layer_name,
"category": category,
"name": _safe(row.get("OBJNAM")),
"info": _safe(row.get("INFORM")),
"height_m": _safe(row.get("HEIGHT")),
"range_nm": _safe(row.get("VALNMR")),
"status": _safe(row.get("STATUS")),
"colours": colours, # list of S-57 colour codes
"colour_code": colours[0] if colours else None, # primary colour
}
# Extract light description for ALL navigational aids and lights.
# Previously only BOY* layers were included, so BCNLAT/BCNWTW/LNDMRK
# silently lost their LITCHR/SIGPER/VALNMR — fixed here.
if category in ("light", "buoy", "beacon", "landmark"):
props["light_desc"] = _light_desc(row)
if layer_name.startswith("BOY") or layer_name.startswith("BCN"):
shape = _safe(row.get("BOYSHP"))
props["boyshp"] = int(shape) if shape else None # raw code for JS
props["shape"] = BOYSHP.get(int(shape), None) if shape else None
topshp = _safe(row.get("TOPSHP"))
props["topshp"] = int(topshp) if topshp else None
if layer_name in ("BOYCAR", "BCNCAR"):
catcam = _safe(row.get("CATCAM"))
props["catcam"] = CATCAM.get(int(catcam), None) if catcam else None
if layer_name in ("BOYLAT", "BCNLAT"):
catlam = _safe(row.get("CATLAM")) # 1=port, 2=stbd, 3=pref-stbd, 4=pref-port
props["catlam"] = int(catlam) if catlam else None
# ORIENT — bearing of the leading line in True degrees
orient_raw = _safe(row.get("ORIENT"))
if orient_raw:
try:
props["orient"] = float(orient_raw)
except ValueError:
pass
props["aid_type"] = classify(layer_name, props)
features.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
"properties": {k: v for k, v in props.items() if v is not None},
})
# ── Leading-line bearing projection for lights with ORIENT ────────
# Project a line from the mark in the ORIENT direction, then clip it
# against the land union so it terminates at the riverbank/shoreline.
# A long initial line (5 NM) is used so the clip always finds the shore.
if "orient" in props:
bearing_deg = props["orient"]
bearing_rad = math.radians(bearing_deg)
lat_rad = math.radians(pt.y)
cos_lat = math.cos(lat_rad)
# Initial generous length (5 NM) — will be clipped to land boundary
PROBE_NM = 5.0
d_lat = (PROBE_NM / 60.0) * math.cos(bearing_rad)
d_lon = (PROBE_NM / 60.0) * math.sin(bearing_rad) / max(cos_lat, 1e-6)
end = [pt.x + d_lon, pt.y + d_lat]
# Densify the probe line (10 intermediate points) so the clip is accurate
n_pts = 12
ldcoords = [
[pt.x + d_lon * i / n_pts, pt.y + d_lat * i / n_pts]
for i in range(n_pts + 1)
]
# Clip against land — the leading line must not cross land
if _land_clip is not None:
try:
probe = SLineString(ldcoords)
in_water = probe.difference(_land_clip)
if in_water.is_empty:
# Mark is on land — use minimum stub
ldcoords = [[pt.x, pt.y], end]
else:
# Keep only the segment that touches the mark
start_pt = SPoint(pt.x, pt.y)
geoms = (list(in_water.geoms)
if in_water.geom_type.startswith("Multi")
else [in_water])
# Sort by distance of each segment's first point to mark
geoms.sort(key=lambda g: start_pt.distance(SPoint(g.coords[0])))
# Take the first (nearest to mark) segment
seg = geoms[0]
ldcoords = [list(c[:2]) for c in seg.coords]
except Exception:
pass # fall through to unclipped coords
features.append({
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": ldcoords,
},
"properties": {
"layer": "LDLINE",
"category": "ldline",
"aid_type": "LEADING_LINE",
"name": props.get("name", ""),
"orient": bearing_deg,
"colours": colours,
},
})
# ── Proximity merge: copy light attrs from LIGHTS → co-located buoys/beacons ──
# In S-57 a LIGHTS object and a BOYLAT object may share the same coordinates.
# The LIGHTS carries LITCHR/VALNMR/HEIGHT/COLOUR; the BOYLAT carries BOYSHP/CATLAM.
# We find the nearest LIGHTS within 50 m and backfill missing attrs on the host.
MERGE_DEG = 0.00045 # ≈ 50 m at equator (1° lat ≈ 111 km)
light_feats = [f for f in features
if f["properties"].get("layer") == "LIGHTS"
and f["geometry"]["type"] == "Point"]
struct_feats = [f for f in features
if (f["properties"].get("layer","").startswith("BOY")
or f["properties"].get("layer","").startswith("BCN"))
and f["geometry"]["type"] == "Point"]
for lf in light_feats:
lc = lf["geometry"]["coordinates"]
lp = lf["properties"]
best_host, best_dist = None, MERGE_DEG
for bf in struct_feats:
bc = bf["geometry"]["coordinates"]
d = ((lc[0] - bc[0])**2 + (lc[1] - bc[1])**2) ** 0.5
if d < best_dist:
best_dist, best_host = d, bf
if best_host:
hp = best_host["properties"]
if lp.get("light_desc") and not hp.get("light_desc"):
hp["light_desc"] = lp["light_desc"]
if lp.get("range_nm") and not hp.get("range_nm"):
hp["range_nm"] = lp["range_nm"]
if lp.get("height_m") and not hp.get("height_m"):
hp["height_m"] = lp["height_m"]
if lp.get("colours") and not hp.get("colours"):
hp["colours"] = lp["colours"]
hp["colour_code"] = lp.get("colour_code")
# Also sync orient for leading lines if the buoy didn't have it
if lp.get("orient") is not None and hp.get("orient") is None:
hp["orient"] = lp["orient"]
return features
def _read_depth_unit(enc_path: Path) -> str:
"""Read DSPM_DUNI from the S-57 dataset metadata. 1=METERS, 2=FEET, 3=FATHOMS."""
try:
from osgeo import gdal
ds = gdal.OpenEx(str(enc_path))
if ds:
md = ds.GetMetadata() or {}
duni = md.get("DSPM_DUNI") or md.get("S57_DUNI")
if duni:
return {"1": "METERS", "2": "FEET", "3": "FATHOMS"}.get(str(duni), "METERS")
except Exception as e:
log.debug("Could not read DUNI from %s: %s", enc_path, e)
return "METERS" # NOAA / IHO default
def _bbox_of_coords(coords) -> list[float]:
xs, ys = [], []
def walk(c):
if isinstance(c[0], (int, float)):
xs.append(c[0]); ys.append(c[1])
else:
for sub in c: walk(sub)
walk(coords)
return [min(xs), min(ys), max(xs), max(ys)]
def _parse_depths(enc_path: Path) -> list[dict]:
"""Extract bathymetry from S-57 cell.
SOUNDG: MultiPoint Z (sounding points; depth in geometry's Z value)
DEPCNT: LineString (contour at VALDCO meters)
DEPARE: Polygon (depth area between DRVAL1 and DRVAL2 meters)
Each feature gets a `bbox` property [minx, miny, maxx, maxy] so the
/charts/depths endpoint can do fast viewport filtering without touching
the geometries. The depth unit (METERS/FEET/FATHOMS) read from the
cell's DSPM record is attached to every feature so the frontend legend
can display it without a separate request.
"""
out = []
unit = _read_depth_unit(enc_path) # METERS / FEET / FATHOMS
# Build a UNIFIED land mask from every S-57 layer that represents
# something a depth feature shouldn't cover. NOAA puts urban islands /
# reclaimed land in BUAARE, dykes in DAMCON, dry docks in DRYDOC, etc.
# — using only LNDARE leaves visible bleed.
from shapely.ops import unary_union
from shapely.prepared import prep
LAND_LAYERS = ("LNDARE", "BUAARE", "DAMCON", "DRYDOC", "PONTON", "LNDRGN")
land_polys = []
for ll in LAND_LAYERS:
try:
gdf = gpd.read_file(str(enc_path), layer=ll, engine="pyogrio")
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
land_polys.extend(g for g in gdf.geometry
if g is not None and not g.is_empty
and g.geom_type in ("Polygon", "MultiPolygon"))
except Exception:
pass
land_union = None
land_prepared = None
if land_polys:
try:
land_union = unary_union(land_polys).buffer(0)
land_prepared = prep(land_union)
# Also emit the union as a LANDMASK feature so the frontend can
# paint land in the chart's own colour when OSM is hidden.
geoms = list(land_union.geoms) if land_union.geom_type.startswith("Multi") else [land_union]
for g in geoms:
if g.geom_type != "Polygon" or g.is_empty: continue
rings = [[list(c[:2]) for c in g.exterior.coords]]
rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors)
out.append({
"type": "Feature",
"geometry": {"type": "Polygon", "coordinates": rings},
"properties": {"layer": "LANDMASK", "bbox": _bbox_of_coords(rings)},
})
except Exception as e:
log.warning("Land union failed for %s: %s", enc_path, e)
# ── SOUNDG (drop sounding points that fall on land) ──
try:
gdf = gpd.read_file(str(enc_path), layer="SOUNDG", engine="pyogrio")
if not gdf.empty:
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
if not getattr(g, "has_z", False): continue
if land_prepared is not None and land_prepared.contains(g):
continue # sounding lies on land — skip
out.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [g.x, g.y]},
"properties": {"layer": "SOUNDG", "depth": round(g.z, 1),
"unit": unit,
"bbox": [g.x, g.y, g.x, g.y]},
})
except Exception:
pass
# ── DEPCNT (clip contour lines so they don't cross land) ──
try:
gdf = gpd.read_file(str(enc_path), layer="DEPCNT", engine="pyogrio")
if not gdf.empty:
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
depth = _safe(row.get("VALDCO"))
if land_union is not None and geom.intersects(land_union):
try: geom = geom.difference(land_union)
except Exception: pass
if geom.is_empty: continue
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
if g.geom_type != "LineString" or g.is_empty: continue
coords = [list(c[:2]) for c in g.coords]
if len(coords) < 2: continue
out.append({
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {"layer": "DEPCNT",
"depth": round(float(depth), 1) if depth is not None else None,
"unit": unit,
"bbox": _bbox_of_coords(coords)},
})
except Exception:
pass
def _emit_polygon_feature(g, props):
"""Write one Polygon (single-part) feature, with bbox + props."""
if g.geom_type != "Polygon" or g.is_empty: return
rings = [[list(c[:2]) for c in g.exterior.coords]]
rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors)
out.append({
"type": "Feature",
"geometry": {"type": "Polygon", "coordinates": rings},
"properties": {**props, "bbox": _bbox_of_coords(rings)},
})
# ── DEPARE (clipped to remove land overlap) ──
try:
gdf = gpd.read_file(str(enc_path), layer="DEPARE", engine="pyogrio")
if not gdf.empty:
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
dmin = _safe(row.get("DRVAL1"))
dmax = _safe(row.get("DRVAL2"))
if land_union is not None and geom.intersects(land_union):
try:
geom = geom.difference(land_union)
except Exception:
pass
if geom.is_empty: continue
props = {
"layer": "DEPARE",
"depth_min": round(float(dmin), 1) if dmin is not None else None,
"depth_max": round(float(dmax), 1) if dmax is not None else None,
"unit": unit,
}
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
_emit_polygon_feature(g, props)
except Exception:
pass
return out
def _emit_geometry(geom, props: dict) -> list[dict]:
"""Flatten a shapely geometry into GeoJSON Feature dicts with bbox.
Handles Point / LineString / Polygon and their Multi* variants."""
out = []
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
for g in geoms:
if g.is_empty:
continue
if g.geom_type == "Point":
coords = [g.x, g.y]
gtype = "Point"
bbox = [g.x, g.y, g.x, g.y]
elif g.geom_type == "LineString":
coords = [list(c[:2]) for c in g.coords]
gtype = "LineString"
bbox = _bbox_of_coords(coords)
elif g.geom_type == "Polygon":
rings = [[list(c[:2]) for c in g.exterior.coords]]
rings += [[list(c[:2]) for c in r.coords] for r in g.interiors]
coords = rings
gtype = "Polygon"
bbox = _bbox_of_coords(coords)
else:
continue
out.append({
"type": "Feature",
"geometry": {"type": gtype, "coordinates": coords},
"properties": {**{k: v for k, v in props.items() if v is not None},
"bbox": bbox},
})
return out
# ── Terrain / land features ───────────────────────────────────────────────────
_LAND_POLY_LAYERS = {"LNDARE": "land_area", "BUAARE": "built_up_area",
"BUISGL": "building"} # building footprints as polygons
_LAND_LINE_LAYERS = {"COALNE": "coastline"}
_LAND_POINT_LAYERS = {"BRIDGE": "bridge", "HRBFAC": "harbour",
"SILTNK": "silo_tank"}
def _parse_land(enc_path: Path) -> list[dict]:
out = []
for layer_name, category in _LAND_POLY_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM"))}
out.extend(_emit_geometry(geom, props))
for layer_name, category in _LAND_LINE_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM"))}
out.extend(_emit_geometry(geom, props))
for layer_name, category in _LAND_POINT_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
pt = geom.centroid if geom.geom_type != "Point" else geom
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM")),
"height_m": _safe(row.get("HEIGHT"))}
if layer_name == "BRIDGE":
props["horclr"] = _safe(row.get("HORCLR"))
props["verclr"] = _safe(row.get("VERCLR"))
out.append({"type": "Feature",
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
"properties": {k: v for k, v in props.items() if v is not None}})
return out
# ── Navigational hazards ──────────────────────────────────────────────────────
_HAZARD_LAYERS = {"WRECKS": "wreck", "OBSTRN": "obstruction", "UWTROC": "rock"}
def _parse_hazards(enc_path: Path) -> list[dict]:
out = []
for layer_name, category in _HAZARD_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
pt = geom.centroid if geom.geom_type != "Point" else geom
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM")),
"depth": _safe(row.get("VALSOU"))}
if layer_name == "WRECKS": props["catwrk"] = _safe(row.get("CATWRK"))
if layer_name == "UWTROC": props["catuwr"] = _safe(row.get("CATUWR"))
if layer_name == "OBSTRN": props["catobs"] = _safe(row.get("CATOBS"))
out.append({"type": "Feature",
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
"properties": {k: v for k, v in props.items() if v is not None}})
return out
# ── Navigation zones / areas ──────────────────────────────────────────────────
_ZONE_LAYERS = {
"RESARE": "restricted", "CTNARE": "caution", "ACHARE": "anchorage",
"TSSLPT": "traffic_lane","PRCARE": "precautionary","FAIRWY": "fairway",
"DMPGRD": "dumpground", "PIPARE": "pipeline_area",
}
def _parse_zones(enc_path: Path) -> list[dict]:
out = []
for layer_name, category in _ZONE_LAYERS.items():
try:
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
except Exception:
continue
if gdf.empty: continue
gdf = gdf.to_crs(epsg=4326)
for _, row in gdf.iterrows():
geom = row.geometry
if geom is None or geom.is_empty: continue
props = {"layer": layer_name, "category": category,
"name": _safe(row.get("OBJNAM")),
"info": _safe(row.get("INFORM"))}
if layer_name == "RESARE":
props["catrea"] = _safe(row.get("CATREA"))
props["restrn"] = _safe_list(row.get("RESTRN"))
if layer_name == "ACHARE":
props["catach"] = _safe(row.get("CATACH"))
out.extend(_emit_geometry(geom, props))
return out
def _ensure_meta(cell_id: str):
"""Write meta.json with auto-detected region if not already set."""
meta = get_meta(cell_id)
if "region" not in meta:
set_meta(cell_id, region=auto_region(cell_id))
def install_from_zip(zip_path: Path) -> list[str]:
installed = []
with zipfile.ZipFile(zip_path) as z:
enc_files = [n for n in z.namelist() if n.upper().endswith(".000")]
if not enc_files:
raise ValueError("No .000 ENC files found in ZIP")
for enc_name in enc_files:
cell_id = Path(enc_name).stem.upper()
cell_dir = CHARTS_DIR / cell_id
cell_dir.mkdir(exist_ok=True)
enc_dest = cell_dir / f"{cell_id}.000"
with z.open(enc_name) as src, open(enc_dest, "wb") as dst:
shutil.copyfileobj(src, dst)
_build_cache(cell_id, enc_dest)
_ensure_meta(cell_id)
installed.append(cell_id)
return installed
def _read_cell_id(enc_path: Path) -> str:
"""Lee el nombre de celda real del header DSID del S-57.
Si el archivo tiene nombre temporal (TMP*), usa DSID_DSNM o DSID_DSNM como fallback."""
stem = enc_path.stem.upper()
# Si el nombre ya parece un cell ID válido (no empieza con TMP), usarlo directamente
if not stem.startswith("TMP"):
return stem
# Intentar leer DSNM del header S-57 via GDAL
try:
from osgeo import gdal
ds = gdal.OpenEx(str(enc_path))
if ds:
md = ds.GetMetadata() or {}
dsnm = (md.get("DSID_DSNM") or "").strip().upper()
if dsnm and not dsnm.startswith("TMP"):
return dsnm
except Exception as e:
log.debug("Could not read DSNM from %s: %s", enc_path, e)
# Fallback: usar el stem aunque sea TMP (se mostrará en lista)
return stem
def install_from_enc(enc_path: Path, orig_name: str | None = None) -> str:
# Preferir el nombre original del archivo (sin extensión) como cell_id.
# Si no viene o empieza con TMP, intentar leerlo del header S-57.
if orig_name and not orig_name.upper().startswith("TMP"):
cell_id = orig_name.upper()
else:
cell_id = _read_cell_id(enc_path)
cell_dir = CHARTS_DIR / cell_id
cell_dir.mkdir(exist_ok=True)
enc_dest = cell_dir / f"{cell_id}.000"
shutil.copy2(enc_path, enc_dest)
_build_cache(cell_id, enc_dest)
_ensure_meta(cell_id)
return cell_id
def _build_cache(cell_id: str, enc_path: Path):
log.info("Parsing ENC %s …", cell_id)
cell_dir = CHARTS_DIR / cell_id
features = _parse_cell(enc_path)
with open(cell_dir / "features.geojson", "w") as f:
json.dump({"type": "FeatureCollection", "features": features}, f)
log.info("ENC %s%d aid features", cell_id, len(features))
depths = _parse_depths(enc_path)
with open(cell_dir / "depths.geojson", "w") as f:
json.dump({"type": "FeatureCollection", "features": depths}, f)
log.info("ENC %s%d depth features", cell_id, len(depths))
land = _parse_land(enc_path)
land_path = cell_dir / "land.geojson"
# Preserve existing land cache when the .000 has no LNDARE layer
# (e.g. custom charts built from CSV that only contain nav aids)
if land or not land_path.exists():
with open(land_path, "w") as f:
json.dump({"type": "FeatureCollection", "features": land}, f)
log.info("ENC %s%d land features%s", cell_id, len(land),
" (preserved existing)" if not land and land_path.exists() else "")
hazards = _parse_hazards(enc_path)
hazards_path = cell_dir / "hazards.geojson"
if hazards or not hazards_path.exists():
with open(hazards_path, "w") as f:
json.dump({"type": "FeatureCollection", "features": hazards}, f)
log.info("ENC %s%d hazard features", cell_id, len(hazards))
zones = _parse_zones(enc_path)
zones_path = cell_dir / "zones.geojson"
if zones or not zones_path.exists():
with open(zones_path, "w") as f:
json.dump({"type": "FeatureCollection", "features": zones}, f)
log.info("ENC %s%d zone features", cell_id, len(zones))
# Cache count and bbox in meta.json so list_cells() doesn't need to read features.geojson
bbox = None
if features:
try:
lons = [f["geometry"]["coordinates"][0] for f in features]
lats = [f["geometry"]["coordinates"][1] for f in features]
bbox = [min(lons), min(lats), max(lons), max(lats)]
except Exception:
pass
meta = get_meta(cell_id)
meta["feature_count"] = len(features)
meta["bbox"] = bbox
_meta_path(cell_dir).write_text(json.dumps(meta))
def list_cells() -> list[dict]:
cells = []
for cell_dir in sorted(CHARTS_DIR.iterdir()):
if not cell_dir.is_dir():
continue
# Skip leftover temp directories from failed installs
if cell_dir.name.startswith("TMP"):
continue
enc = list(cell_dir.glob("*.000"))
cache = cell_dir / "features.geojson"
meta = get_meta(cell_dir.name)
# Use cached count/bbox from meta.json (fast); fallback reads features.geojson
count = meta.get("feature_count")
bbox = meta.get("bbox")
if count is None and cache.exists():
try:
fc = json.loads(cache.read_text())
feats = fc.get("features") or []
count = len(feats)
if feats:
lons = [f["geometry"]["coordinates"][0] for f in feats]
lats = [f["geometry"]["coordinates"][1] for f in feats]
bbox = [min(lons), min(lats), max(lons), max(lats)]
except Exception:
count = 0
cells.append({
"id": cell_dir.name,
"enc_file": enc[0].name if enc else None,
"features": count or 0,
"cached": cache.exists(),
"bbox": bbox,
"region": get_region(cell_dir.name),
})
return cells
def delete_cell(cell_id: str):
cell_dir = CHARTS_DIR / cell_id.upper()
if cell_dir.exists():
shutil.rmtree(cell_dir)
def get_all_features() -> dict:
all_features = []
for cell_dir in CHARTS_DIR.iterdir():
cache = cell_dir / "features.geojson"
if not cache.exists():
continue
try:
fc = json.loads(cache.read_text())
except Exception:
continue
cell_id = cell_dir.name
region = get_region(cell_id)
for f in (fc.get("features") or []):
p = f.setdefault("properties", {})
p["cell"] = cell_id
p["cell_region"] = region
# Backfill aid_type for old caches that pre-date the classifier.
if "aid_type" not in p:
p["aid_type"] = classify(p.get("layer", ""), p)
all_features.extend(fc["features"])
return {"type": "FeatureCollection", "features": all_features}
def _aggregate_cache(filename: str, bbox=None) -> dict:
"""Generic aggregator: read <filename> from every installed cell."""
all_features = []
if bbox is not None:
w, s, e, n = bbox
for cell_dir in CHARTS_DIR.iterdir():
cache = cell_dir / filename
if not cache.exists():
continue
try:
fc = json.loads(cache.read_text())
except Exception:
continue
for f in (fc.get("features") or []):
p = f.setdefault("properties", {})
p["cell"] = cell_dir.name
if bbox is not None:
fb = p.get("bbox")
if fb and (fb[2] < w or fb[0] > e or fb[3] < s or fb[1] > n):
continue
all_features.append(f)
return {"type": "FeatureCollection", "features": all_features}
def get_all_depths(bbox: tuple[float, float, float, float] | None = None) -> dict:
return _aggregate_cache("depths.geojson", bbox)
def get_all_land(bbox: tuple[float, float, float, float] | None = None) -> dict:
return _aggregate_cache("land.geojson", bbox)
def get_all_hazards() -> dict:
return _aggregate_cache("hazards.geojson")
def get_all_zones(bbox: tuple[float, float, float, float] | None = None) -> dict:
return _aggregate_cache("zones.geojson", bbox)