Initial commit — QGIS S-57 Converter

This commit is contained in:
2026-05-04 23:03:19 -04:00
commit eb12a58cb7
41 changed files with 8896 additions and 0 deletions
+881
View File
@@ -0,0 +1,881 @@
#!/usr/bin/env python3
"""
QGIS -> S-57 ENC Converter
Zero external dependencies for geometry: reads SHP/DBF using struct only.
pyproj is used for reprojection (optional — falls back to passthrough if missing).
Usage:
python converter.py myproject.qgs
python converter.py myproject.qgz --output my_chart.000
python converter.py myproject.qgs --list
python converter.py myproject.qgs --config cell_config.json
"""
import argparse
import json
import math
import struct
import sys
import zipfile
import xml.etree.ElementTree as ET
from pathlib import Path
from datetime import datetime
SCRIPT_DIR = Path(__file__).parent
sys.path.insert(0, str(SCRIPT_DIR))
from s57_writer import (
S57Cell,
OBJL_BY_ACRONYM, ATTR_CODE,
OBJL_LIGHTS,
ATTL_CATCOV, ATTL_VALSOU, ATTL_LITCHR, ATTL_COLOUR,
)
try:
from pyproj import CRS, Transformer
PYPROJ_AVAILABLE = True
except Exception:
PYPROJ_AVAILABLE = False
# ── SHP shape-type sets ───────────────────────────────────────────────────────
_PT = {1, 11, 21}
_MPT = {8, 18, 28}
_LINE = {3, 13, 23}
_POLY = {5, 15, 25}
# ── minimal DBF reader (stdlib only) ─────────────────────────────────────────
def _read_dbf(dbf_path: Path):
"""Return (field_names, list_of_dicts) from a dBASE III .dbf file."""
for enc in ("utf-8", "latin-1", "cp1252"):
try:
with open(dbf_path, "rb") as f:
f.read(4) # version + date
nrec = struct.unpack("<I", f.read(4))[0]
hdrsz = struct.unpack("<H", f.read(2))[0]
recsz = struct.unpack("<H", f.read(2))[0]
f.read(20) # reserved
fields = []
while True:
raw = f.read(32)
if not raw or raw[0] == 0x0D:
break
name = raw[:11].rstrip(b"\x00").decode("ascii", errors="replace")
flen = raw[16]
fields.append((name, flen))
f.seek(hdrsz)
rows = []
for _ in range(nrec):
rec = f.read(recsz)
if not rec:
break
if rec[0] == 0x2A: # deleted
continue
row = {}
off = 1
for fname, flen in fields:
raw_val = rec[off:off+flen]
try:
val = raw_val.decode(enc).strip()
except UnicodeDecodeError:
val = raw_val.decode("latin-1").strip()
if val:
row[fname.lower()] = val
off += flen
rows.append(row)
return [f[0].lower() for f in fields], rows
except UnicodeDecodeError:
continue
return [], []
# ── minimal SHP reader (stdlib only) ─────────────────────────────────────────
def _read_shp(shp_path: Path):
"""Yield (shape_type, points, parts) from a .shp file.
points: list of (x, y) tuples
parts: list of part start indices (for Polyline/Polygon)
"""
with open(shp_path, "rb") as f:
f.read(100) # skip file header
while True:
hdr = f.read(8)
if len(hdr) < 8:
break
content_len = struct.unpack(">ii", hdr)[1] * 2
content = f.read(content_len)
if len(content) < 4:
break
stype = struct.unpack_from("<i", content, 0)[0]
if stype == 0:
yield stype, [], []
continue
if stype in _PT:
x, y = struct.unpack_from("<dd", content, 4)
yield stype, [(x, y)], []
elif stype in _MPT:
npts = struct.unpack_from("<i", content, 36)[0]
pts = [struct.unpack_from("<dd", content, 40 + i*16)
for i in range(npts)]
yield stype, pts, []
elif stype in _LINE | _POLY:
nparts = struct.unpack_from("<i", content, 36)[0]
npts = struct.unpack_from("<i", content, 40)[0]
parts = [struct.unpack_from("<i", content, 44 + i*4)[0]
for i in range(nparts)]
off = 44 + nparts * 4
pts = [struct.unpack_from("<dd", content, off + i*16)
for i in range(npts)]
yield stype, pts, parts
# ── signed area (Shoelace) — ESRI outer rings are CW → negative ──────────────
def _signed_area(pts):
n = len(pts)
return sum(
pts[i][0] * pts[(i+1)%n][1] - pts[(i+1)%n][0] * pts[i][1]
for i in range(n)
) / 2.0
# ── Geometry simplification ───────────────────────────────────────────────────
# ISO 8211 leader field = 5 digits → DR max 99,999 bytes.
# SG2D: each coord pair = 8 bytes (2 × int32). Max ~12,000 pairs per record.
# For safety, keep max 8,000 vertices per ring/line with RDP simplification.
_MAX_VERTICES = 8000 # hard limit per ring or line feature
_RDP_TOL_DEG = 1e-5 # ~1 m at equator — safe for 1:50 000 charts
def _perp_dist_sq(p, a, b):
"""Squared perpendicular distance from p to segment a→b (2-D, degrees)."""
ax, ay = a; bx, by = b; px, py = p
dx, dy = bx - ax, by - ay
if dx == 0 and dy == 0:
return (px - ax) ** 2 + (py - ay) ** 2
t = ((px - ax) * dx + (py - ay) * dy) / (dx * dx + dy * dy)
t = max(0.0, min(1.0, t))
return (px - ax - t * dx) ** 2 + (py - ay - t * dy) ** 2
def _rdp(pts, tol_sq):
"""Ramer-Douglas-Peucker (iterative stack version, no recursion limit)."""
if len(pts) <= 2:
return list(pts)
keep = [False] * len(pts)
keep[0] = keep[-1] = True
stack = [(0, len(pts) - 1)]
while stack:
s, e = stack.pop()
if e - s < 2:
continue
max_d, max_i = 0.0, s
for i in range(s + 1, e):
d = _perp_dist_sq(pts[i], pts[s], pts[e])
if d > max_d:
max_d, max_i = d, i
if max_d > tol_sq:
keep[max_i] = True
stack.append((s, max_i))
stack.append((max_i, e))
return [p for p, k in zip(pts, keep) if k]
def _simplify_coords(coords, max_verts=_MAX_VERTICES, tol_deg=_RDP_TOL_DEG):
"""
Simplify a coordinate list so it fits within the ISO 8211 record limit.
1) Apply RDP at tol_deg.
2) If still > max_verts, apply RDP at escalating tolerance until fits.
"""
if len(coords) <= max_verts:
return coords
tol_sq = tol_deg ** 2
result = _rdp(coords, tol_sq)
# Escalate tolerance if still too many
factor = 10.0
while len(result) > max_verts and factor < 1e6:
result = _rdp(coords, (tol_deg * factor) ** 2)
factor *= 10.0
# Last resort: uniform decimation
if len(result) > max_verts:
step = math.ceil(len(result) / max_verts)
result = result[::step]
if result[-1] != coords[-1]:
result.append(coords[-1]) # keep last point
n_in, n_out = len(coords), len(result)
if n_out < n_in:
print(f" [simplify] {n_in} -> {n_out} vertices (tol ~{tol_deg*factor:.6f}°)")
return result
# ── S-57 object catalog ───────────────────────────────────────────────────────
def load_s57_objects():
path = SCRIPT_DIR / "s57_objects.json"
if path.exists():
with open(path, encoding="utf-8") as f:
data = json.load(f)
return {k: v for k, v in data.items() if not k.startswith("_")}
return {}
S57_OBJECTS = load_s57_objects()
# ── config ────────────────────────────────────────────────────────────────────
def load_config(path=None):
cfg_path = Path(path) if path else SCRIPT_DIR / "cell_config.json"
if not cfg_path.exists():
print(f"[WARN] Config not found: {cfg_path}. Using defaults.")
return _default_config()
with open(cfg_path, encoding="utf-8") as f:
cfg = json.load(f)
cfg = {k: v for k, v in cfg.items() if not k.startswith("_")}
cfg.setdefault("layer_mappings", {})
cfg.setdefault("attribute_mappings", {})
return cfg
def _default_config():
return {
"cell_name": "XX1XX01M", "cell_edition": 1, "update_number": 0,
"issue_date": datetime.now().strftime("%Y%m%d"),
"producer_code": "XX", "producer_name": "Custom",
"data_set_name": "ENC Chart", "scale": 50000, "comment": "",
"horizontal_datum": "WGS84", "vertical_datum": "MLLW",
"sounding_datum": "MLLW", "compilation_scale": 50000,
"layer_mappings": {}, "attribute_mappings": {},
}
# ── QGIS project parser ───────────────────────────────────────────────────────
class QGISProject:
def __init__(self, project_path):
self.project_path = Path(project_path)
self.base_dir = self.project_path.parent
self.layers = []
self._parse()
def _parse(self):
if self.project_path.suffix.lower() == ".qgz":
self._parse_qgz()
else:
self._parse_qgs(self.project_path)
def _parse_qgz(self):
with zipfile.ZipFile(self.project_path, "r") as z:
qgs_files = [f for f in z.namelist() if f.endswith(".qgs")]
if not qgs_files:
raise ValueError("No .qgs file inside .qgz")
with z.open(qgs_files[0]) as f:
content = f.read().decode("utf-8")
tmp = self.project_path.parent / "_tmp_project.qgs"
tmp.write_text(content, encoding="utf-8")
self._parse_qgs(tmp)
tmp.unlink(missing_ok=True)
def _parse_qgs(self, qgs_path):
tree = ET.parse(qgs_path)
root = tree.getroot()
for ltl in root.iter("layer-tree-layer"):
lid = ltl.get("id", "")
name = ltl.get("name", "unnamed")
vis = ltl.get("checked", "Qt::Checked") != "Qt::Unchecked"
ml = self._find_maplayer(root, lid)
if ml is None or ml.get("type", "") != "vector":
continue
ds = ml.find("datasource")
if ds is None:
continue
ds_text = (ds.text or "").strip()
crs_el = ml.find(".//srs/spatialrefsys/authid")
crs = crs_el.text if crs_el is not None else "EPSG:4326"
# ── Capa SHP ──────────────────────────────────────────────────
shp = self._resolve_path(ds_text.split("|")[0].strip())
if shp is not None and str(shp).lower().endswith(".shp"):
self.layers.append({
"id": lid, "name": name, "path": shp,
"crs": crs, "visible": vis, "layer_type": "shp",
})
continue
# ── Capa CSV / texto delimitado ───────────────────────────────
# QGIS guarda: file:///ruta/al/archivo.csv?delimiter=,&xField=lon&yField=lat&...
csv_path = self._resolve_csv_path(ds_text)
if csv_path is not None:
# Leer xField / yField del URI
x_field = "lon"
y_field = "lat"
for part in ds_text.split("?")[-1].split("&"):
if part.startswith("xField="):
x_field = part.split("=", 1)[1]
elif part.startswith("yField="):
y_field = part.split("=", 1)[1]
self.layers.append({
"id": lid, "name": name, "path": csv_path,
"crs": crs, "visible": vis, "layer_type": "csv",
"x_field": x_field, "y_field": y_field,
})
def _resolve_csv_path(self, ds_text):
"""Extrae y resuelve la ruta de una datasource CSV de QGIS."""
import urllib.parse
# Formatos: file:///C:/ruta/file.csv?... o /ruta/file.csv
raw = ds_text.split("?")[0]
if raw.startswith("file:///"):
raw = raw[8:] # quitar file:///
elif raw.startswith("file://"):
raw = raw[7:]
raw = urllib.parse.unquote(raw)
p = Path(raw)
if p.exists() and p.suffix.lower() == ".csv":
return p
# Intentar resolver relativo al proyecto
rel = self.base_dir / raw
if rel.exists() and rel.suffix.lower() == ".csv":
return rel.resolve()
return None
def _find_maplayer(self, root, lid):
for ml in root.iter("maplayer"):
el = ml.find("id")
if el is not None and el.text == lid:
return ml
return None
def _resolve_path(self, path_str):
p = Path(path_str)
if p.is_absolute() and p.exists():
return p
rel = self.base_dir / path_str
if rel.exists():
return rel.resolve()
for c in self.base_dir.rglob(p.name):
return c
return None
# ── layer -> S-57 class resolver ─────────────────────────────────────────────
def resolve_s57_class(layer_name, layer_mappings):
nl = layer_name.lower().strip()
if nl in layer_mappings:
return layer_mappings[nl].upper()
for key, val in layer_mappings.items():
if key in nl or nl in key:
return val.upper()
nu = layer_name.upper().strip()
if nu in S57_OBJECTS:
return nu
for acro, info in S57_OBJECTS.items():
if any(w in nl for w in info["desc"].lower().split() if len(w) > 3):
return acro
return None
# ── SHP feature iterator (stdlib only) ───────────────────────────────────────
def iter_shapes(shp_path: Path, crs_str: str, attr_map: dict):
"""Yield (geom_type, coords_wgs84, mapped_attrs) using only stdlib."""
# Reprojection
transformer = None
if PYPROJ_AVAILABLE and crs_str:
try:
src = CRS.from_user_input(crs_str)
wgs84 = CRS.from_epsg(4326)
if not src.equals(wgs84):
transformer = Transformer.from_crs(src, wgs84, always_xy=True)
except Exception as e:
print(f" [WARN] Reprojection unavailable ({e}); assuming WGS84")
def tr(x, y):
return transformer.transform(x, y) if transformer else (x, y)
def tr_pts(pts):
return [tr(p[0], p[1]) for p in pts]
# Read attributes from .dbf
dbf_path = shp_path.with_suffix(".dbf")
_, dbf_rows = _read_dbf(dbf_path) if dbf_path.exists() else ([], [])
# Iterate geometry
for idx, (stype, pts, parts) in enumerate(_read_shp(shp_path)):
if stype == 0:
continue
raw = dbf_rows[idx] if idx < len(dbf_rows) else {}
mapped = []
for shp_col, s57_acro in attr_map.items():
if shp_col in raw:
attl = ATTR_CODE.get(s57_acro.upper())
if attl is not None:
mapped.append((attl, raw[shp_col]))
# ── Auto-detect: SHP column name == S-57 attribute acronym ──────────
already_attls = {a for a, _ in mapped}
raw_upper = {k.upper(): v for k, v in raw.items()}
for s57_acro, attl in ATTR_CODE.items():
if attl in already_attls:
continue
if s57_acro in raw_upper:
val = raw_upper[s57_acro]
val_str = str(val).strip() if val is not None else ""
# Skip DBF nulls: empty, all-asterisks, or all-zeros (numeric null)
if val_str and not all(c in "*0 " for c in val_str):
mapped.append((attl, val_str))
already_attls.add(attl)
# ── COLOUR_TXT override: text name → correct S-57 colour code ───────
# SHP/QGIS may store COLOUR as 0-indexed or wrong-offset numeric;
# COLOUR_TXT (the human name) is the ground truth.
_CNAME = {
"white": "1", "black": "2", "red": "3", "green": "4",
"blue": "5", "yellow": "6", "grey": "7", "gray": "7",
"brown": "8", "amber": "9", "orange": "11",
"magenta": "12", "violet": "13",
}
attl_colour = ATTR_CODE.get("COLOUR")
if attl_colour is not None and "COLOUR_TXT" in raw_upper:
cname = raw_upper["COLOUR_TXT"].lower().strip()
s57c = _CNAME.get(cname)
if s57c:
mapped = [(a, v) for a, v in mapped if a != attl_colour]
mapped.append((attl_colour, s57c))
already_attls.discard(attl_colour)
already_attls.add(attl_colour)
# ── Infer CATLAM from colour when absent (IALA B: green=port, red=stbd)
attl_catlam = ATTR_CODE.get("CATLAM")
if attl_catlam is not None and attl_catlam not in already_attls and attl_colour is not None:
colour_val = next((v for a, v in mapped if a == attl_colour), None)
if colour_val == "4": # green
mapped.append((attl_catlam, "1")) # port-hand
elif colour_val == "3": # red
mapped.append((attl_catlam, "2")) # starboard-hand
if stype in _PT and pts:
yield "point", [tr(*pts[0])], mapped
elif stype in _MPT:
for pt in pts:
yield "point", [tr(*pt)], mapped
elif stype in _LINE:
bounds = list(parts) + [len(pts)]
for i in range(len(bounds) - 1):
seg = pts[bounds[i]:bounds[i+1]]
if len(seg) >= 2:
yield "line", tr_pts(seg), mapped
elif stype in _POLY:
bounds = list(parts) + [len(pts)]
for i in range(len(bounds) - 1):
ring = pts[bounds[i]:bounds[i+1]]
if len(ring) < 3:
continue
# ESRI outer rings are CW (negative shoelace area); skip CCW holes
if _signed_area([(p[0], p[1]) for p in ring]) > 0:
continue
yield "polygon", tr_pts(ring), mapped
# ── S-57 cell writer ──────────────────────────────────────────────────────────
class S57CellWriter:
def __init__(self, output_path, config):
self.output_path = Path(output_path)
self.cfg = config
self._cell = None
self._bbox = None
self._feature_counter = {}
def open(self):
cfg = self.cfg
issue = cfg.get("issue_date") or datetime.now().strftime("%Y%m%d")
self._cell = S57Cell(
dsnm = cfg.get("cell_name", "CHART01") + ".000",
edition = int(cfg.get("cell_edition", 1)),
intu = 5,
scale = int(cfg.get("scale", 50000)),
agen = 999,
comt = cfg.get("comment", "Generated by QGISS57Converter"),
issue_date = issue,
)
def _update_bbox(self, coords):
for x, y in coords:
if self._bbox is None:
self._bbox = [x, y, x, y]
else:
if x < self._bbox[0]: self._bbox[0] = x
if y < self._bbox[1]: self._bbox[1] = y
if x > self._bbox[2]: self._bbox[2] = x
if y > self._bbox[3]: self._bbox[3] = y
def add_features_from_csv(self, csv_path: Path, s57_class: str,
attr_map: dict, x_field: str = "lon",
y_field: str = "lat") -> int:
"""Lee una capa CSV de QGIS (puntos) y la convierte a features S-57.
Estándar IHO S-57: usa los nombres de atributo S-57 como cabeceras de
columna (LITCHR, COLOUR, VALNMR, BOYSHP, CATLAM, …). El converter los
recoge automáticamente sin necesidad de ningún mapeo adicional.
Columna especial feat_type:
Si una fila tiene la columna 'feat_type' con un acrónimo S-57
válido (BCNLAT, BOYLAT, LIGHTS, …), esa fila usa ese objeto en
lugar del s57_class del nivel de capa. Esto permite mezclar tipos
en un solo CSV (p.ej. la carta de Barranquilla que incluye BCNLAT,
BOYLAT, LIGHTS y BOYCAR en el mismo archivo).
Luces compañeras (companion LIGHTS):
Cuando una estructura física (BCNLAT, BOYLAT, BCNCAR, BOYCAR,
BOYISD, BOYSAW, BOYSPP, LNDMRK) tiene LITCHR definido, el
converter emite además un objeto LIGHTS co-ubicado con solo los
atributos de luz. Así, el ECDIS puede mostrar tanto el símbolo 3D
de la estructura como la descripción de luz en el tooltip, igual
que en las cartas NOAA.
Columnas privadas (prefijo _):
Las columnas que empiezan por _ (p.ej. _source, _dimar_char_raw)
se ignoran y nunca se escriben al S-57.
"""
import csv as _csv
# LITCHR_TXT → código S-57 oficial (para CSVs con texto legible)
_LITCHR_TXT = {
"f": "1", "fl": "2", "lfl": "3", "q": "4",
"vq": "5", "uq": "6", "iso": "7", "oc": "8",
"iq": "9", "ivq": "10", "iuq": "11", "mo": "12",
"ffl": "13", "fl+lfl":"14", "oc+fl": "15",
"al.oc": "25", "al.lfl":"26", "al.fl": "27", "al.grp":"28",
}
# COLOUR_TXT → código S-57 oficial
_COLOUR_TXT = {
"white":"1", "black":"2", "red":"3", "green":"4",
"blue":"5", "yellow":"6", "grey":"7", "gray":"7",
"brown":"8", "amber":"9", "violet":"10", "orange":"11", "magenta":"12",
}
# S-57 classes that represent physical structures and may carry light attrs
_STRUCT_CLASSES = {
"BCNLAT","BCNCAR","BCNISD","BCNSAW","BCNSPP","BCNWTW",
"BOYLAT","BOYCAR","BOYISD","BOYSAW","BOYSPP",
"LNDMRK","LITFLT","LITVES",
}
# Attribute codes for companion LIGHTS
_LIGHT_ATTL = {ATTR_CODE[a] for a in
("LITCHR","SIGGRP","SIGPER","COLOUR","VALNMR","HEIGHT",
"SECTR1","SECTR2","ORIENT","MLTYLT","CATLIT","OBJNAM")
if a in ATTR_CODE}
if self._cell is None:
raise RuntimeError("call open() first")
count = 0
with open(csv_path, newline="", encoding="utf-8-sig") as f:
reader = _csv.DictReader(f)
for row in reader:
try:
lon = float(row[x_field])
lat = float(row[y_field])
except (KeyError, ValueError):
continue
# Determine S-57 object class for this row
row_class = (row.get("feat_type") or "").strip().upper()
if not row_class:
row_class = s57_class.upper()
objl = OBJL_BY_ACRONYM.get(row_class)
if objl is None:
print(f" [WARN] Unknown S-57 class '{row_class}' in row, skipping")
continue
# Build attribute list using S-57 column name auto-detection
already_attls: set[int] = set()
mapped: list[tuple[int, str]] = []
raw_upper = {k.upper(): v for k, v in row.items()
if not k.startswith("_") and k not in (x_field, y_field, "feat_type")}
for s57_acro, attl in ATTR_CODE.items():
if attl in already_attls:
continue
if s57_acro in raw_upper:
val = raw_upper[s57_acro].strip()
if val and not all(c in "*0 " for c in val):
mapped.append((attl, val))
already_attls.add(attl)
# Also apply attribute_mappings from config
for shp_col, s57_acro in attr_map.items():
attl = ATTR_CODE.get(s57_acro.upper())
if attl and attl not in already_attls and shp_col.upper() in raw_upper:
val = raw_upper[shp_col.upper()].strip()
if val:
mapped.append((attl, val))
already_attls.add(attl)
# LITCHR_TXT override — parse readable chars like "Q(4)G" → 4
attl_litchr = ATTR_CODE.get("LITCHR")
if attl_litchr and "LITCHR_TXT" in raw_upper and attl_litchr not in already_attls:
txt = raw_upper["LITCHR_TXT"].lower().split("(")[0].strip()
code = _LITCHR_TXT.get(txt)
if code:
mapped.append((attl_litchr, code))
already_attls.add(attl_litchr)
elif attl_litchr and "LITCHR_TXT" in raw_upper and attl_litchr in already_attls:
# Correct an already-set LITCHR if TXT provides a better value
txt = raw_upper["LITCHR_TXT"].lower().split("(")[0].strip()
code = _LITCHR_TXT.get(txt)
if code:
mapped = [(a, v) for a, v in mapped if a != attl_litchr]
mapped.append((attl_litchr, code))
# COLOUR_TXT override
attl_colour = ATTR_CODE.get("COLOUR")
if attl_colour and "COLOUR_TXT" in raw_upper:
cname = raw_upper["COLOUR_TXT"].lower().strip()
s57c = _COLOUR_TXT.get(cname)
if s57c:
mapped = [(a, v) for a, v in mapped if a != attl_colour]
mapped.append((attl_colour, s57c))
# Write the main object
self._cell.add_point_feature(objl=objl, lon=lon, lat=lat,
attrs=mapped if mapped else None)
self._update_bbox([(lon, lat)])
count += 1
# ── Companion LIGHTS ─────────────────────────────────────────
# When a physical structure carries light data, emit a co-located
# LIGHTS object so ECDIS proximity-merge picks up the light desc.
if row_class in _STRUCT_CLASSES and attl_litchr in already_attls:
light_attrs = [(a, v) for a, v in mapped if a in _LIGHT_ATTL]
if light_attrs:
self._cell.add_point_feature(
objl=OBJL_LIGHTS, lon=lon, lat=lat,
attrs=light_attrs,
)
self._feature_counter[s57_class] = (
self._feature_counter.get(s57_class, 0) + count
)
return count
def add_features_from_shp(self, shp_path: Path, crs_str: str,
s57_class: str, attr_map: dict) -> int:
if self._cell is None:
raise RuntimeError("call open() first")
objl = OBJL_BY_ACRONYM.get(s57_class.upper())
if objl is None:
print(f" [WARN] Unknown S-57 class '{s57_class}', skipping")
return 0
count = 0
try:
for geom_type, coords, attrs in iter_shapes(shp_path, crs_str, attr_map):
if not coords:
continue
self._update_bbox(coords)
a = attrs or None
if geom_type == "point":
self._cell.add_point_feature(objl=objl, lon=coords[0][0],
lat=coords[0][1], attrs=a)
count += 1
elif geom_type == "line" and len(coords) >= 2:
coords = _simplify_coords(list(coords))
if len(coords) >= 2:
self._cell.add_line_feature(objl=objl, coords=coords, attrs=a)
count += 1
elif geom_type == "polygon" and len(coords) >= 3:
coords = _simplify_coords(list(coords))
if len(coords) >= 3:
self._cell.add_area_feature(objl=objl, ring=coords, attrs=a)
count += 1
except AssertionError as e:
print(f" [ERR] ISO 8211 record too large in {shp_path.name}: {e}")
print(f" [ERR] Reduce geometry complexity or increase _MAX_VERTICES in converter.py")
except Exception as e:
print(f" [ERR] {shp_path.name}: {e}")
self._feature_counter[s57_class] = (
self._feature_counter.get(s57_class, 0) + count
)
return count
def close(self):
if self._cell is None:
return
if self._bbox:
w, s, e, n = self._bbox
buf = 0.001
ring = [(w-buf, s-buf), (e+buf, s-buf), (e+buf, n+buf),
(w-buf, n+buf), (w-buf, s-buf)]
self._cell.add_area_feature(
objl = OBJL_BY_ACRONYM["M_COVR"],
ring = ring,
attrs = [(ATTL_CATCOV, "1")],
)
print(" M_COVR 1 Coverage (auto-generated)")
self._cell.write(self.output_path)
self._cell = None
def summary(self):
print("\nFeatures written:")
total = 0
for cls, cnt in self._feature_counter.items():
desc = S57_OBJECTS.get(cls, {}).get("desc", "")
# Replace non-cp1252 chars (e.g. →) to avoid UnicodeEncodeError on Windows console
desc = desc.encode("cp1252", errors="replace").decode("cp1252")
print(f" {cls:<12} {cnt:>5} {desc}")
total += cnt
print(f" {'TOTAL':<12} {total:>5}")
S57Writer = S57CellWriter # backward-compat alias
# ── main converter ────────────────────────────────────────────────────────────
def convert(project_path, output_path, config_path, list_only, force, verbose,
extra_csv_dir=None):
print(f"\nQGIS -> S-57 Converter")
print(f"{'='*50}")
print(f"Project : {project_path}")
cfg = load_config(config_path)
attr_map = {k.lower(): v.upper() for k, v in cfg.get("attribute_mappings", {}).items()}
layer_map = {k.lower(): v.upper() for k, v in cfg.get("layer_mappings", {}).items()}
print(f"\nParsing QGIS project...")
try:
project = QGISProject(project_path)
except Exception as e:
print(f"[ERROR] Could not parse project: {e}")
sys.exit(1)
# ── Inyectar CSVs de directorio extra ──────────────────────────────────
extra_layers = []
if extra_csv_dir:
csv_dir = Path(extra_csv_dir)
if csv_dir.is_dir():
for csv_file in sorted(csv_dir.glob("*.csv")):
# Evitar duplicados: si la capa ya está en el proyecto, ignorar
already = any(
Path(l["path"]).resolve() == csv_file.resolve()
for l in project.layers if l.get("layer_type") == "csv"
)
if not already:
extra_layers.append({
"id": "extra_" + csv_file.stem,
"name": csv_file.stem,
"path": csv_file,
"crs": "EPSG:4326",
"visible": True,
"layer_type": "csv",
"x_field": "lon",
"y_field": "lat",
})
else:
print(f"[WARN] extra_csv_dir not found: {csv_dir}")
if not project.layers and not extra_layers:
print("[ERROR] No layers found in project.")
sys.exit(1)
all_layers = project.layers + extra_layers
shp_count = sum(1 for l in all_layers if l.get("layer_type","shp") == "shp")
csv_count = sum(1 for l in all_layers if l.get("layer_type") == "csv")
print(f"Found {shp_count} SHP + {csv_count} CSV layer(s):\n")
layer_assignments = []
for layer in all_layers:
s57_class = resolve_s57_class(layer["name"], layer_map)
status = "+" if layer["visible"] else "o"
assigned = s57_class or "?? (unmapped)"
src_tag = " [extra]" if layer["id"].startswith("extra_") else ""
print(f" {status} {layer['name']:<30} -> {assigned}{src_tag}")
layer_assignments.append((layer, s57_class))
if list_only:
print("\n[INFO] --list mode. No conversion performed.")
return
unmapped = [(l, c) for l, c in layer_assignments if c is None]
if unmapped and not force:
print(f"\n[WARN] {len(unmapped)} unmapped layer(s).")
ans = input("Continue anyway? [y/N]: ")
if ans.lower() != "y":
print("Aborted.")
return
if output_path is None:
cell_name = cfg.get("cell_name", "CHART01").upper()
output_path = Path(project_path).parent / (cell_name + ".000")
else:
output_path = Path(output_path)
print(f"\nOutput : {output_path}")
print(f"Cell : {cfg.get('cell_name','?')} Scale 1:{cfg.get('scale','?')}")
print()
output_path.parent.mkdir(parents=True, exist_ok=True)
writer = S57CellWriter(str(output_path), cfg)
writer.open()
for layer, s57_class in layer_assignments:
if s57_class is None:
print(f" SKIP {layer['name']} (no S-57 mapping)")
continue
ltype = layer.get("layer_type", "shp")
crs_str = layer.get("crs", "EPSG:4326")
print(f" Converting: {layer['name']} -> {s57_class}")
if ltype == "csv":
if verbose:
print(f" CSV: {layer['path']}")
count = writer.add_features_from_csv(
layer["path"], s57_class, attr_map,
x_field=layer.get("x_field", "lon"),
y_field=layer.get("y_field", "lat"),
)
else:
shp_path = layer["path"]
if verbose:
print(f" CRS: {crs_str} | SHP: {shp_path}")
if not shp_path.exists():
print(f" [ERR] File not found: {shp_path}")
continue
count = writer.add_features_from_shp(shp_path, crs_str, s57_class, attr_map)
print(f" {count} feature(s) written")
writer.close()
writer.summary()
if output_path.exists():
print(f"\nOutput file: {output_path} ({output_path.stat().st_size // 1024} KB)")
print("Done.")
# ── interactive mode ──────────────────────────────────────────────────────────
def interactive_mode():
print("\n=== QGIS -> S-57 Converter (Interactive) ===\n")
qgs = input("QGIS project (.qgs or .qgz): ").strip().strip('"')
if not Path(qgs).exists():
print(f"File not found: {qgs}"); sys.exit(1)
out = input("Output .000 [blank=auto]: ").strip().strip('"') or None
cfg = input("Config file [blank=default]: ").strip().strip('"') or None
convert(qgs, out, cfg, list_only=False, force=False, verbose=True)
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
if len(sys.argv) == 1:
interactive_mode()
return
parser = argparse.ArgumentParser(
description="Convert QGIS project SHP layers to S-57 ENC format")
parser.add_argument("project", help=".qgs or .qgz QGIS project file")
parser.add_argument("--output", "-o", help="Output .000 file")
parser.add_argument("--config", "-c", help="JSON config file")
parser.add_argument("--list", "-l", action="store_true",
help="List layers then exit")
parser.add_argument("--force", "-f", action="store_true",
help="Skip prompts for unmapped layers")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
if not Path(args.project).exists():
print(f"[ERROR] Not found: {args.project}"); sys.exit(1)
convert(args.project, args.output, args.config,
args.list, args.force, args.verbose)
if __name__ == "__main__":
main()