293b0c45ef
frontend/js/map.js: - Reemplaza merge exacto por merge de proximidad (~50m) en loadChartFeatures para capturar pares LIGHTS/BOYLAT con coordenadas no exactamente iguales - Guard null-canvas en encStyle con fallback visible + console.warn - Mejora JS de debug: log layer/aidType cuando usa fallback backend/services/chart_manager.py: - Expande extraccion de light_desc a category buoy+beacon+landmark (antes solo BOYLAT/BOYCAR; BCNLAT/BCNWTW/LNDMRK perdian LITCHR silenciosamente)
937 lines
38 KiB
Python
937 lines
38 KiB
Python
"""
|
|
S-57 ENC chart manager — parse, store and serve chart features as GeoJSON.
|
|
"""
|
|
import json
|
|
import logging
|
|
import math
|
|
import shutil
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
import geopandas as gpd
|
|
import pandas as pd
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
CHARTS_DIR = Path(__file__).parent.parent.parent / "charts"
|
|
CHARTS_DIR.mkdir(exist_ok=True)
|
|
|
|
# S-57 layers we care about, mapped to a friendly category
|
|
S57_LAYERS = {
|
|
"LIGHTS": "light",
|
|
"BOYLAT": "buoy",
|
|
"BOYCAR": "buoy",
|
|
"BOYISD": "buoy",
|
|
"BOYSPP": "buoy",
|
|
"BOYSAW": "buoy",
|
|
"BCNLAT": "beacon",
|
|
"BCNCAR": "beacon",
|
|
"BCNISD": "beacon",
|
|
"BCNWTW": "beacon",
|
|
"LNDMRK": "landmark",
|
|
"RTPBCN": "beacon",
|
|
"LAKARE": "area", # Lake / lagoon / ciénaga area
|
|
# BUISGL is rendered as polygon footprint via _parse_land → landLayer.
|
|
# Removed from S57_LAYERS to avoid a duplicate centroid-point beacon dot.
|
|
}
|
|
|
|
# Human-readable light character codes
|
|
LITCHR = {
|
|
1:"F", 2:"Fl", 3:"LFl", 4:"Q", 5:"VQ", 6:"UQ", 7:"Iso",
|
|
8:"Oc", 9:"IQ", 10:"IVQ", 11:"IUQ", 12:"Mo", 13:"FFl",
|
|
14:"FlLFl", 15:"OcFl", 16:"FLFl", 25:"Al.Oc", 26:"Al.LFl",
|
|
27:"Al.Fl", 28:"Al.Grp",
|
|
}
|
|
|
|
COLOUR = {
|
|
1:"W", 2:"K", 3:"R", 4:"G", 5:"B", 6:"Y",
|
|
7:"Gy", 8:"Br", 9:"Amb", 10:"Vi", 11:"Or", 12:"Mg",
|
|
}
|
|
|
|
BOYSHP = {
|
|
1:"Conical(N)",2:"Can(C)",3:"Spherical",4:"Pillar",
|
|
5:"Spar",6:"Barrel",7:"Super-buoy",8:"Ice buoy",
|
|
}
|
|
|
|
# Cardinal category (CATCAM) per IHO S-57 Object Catalogue
|
|
CATCAM = {1:"N", 2:"E", 3:"S", 4:"W"}
|
|
|
|
# IALA region defaults by chart producer prefix (cell ID first 2 chars)
|
|
REGION_A_PREFIXES = {
|
|
"GB","FR","DE","NL","BE","DK","NO","SE","FI","IS","IE","ES","PT","IT",
|
|
"GR","TR","RU","UA","HR","CY","MT","EE","LV","LT","PL","RO","BG","SI",
|
|
"MA","DZ","TN","LY","EG","ZA","NG","SN","CI","KE","TZ","MZ",
|
|
"IN","PK","BD","LK","MM","TH","ID","MY","SG","VN","CN","HK","TW",
|
|
"AE","SA","OM","IR","IQ","KW","QA","BH","YE",
|
|
"AU","NZ","PG",
|
|
}
|
|
REGION_B_PREFIXES = {
|
|
"US","CA","MX","BS","CU","JM","HT","DO","PR","VI",
|
|
"BZ","GT","SV","HN","NI","CR","PA",
|
|
"CO","VE","EC","PE","CL","AR","UY","BR","GY","SR","PY","BO",
|
|
"JP","KR","KP","PH",
|
|
}
|
|
|
|
def auto_region(cell_id: str) -> str:
|
|
prefix = (cell_id or "").upper()[:2]
|
|
if prefix in REGION_A_PREFIXES: return "A"
|
|
if prefix in REGION_B_PREFIXES: return "B"
|
|
return "B"
|
|
|
|
def _meta_path(cell_dir: Path) -> Path:
|
|
return cell_dir / "meta.json"
|
|
|
|
def get_meta(cell_id: str) -> dict:
|
|
cell_dir = CHARTS_DIR / cell_id.upper()
|
|
p = _meta_path(cell_dir)
|
|
if p.exists():
|
|
try: return json.loads(p.read_text())
|
|
except Exception: pass
|
|
return {}
|
|
|
|
def set_meta(cell_id: str, **patch) -> dict:
|
|
cell_dir = CHARTS_DIR / cell_id.upper()
|
|
if not cell_dir.exists():
|
|
raise FileNotFoundError(cell_id)
|
|
meta = get_meta(cell_id)
|
|
meta.update(patch)
|
|
_meta_path(cell_dir).write_text(json.dumps(meta))
|
|
return meta
|
|
|
|
def get_region(cell_id: str) -> str:
|
|
return get_meta(cell_id).get("region") or auto_region(cell_id)
|
|
|
|
# ── Aid classification ────────────────────────────────────────────────────
|
|
# Canonical aid types — derived from S-57 layer + attributes.
|
|
# Frontend dispatches symbol drawing on this; renders are region-aware
|
|
# only for LATERAL_* (which depend on IALA A vs B for color).
|
|
def classify(layer: str, props: dict) -> str:
|
|
catlam = props.get("catlam")
|
|
catcam = props.get("catcam")
|
|
if layer in ("BOYLAT", "BCNLAT"):
|
|
if catlam == 1: return "LATERAL_PORT"
|
|
if catlam == 2: return "LATERAL_STBD"
|
|
if catlam == 3: return "LATERAL_PREF_STBD" # preferred channel to STBD (acts port)
|
|
if catlam == 4: return "LATERAL_PREF_PORT" # preferred channel to PORT (acts stbd)
|
|
# Fallback: infer from primary colour (IALA B — Americas: green=port, red=stbd)
|
|
colour = props.get("colour_code")
|
|
if colour == 4: return "LATERAL_PORT" # green
|
|
if colour == 3: return "LATERAL_STBD" # red
|
|
return "LATERAL_UNKNOWN"
|
|
if layer in ("BOYCAR", "BCNCAR"):
|
|
if catcam == "N": return "CARDINAL_N"
|
|
if catcam == "E": return "CARDINAL_E"
|
|
if catcam == "S": return "CARDINAL_S"
|
|
if catcam == "W": return "CARDINAL_W"
|
|
return "CARDINAL_UNKNOWN"
|
|
if layer in ("BOYISD", "BCNISD"): return "ISOLATED_DANGER"
|
|
if layer == "BOYSAW": return "SAFE_WATER"
|
|
if layer == "BOYSPP": return "SPECIAL"
|
|
if layer == "LIGHTS": return "LIGHT_POINT"
|
|
if layer == "LNDMRK": return "LANDMARK"
|
|
if layer == "RTPBCN": return "RACON"
|
|
if layer.startswith("BCN"): return "BEACON_GENERIC"
|
|
if layer.startswith("BOY"): return "BUOY_GENERIC"
|
|
return "UNKNOWN"
|
|
|
|
# Topmark shape (TOPSHP)
|
|
TOPSHP = {
|
|
1:"cone_up",2:"cone_down",3:"sphere",4:"X",5:"cross",
|
|
6:"can",7:"T",8:"triangle_up",9:"triangle_down",
|
|
10:"circle",11:"rectangle",12:"board",13:"diamond",
|
|
14:"diamond_2",15:"square",16:"rectangle_2",17:"pennant",
|
|
18:"pennant_2",19:"pennant_3",20:"pennant_4",
|
|
21:"2sphere",22:"2cone_up",23:"2cone_down",
|
|
24:"cone_up_sphere",25:"2cone_base_up",26:"2cone_base_down",
|
|
27:"2cone_point",28:"besom_up",29:"besom_down",
|
|
30:"flag",31:"sphere_2",32:"2cone_base_apart",
|
|
33:"board_2",
|
|
}
|
|
|
|
|
|
def _safe_list(val) -> list:
|
|
"""Return a plain Python list from a pandas/numpy array-like value.
|
|
|
|
GDAL/pyogrio surfaces multi-valued S-57 attributes (like COLOUR) as
|
|
comma-separated STRINGS (e.g. "4" or "1,4"), not as Python lists.
|
|
Without the string branch we silently drop colour info."""
|
|
if val is None:
|
|
return []
|
|
try:
|
|
if hasattr(val, 'tolist'):
|
|
val = val.tolist()
|
|
if isinstance(val, str):
|
|
return [int(x) for x in val.split(",") if x.strip()]
|
|
if isinstance(val, (list, tuple)):
|
|
return [int(v) for v in val if v is not None]
|
|
return [int(val)]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _safe(val):
|
|
"""Return a plain Python scalar from any pandas/numpy value."""
|
|
if val is None:
|
|
return None
|
|
# numpy array or list with multiple elements — take first
|
|
if hasattr(val, '__len__') and not isinstance(val, str):
|
|
if len(val) == 0:
|
|
return None
|
|
val = val[0]
|
|
# numpy scalar
|
|
if hasattr(val, 'item'):
|
|
try:
|
|
val = val.item()
|
|
except Exception:
|
|
val = val.tolist()
|
|
# NaN check after extraction
|
|
try:
|
|
if isinstance(val, float) and pd.isna(val):
|
|
return None
|
|
except Exception:
|
|
pass
|
|
return val
|
|
|
|
|
|
def _light_desc(row) -> str:
|
|
parts = []
|
|
chr_code = _safe(row.get("LITCHR"))
|
|
grp = _safe(row.get("SIGGRP"))
|
|
col_list = _safe(row.get("COLOUR"))
|
|
period = _safe(row.get("SIGPER"))
|
|
rng = _safe(row.get("VALNMR"))
|
|
|
|
lc = LITCHR.get(int(chr_code), str(chr_code)) if chr_code else ""
|
|
if grp:
|
|
lc = f"{lc}({grp})"
|
|
colours = ""
|
|
if col_list:
|
|
try:
|
|
nums = [int(x) for x in str(col_list).split(",") if x.strip()]
|
|
colours = ".".join(COLOUR.get(n, "?") for n in nums)
|
|
except Exception:
|
|
pass
|
|
if lc:
|
|
parts.append(f"{lc} {colours}".strip())
|
|
if period:
|
|
parts.append(f"{period}s")
|
|
if rng:
|
|
parts.append(f"{rng}M")
|
|
return " ".join(parts)
|
|
|
|
|
|
def _parse_cell(enc_path: Path) -> list[dict]:
|
|
features = []
|
|
|
|
# ── Build land union for LDLINE clipping ─────────────────────────────────
|
|
# Leading lines must stop at the riverbank / land boundary, not extend
|
|
# indefinitely. We union all land polygons in the cell and use them to
|
|
# clip each LDLINE so it terminates where it hits the shore.
|
|
from shapely.ops import unary_union
|
|
from shapely.geometry import LineString as SLineString, Point as SPoint
|
|
_land_clip = None
|
|
for _ll in ("LNDARE", "BUAARE"):
|
|
try:
|
|
_gdf_l = gpd.read_file(str(enc_path), layer=_ll, engine="pyogrio")
|
|
if _gdf_l.empty:
|
|
continue
|
|
_gdf_l = _gdf_l.to_crs(epsg=4326)
|
|
_polys = [g for g in _gdf_l.geometry
|
|
if g is not None and not g.is_empty
|
|
and g.geom_type in ("Polygon", "MultiPolygon")]
|
|
if _polys:
|
|
_u = unary_union(_polys).buffer(0)
|
|
_land_clip = _u if _land_clip is None else _land_clip.union(_u)
|
|
except Exception:
|
|
pass
|
|
|
|
for layer_name, category in S57_LAYERS.items():
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
|
|
except Exception:
|
|
continue
|
|
if gdf.empty:
|
|
continue
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty:
|
|
continue
|
|
# Use centroid for all geometry types
|
|
pt = geom.centroid if geom.geom_type != "Point" else geom
|
|
colours = _safe_list(row.get("COLOUR")) # e.g. [3,4] = Red+Green
|
|
props = {
|
|
"layer": layer_name,
|
|
"category": category,
|
|
"name": _safe(row.get("OBJNAM")),
|
|
"info": _safe(row.get("INFORM")),
|
|
"height_m": _safe(row.get("HEIGHT")),
|
|
"range_nm": _safe(row.get("VALNMR")),
|
|
"status": _safe(row.get("STATUS")),
|
|
"colours": colours, # list of S-57 colour codes
|
|
"colour_code": colours[0] if colours else None, # primary colour
|
|
}
|
|
# Extract light description for ALL navigational aids and lights.
|
|
# Previously only BOY* layers were included, so BCNLAT/BCNWTW/LNDMRK
|
|
# silently lost their LITCHR/SIGPER/VALNMR — fixed here.
|
|
if category in ("light", "buoy", "beacon", "landmark"):
|
|
props["light_desc"] = _light_desc(row)
|
|
if layer_name.startswith("BOY") or layer_name.startswith("BCN"):
|
|
shape = _safe(row.get("BOYSHP"))
|
|
props["boyshp"] = int(shape) if shape else None # raw code for JS
|
|
props["shape"] = BOYSHP.get(int(shape), None) if shape else None
|
|
topshp = _safe(row.get("TOPSHP"))
|
|
props["topshp"] = int(topshp) if topshp else None
|
|
if layer_name in ("BOYCAR", "BCNCAR"):
|
|
catcam = _safe(row.get("CATCAM"))
|
|
props["catcam"] = CATCAM.get(int(catcam), None) if catcam else None
|
|
if layer_name in ("BOYLAT", "BCNLAT"):
|
|
catlam = _safe(row.get("CATLAM")) # 1=port, 2=stbd, 3=pref-stbd, 4=pref-port
|
|
props["catlam"] = int(catlam) if catlam else None
|
|
# ORIENT — bearing of the leading line in True degrees
|
|
orient_raw = _safe(row.get("ORIENT"))
|
|
if orient_raw:
|
|
try:
|
|
props["orient"] = float(orient_raw)
|
|
except ValueError:
|
|
pass
|
|
props["aid_type"] = classify(layer_name, props)
|
|
features.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
|
|
"properties": {k: v for k, v in props.items() if v is not None},
|
|
})
|
|
|
|
# ── Leading-line bearing projection for lights with ORIENT ────────
|
|
# Project a line from the mark in the ORIENT direction, then clip it
|
|
# against the land union so it terminates at the riverbank/shoreline.
|
|
# A long initial line (5 NM) is used so the clip always finds the shore.
|
|
if "orient" in props:
|
|
bearing_deg = props["orient"]
|
|
bearing_rad = math.radians(bearing_deg)
|
|
lat_rad = math.radians(pt.y)
|
|
cos_lat = math.cos(lat_rad)
|
|
# Initial generous length (5 NM) — will be clipped to land boundary
|
|
PROBE_NM = 5.0
|
|
d_lat = (PROBE_NM / 60.0) * math.cos(bearing_rad)
|
|
d_lon = (PROBE_NM / 60.0) * math.sin(bearing_rad) / max(cos_lat, 1e-6)
|
|
end = [pt.x + d_lon, pt.y + d_lat]
|
|
|
|
# Densify the probe line (10 intermediate points) so the clip is accurate
|
|
n_pts = 12
|
|
ldcoords = [
|
|
[pt.x + d_lon * i / n_pts, pt.y + d_lat * i / n_pts]
|
|
for i in range(n_pts + 1)
|
|
]
|
|
|
|
# Clip against land — the leading line must not cross land
|
|
if _land_clip is not None:
|
|
try:
|
|
probe = SLineString(ldcoords)
|
|
in_water = probe.difference(_land_clip)
|
|
if in_water.is_empty:
|
|
# Mark is on land — use minimum stub
|
|
ldcoords = [[pt.x, pt.y], end]
|
|
else:
|
|
# Keep only the segment that touches the mark
|
|
start_pt = SPoint(pt.x, pt.y)
|
|
geoms = (list(in_water.geoms)
|
|
if in_water.geom_type.startswith("Multi")
|
|
else [in_water])
|
|
# Sort by distance of each segment's first point to mark
|
|
geoms.sort(key=lambda g: start_pt.distance(SPoint(g.coords[0])))
|
|
# Take the first (nearest to mark) segment
|
|
seg = geoms[0]
|
|
ldcoords = [list(c[:2]) for c in seg.coords]
|
|
except Exception:
|
|
pass # fall through to unclipped coords
|
|
|
|
features.append({
|
|
"type": "Feature",
|
|
"geometry": {
|
|
"type": "LineString",
|
|
"coordinates": ldcoords,
|
|
},
|
|
"properties": {
|
|
"layer": "LDLINE",
|
|
"category": "ldline",
|
|
"aid_type": "LEADING_LINE",
|
|
"name": props.get("name", ""),
|
|
"orient": bearing_deg,
|
|
"colours": colours,
|
|
},
|
|
})
|
|
|
|
# ── Proximity merge: copy light attrs from LIGHTS → co-located buoys/beacons ──
|
|
# In S-57 a LIGHTS object and a BOYLAT object may share the same coordinates.
|
|
# The LIGHTS carries LITCHR/VALNMR/HEIGHT/COLOUR; the BOYLAT carries BOYSHP/CATLAM.
|
|
# We find the nearest LIGHTS within 50 m and backfill missing attrs on the host.
|
|
MERGE_DEG = 0.00045 # ≈ 50 m at equator (1° lat ≈ 111 km)
|
|
light_feats = [f for f in features
|
|
if f["properties"].get("layer") == "LIGHTS"
|
|
and f["geometry"]["type"] == "Point"]
|
|
struct_feats = [f for f in features
|
|
if (f["properties"].get("layer","").startswith("BOY")
|
|
or f["properties"].get("layer","").startswith("BCN"))
|
|
and f["geometry"]["type"] == "Point"]
|
|
for lf in light_feats:
|
|
lc = lf["geometry"]["coordinates"]
|
|
lp = lf["properties"]
|
|
best_host, best_dist = None, MERGE_DEG
|
|
for bf in struct_feats:
|
|
bc = bf["geometry"]["coordinates"]
|
|
d = ((lc[0] - bc[0])**2 + (lc[1] - bc[1])**2) ** 0.5
|
|
if d < best_dist:
|
|
best_dist, best_host = d, bf
|
|
if best_host:
|
|
hp = best_host["properties"]
|
|
if lp.get("light_desc") and not hp.get("light_desc"):
|
|
hp["light_desc"] = lp["light_desc"]
|
|
if lp.get("range_nm") and not hp.get("range_nm"):
|
|
hp["range_nm"] = lp["range_nm"]
|
|
if lp.get("height_m") and not hp.get("height_m"):
|
|
hp["height_m"] = lp["height_m"]
|
|
if lp.get("colours") and not hp.get("colours"):
|
|
hp["colours"] = lp["colours"]
|
|
hp["colour_code"] = lp.get("colour_code")
|
|
# Also sync orient for leading lines if the buoy didn't have it
|
|
if lp.get("orient") is not None and hp.get("orient") is None:
|
|
hp["orient"] = lp["orient"]
|
|
|
|
return features
|
|
|
|
|
|
def _read_depth_unit(enc_path: Path) -> str:
|
|
"""Read DSPM_DUNI from the S-57 dataset metadata. 1=METERS, 2=FEET, 3=FATHOMS."""
|
|
try:
|
|
from osgeo import gdal
|
|
ds = gdal.OpenEx(str(enc_path))
|
|
if ds:
|
|
md = ds.GetMetadata() or {}
|
|
duni = md.get("DSPM_DUNI") or md.get("S57_DUNI")
|
|
if duni:
|
|
return {"1": "METERS", "2": "FEET", "3": "FATHOMS"}.get(str(duni), "METERS")
|
|
except Exception as e:
|
|
log.debug("Could not read DUNI from %s: %s", enc_path, e)
|
|
return "METERS" # NOAA / IHO default
|
|
|
|
|
|
def _bbox_of_coords(coords) -> list[float]:
|
|
xs, ys = [], []
|
|
def walk(c):
|
|
if isinstance(c[0], (int, float)):
|
|
xs.append(c[0]); ys.append(c[1])
|
|
else:
|
|
for sub in c: walk(sub)
|
|
walk(coords)
|
|
return [min(xs), min(ys), max(xs), max(ys)]
|
|
|
|
|
|
def _parse_depths(enc_path: Path) -> list[dict]:
|
|
"""Extract bathymetry from S-57 cell.
|
|
|
|
SOUNDG: MultiPoint Z (sounding points; depth in geometry's Z value)
|
|
DEPCNT: LineString (contour at VALDCO meters)
|
|
DEPARE: Polygon (depth area between DRVAL1 and DRVAL2 meters)
|
|
|
|
Each feature gets a `bbox` property [minx, miny, maxx, maxy] so the
|
|
/charts/depths endpoint can do fast viewport filtering without touching
|
|
the geometries. The depth unit (METERS/FEET/FATHOMS) read from the
|
|
cell's DSPM record is attached to every feature so the frontend legend
|
|
can display it without a separate request.
|
|
"""
|
|
out = []
|
|
unit = _read_depth_unit(enc_path) # METERS / FEET / FATHOMS
|
|
|
|
# Build a UNIFIED land mask from every S-57 layer that represents
|
|
# something a depth feature shouldn't cover. NOAA puts urban islands /
|
|
# reclaimed land in BUAARE, dykes in DAMCON, dry docks in DRYDOC, etc.
|
|
# — using only LNDARE leaves visible bleed.
|
|
from shapely.ops import unary_union
|
|
from shapely.prepared import prep
|
|
LAND_LAYERS = ("LNDARE", "BUAARE", "DAMCON", "DRYDOC", "PONTON", "LNDRGN")
|
|
land_polys = []
|
|
for ll in LAND_LAYERS:
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer=ll, engine="pyogrio")
|
|
if gdf.empty: continue
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
land_polys.extend(g for g in gdf.geometry
|
|
if g is not None and not g.is_empty
|
|
and g.geom_type in ("Polygon", "MultiPolygon"))
|
|
except Exception:
|
|
pass
|
|
land_union = None
|
|
land_prepared = None
|
|
if land_polys:
|
|
try:
|
|
land_union = unary_union(land_polys).buffer(0)
|
|
land_prepared = prep(land_union)
|
|
# Also emit the union as a LANDMASK feature so the frontend can
|
|
# paint land in the chart's own colour when OSM is hidden.
|
|
geoms = list(land_union.geoms) if land_union.geom_type.startswith("Multi") else [land_union]
|
|
for g in geoms:
|
|
if g.geom_type != "Polygon" or g.is_empty: continue
|
|
rings = [[list(c[:2]) for c in g.exterior.coords]]
|
|
rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors)
|
|
out.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "Polygon", "coordinates": rings},
|
|
"properties": {"layer": "LANDMASK", "bbox": _bbox_of_coords(rings)},
|
|
})
|
|
except Exception as e:
|
|
log.warning("Land union failed for %s: %s", enc_path, e)
|
|
|
|
# ── SOUNDG (drop sounding points that fall on land) ──
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer="SOUNDG", engine="pyogrio")
|
|
if not gdf.empty:
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
|
|
for g in geoms:
|
|
if not getattr(g, "has_z", False): continue
|
|
if land_prepared is not None and land_prepared.contains(g):
|
|
continue # sounding lies on land — skip
|
|
out.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [g.x, g.y]},
|
|
"properties": {"layer": "SOUNDG", "depth": round(g.z, 1),
|
|
"unit": unit,
|
|
"bbox": [g.x, g.y, g.x, g.y]},
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
# ── DEPCNT (clip contour lines so they don't cross land) ──
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer="DEPCNT", engine="pyogrio")
|
|
if not gdf.empty:
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
depth = _safe(row.get("VALDCO"))
|
|
if land_union is not None and geom.intersects(land_union):
|
|
try: geom = geom.difference(land_union)
|
|
except Exception: pass
|
|
if geom.is_empty: continue
|
|
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
|
|
for g in geoms:
|
|
if g.geom_type != "LineString" or g.is_empty: continue
|
|
coords = [list(c[:2]) for c in g.coords]
|
|
if len(coords) < 2: continue
|
|
out.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "LineString", "coordinates": coords},
|
|
"properties": {"layer": "DEPCNT",
|
|
"depth": round(float(depth), 1) if depth is not None else None,
|
|
"unit": unit,
|
|
"bbox": _bbox_of_coords(coords)},
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
def _emit_polygon_feature(g, props):
|
|
"""Write one Polygon (single-part) feature, with bbox + props."""
|
|
if g.geom_type != "Polygon" or g.is_empty: return
|
|
rings = [[list(c[:2]) for c in g.exterior.coords]]
|
|
rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors)
|
|
out.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "Polygon", "coordinates": rings},
|
|
"properties": {**props, "bbox": _bbox_of_coords(rings)},
|
|
})
|
|
|
|
# ── DEPARE (clipped to remove land overlap) ──
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer="DEPARE", engine="pyogrio")
|
|
if not gdf.empty:
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
dmin = _safe(row.get("DRVAL1"))
|
|
dmax = _safe(row.get("DRVAL2"))
|
|
if land_union is not None and geom.intersects(land_union):
|
|
try:
|
|
geom = geom.difference(land_union)
|
|
except Exception:
|
|
pass
|
|
if geom.is_empty: continue
|
|
props = {
|
|
"layer": "DEPARE",
|
|
"depth_min": round(float(dmin), 1) if dmin is not None else None,
|
|
"depth_max": round(float(dmax), 1) if dmax is not None else None,
|
|
"unit": unit,
|
|
}
|
|
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
|
|
for g in geoms:
|
|
_emit_polygon_feature(g, props)
|
|
except Exception:
|
|
pass
|
|
|
|
return out
|
|
|
|
|
|
def _emit_geometry(geom, props: dict) -> list[dict]:
|
|
"""Flatten a shapely geometry into GeoJSON Feature dicts with bbox.
|
|
Handles Point / LineString / Polygon and their Multi* variants."""
|
|
out = []
|
|
geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom]
|
|
for g in geoms:
|
|
if g.is_empty:
|
|
continue
|
|
if g.geom_type == "Point":
|
|
coords = [g.x, g.y]
|
|
gtype = "Point"
|
|
bbox = [g.x, g.y, g.x, g.y]
|
|
elif g.geom_type == "LineString":
|
|
coords = [list(c[:2]) for c in g.coords]
|
|
gtype = "LineString"
|
|
bbox = _bbox_of_coords(coords)
|
|
elif g.geom_type == "Polygon":
|
|
rings = [[list(c[:2]) for c in g.exterior.coords]]
|
|
rings += [[list(c[:2]) for c in r.coords] for r in g.interiors]
|
|
coords = rings
|
|
gtype = "Polygon"
|
|
bbox = _bbox_of_coords(coords)
|
|
else:
|
|
continue
|
|
out.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": gtype, "coordinates": coords},
|
|
"properties": {**{k: v for k, v in props.items() if v is not None},
|
|
"bbox": bbox},
|
|
})
|
|
return out
|
|
|
|
|
|
# ── Terrain / land features ───────────────────────────────────────────────────
|
|
_LAND_POLY_LAYERS = {"LNDARE": "land_area", "BUAARE": "built_up_area",
|
|
"BUISGL": "building"} # building footprints as polygons
|
|
_LAND_LINE_LAYERS = {"COALNE": "coastline"}
|
|
_LAND_POINT_LAYERS = {"BRIDGE": "bridge", "HRBFAC": "harbour",
|
|
"SILTNK": "silo_tank"}
|
|
|
|
def _parse_land(enc_path: Path) -> list[dict]:
|
|
out = []
|
|
for layer_name, category in _LAND_POLY_LAYERS.items():
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
|
|
except Exception:
|
|
continue
|
|
if gdf.empty: continue
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
props = {"layer": layer_name, "category": category,
|
|
"name": _safe(row.get("OBJNAM"))}
|
|
out.extend(_emit_geometry(geom, props))
|
|
for layer_name, category in _LAND_LINE_LAYERS.items():
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
|
|
except Exception:
|
|
continue
|
|
if gdf.empty: continue
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
props = {"layer": layer_name, "category": category,
|
|
"name": _safe(row.get("OBJNAM"))}
|
|
out.extend(_emit_geometry(geom, props))
|
|
for layer_name, category in _LAND_POINT_LAYERS.items():
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
|
|
except Exception:
|
|
continue
|
|
if gdf.empty: continue
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
pt = geom.centroid if geom.geom_type != "Point" else geom
|
|
props = {"layer": layer_name, "category": category,
|
|
"name": _safe(row.get("OBJNAM")),
|
|
"height_m": _safe(row.get("HEIGHT"))}
|
|
if layer_name == "BRIDGE":
|
|
props["horclr"] = _safe(row.get("HORCLR"))
|
|
props["verclr"] = _safe(row.get("VERCLR"))
|
|
out.append({"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
|
|
"properties": {k: v for k, v in props.items() if v is not None}})
|
|
return out
|
|
|
|
|
|
# ── Navigational hazards ──────────────────────────────────────────────────────
|
|
_HAZARD_LAYERS = {"WRECKS": "wreck", "OBSTRN": "obstruction", "UWTROC": "rock"}
|
|
|
|
def _parse_hazards(enc_path: Path) -> list[dict]:
|
|
out = []
|
|
for layer_name, category in _HAZARD_LAYERS.items():
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
|
|
except Exception:
|
|
continue
|
|
if gdf.empty: continue
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
pt = geom.centroid if geom.geom_type != "Point" else geom
|
|
props = {"layer": layer_name, "category": category,
|
|
"name": _safe(row.get("OBJNAM")),
|
|
"depth": _safe(row.get("VALSOU"))}
|
|
if layer_name == "WRECKS": props["catwrk"] = _safe(row.get("CATWRK"))
|
|
if layer_name == "UWTROC": props["catuwr"] = _safe(row.get("CATUWR"))
|
|
if layer_name == "OBSTRN": props["catobs"] = _safe(row.get("CATOBS"))
|
|
out.append({"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [pt.x, pt.y]},
|
|
"properties": {k: v for k, v in props.items() if v is not None}})
|
|
return out
|
|
|
|
|
|
# ── Navigation zones / areas ──────────────────────────────────────────────────
|
|
_ZONE_LAYERS = {
|
|
"RESARE": "restricted", "CTNARE": "caution", "ACHARE": "anchorage",
|
|
"TSSLPT": "traffic_lane","PRCARE": "precautionary","FAIRWY": "fairway",
|
|
"DMPGRD": "dumpground", "PIPARE": "pipeline_area",
|
|
}
|
|
|
|
def _parse_zones(enc_path: Path) -> list[dict]:
|
|
out = []
|
|
for layer_name, category in _ZONE_LAYERS.items():
|
|
try:
|
|
gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio")
|
|
except Exception:
|
|
continue
|
|
if gdf.empty: continue
|
|
gdf = gdf.to_crs(epsg=4326)
|
|
for _, row in gdf.iterrows():
|
|
geom = row.geometry
|
|
if geom is None or geom.is_empty: continue
|
|
props = {"layer": layer_name, "category": category,
|
|
"name": _safe(row.get("OBJNAM")),
|
|
"info": _safe(row.get("INFORM"))}
|
|
if layer_name == "RESARE":
|
|
props["catrea"] = _safe(row.get("CATREA"))
|
|
props["restrn"] = _safe_list(row.get("RESTRN"))
|
|
if layer_name == "ACHARE":
|
|
props["catach"] = _safe(row.get("CATACH"))
|
|
out.extend(_emit_geometry(geom, props))
|
|
return out
|
|
|
|
|
|
def _ensure_meta(cell_id: str):
|
|
"""Write meta.json with auto-detected region if not already set."""
|
|
meta = get_meta(cell_id)
|
|
if "region" not in meta:
|
|
set_meta(cell_id, region=auto_region(cell_id))
|
|
|
|
|
|
def install_from_zip(zip_path: Path) -> list[str]:
|
|
installed = []
|
|
with zipfile.ZipFile(zip_path) as z:
|
|
enc_files = [n for n in z.namelist() if n.upper().endswith(".000")]
|
|
if not enc_files:
|
|
raise ValueError("No .000 ENC files found in ZIP")
|
|
for enc_name in enc_files:
|
|
cell_id = Path(enc_name).stem.upper()
|
|
cell_dir = CHARTS_DIR / cell_id
|
|
cell_dir.mkdir(exist_ok=True)
|
|
enc_dest = cell_dir / f"{cell_id}.000"
|
|
with z.open(enc_name) as src, open(enc_dest, "wb") as dst:
|
|
shutil.copyfileobj(src, dst)
|
|
_build_cache(cell_id, enc_dest)
|
|
_ensure_meta(cell_id)
|
|
installed.append(cell_id)
|
|
return installed
|
|
|
|
|
|
def _read_cell_id(enc_path: Path) -> str:
|
|
"""Lee el nombre de celda real del header DSID del S-57.
|
|
Si el archivo tiene nombre temporal (TMP*), usa DSID_DSNM o DSID_DSNM como fallback."""
|
|
stem = enc_path.stem.upper()
|
|
# Si el nombre ya parece un cell ID válido (no empieza con TMP), usarlo directamente
|
|
if not stem.startswith("TMP"):
|
|
return stem
|
|
# Intentar leer DSNM del header S-57 via GDAL
|
|
try:
|
|
from osgeo import gdal
|
|
ds = gdal.OpenEx(str(enc_path))
|
|
if ds:
|
|
md = ds.GetMetadata() or {}
|
|
dsnm = (md.get("DSID_DSNM") or "").strip().upper()
|
|
if dsnm and not dsnm.startswith("TMP"):
|
|
return dsnm
|
|
except Exception as e:
|
|
log.debug("Could not read DSNM from %s: %s", enc_path, e)
|
|
# Fallback: usar el stem aunque sea TMP (se mostrará en lista)
|
|
return stem
|
|
|
|
|
|
def install_from_enc(enc_path: Path, orig_name: str | None = None) -> str:
|
|
# Preferir el nombre original del archivo (sin extensión) como cell_id.
|
|
# Si no viene o empieza con TMP, intentar leerlo del header S-57.
|
|
if orig_name and not orig_name.upper().startswith("TMP"):
|
|
cell_id = orig_name.upper()
|
|
else:
|
|
cell_id = _read_cell_id(enc_path)
|
|
cell_dir = CHARTS_DIR / cell_id
|
|
cell_dir.mkdir(exist_ok=True)
|
|
enc_dest = cell_dir / f"{cell_id}.000"
|
|
shutil.copy2(enc_path, enc_dest)
|
|
_build_cache(cell_id, enc_dest)
|
|
_ensure_meta(cell_id)
|
|
return cell_id
|
|
|
|
|
|
def _build_cache(cell_id: str, enc_path: Path):
|
|
log.info("Parsing ENC %s …", cell_id)
|
|
cell_dir = CHARTS_DIR / cell_id
|
|
|
|
features = _parse_cell(enc_path)
|
|
with open(cell_dir / "features.geojson", "w") as f:
|
|
json.dump({"type": "FeatureCollection", "features": features}, f)
|
|
log.info("ENC %s → %d aid features", cell_id, len(features))
|
|
|
|
depths = _parse_depths(enc_path)
|
|
with open(cell_dir / "depths.geojson", "w") as f:
|
|
json.dump({"type": "FeatureCollection", "features": depths}, f)
|
|
log.info("ENC %s → %d depth features", cell_id, len(depths))
|
|
|
|
land = _parse_land(enc_path)
|
|
with open(cell_dir / "land.geojson", "w") as f:
|
|
json.dump({"type": "FeatureCollection", "features": land}, f)
|
|
log.info("ENC %s → %d land features", cell_id, len(land))
|
|
|
|
hazards = _parse_hazards(enc_path)
|
|
with open(cell_dir / "hazards.geojson", "w") as f:
|
|
json.dump({"type": "FeatureCollection", "features": hazards}, f)
|
|
log.info("ENC %s → %d hazard features", cell_id, len(hazards))
|
|
|
|
zones = _parse_zones(enc_path)
|
|
with open(cell_dir / "zones.geojson", "w") as f:
|
|
json.dump({"type": "FeatureCollection", "features": zones}, f)
|
|
log.info("ENC %s → %d zone features", cell_id, len(zones))
|
|
|
|
# Cache count and bbox in meta.json so list_cells() doesn't need to read features.geojson
|
|
bbox = None
|
|
if features:
|
|
try:
|
|
lons = [f["geometry"]["coordinates"][0] for f in features]
|
|
lats = [f["geometry"]["coordinates"][1] for f in features]
|
|
bbox = [min(lons), min(lats), max(lons), max(lats)]
|
|
except Exception:
|
|
pass
|
|
meta = get_meta(cell_id)
|
|
meta["feature_count"] = len(features)
|
|
meta["bbox"] = bbox
|
|
_meta_path(cell_dir).write_text(json.dumps(meta))
|
|
|
|
|
|
def list_cells() -> list[dict]:
|
|
cells = []
|
|
for cell_dir in sorted(CHARTS_DIR.iterdir()):
|
|
if not cell_dir.is_dir():
|
|
continue
|
|
# Skip leftover temp directories from failed installs
|
|
if cell_dir.name.startswith("TMP"):
|
|
continue
|
|
enc = list(cell_dir.glob("*.000"))
|
|
cache = cell_dir / "features.geojson"
|
|
meta = get_meta(cell_dir.name)
|
|
# Use cached count/bbox from meta.json (fast); fallback reads features.geojson
|
|
count = meta.get("feature_count")
|
|
bbox = meta.get("bbox")
|
|
if count is None and cache.exists():
|
|
try:
|
|
fc = json.loads(cache.read_text())
|
|
feats = fc.get("features") or []
|
|
count = len(feats)
|
|
if feats:
|
|
lons = [f["geometry"]["coordinates"][0] for f in feats]
|
|
lats = [f["geometry"]["coordinates"][1] for f in feats]
|
|
bbox = [min(lons), min(lats), max(lons), max(lats)]
|
|
except Exception:
|
|
count = 0
|
|
cells.append({
|
|
"id": cell_dir.name,
|
|
"enc_file": enc[0].name if enc else None,
|
|
"features": count or 0,
|
|
"cached": cache.exists(),
|
|
"bbox": bbox,
|
|
"region": get_region(cell_dir.name),
|
|
})
|
|
return cells
|
|
|
|
|
|
def delete_cell(cell_id: str):
|
|
cell_dir = CHARTS_DIR / cell_id.upper()
|
|
if cell_dir.exists():
|
|
shutil.rmtree(cell_dir)
|
|
|
|
|
|
def get_all_features() -> dict:
|
|
all_features = []
|
|
for cell_dir in CHARTS_DIR.iterdir():
|
|
cache = cell_dir / "features.geojson"
|
|
if not cache.exists():
|
|
continue
|
|
try:
|
|
fc = json.loads(cache.read_text())
|
|
except Exception:
|
|
continue
|
|
cell_id = cell_dir.name
|
|
region = get_region(cell_id)
|
|
for f in (fc.get("features") or []):
|
|
p = f.setdefault("properties", {})
|
|
p["cell"] = cell_id
|
|
p["cell_region"] = region
|
|
# Backfill aid_type for old caches that pre-date the classifier.
|
|
if "aid_type" not in p:
|
|
p["aid_type"] = classify(p.get("layer", ""), p)
|
|
all_features.extend(fc["features"])
|
|
return {"type": "FeatureCollection", "features": all_features}
|
|
|
|
|
|
def _aggregate_cache(filename: str, bbox=None) -> dict:
|
|
"""Generic aggregator: read <filename> from every installed cell."""
|
|
all_features = []
|
|
if bbox is not None:
|
|
w, s, e, n = bbox
|
|
for cell_dir in CHARTS_DIR.iterdir():
|
|
cache = cell_dir / filename
|
|
if not cache.exists():
|
|
continue
|
|
try:
|
|
fc = json.loads(cache.read_text())
|
|
except Exception:
|
|
continue
|
|
for f in (fc.get("features") or []):
|
|
p = f.setdefault("properties", {})
|
|
p["cell"] = cell_dir.name
|
|
if bbox is not None:
|
|
fb = p.get("bbox")
|
|
if fb and (fb[2] < w or fb[0] > e or fb[3] < s or fb[1] > n):
|
|
continue
|
|
all_features.append(f)
|
|
return {"type": "FeatureCollection", "features": all_features}
|
|
|
|
|
|
def get_all_depths(bbox: tuple[float, float, float, float] | None = None) -> dict:
|
|
return _aggregate_cache("depths.geojson", bbox)
|
|
|
|
def get_all_land(bbox: tuple[float, float, float, float] | None = None) -> dict:
|
|
return _aggregate_cache("land.geojson", bbox)
|
|
|
|
def get_all_hazards() -> dict:
|
|
return _aggregate_cache("hazards.geojson")
|
|
|
|
def get_all_zones(bbox: tuple[float, float, float, float] | None = None) -> dict:
|
|
return _aggregate_cache("zones.geojson", bbox)
|