Files

675 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
S-57 ENC writer — produces structurally valid `.000` files that the GDAL
S-57 driver can read. Reuses a NOAA DDR template verbatim (the schema is fixed
by IHO S-57 ed. 3.1) and only encodes data records.
ISO 8211 record format (data records, leader entry-map "3404"):
Leader (24 bytes) + Directory (11 bytes per entry + FT) + Field area.
Directory entry layout: tag(4) + length(3) + position(4).
"""
from __future__ import annotations
import struct
from pathlib import Path
# ── ISO 8211 control characters ────────────────────────────────────────────────
FT = b"\x1e" # Field terminator
UT = b"\x1f" # Unit terminator (subfield delimiter)
DDR_TEMPLATE = Path(__file__).parent / "noaa_ddr_template.bin"
# S-57 record names (RCNM)
RCNM_VI = 110 # Isolated Node (point primitive)
RCNM_VC = 120 # Connected Node
RCNM_VE = 130 # Edge (line primitive)
RCNM_VF = 140 # Face
RCNM_FE = 100 # Feature
# Object class codes — from s57objectclasses.csv (IHO S-57 Ed.3.1, Google Earth GDAL copy)
# fmt: off
OBJL_ACHBRT = 3 # Anchor berth
OBJL_ACHARE = 4 # Anchorage area
OBJL_BCNCAR = 5 # Beacon, cardinal
OBJL_BCNISD = 6 # Beacon, isolated danger
OBJL_BCNLAT = 7 # Beacon, lateral
OBJL_BCNSAW = 8 # Beacon, safe water
OBJL_BCNSPP = 9 # Beacon, special purpose/general
OBJL_BERTHS = 10 # Berth
OBJL_BRIDGE = 11 # Bridge
OBJL_BUISGL = 12 # Building, single
OBJL_BUAARE = 13 # Built-up area
OBJL_BOYCAR = 14 # Buoy, cardinal
OBJL_BOYISD = 16 # Buoy, isolated danger
OBJL_BOYLAT = 17 # Buoy, lateral
OBJL_BOYSAW = 18 # Buoy, safe water ← was 19 (BUG fixed)
OBJL_BOYSPP = 19 # Buoy, special purpose/general (S-57 std acronym)
OBJL_CBLOHD = 21 # Cable, overhead
OBJL_CBLSUB = 22 # Cable, submarine
OBJL_CANALS = 23 # Canal
OBJL_CTSARE = 25 # Cargo transshipment area
OBJL_COALNE = 30 # Coastline
OBJL_CRANES = 35 # Crane
OBJL_DAYMAR = 39 # Daymark
OBJL_DWRTCL = 40 # Deep water route centre line
OBJL_DWRTPT = 41 # Deep water route part
OBJL_DEPARE = 42 # Depth area
OBJL_DEPCNT = 43 # Depth contour
OBJL_DOCARE = 45 # Dock area
OBJL_DRGARE = 46 # Dredged area
OBJL_DRYDOC = 47 # Dry dock
OBJL_DMPGRD = 48 # Dumping ground
OBJL_EXEZNE = 50 # Exclusive Economic Zone
OBJL_FAIRWY = 51 # Fairway
OBJL_FERYRT = 53 # Ferry route
OBJL_FSHZNE = 54 # Fishery zone
OBJL_FOGSIG = 58 # Fog signal
OBJL_FRPARE = 60 # Free port area
OBJL_GATCON = 61 # Gate / sluice
OBJL_HRBARE = 63 # Harbour area (administrative)
OBJL_HRBFAC = 64 # Harbour facility
OBJL_HULKES = 65 # Hulk
OBJL_ISTZNE = 68 # Inshore traffic zone
OBJL_LAKARE = 69 # Lake
OBJL_LNDARE = 71 # Land area
OBJL_LNDRGN = 73 # Land region
OBJL_LNDMRK = 74 # Landmark
OBJL_LIGHTS = 75 # Light
OBJL_LITFLT = 76 # Light float
OBJL_LITVES = 77 # Light vessel
OBJL_LOKBSN = 79 # Lock basin
OBJL_MAGVAR = 81 # Magnetic variation
OBJL_MARCUL = 82 # Marine farm/culture
OBJL_MIPARE = 83 # Military practice area
OBJL_MORFAC = 84 # Mooring/warping facility
OBJL_NAVLNE = 85 # Navigation line
OBJL_OBSTRN = 86 # Obstruction
OBJL_OFSPLF = 87 # Offshore platform
OBJL_OSPARE = 88 # Offshore production area
OBJL_PILPNT = 90 # Pile
OBJL_PILBOP = 91 # Pilot boarding place
OBJL_PIPARE = 92 # Pipeline area
OBJL_PIPOHD = 93 # Pipeline, overhead
OBJL_PIPSOL = 94 # Pipeline, submarine/on land
OBJL_PONTON = 95 # Pontoon
OBJL_PRCARE = 96 # Precautionary area
OBJL_PYLONS = 98 # Pylon/bridge support
OBJL_RADRFL = 101 # Radar reflector
OBJL_RADSTA = 102 # Radar station
OBJL_RDOCAL = 104 # Radio calling-in point
OBJL_RDOSTA = 105 # Radio station
OBJL_RECTRC = 109 # Recommended track
OBJL_RESARE = 112 # Restricted area
OBJL_RETRFL = 113 # Retro-reflector
OBJL_RIVERS = 114 # River
OBJL_RIVBNK = 115 # River bank
OBJL_ROADWY = 116 # Road
OBJL_RUNWAY = 117 # Runway
OBJL_SEAARE = 119 # Sea area / named water area
OBJL_SPLARE = 120 # Sea-plane landing area (SPLARE)
OBJL_SBDARE = 121 # Seabed area
OBJL_SLCONS = 122 # Shoreline construction
OBJL_SLOTOP = 126 # Slope topline
OBJL_SLOGRD = 127 # Sloping ground
OBJL_SOUNDG = 129 # Sounding
OBJL_STSLNE = 132 # Straight territorial sea baseline
OBJL_SUBTLN = 133 # Submarine transit lane
OBJL_SWPARE = 134 # Swept area
OBJL_TESARE = 135 # Territorial sea area
OBJL_TIDEWY = 143 # Tideway
OBJL_TOPMAR = 144 # Top mark
OBJL_TSELNE = 145 # Traffic Separation Line
OBJL_TSSBND = 146 # Traffic Separation Scheme Boundary
OBJL_TSSCRS = 147 # Traffic Separation Scheme Crossing
OBJL_TSSLPT = 148 # Traffic Separation Scheme Lane part
OBJL_TSSRON = 149 # Traffic Separation Scheme Roundabout
OBJL_TUNNEL = 151 # Tunnel
OBJL_TWRTPT = 152 # Two-way route part
OBJL_UWTROC = 153 # Underwater rock / awash rock
OBJL_VEGATN = 155 # Vegetation
OBJL_WATTUR = 156 # Water turbulence
OBJL_WEDKLP = 158 # Weed/Kelp
OBJL_WRECKS = 159 # Wreck
OBJL_M_ACCY = 300 # Accuracy of data (meta)
OBJL_M_CSCL = 301 # Compilation scale (meta)
OBJL_M_COVR = 302 # Coverage (meta)
OBJL_M_HDAT = 303 # Horizontal datum (meta)
OBJL_M_NSYS = 306 # Navigational system of marks (meta)
OBJL_M_QUAL = 308 # Quality of data (meta)
OBJL_M_SDAT = 309 # Sounding datum (meta)
OBJL_M_UNIT = 311 # Units of measurement (meta)
# fmt: on
# Lookup: S-57 acronym → OBJL code (complete IHO S-57 Ed 3.1 set)
OBJL_BY_ACRONYM: dict[str, int] = {
"ACHBRT": OBJL_ACHBRT, "ACHARE": OBJL_ACHARE,
"BCNCAR": OBJL_BCNCAR, "BCNISD": OBJL_BCNISD, "BCNLAT": OBJL_BCNLAT,
"BCNSAW": OBJL_BCNSAW, "BCNSPP": OBJL_BCNSPP,
"BERTHS": OBJL_BERTHS, "BRIDGE": OBJL_BRIDGE,
"BUISGL": OBJL_BUISGL, "BUAARE": OBJL_BUAARE,
"BOYCAR": OBJL_BOYCAR, "BOYISD": OBJL_BOYISD, "BOYLAT": OBJL_BOYLAT,
"BOYSAW": OBJL_BOYSAW, "BOYSPP": OBJL_BOYSPP,
"BOYSPEC": OBJL_BOYSPP, # user alias → BOYSPP
"CBLOHD": OBJL_CBLOHD, "CBLSUB": OBJL_CBLSUB,
"CANALS": OBJL_CANALS, "CTSARE": OBJL_CTSARE,
"COALNE": OBJL_COALNE, "CRANES": OBJL_CRANES,
"DAYMAR": OBJL_DAYMAR, "DWRTCL": OBJL_DWRTCL, "DWRTPT": OBJL_DWRTPT,
"DEPARE": OBJL_DEPARE, "DEPCNT": OBJL_DEPCNT,
"DOCARE": OBJL_DOCARE, "DRGARE": OBJL_DRGARE, "DRYDOC": OBJL_DRYDOC,
"DMPGRD": OBJL_DMPGRD,
"EXEZNE": OBJL_EXEZNE, "FAIRWY": OBJL_FAIRWY,
"FERYRT": OBJL_FERYRT, "FSHZNE": OBJL_FSHZNE, "FRPARE": OBJL_FRPARE,
"FOGSIG": OBJL_FOGSIG, "GATCON": OBJL_GATCON,
"HRBARE": OBJL_HRBARE, "HRBFAC": OBJL_HRBFAC, "HULKES": OBJL_HULKES,
"ISTZNE": OBJL_ISTZNE,
"LAKARE": OBJL_LAKARE, "LNDARE": OBJL_LNDARE, "LNDRGN": OBJL_LNDRGN,
"LNDMRK": OBJL_LNDMRK, "LIGHTS": OBJL_LIGHTS,
"LITFLT": OBJL_LITFLT, "LITVES": OBJL_LITVES,
"LOKBSN": OBJL_LOKBSN, "MAGVAR": OBJL_MAGVAR,
"MARCUL": OBJL_MARCUL, "MIPARE": OBJL_MIPARE, "MORFAC": OBJL_MORFAC,
"NAVLNE": OBJL_NAVLNE, "OBSTRN": OBJL_OBSTRN,
"OFSPLF": OBJL_OFSPLF, "OSPARE": OBJL_OSPARE,
"PILPNT": OBJL_PILPNT, "PILBOP": OBJL_PILBOP, "PIPARE": OBJL_PIPARE,
"PIPOHD": OBJL_PIPOHD, "PIPSOL": OBJL_PIPSOL, "PONTON": OBJL_PONTON,
"PRCARE": OBJL_PRCARE, "PYLONS": OBJL_PYLONS,
"RADRFL": OBJL_RADRFL, "RADSTA": OBJL_RADSTA,
"RDOCAL": OBJL_RDOCAL, "RDOSTA": OBJL_RDOSTA,
"RECTRC": OBJL_RECTRC, "RESARE": OBJL_RESARE, "RETRFL": OBJL_RETRFL,
"RIVERS": OBJL_RIVERS, "RIVBNK": OBJL_RIVBNK,
"ROADWY": OBJL_ROADWY, "RUNWAY": OBJL_RUNWAY,
"SEAARE": OBJL_SEAARE, "SPLARE": OBJL_SPLARE, "SBDARE": OBJL_SBDARE,
"SLCONS": OBJL_SLCONS, "SLOTOP": OBJL_SLOTOP, "SLOGRD": OBJL_SLOGRD,
"SOUNDG": OBJL_SOUNDG, "STSLNE": OBJL_STSLNE, "SUBTLN": OBJL_SUBTLN,
"SWPARE": OBJL_SWPARE, "TESARE": OBJL_TESARE, "TIDEWY": OBJL_TIDEWY,
"TOPMAR": OBJL_TOPMAR,
"TSELNE": OBJL_TSELNE, "TSSBND": OBJL_TSSBND, "TSSCRS": OBJL_TSSCRS,
"TSSLPT": OBJL_TSSLPT, "TSSRON": OBJL_TSSRON,
"TUNNEL": OBJL_TUNNEL, "TWRTPT": OBJL_TWRTPT,
"UWTROC": OBJL_UWTROC, "VEGATN": OBJL_VEGATN,
"WATTUR": OBJL_WATTUR, "WEDKLP": OBJL_WEDKLP, "WRECKS": OBJL_WRECKS,
"M_ACCY": OBJL_M_ACCY, "M_CSCL": OBJL_M_CSCL, "M_COVR": OBJL_M_COVR,
"M_HDAT": OBJL_M_HDAT, "M_NSYS": OBJL_M_NSYS, "M_QUAL": OBJL_M_QUAL,
"M_SDAT": OBJL_M_SDAT, "M_UNIT": OBJL_M_UNIT,
}
# PRIM codes for FRID
PRIM_POINT = 1
PRIM_LINE = 2
PRIM_AREA = 3
# ── S-57 attribute codes (ATTL) — from s57attributes.csv (IHO S-57 Ed.3.1) ───
# fmt: off
ATTL_AGENCY = 1 # AGENCY — agency responsible for production
ATTL_BCNSHP = 2 # BCNSHP — beacon shape
ATTL_BOYSHP = 4 # BOYSHP — buoy shape
ATTL_BURDEP = 5 # BURDEP — buried depth
ATTL_CATCAM = 13 # CATCAM — category of cardinal mark
ATTL_CATCOV = 18 # CATCOV — category of coverage (1=coverage, 2=no coverage)
ATTL_CATFOG = 27 # CATFOG — category of fog signal
ATTL_CATLAM = 36 # CATLAM — category of lateral mark (1=port, 2=starboard)
ATTL_CATLMK = 35 # CATLMK — category of landmark
ATTL_CATLIT = 37 # CATLIT — category of light
ATTL_CATOBS = 42 # CATOBS — category of obstruction
ATTL_CATREA = 54 # CATREA — category of restricted area
ATTL_CATSIW = 63 # CATSIW — category of signal station warning
ATTL_COLOUR = 75 # COLOUR — colour (list)
ATTL_COLPAT = 76 # COLPAT — colour pattern
ATTL_CONDTN = 81 # CONDTN — condition
ATTL_DRVAL1 = 90 # DRVAL1 — draft value 1
ATTL_DRVAL2 = 91 # DRVAL2 — draft value 2
ATTL_HEIGHT = 95 # HEIGHT — height above sea surface
ATTL_LITCHR = 107 # LITCHR — light characteristic
ATTL_MLTYLT = 110 # MLTYLT — multiplicity of lights
ATTL_NATCON = 112 # NATCON — nature of construction
ATTL_OBJNAM = 116 # OBJNAM — object name (free text label)
ATTL_ORIENT = 117 # ORIENT — orientation (degrees)
ATTL_PEREND = 119 # PEREND — period end (season)
ATTL_PERSTA = 120 # PERSTA — period start (season)
ATTL_QUASOU = 126 # QUASOU — quality of sounding measurement
ATTL_SIGGRP = 141 # SIGGRP — signal group ("(2)", etc.)
ATTL_SIGPER = 142 # SIGPER — signal period (seconds)
ATTL_STATUS = 149 # STATUS — status (permanent, occasional…)
ATTL_VALSOU = 179 # VALSOU — value of sounding (metres)
ATTL_VERACC = 180 # VERACC — vertical accuracy
ATTL_VERCLR = 181 # VERCLR — vertical clearance
ATTL_VALNMR = 178 # VALNMR — value of nominal range (nautical miles)
ATTL_VERDAT = 187 # VERDAT — vertical datum
# fmt: on
# Convenience dict: acronym → ATTL code (superset of the named constants above)
ATTR_CODE: dict[str, int] = {
"AGENCY": 1, "BCNSHP": 2, "BUISHP": 3, "BOYSHP": 4, "BURDEP": 5,
"CALSGN": 6, "CATAIR": 7, "CATACH": 8, "CATBRG": 9, "CATBUA": 10,
"CATCBL": 11, "CATCAN": 12, "CATCAM": 13, "CATCHP": 14, "CATCOA": 15,
"CATCTR": 16, "CATCON": 17, "CATCOV": 18, "CATCRN": 19, "CATDAM": 20,
"CATDIS": 21, "CATDOC": 22, "CATDPG": 23, "CATFNC": 24, "CATFRY": 25,
"CATFIF": 26, "CATFOG": 27, "CATFOR": 28, "CATGAT": 29, "CATHAF": 30,
"CATHLK": 31, "CATICE": 32, "CATINB": 33, "CATLND": 34, "CATLMK": 35,
"CATLAM": 36, "CATLIT": 37, "CATMFA": 38, "CATMPA": 39, "CATMOR": 40,
"CATNAV": 41, "CATOBS": 42, "CATOFP": 43, "CATOLB": 44, "CATPLE": 45,
"CATPIL": 46, "CATPIP": 47, "CATPRA": 48, "CATPYL": 49, "CATQUA": 50,
"CATRAS": 51, "CATRTB": 52, "CATROS": 53, "CATREA": 54, "CATSEA": 55,
"CATSIT": 56, "CATSLC": 57, "CATSPM": 58, "CATSCF": 59, "CATSUB": 60,
"CAATTS": 61, "CATTSS": 62, "CATSIW": 63, "CATTRK": 64, "CATVEG": 65,
"CATWED": 66, "CATWRK": 67, "CATZOC": 68, "CATWAT": 69, "COLOUR": 75,
"COLPAT": 76, "CONDTN": 81, "CONRAD": 82, "CONVIS": 83, "CURVEL": 84,
"DATEND": 85, "DATSTA": 86, "DRVAL1": 90, "DRVAL2": 91, "ELEVAT": 93,
"ESTRNG": 94, "HEIGHT": 95, "HORACC": 96, "HORCLR": 97, "HORLEN": 98,
"HORWID": 99, "ICEFAC": 100,"INFORM": 101,"JRSDTN": 102,"LIFCAP": 103,
"LITCHR": 107, "LITVIS": 108,"MARSYS": 109,"MLTYLT": 110,"NATION": 111,
"NATCON": 112, "NATQUA": 113,"NATSUR": 114,"NOBJNM": 115,"OBJNAM": 116,
"ORIENT": 117, "PEREND": 119,"PERSTA": 120,"PICREP": 121,"POSACC": 122,
"PRCTRY": 124, "PRODCT": 125,"QUASOU": 126,"RADWAL": 127,"RADIUS": 128,
"RYRMGV": 133, "SCAMAX": 134,"SCAMIN": 135,"SCVAL1": 136,"SCVAL2": 137,
"SECTR1": 138, "SECTR2": 139,"SHIPAM": 140,"SIGGRP": 141,"SIGPER": 142,
"SIGSEQ": 143, "SOUACC": 144,"SDISMX": 145,"SDISMN": 146,"SORDAT": 147,
"SORIND": 148, "STATUS": 149,"SURATH": 150,"SUREND": 151,"SURSTA": 152,
"SURTYP": 153, "TECSOU": 156,"TXTDSC": 157,"TS_TSP": 158,"TS_TSV": 159,
"T_ACWL": 160, "T_HWLW": 161,"T_MTOD": 162,"T_THDF": 163,"T_TIMS": 164,
"T_TRNP": 165, "T_VAHF": 166,"T_VAVL": 167,"TIMEND": 168,"TIMSTA": 169,
"TOPSHP": 170, "TRAFIC": 171,"VALDCO": 172,"VERACC": 180,"VERCLR": 181,
"VERCCL": 182, "VERCOP": 183,"VERCSA": 184,"VERDAT": 187,"VERLEN": 188,
"WATLEV": 187, "VALSOU": 179, "VALNMR": 178,
}
# ── Binary primitives (little-endian, S-57 convention) ─────────────────────────
def b11(n): return int(n).to_bytes(1, "little", signed=False)
def b12(n): return int(n).to_bytes(2, "little", signed=False)
def b14(n): return int(n).to_bytes(4, "little", signed=False)
def b21(n): return int(n).to_bytes(1, "little", signed=True)
def b22(n): return int(n).to_bytes(2, "little", signed=True)
def b24(n): return int(n).to_bytes(4, "little", signed=True)
def A_var(s: str) -> bytes:
"""Variable-length ASCII subfield, delimited by UT."""
return s.encode("latin-1") + UT
def A_fixed(s: str, n: int) -> bytes:
"""Fixed-width ASCII (no UT)."""
return s.encode("latin-1").ljust(n, b" ")[:n]
def R_fixed(value: float, n: int = 4) -> bytes:
"""Fixed-width ASCII real, e.g. R(4) = '03.1'."""
s = f"{value:.{max(0, n-2)}f}"
return s.encode("latin-1").ljust(n, b" ")[:n]
def name_5(rcnm: int, rcid: int) -> bytes:
"""S-57 NAME field: RCNM (1 byte) + RCID (4 bytes LE) = 5 bytes (= 40 bits)."""
return b11(rcnm) + b14(rcid)
# ── Field builders (one per S-57 field tag) ────────────────────────────────────
def field_0001(rcid: int) -> bytes:
"""Record header (b12 RCID) + FT."""
return b12(rcid) + FT
def field_DSID(*, rcnm=10, rcid=1, expp=1, intu=3,
dsnm="", edtn="1", updn="0",
uadt="", isdt="", sted=3.1,
prsp=1, psdn="", pred="2.0",
prof=1, agen=999, comt="") -> bytes:
return (b11(rcnm) + b14(rcid) + b11(expp) + b11(intu) +
A_var(dsnm) + A_var(edtn) + A_var(updn) +
A_fixed(uadt, 8) + A_fixed(isdt, 8) +
R_fixed(sted, 4) +
b11(prsp) + A_var(psdn) + A_var(pred) +
b11(prof) + b12(agen) +
A_var(comt) + FT)
def field_DSSI(*, dstr=2, aall=1, nall=1, nomr=0, nocr=0,
nogr=0, nolr=0, noin=0, nocn=0, noed=0, nofa=0) -> bytes:
"""Format: (3b11, 8b14)"""
return (b11(dstr) + b11(aall) + b11(nall) +
b14(nomr) + b14(nocr) + b14(nogr) + b14(nolr) +
b14(noin) + b14(nocn) + b14(noed) + b14(nofa) + FT)
def field_DSPM(*, rcnm=20, rcid=1, hdat=2, vdat=17, sdat=23,
cscl=50000, duni=1, huni=1, puni=1, coun=1,
comf=10000000, somf=10, comt="") -> bytes:
"""Format: (b11, b14, 3b11, b14, 4b11, 2b14, A)
HDAT=2 (WGS84), VDAT=17 (MLLW), SDAT=23 (MLLW),
DUNI=1 (m), HUNI=1 (m), PUNI=1 (m), COUN=1 (lat/long)
COMF=10^7 means coords stored as int = (degrees * 10^7).
SOMF=10 means depths stored as int = (metres * 10)."""
return (b11(rcnm) + b14(rcid) +
b11(hdat) + b11(vdat) + b11(sdat) +
b14(cscl) +
b11(duni) + b11(huni) + b11(puni) + b11(coun) +
b14(comf) + b14(somf) +
A_var(comt) + FT)
def field_VRID(*, rcnm: int, rcid: int, rver: int = 1, ruin: int = 1) -> bytes:
"""Format: (b11, b14, b12, b11). RUIN=1 means insert."""
return b11(rcnm) + b14(rcid) + b12(rver) + b11(ruin) + FT
def field_SG2D(coords_deg: list[tuple[float, float]], comf: int) -> bytes:
"""Format: *(b24 YCOO, b24 XCOO). Coords are (lon, lat) in degrees → ints
multiplied by COMF (typically 10^7). Note S-57 stores YCOO (lat) before
XCOO (lon) for each pair."""
out = b""
for lon, lat in coords_deg:
y_int = int(round(lat * comf))
x_int = int(round(lon * comf))
out += b24(y_int) + b24(x_int)
return out + FT
def field_VRPT(pointers: list[tuple[int, int, int, int, int, int]]) -> bytes:
"""Format: *(B(40) NAME, b11 ORNT, b11 USAG, b11 TOPI, b11 MASK).
Each item: (rcnm, rcid, ornt, usag, topi, mask)."""
out = b""
for rcnm, rcid, ornt, usag, topi, mask in pointers:
out += name_5(rcnm, rcid) + b11(ornt) + b11(usag) + b11(topi) + b11(mask)
return out + FT
def field_FRID(*, rcnm: int = RCNM_FE, rcid: int, prim: int, grup: int = 1,
objl: int, rver: int = 1, ruin: int = 1) -> bytes:
"""Format: (b11, b14, 2b11, 2b12, b11)."""
return (b11(rcnm) + b14(rcid) +
b11(prim) + b11(grup) +
b12(objl) + b12(rver) +
b11(ruin) + FT)
def field_FOID(*, agen: int = 999, fidn: int, fids: int = 1) -> bytes:
"""Format: (b12, b14, b12)."""
return b12(agen) + b14(fidn) + b12(fids) + FT
def field_ATTF(attrs: list[tuple[int, str]]) -> bytes:
"""Format: *(b12 ATTL, A ATVL). Empty list → no field at all (caller skips)."""
out = b""
for attl, atvl in attrs:
out += b12(attl) + A_var(str(atvl))
return out + FT
def field_FSPT(pointers: list[tuple[int, int, int, int, int]]) -> bytes:
"""Format: *(B(40) NAME, b11 ORNT, b11 USAG, b11 MASK).
Each: (rcnm, rcid, ornt, usag, mask). ORNT=1=forward,2=reverse,255=null;
USAG=1=exterior,2=interior,3=both,255=null; MASK=1=mask,2=show,255=null."""
out = b""
for rcnm, rcid, ornt, usag, mask in pointers:
out += name_5(rcnm, rcid) + b11(ornt) + b11(usag) + b11(mask)
return out + FT
# ── Record builder (data record with entry-map 5504) ───────────────────────────
def build_record(fields: list[tuple[str, bytes]]) -> bytes:
"""fields: list of (4-char tag, field bytes). Field bytes include trailing FT.
Uses entry-map "5504": field-length=5 digits (max 99999 bytes per field),
position=5 digits (max 99999). Each directory entry is 14 bytes: tag(4)+len(5)+pos(5).
History of bugs:
"3404" (original): 3-digit length = max 999 bytes → LNDARE polygons produce
SG2D fields of 2000-53000 bytes → length truncated → GDAL: "Not enough byte
to initialize field 'SG2D'".
"4404": 4-digit = max 9999 bytes → still not enough for large coastal outlines.
"5504": 5-digit = max 99999 bytes → handles any realistic polygon after RDP.
The DDR template uses "3404" for its own directory, but that only applies to
how the DDR is parsed. GDAL reads each data record's leader independently, so
data records may use a different entry-map than the DDR — this is valid ISO 8211.
"""
field_area = b""
dir_str = ""
pos = 0
for tag, data in fields:
assert len(tag) == 4, f"tag must be 4 chars: {tag!r}"
flen = len(data)
assert flen <= 99999, f"field {tag} too large ({flen} bytes); apply more RDP"
assert pos <= 99999, f"field area position overflow at {tag} (pos={pos})"
dir_str += f"{tag:<4s}{flen:05d}{pos:05d}"
field_area += data
pos += flen
directory = dir_str.encode("latin-1") + FT
base_addr = 24 + len(directory)
record_len = base_addr + len(field_area)
assert record_len <= 99999, f"record too large ({record_len} bytes)"
leader = (
f"{record_len:05d}" # 0-4: record length
" " # 5: interchange level (space for DR)
"D" # 6: leader id
" " # 7: in-line code extension (space for DR)
" " # 8: version
" " # 9: app indicator
" " # 10-11: field control length (00 for DR)
f"{base_addr:05d}" # 12-16: base address of field area
" " # 17-19: extended character set indicator
"5504" # 20-23: entry map (len=5, pos=5, reserved=0, tag=4)
).encode("latin-1")
assert len(leader) == 24, f"leader len {len(leader)}"
return leader + directory + field_area
# ── High-level builder for a complete `.000` ──────────────────────────────────
class S57Cell:
"""Builds a complete S-57 cell. Call add_* methods then write()."""
def __init__(self, *, dsnm: str, edition: int = 1, intu: int = 5,
scale: int = 50000, agen: int = 999, comt: str = "AR ECDIS custom",
issue_date: str = "20260428"):
self.dsnm = dsnm
self.edition = edition
self.intu = intu
self.scale = scale
self.agen = agen
self.comt = comt
self.issue_date = issue_date
self.comf = 10_000_000 # 10^7 — coords stored as int(deg * COMF)
self.somf = 10 # depths × 10
# Spatial primitives & features accumulated by callers
self._next_vi = 1
self._next_vc = 1
self._next_ve = 1
self._next_fid = 1
self._records: list[tuple[str, list[tuple[str, bytes]]]] = []
# Counters for DSSI
self._n_meta = 0
self._n_pt = 0 # NOMR isolated nodes
self._n_cn = 0 # NOCN connected nodes
self._n_ed = 0 # NOED edges
self._n_geo = 0
self._n_col = 0 # NOCR collection
self._n_in = 0 # NOIN isolated nodes alt count
self._n_lr = 0 # NOLR
self._n_gr = 0 # NOGR
self._n_fa = 0 # NOFA faces
# --- Add an isolated node (for points: buoys, lights, landmarks) ---
def add_isolated_node(self, lon: float, lat: float) -> tuple[int, int]:
rcid = self._next_vi
self._next_vi += 1
fields = [
("0001", field_0001(rcid)),
("VRID", field_VRID(rcnm=RCNM_VI, rcid=rcid)),
("SG2D", field_SG2D([(lon, lat)], self.comf)),
]
self._records.append(("VI", fields))
self._n_pt += 1
self._n_in += 1
return (RCNM_VI, rcid)
# --- Add an edge connecting (start_lon, start_lat) → (end_lon, end_lat) ---
# First creates two connected nodes, then the edge that points to them with
# intermediate coordinates.
def add_edge(self, coords: list[tuple[float, float]]) -> tuple[int, int]:
if len(coords) < 2:
raise ValueError("edge needs ≥2 coordinates")
# Create connected nodes for the endpoints
cn_start = self._add_connected_node(*coords[0])
cn_end = self._add_connected_node(*coords[-1])
# The edge: VRID + VRPT (pointers to both CNs) + SG2D (intermediate coords)
rcid = self._next_ve
self._next_ve += 1
# Intermediate points only (S-57 stores edges as: start_CN, intermediates..., end_CN)
intermediate = coords[1:-1]
fields = [
("0001", field_0001(rcid)),
("VRID", field_VRID(rcnm=RCNM_VE, rcid=rcid)),
# VRPT: two pointers — to start node, to end node
("VRPT", field_VRPT([
(RCNM_VC, cn_start[1], 1, 1, 1, 255), # ORNT=1, USAG=1=ext, TOPI=1=begin, MASK=255
(RCNM_VC, cn_end[1], 2, 1, 2, 255), # ORNT=2=reverse, TOPI=2=end
])),
]
if intermediate:
fields.append(("SG2D", field_SG2D(intermediate, self.comf)))
self._records.append(("VE", fields))
self._n_ed += 1
return (RCNM_VE, rcid)
def _add_connected_node(self, lon: float, lat: float) -> tuple[int, int]:
rcid = self._next_vc
self._next_vc += 1
fields = [
("0001", field_0001(rcid)),
("VRID", field_VRID(rcnm=RCNM_VC, rcid=rcid)),
("SG2D", field_SG2D([(lon, lat)], self.comf)),
]
self._records.append(("VC", fields))
self._n_cn += 1
return (RCNM_VC, rcid)
# --- Feature: point ---
def add_point_feature(self, *, objl: int,
lon: float, lat: float,
attrs: list[tuple[int, str]] | None = None) -> int:
rcid = self._next_fid
self._next_fid += 1
fid = rcid
node_name = self.add_isolated_node(lon, lat)
fields = [
("0001", field_0001(rcid)),
("FRID", field_FRID(rcid=rcid, prim=PRIM_POINT, objl=objl)),
("FOID", field_FOID(agen=self.agen, fidn=fid)),
]
if attrs:
fields.append(("ATTF", field_ATTF(attrs)))
fields.append(("FSPT", field_FSPT([
(node_name[0], node_name[1], 255, 255, 255), # ORNT/USAG/MASK = NULL for point
])))
self._records.append(("FE", fields))
self._n_geo += 1
return rcid
# --- Feature: line (e.g. COALNE, DEPCNT) ---
def add_line_feature(self, *, objl: int,
coords: list[tuple[float, float]],
attrs: list[tuple[int, str]] | None = None) -> int:
"""coords: ordered list of (lon, lat) vertices (at least 2). The whole
polyline is encoded as a single VE edge so that GDAL reassembles it into
a LineString geometry. For very long coastlines you can split into multiple
calls, each producing a separate feature."""
if len(coords) < 2:
raise ValueError("line feature needs ≥2 coordinates")
edge_name = self.add_edge(coords)
rcid = self._next_fid
self._next_fid += 1
fields = [
("0001", field_0001(rcid)),
("FRID", field_FRID(rcid=rcid, prim=PRIM_LINE, objl=objl)),
("FOID", field_FOID(agen=self.agen, fidn=rcid)),
]
if attrs:
fields.append(("ATTF", field_ATTF(attrs)))
fields.append(("FSPT", field_FSPT([
(edge_name[0], edge_name[1], 1, 255, 255), # ORNT=1=fwd, USAG=null, MASK=null
])))
self._records.append(("FE", fields))
self._n_geo += 1
return rcid
# --- Feature: area defined by closed boundary polygon ---
def add_area_feature(self, *, objl: int,
ring: list[tuple[float, float]],
attrs: list[tuple[int, str]] | None = None) -> int:
"""ring: list of (lon, lat) — must be closed (first == last) or will be."""
if ring[0] != ring[-1]:
ring = ring + [ring[0]]
edge_name = self.add_edge(ring)
rcid = self._next_fid
self._next_fid += 1
fields = [
("0001", field_0001(rcid)),
("FRID", field_FRID(rcid=rcid, prim=PRIM_AREA, objl=objl)),
("FOID", field_FOID(agen=self.agen, fidn=rcid)),
]
if attrs:
fields.append(("ATTF", field_ATTF(attrs)))
fields.append(("FSPT", field_FSPT([
(edge_name[0], edge_name[1], 1, 1, 255), # ORNT=1=fwd, USAG=1=exterior, MASK=null
])))
self._records.append(("FE", fields))
self._n_geo += 1
return rcid
# --- Build the full file ---
def write(self, output_path: str | Path) -> None:
if not DDR_TEMPLATE.exists():
raise FileNotFoundError(
f"DDR template not found: {DDR_TEMPLATE}. "
"Extract noaa_ddr_template.bin from a real ENC first."
)
with open(DDR_TEMPLATE, "rb") as f:
ddr = f.read()
# Build DSID record (record 2)
dsid_record = build_record([
("0001", field_0001(2)),
("DSID", field_DSID(
rcnm=10, rcid=1, expp=1, intu=self.intu,
dsnm=self.dsnm, edtn=str(self.edition), updn="0",
uadt=self.issue_date, isdt=self.issue_date,
sted=3.1, prsp=1, psdn="", pred="2.0",
prof=1, agen=self.agen, comt=self.comt,
)),
("DSSI", field_DSSI(
dstr=2, aall=1, nall=1,
nomr=self._n_meta, nocr=self._n_col, nogr=self._n_gr,
nolr=self._n_lr, noin=self._n_pt, nocn=self._n_cn,
noed=self._n_ed, nofa=self._n_fa,
)),
])
# Build DSPM record (record 3)
dspm_record = build_record([
("0001", field_0001(3)),
("DSPM", field_DSPM(
rcnm=20, rcid=1, hdat=2, vdat=17, sdat=23,
cscl=self.scale, duni=1, huni=1, puni=1, coun=1,
comf=self.comf, somf=self.somf, comt="",
)),
])
# Build all data records collected by the caller
rcid_seq = 4 # records 1=DDR, 2=DSID, 3=DSPM, then sequential
body_records = b""
for kind, fields in self._records:
# Replace the 0001 RCID with global record sequence (per-record-type
# numbering would also be valid; simplest is global sequence)
new_fields = [
("0001", field_0001(rcid_seq)) if t == "0001" else (t, d)
for t, d in fields
]
body_records += build_record(new_fields)
rcid_seq += 1
with open(output_path, "wb") as f:
f.write(ddr + dsid_record + dspm_record + body_records)