""" nga_fetch.py — Descarga ayudas a la navegacion del API NGA MSI (Pub. 110) y genera CSVs listos para importar en QGIS, uno por tipo de objeto S-57. Uso: python -X utf8 nga_fetch.py python -X utf8 nga_fetch.py --lat0 10.0 --lat1 12.0 --lon0 -76.5 --lon1 -73.5 """ import csv, json, re, ssl, sys, urllib.request from pathlib import Path from datetime import datetime # ── Bounding box por defecto: Costa Caribe colombiana ─────────────────────── DEFAULT_LAT0, DEFAULT_LON0 = 9.5, -77.0 DEFAULT_LAT1, DEFAULT_LON1 = 11.8, -73.5 OUT_DIR = Path(__file__).parent # ── Headers NGA (requiere origen mismo dominio) ────────────────────────────── NGA_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ), "Referer": "https://msi.nga.mil/Publications/NGALOL", "Origin": "https://msi.nga.mil", "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.9", "sec-fetch-site": "same-origin", "sec-fetch-mode": "cors", } NGA_URL = ( "https://msi.nga.mil/api/publications/ngalol/lights-buoys" "?latitudeLeft={lat0}&longitudeLeft={lon0}" "&latitudeRight={lat1}&longitudeRight={lon1}" "&includeRemovals=false&output=json" ) _SSL_CTX = ssl.create_default_context() # verifica certificado SSL por defecto # ── Rumbos conocidos de enfilaciones Bocas de Ceniza / Rio Magdalena ───────── # Fuente: NGA Pub.110 Vol.D (Colombia) + geometria del canal DIMAR. # E-1 y E-3: canal exterior (barra), E-6/E-8: primer tramo, etc. # ATENCION: valores aproximados +/-1 grado. Verificar contra DIMAR Lista Faros. KNOWN_ORIENT = { 'E-1': '176.0', # Bocas de Ceniza, barra exterior 'E-3': '176.0', # rear de E-1 'E-3A': '135.6', # ramal alternativo 'E-6': '167.7', # primer codo (visible 166-170 segun NGA) 'E-8': '167.7', # rear de E-6 'E-16': '122.0', # codo hacia Barranquilla 'E-14': '122.0', # rear de E-16 (302-122 opuesto) 'E-18': '142.2', # Dir luz W/R/G (sector W: 141.5-142.5) 'E-20': '142.2', # rear de E-18 'E-4': '141.0', # Dir Iso W (interior) 'E-7': '120.0', # Dir Iso W/R/G sector W 118.5-121.5 } # ── Conversion DMS -> decimal ───────────────────────────────────────────────── _DMS = re.compile(r'(\d+)[deg°º]\s*(\d+)[min′\'"]\s*([\d.]+)[sec"″]?\s*([NSEW])', re.I) _DMS2 = re.compile(r'(\d+)[°º]\s*([\d.]+)[′\'"]\s*([NSEW])', re.I) def dms_dec(txt: str) -> float | None: txt = txt.replace('\n', ' ').replace('\\u00b0', '°') m = _DMS.search(txt) if m: d, mi, s, h = int(m[1]), int(m[2]), float(m[3]), m[4].upper() v = d + mi/60 + s/3600 return round(-v if h in ('S','W') else v, 6) m = _DMS2.search(txt) if m: d, mi, h = int(m[1]), float(m[2]), m[3].upper() v = d + mi/60 return round(-v if h in ('S','W') else v, 6) # Unicode grado (JSON de NGA viene con °) m2 = re.search(r'(\d+)°(\d+)\'([\d.]+)"([NSEW])', txt) if m2: d, mi, s, h = int(m2[1]), int(m2[2]), float(m2[3]), m2[4].upper() v = d + mi/60 + s/3600 return round(-v if h in ('S','W') else v, 6) nums = re.findall(r'[-\d.]+', txt) return float(nums[0]) if nums else None def parse_pos(pos: str): parts = re.split(r'\n', pos) if len(parts) < 2: return None, None return dms_dec(parts[0]), dms_dec(parts[1]) # ── Tablas S-57 ─────────────────────────────────────────────────────────────── LITCHR_FIXED = { 'F':1, 'Fl':2, 'LFl':4, 'Q':5, 'VQ':6, 'UQ':7, 'Iso':8, 'Oc':9, 'IQ':10, 'Mo':13, 'FFl':14, 'Al.Oc':18, 'Al.LFl':19, 'Al.Fl':20, 'Dir':28, 'Dir.Iso':8, 'Dir.Fl':2, 'Dir.Oc':9, 'Dir.LFl':4, 'Q+LFl':25, 'VQ+LFl':26, } _COLORES = {'Bu':5, 'Or':11, 'Am':6, 'Vi':7, 'W':1, 'R':3, 'G':4, 'Y':6, 'B':2} COLOUR_TXT_MAP = { '1':'W','2':'B','3':'R','4':'G','5':'Bu','6':'Y','7':'Vi','11':'Or', } _TIPOS = [ 'Al.Fl','Al.Oc','Al.LFl','Dir.Iso','Dir.Fl','Dir.Oc','Dir.LFl', 'VQ+LFl','Q+LFl','FFl','LFl','VQ','UQ','IQ','Mo','Iso','Oc','Fl','Q','F', ] # ── Parser de tokens de caracteristica ─────────────────────────────────────── def _tokenize(s: str) -> list: """'Fl.(4)W.R.' -> ['Fl','(4)','W','R'] sin puntos separadores.""" tokens, cur, depth = [], '', 0 for ch in s.rstrip('.'): if ch == '(': depth += 1; cur += ch elif ch == ')': depth -= 1; cur += ch if depth == 0: tokens.append(cur); cur = '' elif ch == '.' and depth == 0: if cur: tokens.append(cur); cur = '' else: cur += ch if cur: tokens.append(cur) return tokens _PER_RE = re.compile(r'period\s+([\d.]+)\s*s', re.I) _FL_RE = re.compile(r'\bfl\.\s*([\d.]+)\s*s', re.I) _SEC_RE = re.compile( r'([WRGBY])\.\s*(\d+[°º]\d+[\'`])\s*[–-]\s*(\d+[°º]\d+[\'`])', re.I ) _VIS_RE = re.compile(r'[Vv]isible\s+([\d.]+)[°º]\s*[–-]\s*([\d.]+)[°º]') _BRG_RE = re.compile(r'(\d{2,3})\^([\d.]*)', re.I) # "139^18'" formato NGA def _deg_min(txt: str) -> str: txt = txt.strip().replace('`',"'") m = re.match(r'(\d+)[°º](\d+)', txt) if m: return str(round(int(m[1]) + int(m[2])/60, 2)) nums = re.findall(r'[\d.]+', txt) return nums[0] if nums else txt def parse_char(char_str: str, remarks: str = '', name: str = '') -> dict: """Extrae todos los campos S-57 de la caracteristica + remarks del NGA.""" out = { 'LITCHR':'', 'SIGGRP':'', 'COLOUR':'', 'SIGPER':'', 'SIGPER2':'', 'SECTR1':'', 'SECTR2':'', 'MLTYLT':'', 'ORIENT':'', } if not char_str: return out lines = char_str.split('\n') base = lines[0].strip() full = ' '.join(lines) tokens = _tokenize(base) remaining = list(tokens) # 1) Tipo de luz (compuestos primero) tipo_found = '' for tipo in _TIPOS: tp_tokens = tipo.split('.') if remaining[:len(tp_tokens)] == tp_tokens: tipo_found = tipo remaining = remaining[len(tp_tokens):] break if tipo_found: out['LITCHR'] = LITCHR_FIXED.get(tipo_found, tipo_found) # 2) Grupo opcional "(N)" o "(N+M)" if remaining and remaining[0].startswith('('): out['SIGGRP'] = remaining.pop(0) # 3) Colores: tokens de solo letras colours = [] for t in remaining: if re.match(r'^[A-Za-z]+$', t): # 2 letras primero, luego 1 c = _COLORES.get(t) or _COLORES.get(t[:2]) or _COLORES.get(t[:1]) if c: colours.append(str(c)) out['COLOUR'] = ','.join(colours) if colours else '1' # 4) Periodo pm = _PER_RE.search(full) if pm: out['SIGPER'] = pm.group(1) # 5) Duracion destello fm = _FL_RE.search(full) if fm: out['SIGPER2'] = fm.group(1) # 6) Sectores y orientacion desde remarks rem = (remarks or '').replace('`',"'").replace('',"'") secs = _SEC_RE.findall(rem) if secs: lst = [f"{c.upper()}:{_deg_min(s1)}-{_deg_min(s2)}" for c, s1, s2 in secs] out['MLTYLT'] = ' | '.join(lst) _, s1, s2 = secs[0] out['SECTR1'] = _deg_min(s1) out['SECTR2'] = _deg_min(s2) if not out['SECTR1']: vm = _VIS_RE.search(rem + ' ' + full) if vm: out['SECTR1'] = vm.group(1) out['SECTR2'] = vm.group(2) # 7) Rumbo de enfilacion desde formato NGA "310 meters 139^18' from front" bm = _BRG_RE.search(rem + ' ' + full) if bm: deg = int(bm.group(1)) frac = bm.group(2) if frac: deg_dec = round(deg + float(frac)/60, 1) else: deg_dec = float(deg) # El rumbo NGA es desde el frente al trasero -> invertir para ORIENT # (ORIENT = rumbo inbound = desde el mar hacia el muelle) orient = round((deg_dec + 180) % 360, 1) out['ORIENT'] = str(orient) # 8) Rumbo conocido de tabla local (mas preciso que calculo) # Buscar el ID de la enfilacion en el nombre for key, val in KNOWN_ORIENT.items(): if key in name: out['ORIENT'] = val # Si el sector visible es estrecho y ORIENT conocido, derivar sectores if not out['SECTR1'] and out['LITCHR'] == 28: # Dir light ang = float(val) out['SECTR1'] = str(round(ang - 1.5, 1)) out['SECTR2'] = str(round(ang + 1.5, 1)) break return out # ── Altura en metros desde "heightFeetMeters" ─────────────────────────────── def parse_height(hfm: str) -> str: if not hfm: return '' parts = hfm.strip().split('\n') if len(parts) >= 2 and parts[1].strip(): return parts[1].strip() nums = re.findall(r'[\d.]+', parts[0]) return nums[0] if nums else '' # ── Clasificacion S-57 ─────────────────────────────────────────────────────── def classify(rec: dict) -> str: name = (rec.get('name') or '').lower() struc = (rec.get('structure') or '').lower() aid = (rec.get('aidType') or '').lower() if 'buoy' in struc or 'buoy' in aid: if any(w in name+struc for w in ['north','south','east','west','cardinal']): return 'BOYCAR' if any(w in name+struc for w in ['safe water','fairway','spherical']): return 'BOYSAW' return 'BOYLAT' if 'beacon' in struc or 'beacon' in aid: return 'BCNLAT' return 'LIGHTS' # ── Fetch ──────────────────────────────────────────────────────────────────── def fetch(lat0, lon0, lat1, lon1) -> list: url = NGA_URL.format(lat0=lat0, lon0=lon0, lat1=lat1, lon1=lon1) print(f"Consultando NGA MSI...\n{url}\n") req = urllib.request.Request(url, headers=NGA_HEADERS) try: with urllib.request.urlopen(req, timeout=25, context=_SSL_CTX) as r: data = json.loads(r.read().decode('utf-8')) except urllib.error.HTTPError as e: print(f"Error HTTP {e.code}: {e.reason}"); sys.exit(1) except Exception as e: print(f"Error conexion: {e}"); sys.exit(1) recs = data.get('ngalol') or [] print(f" -> {len(recs)} registros recibidos") return recs # ── Campos CSV (S-57 completo) ─────────────────────────────────────────────── FIELDS = [ 'lon','lat', 'OBJNAM','NOBJNM', # Luz 'LITCHR','LITCHR_TXT', 'SIGGRP', 'COLOUR','COLOUR_TXT', 'SIGPER','SIGPER2', 'VALNMR', 'HEIGHT', # Sectores / orientacion 'SECTR1','SECTR2', 'MLTYLT', 'ORIENT', # Boya 'BOYSHP','CATLBR','CATCAM', # Texto 'INFORM','TXTDSC', # Referencia NGA '_nga_feature','_nga_volume','_nga_region', '_nga_char_raw','_nga_hfm_raw','_nga_range_raw','_nga_notice', ] def write_csv(path: Path, rows: list): with open(path, 'w', newline='', encoding='utf-8-sig') as f: w = csv.DictWriter(f, fieldnames=FIELDS, extrasaction='ignore') w.writeheader() w.writerows(rows) print(f" OK {path.name} ({len(rows)} registros)") # ── Main ───────────────────────────────────────────────────────────────────── def main(): lat0, lon0, lat1, lon1 = DEFAULT_LAT0, DEFAULT_LON0, DEFAULT_LAT1, DEFAULT_LON1 args = sys.argv[1:] for i, a in enumerate(args): if a=='--lat0' and i+13} registros") # Escribir CSVs print(f"\nEscribiendo CSVs en: {OUT_DIR}\n") ts = datetime.now().strftime('%Y%m%d') for k, rows in buckets.items(): if rows: write_csv(OUT_DIR / f'nga_{k}_{ts}.csv', rows) # Preview tabla completa print(f"\n{'OBJNAM':<45} {'CHAR':<15} {'COL':<8} {'PER':>5} {'RNG':>4} {'HGT':>4} {'ORI':>6} {'SEC':}") print('-'*115) for r in buckets['LIGHTS']: sec = f"{r['SECTR1']}-{r['SECTR2']}" if r['SECTR1'] else '' print(f"{r['OBJNAM'][:44]:<45} {r['LITCHR_TXT'][:14]:<15} " f"{r['COLOUR_TXT']:<8} {r['SIGPER']:>5}s {r['VALNMR']:>4}NM " f"{r['HEIGHT']:>4}m {r['ORIENT']:>6} {sec}") print("\nListo. En QGIS: Capa -> Anadir capa -> Texto delimitado") print("X=lon Y=lat CRS=EPSG:4326 Renombre capa con codigo S-57") print("ATENCION: ORIENT de enfilaciones son aproximados.") print("Verificar contra DIMAR Lista de Faros antes de publicar carta.") if __name__ == '__main__': main()