""" S-57 ENC chart manager — parse, store and serve chart features as GeoJSON. """ import json import logging import math import shutil import zipfile from pathlib import Path import geopandas as gpd import pandas as pd log = logging.getLogger(__name__) CHARTS_DIR = Path(__file__).parent.parent.parent / "charts" CHARTS_DIR.mkdir(exist_ok=True) # S-57 layers we care about, mapped to a friendly category S57_LAYERS = { "LIGHTS": "light", "BOYLAT": "buoy", "BOYCAR": "buoy", "BOYISD": "buoy", "BOYSPP": "buoy", "BOYSAW": "buoy", "BCNLAT": "beacon", "BCNCAR": "beacon", "BCNISD": "beacon", "BCNWTW": "beacon", "LNDMRK": "landmark", "RTPBCN": "beacon", "LAKARE": "area", # Lake / lagoon / ciénaga area # BUISGL is rendered as polygon footprint via _parse_land → landLayer. # Removed from S57_LAYERS to avoid a duplicate centroid-point beacon dot. } # Human-readable light character codes LITCHR = { 1:"F", 2:"Fl", 3:"LFl", 4:"Q", 5:"VQ", 6:"UQ", 7:"Iso", 8:"Oc", 9:"IQ", 10:"IVQ", 11:"IUQ", 12:"Mo", 13:"FFl", 14:"FlLFl", 15:"OcFl", 16:"FLFl", 25:"Al.Oc", 26:"Al.LFl", 27:"Al.Fl", 28:"Al.Grp", } COLOUR = { 1:"W", 2:"K", 3:"R", 4:"G", 5:"B", 6:"Y", 7:"Gy", 8:"Br", 9:"Amb", 10:"Vi", 11:"Or", 12:"Mg", } BOYSHP = { 1:"Conical(N)",2:"Can(C)",3:"Spherical",4:"Pillar", 5:"Spar",6:"Barrel",7:"Super-buoy",8:"Ice buoy", } # Cardinal category (CATCAM) per IHO S-57 Object Catalogue CATCAM = {1:"N", 2:"E", 3:"S", 4:"W"} # IALA region defaults by chart producer prefix (cell ID first 2 chars) REGION_A_PREFIXES = { "GB","FR","DE","NL","BE","DK","NO","SE","FI","IS","IE","ES","PT","IT", "GR","TR","RU","UA","HR","CY","MT","EE","LV","LT","PL","RO","BG","SI", "MA","DZ","TN","LY","EG","ZA","NG","SN","CI","KE","TZ","MZ", "IN","PK","BD","LK","MM","TH","ID","MY","SG","VN","CN","HK","TW", "AE","SA","OM","IR","IQ","KW","QA","BH","YE", "AU","NZ","PG", } REGION_B_PREFIXES = { "US","CA","MX","BS","CU","JM","HT","DO","PR","VI", "BZ","GT","SV","HN","NI","CR","PA", "CO","VE","EC","PE","CL","AR","UY","BR","GY","SR","PY","BO", "JP","KR","KP","PH", } def auto_region(cell_id: str) -> str: prefix = (cell_id or "").upper()[:2] if prefix in REGION_A_PREFIXES: return "A" if prefix in REGION_B_PREFIXES: return "B" return "B" def _meta_path(cell_dir: Path) -> Path: return cell_dir / "meta.json" def get_meta(cell_id: str) -> dict: cell_dir = CHARTS_DIR / cell_id.upper() p = _meta_path(cell_dir) if p.exists(): try: return json.loads(p.read_text()) except Exception: pass return {} def set_meta(cell_id: str, **patch) -> dict: cell_dir = CHARTS_DIR / cell_id.upper() if not cell_dir.exists(): raise FileNotFoundError(cell_id) meta = get_meta(cell_id) meta.update(patch) _meta_path(cell_dir).write_text(json.dumps(meta)) return meta def get_region(cell_id: str) -> str: return get_meta(cell_id).get("region") or auto_region(cell_id) # ── Aid classification ──────────────────────────────────────────────────── # Canonical aid types — derived from S-57 layer + attributes. # Frontend dispatches symbol drawing on this; renders are region-aware # only for LATERAL_* (which depend on IALA A vs B for color). def classify(layer: str, props: dict) -> str: catlam = props.get("catlam") catcam = props.get("catcam") if layer in ("BOYLAT", "BCNLAT"): if catlam == 1: return "LATERAL_PORT" if catlam == 2: return "LATERAL_STBD" if catlam == 3: return "LATERAL_PREF_STBD" # preferred channel to STBD (acts port) if catlam == 4: return "LATERAL_PREF_PORT" # preferred channel to PORT (acts stbd) # Fallback: infer from primary colour (IALA B — Americas: green=port, red=stbd) colour = props.get("colour_code") if colour == 4: return "LATERAL_PORT" # green if colour == 3: return "LATERAL_STBD" # red return "LATERAL_UNKNOWN" if layer in ("BOYCAR", "BCNCAR"): if catcam == "N": return "CARDINAL_N" if catcam == "E": return "CARDINAL_E" if catcam == "S": return "CARDINAL_S" if catcam == "W": return "CARDINAL_W" return "CARDINAL_UNKNOWN" if layer in ("BOYISD", "BCNISD"): return "ISOLATED_DANGER" if layer == "BOYSAW": return "SAFE_WATER" if layer == "BOYSPP": return "SPECIAL" if layer == "LIGHTS": return "LIGHT_POINT" if layer == "LNDMRK": return "LANDMARK" if layer == "RTPBCN": return "RACON" if layer.startswith("BCN"): return "BEACON_GENERIC" if layer.startswith("BOY"): return "BUOY_GENERIC" return "UNKNOWN" # Topmark shape (TOPSHP) TOPSHP = { 1:"cone_up",2:"cone_down",3:"sphere",4:"X",5:"cross", 6:"can",7:"T",8:"triangle_up",9:"triangle_down", 10:"circle",11:"rectangle",12:"board",13:"diamond", 14:"diamond_2",15:"square",16:"rectangle_2",17:"pennant", 18:"pennant_2",19:"pennant_3",20:"pennant_4", 21:"2sphere",22:"2cone_up",23:"2cone_down", 24:"cone_up_sphere",25:"2cone_base_up",26:"2cone_base_down", 27:"2cone_point",28:"besom_up",29:"besom_down", 30:"flag",31:"sphere_2",32:"2cone_base_apart", 33:"board_2", } def _safe_list(val) -> list: """Return a plain Python list from a pandas/numpy array-like value. GDAL/pyogrio surfaces multi-valued S-57 attributes (like COLOUR) as comma-separated STRINGS (e.g. "4" or "1,4"), not as Python lists. Without the string branch we silently drop colour info.""" if val is None: return [] try: if hasattr(val, 'tolist'): val = val.tolist() if isinstance(val, str): return [int(x) for x in val.split(",") if x.strip()] if isinstance(val, (list, tuple)): return [int(v) for v in val if v is not None] return [int(val)] except Exception: return [] def _safe(val): """Return a plain Python scalar from any pandas/numpy value.""" if val is None: return None # numpy array or list with multiple elements — take first if hasattr(val, '__len__') and not isinstance(val, str): if len(val) == 0: return None val = val[0] # numpy scalar if hasattr(val, 'item'): try: val = val.item() except Exception: val = val.tolist() # NaN check after extraction try: if isinstance(val, float) and pd.isna(val): return None except Exception: pass return val def _light_desc(row) -> str: parts = [] chr_code = _safe(row.get("LITCHR")) grp = _safe(row.get("SIGGRP")) col_list = _safe(row.get("COLOUR")) period = _safe(row.get("SIGPER")) rng = _safe(row.get("VALNMR")) lc = LITCHR.get(int(chr_code), str(chr_code)) if chr_code else "" if grp: lc = f"{lc}({grp})" colours = "" if col_list: try: nums = [int(x) for x in str(col_list).split(",") if x.strip()] colours = ".".join(COLOUR.get(n, "?") for n in nums) except Exception: pass if lc: parts.append(f"{lc} {colours}".strip()) if period: parts.append(f"{period}s") if rng: parts.append(f"{rng}M") return " ".join(parts) def _parse_cell(enc_path: Path) -> list[dict]: features = [] # ── Build land union for LDLINE clipping ───────────────────────────────── # Leading lines must stop at the riverbank / land boundary, not extend # indefinitely. We union all land polygons in the cell and use them to # clip each LDLINE so it terminates where it hits the shore. from shapely.ops import unary_union from shapely.geometry import LineString as SLineString, Point as SPoint _land_clip = None for _ll in ("LNDARE", "BUAARE"): try: _gdf_l = gpd.read_file(str(enc_path), layer=_ll, engine="pyogrio") if _gdf_l.empty: continue _gdf_l = _gdf_l.to_crs(epsg=4326) _polys = [g for g in _gdf_l.geometry if g is not None and not g.is_empty and g.geom_type in ("Polygon", "MultiPolygon")] if _polys: _u = unary_union(_polys).buffer(0) _land_clip = _u if _land_clip is None else _land_clip.union(_u) except Exception: pass for layer_name, category in S57_LAYERS.items(): try: gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio") except Exception: continue if gdf.empty: continue gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue # Use centroid for all geometry types pt = geom.centroid if geom.geom_type != "Point" else geom colours = _safe_list(row.get("COLOUR")) # e.g. [3,4] = Red+Green props = { "layer": layer_name, "category": category, "name": _safe(row.get("OBJNAM")), "info": _safe(row.get("INFORM")), "height_m": _safe(row.get("HEIGHT")), "range_nm": _safe(row.get("VALNMR")), "status": _safe(row.get("STATUS")), "colours": colours, # list of S-57 colour codes "colour_code": colours[0] if colours else None, # primary colour } # Extract light description for ALL navigational aids and lights. # Previously only BOY* layers were included, so BCNLAT/BCNWTW/LNDMRK # silently lost their LITCHR/SIGPER/VALNMR — fixed here. if category in ("light", "buoy", "beacon", "landmark"): props["light_desc"] = _light_desc(row) if layer_name.startswith("BOY") or layer_name.startswith("BCN"): shape = _safe(row.get("BOYSHP")) props["boyshp"] = int(shape) if shape else None # raw code for JS props["shape"] = BOYSHP.get(int(shape), None) if shape else None topshp = _safe(row.get("TOPSHP")) props["topshp"] = int(topshp) if topshp else None if layer_name in ("BOYCAR", "BCNCAR"): catcam = _safe(row.get("CATCAM")) props["catcam"] = CATCAM.get(int(catcam), None) if catcam else None if layer_name in ("BOYLAT", "BCNLAT"): catlam = _safe(row.get("CATLAM")) # 1=port, 2=stbd, 3=pref-stbd, 4=pref-port props["catlam"] = int(catlam) if catlam else None # ORIENT — bearing of the leading line in True degrees orient_raw = _safe(row.get("ORIENT")) if orient_raw: try: props["orient"] = float(orient_raw) except ValueError: pass props["aid_type"] = classify(layer_name, props) features.append({ "type": "Feature", "geometry": {"type": "Point", "coordinates": [pt.x, pt.y]}, "properties": {k: v for k, v in props.items() if v is not None}, }) # ── Leading-line bearing projection for lights with ORIENT ──────── # Project a line from the mark in the ORIENT direction, then clip it # against the land union so it terminates at the riverbank/shoreline. # A long initial line (5 NM) is used so the clip always finds the shore. if "orient" in props: bearing_deg = props["orient"] bearing_rad = math.radians(bearing_deg) lat_rad = math.radians(pt.y) cos_lat = math.cos(lat_rad) # Initial generous length (5 NM) — will be clipped to land boundary PROBE_NM = 5.0 d_lat = (PROBE_NM / 60.0) * math.cos(bearing_rad) d_lon = (PROBE_NM / 60.0) * math.sin(bearing_rad) / max(cos_lat, 1e-6) end = [pt.x + d_lon, pt.y + d_lat] # Densify the probe line (10 intermediate points) so the clip is accurate n_pts = 12 ldcoords = [ [pt.x + d_lon * i / n_pts, pt.y + d_lat * i / n_pts] for i in range(n_pts + 1) ] # Clip against land — the leading line must not cross land if _land_clip is not None: try: probe = SLineString(ldcoords) in_water = probe.difference(_land_clip) if in_water.is_empty: # Mark is on land — use minimum stub ldcoords = [[pt.x, pt.y], end] else: # Keep only the segment that touches the mark start_pt = SPoint(pt.x, pt.y) geoms = (list(in_water.geoms) if in_water.geom_type.startswith("Multi") else [in_water]) # Sort by distance of each segment's first point to mark geoms.sort(key=lambda g: start_pt.distance(SPoint(g.coords[0]))) # Take the first (nearest to mark) segment seg = geoms[0] ldcoords = [list(c[:2]) for c in seg.coords] except Exception: pass # fall through to unclipped coords features.append({ "type": "Feature", "geometry": { "type": "LineString", "coordinates": ldcoords, }, "properties": { "layer": "LDLINE", "category": "ldline", "aid_type": "LEADING_LINE", "name": props.get("name", ""), "orient": bearing_deg, "colours": colours, }, }) # ── Proximity merge: copy light attrs from LIGHTS → co-located buoys/beacons ── # In S-57 a LIGHTS object and a BOYLAT object may share the same coordinates. # The LIGHTS carries LITCHR/VALNMR/HEIGHT/COLOUR; the BOYLAT carries BOYSHP/CATLAM. # We find the nearest LIGHTS within 50 m and backfill missing attrs on the host. MERGE_DEG = 0.00045 # ≈ 50 m at equator (1° lat ≈ 111 km) light_feats = [f for f in features if f["properties"].get("layer") == "LIGHTS" and f["geometry"]["type"] == "Point"] struct_feats = [f for f in features if (f["properties"].get("layer","").startswith("BOY") or f["properties"].get("layer","").startswith("BCN")) and f["geometry"]["type"] == "Point"] for lf in light_feats: lc = lf["geometry"]["coordinates"] lp = lf["properties"] best_host, best_dist = None, MERGE_DEG for bf in struct_feats: bc = bf["geometry"]["coordinates"] d = ((lc[0] - bc[0])**2 + (lc[1] - bc[1])**2) ** 0.5 if d < best_dist: best_dist, best_host = d, bf if best_host: hp = best_host["properties"] if lp.get("light_desc") and not hp.get("light_desc"): hp["light_desc"] = lp["light_desc"] if lp.get("range_nm") and not hp.get("range_nm"): hp["range_nm"] = lp["range_nm"] if lp.get("height_m") and not hp.get("height_m"): hp["height_m"] = lp["height_m"] if lp.get("colours") and not hp.get("colours"): hp["colours"] = lp["colours"] hp["colour_code"] = lp.get("colour_code") # Also sync orient for leading lines if the buoy didn't have it if lp.get("orient") is not None and hp.get("orient") is None: hp["orient"] = lp["orient"] return features def _read_depth_unit(enc_path: Path) -> str: """Read DSPM_DUNI from the S-57 dataset metadata. 1=METERS, 2=FEET, 3=FATHOMS.""" try: from osgeo import gdal ds = gdal.OpenEx(str(enc_path)) if ds: md = ds.GetMetadata() or {} duni = md.get("DSPM_DUNI") or md.get("S57_DUNI") if duni: return {"1": "METERS", "2": "FEET", "3": "FATHOMS"}.get(str(duni), "METERS") except Exception as e: log.debug("Could not read DUNI from %s: %s", enc_path, e) return "METERS" # NOAA / IHO default def _bbox_of_coords(coords) -> list[float]: xs, ys = [], [] def walk(c): if isinstance(c[0], (int, float)): xs.append(c[0]); ys.append(c[1]) else: for sub in c: walk(sub) walk(coords) return [min(xs), min(ys), max(xs), max(ys)] def _parse_depths(enc_path: Path) -> list[dict]: """Extract bathymetry from S-57 cell. SOUNDG: MultiPoint Z (sounding points; depth in geometry's Z value) DEPCNT: LineString (contour at VALDCO meters) DEPARE: Polygon (depth area between DRVAL1 and DRVAL2 meters) Each feature gets a `bbox` property [minx, miny, maxx, maxy] so the /charts/depths endpoint can do fast viewport filtering without touching the geometries. The depth unit (METERS/FEET/FATHOMS) read from the cell's DSPM record is attached to every feature so the frontend legend can display it without a separate request. """ out = [] unit = _read_depth_unit(enc_path) # METERS / FEET / FATHOMS # Build a UNIFIED land mask from every S-57 layer that represents # something a depth feature shouldn't cover. NOAA puts urban islands / # reclaimed land in BUAARE, dykes in DAMCON, dry docks in DRYDOC, etc. # — using only LNDARE leaves visible bleed. from shapely.ops import unary_union from shapely.prepared import prep LAND_LAYERS = ("LNDARE", "BUAARE", "DAMCON", "DRYDOC", "PONTON", "LNDRGN") land_polys = [] for ll in LAND_LAYERS: try: gdf = gpd.read_file(str(enc_path), layer=ll, engine="pyogrio") if gdf.empty: continue gdf = gdf.to_crs(epsg=4326) land_polys.extend(g for g in gdf.geometry if g is not None and not g.is_empty and g.geom_type in ("Polygon", "MultiPolygon")) except Exception: pass land_union = None land_prepared = None if land_polys: try: land_union = unary_union(land_polys).buffer(0) land_prepared = prep(land_union) # Also emit the union as a LANDMASK feature so the frontend can # paint land in the chart's own colour when OSM is hidden. geoms = list(land_union.geoms) if land_union.geom_type.startswith("Multi") else [land_union] for g in geoms: if g.geom_type != "Polygon" or g.is_empty: continue rings = [[list(c[:2]) for c in g.exterior.coords]] rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors) out.append({ "type": "Feature", "geometry": {"type": "Polygon", "coordinates": rings}, "properties": {"layer": "LANDMASK", "bbox": _bbox_of_coords(rings)}, }) except Exception as e: log.warning("Land union failed for %s: %s", enc_path, e) # ── SOUNDG (drop sounding points that fall on land) ── try: gdf = gpd.read_file(str(enc_path), layer="SOUNDG", engine="pyogrio") if not gdf.empty: gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom] for g in geoms: if not getattr(g, "has_z", False): continue if land_prepared is not None and land_prepared.contains(g): continue # sounding lies on land — skip out.append({ "type": "Feature", "geometry": {"type": "Point", "coordinates": [g.x, g.y]}, "properties": {"layer": "SOUNDG", "depth": round(g.z, 1), "unit": unit, "bbox": [g.x, g.y, g.x, g.y]}, }) except Exception: pass # ── DEPCNT (clip contour lines so they don't cross land) ── try: gdf = gpd.read_file(str(enc_path), layer="DEPCNT", engine="pyogrio") if not gdf.empty: gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue depth = _safe(row.get("VALDCO")) if land_union is not None and geom.intersects(land_union): try: geom = geom.difference(land_union) except Exception: pass if geom.is_empty: continue geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom] for g in geoms: if g.geom_type != "LineString" or g.is_empty: continue coords = [list(c[:2]) for c in g.coords] if len(coords) < 2: continue out.append({ "type": "Feature", "geometry": {"type": "LineString", "coordinates": coords}, "properties": {"layer": "DEPCNT", "depth": round(float(depth), 1) if depth is not None else None, "unit": unit, "bbox": _bbox_of_coords(coords)}, }) except Exception: pass def _emit_polygon_feature(g, props): """Write one Polygon (single-part) feature, with bbox + props.""" if g.geom_type != "Polygon" or g.is_empty: return rings = [[list(c[:2]) for c in g.exterior.coords]] rings.extend([list(c[:2]) for c in r.coords] for r in g.interiors) out.append({ "type": "Feature", "geometry": {"type": "Polygon", "coordinates": rings}, "properties": {**props, "bbox": _bbox_of_coords(rings)}, }) # ── DEPARE (clipped to remove land overlap) ── try: gdf = gpd.read_file(str(enc_path), layer="DEPARE", engine="pyogrio") if not gdf.empty: gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue dmin = _safe(row.get("DRVAL1")) dmax = _safe(row.get("DRVAL2")) if land_union is not None and geom.intersects(land_union): try: geom = geom.difference(land_union) except Exception: pass if geom.is_empty: continue props = { "layer": "DEPARE", "depth_min": round(float(dmin), 1) if dmin is not None else None, "depth_max": round(float(dmax), 1) if dmax is not None else None, "unit": unit, } geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom] for g in geoms: _emit_polygon_feature(g, props) except Exception: pass return out def _emit_geometry(geom, props: dict) -> list[dict]: """Flatten a shapely geometry into GeoJSON Feature dicts with bbox. Handles Point / LineString / Polygon and their Multi* variants.""" out = [] geoms = list(geom.geoms) if geom.geom_type.startswith("Multi") else [geom] for g in geoms: if g.is_empty: continue if g.geom_type == "Point": coords = [g.x, g.y] gtype = "Point" bbox = [g.x, g.y, g.x, g.y] elif g.geom_type == "LineString": coords = [list(c[:2]) for c in g.coords] gtype = "LineString" bbox = _bbox_of_coords(coords) elif g.geom_type == "Polygon": rings = [[list(c[:2]) for c in g.exterior.coords]] rings += [[list(c[:2]) for c in r.coords] for r in g.interiors] coords = rings gtype = "Polygon" bbox = _bbox_of_coords(coords) else: continue out.append({ "type": "Feature", "geometry": {"type": gtype, "coordinates": coords}, "properties": {**{k: v for k, v in props.items() if v is not None}, "bbox": bbox}, }) return out # ── Terrain / land features ─────────────────────────────────────────────────── _LAND_POLY_LAYERS = {"LNDARE": "land_area", "BUAARE": "built_up_area", "BUISGL": "building"} # building footprints as polygons _LAND_LINE_LAYERS = {"COALNE": "coastline"} _LAND_POINT_LAYERS = {"BRIDGE": "bridge", "HRBFAC": "harbour", "SILTNK": "silo_tank"} def _parse_land(enc_path: Path) -> list[dict]: out = [] for layer_name, category in _LAND_POLY_LAYERS.items(): try: gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio") except Exception: continue if gdf.empty: continue gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue props = {"layer": layer_name, "category": category, "name": _safe(row.get("OBJNAM"))} out.extend(_emit_geometry(geom, props)) for layer_name, category in _LAND_LINE_LAYERS.items(): try: gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio") except Exception: continue if gdf.empty: continue gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue props = {"layer": layer_name, "category": category, "name": _safe(row.get("OBJNAM"))} out.extend(_emit_geometry(geom, props)) for layer_name, category in _LAND_POINT_LAYERS.items(): try: gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio") except Exception: continue if gdf.empty: continue gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue pt = geom.centroid if geom.geom_type != "Point" else geom props = {"layer": layer_name, "category": category, "name": _safe(row.get("OBJNAM")), "height_m": _safe(row.get("HEIGHT"))} if layer_name == "BRIDGE": props["horclr"] = _safe(row.get("HORCLR")) props["verclr"] = _safe(row.get("VERCLR")) out.append({"type": "Feature", "geometry": {"type": "Point", "coordinates": [pt.x, pt.y]}, "properties": {k: v for k, v in props.items() if v is not None}}) return out # ── Navigational hazards ────────────────────────────────────────────────────── _HAZARD_LAYERS = {"WRECKS": "wreck", "OBSTRN": "obstruction", "UWTROC": "rock"} def _parse_hazards(enc_path: Path) -> list[dict]: out = [] for layer_name, category in _HAZARD_LAYERS.items(): try: gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio") except Exception: continue if gdf.empty: continue gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue pt = geom.centroid if geom.geom_type != "Point" else geom props = {"layer": layer_name, "category": category, "name": _safe(row.get("OBJNAM")), "depth": _safe(row.get("VALSOU"))} if layer_name == "WRECKS": props["catwrk"] = _safe(row.get("CATWRK")) if layer_name == "UWTROC": props["catuwr"] = _safe(row.get("CATUWR")) if layer_name == "OBSTRN": props["catobs"] = _safe(row.get("CATOBS")) out.append({"type": "Feature", "geometry": {"type": "Point", "coordinates": [pt.x, pt.y]}, "properties": {k: v for k, v in props.items() if v is not None}}) return out # ── Navigation zones / areas ────────────────────────────────────────────────── _ZONE_LAYERS = { "RESARE": "restricted", "CTNARE": "caution", "ACHARE": "anchorage", "TSSLPT": "traffic_lane","PRCARE": "precautionary","FAIRWY": "fairway", "DMPGRD": "dumpground", "PIPARE": "pipeline_area", } def _parse_zones(enc_path: Path) -> list[dict]: out = [] for layer_name, category in _ZONE_LAYERS.items(): try: gdf = gpd.read_file(str(enc_path), layer=layer_name, engine="pyogrio") except Exception: continue if gdf.empty: continue gdf = gdf.to_crs(epsg=4326) for _, row in gdf.iterrows(): geom = row.geometry if geom is None or geom.is_empty: continue props = {"layer": layer_name, "category": category, "name": _safe(row.get("OBJNAM")), "info": _safe(row.get("INFORM"))} if layer_name == "RESARE": props["catrea"] = _safe(row.get("CATREA")) props["restrn"] = _safe_list(row.get("RESTRN")) if layer_name == "ACHARE": props["catach"] = _safe(row.get("CATACH")) out.extend(_emit_geometry(geom, props)) return out def _ensure_meta(cell_id: str): """Write meta.json with auto-detected region if not already set.""" meta = get_meta(cell_id) if "region" not in meta: set_meta(cell_id, region=auto_region(cell_id)) def install_from_zip(zip_path: Path) -> list[str]: installed = [] with zipfile.ZipFile(zip_path) as z: enc_files = [n for n in z.namelist() if n.upper().endswith(".000")] if not enc_files: raise ValueError("No .000 ENC files found in ZIP") for enc_name in enc_files: cell_id = Path(enc_name).stem.upper() cell_dir = CHARTS_DIR / cell_id cell_dir.mkdir(exist_ok=True) enc_dest = cell_dir / f"{cell_id}.000" with z.open(enc_name) as src, open(enc_dest, "wb") as dst: shutil.copyfileobj(src, dst) _build_cache(cell_id, enc_dest) _ensure_meta(cell_id) installed.append(cell_id) return installed def _read_cell_id(enc_path: Path) -> str: """Lee el nombre de celda real del header DSID del S-57. Si el archivo tiene nombre temporal (TMP*), usa DSID_DSNM o DSID_DSNM como fallback.""" stem = enc_path.stem.upper() # Si el nombre ya parece un cell ID válido (no empieza con TMP), usarlo directamente if not stem.startswith("TMP"): return stem # Intentar leer DSNM del header S-57 via GDAL try: from osgeo import gdal ds = gdal.OpenEx(str(enc_path)) if ds: md = ds.GetMetadata() or {} dsnm = (md.get("DSID_DSNM") or "").strip().upper() if dsnm and not dsnm.startswith("TMP"): return dsnm except Exception as e: log.debug("Could not read DSNM from %s: %s", enc_path, e) # Fallback: usar el stem aunque sea TMP (se mostrará en lista) return stem def install_from_enc(enc_path: Path, orig_name: str | None = None) -> str: # Preferir el nombre original del archivo (sin extensión) como cell_id. # Si no viene o empieza con TMP, intentar leerlo del header S-57. if orig_name and not orig_name.upper().startswith("TMP"): cell_id = orig_name.upper() else: cell_id = _read_cell_id(enc_path) cell_dir = CHARTS_DIR / cell_id cell_dir.mkdir(exist_ok=True) enc_dest = cell_dir / f"{cell_id}.000" shutil.copy2(enc_path, enc_dest) _build_cache(cell_id, enc_dest) _ensure_meta(cell_id) return cell_id def _build_cache(cell_id: str, enc_path: Path): log.info("Parsing ENC %s …", cell_id) cell_dir = CHARTS_DIR / cell_id features = _parse_cell(enc_path) with open(cell_dir / "features.geojson", "w") as f: json.dump({"type": "FeatureCollection", "features": features}, f) log.info("ENC %s → %d aid features", cell_id, len(features)) depths = _parse_depths(enc_path) with open(cell_dir / "depths.geojson", "w") as f: json.dump({"type": "FeatureCollection", "features": depths}, f) log.info("ENC %s → %d depth features", cell_id, len(depths)) land = _parse_land(enc_path) land_path = cell_dir / "land.geojson" # Preserve existing land cache when the .000 has no LNDARE layer # (e.g. custom charts built from CSV that only contain nav aids) if land or not land_path.exists(): with open(land_path, "w") as f: json.dump({"type": "FeatureCollection", "features": land}, f) log.info("ENC %s → %d land features%s", cell_id, len(land), " (preserved existing)" if not land and land_path.exists() else "") hazards = _parse_hazards(enc_path) hazards_path = cell_dir / "hazards.geojson" if hazards or not hazards_path.exists(): with open(hazards_path, "w") as f: json.dump({"type": "FeatureCollection", "features": hazards}, f) log.info("ENC %s → %d hazard features", cell_id, len(hazards)) zones = _parse_zones(enc_path) zones_path = cell_dir / "zones.geojson" if zones or not zones_path.exists(): with open(zones_path, "w") as f: json.dump({"type": "FeatureCollection", "features": zones}, f) log.info("ENC %s → %d zone features", cell_id, len(zones)) # Cache count and bbox in meta.json so list_cells() doesn't need to read features.geojson bbox = None if features: try: lons = [f["geometry"]["coordinates"][0] for f in features] lats = [f["geometry"]["coordinates"][1] for f in features] bbox = [min(lons), min(lats), max(lons), max(lats)] except Exception: pass meta = get_meta(cell_id) meta["feature_count"] = len(features) meta["bbox"] = bbox _meta_path(cell_dir).write_text(json.dumps(meta)) def list_cells() -> list[dict]: cells = [] for cell_dir in sorted(CHARTS_DIR.iterdir()): if not cell_dir.is_dir(): continue # Skip leftover temp directories from failed installs if cell_dir.name.startswith("TMP"): continue enc = list(cell_dir.glob("*.000")) cache = cell_dir / "features.geojson" meta = get_meta(cell_dir.name) # Use cached count/bbox from meta.json (fast); fallback reads features.geojson count = meta.get("feature_count") bbox = meta.get("bbox") if count is None and cache.exists(): try: fc = json.loads(cache.read_text()) feats = fc.get("features") or [] count = len(feats) if feats: lons = [f["geometry"]["coordinates"][0] for f in feats] lats = [f["geometry"]["coordinates"][1] for f in feats] bbox = [min(lons), min(lats), max(lons), max(lats)] except Exception: count = 0 cells.append({ "id": cell_dir.name, "enc_file": enc[0].name if enc else None, "features": count or 0, "cached": cache.exists(), "bbox": bbox, "region": get_region(cell_dir.name), }) return cells def delete_cell(cell_id: str): cell_dir = CHARTS_DIR / cell_id.upper() if cell_dir.exists(): shutil.rmtree(cell_dir) def get_all_features() -> dict: all_features = [] for cell_dir in CHARTS_DIR.iterdir(): cache = cell_dir / "features.geojson" if not cache.exists(): continue try: fc = json.loads(cache.read_text()) except Exception: continue cell_id = cell_dir.name region = get_region(cell_id) for f in (fc.get("features") or []): p = f.setdefault("properties", {}) p["cell"] = cell_id p["cell_region"] = region # Backfill aid_type for old caches that pre-date the classifier. if "aid_type" not in p: p["aid_type"] = classify(p.get("layer", ""), p) all_features.extend(fc["features"]) return {"type": "FeatureCollection", "features": all_features} def _aggregate_cache(filename: str, bbox=None) -> dict: """Generic aggregator: read from every installed cell.""" all_features = [] if bbox is not None: w, s, e, n = bbox for cell_dir in CHARTS_DIR.iterdir(): cache = cell_dir / filename if not cache.exists(): continue try: fc = json.loads(cache.read_text()) except Exception: continue for f in (fc.get("features") or []): p = f.setdefault("properties", {}) p["cell"] = cell_dir.name if bbox is not None: fb = p.get("bbox") if fb and (fb[2] < w or fb[0] > e or fb[3] < s or fb[1] > n): continue all_features.append(f) return {"type": "FeatureCollection", "features": all_features} def get_all_depths(bbox: tuple[float, float, float, float] | None = None) -> dict: return _aggregate_cache("depths.geojson", bbox) def get_all_land(bbox: tuple[float, float, float, float] | None = None) -> dict: return _aggregate_cache("land.geojson", bbox) def get_all_hazards() -> dict: return _aggregate_cache("hazards.geojson") def get_all_zones(bbox: tuple[float, float, float, float] | None = None) -> dict: return _aggregate_cache("zones.geojson", bbox)