Files
AR-QGISS57Converter/nga_fetch.py
T

424 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
nga_fetch.py — Descarga ayudas a la navegacion del API NGA MSI (Pub. 110)
y genera CSVs listos para importar en QGIS, uno por tipo de objeto S-57.
Uso:
python -X utf8 nga_fetch.py
python -X utf8 nga_fetch.py --lat0 10.0 --lat1 12.0 --lon0 -76.5 --lon1 -73.5
"""
import csv, json, re, ssl, sys, urllib.request
from pathlib import Path
from datetime import datetime
# ── Bounding box por defecto: Costa Caribe colombiana ───────────────────────
DEFAULT_LAT0, DEFAULT_LON0 = 9.5, -77.0
DEFAULT_LAT1, DEFAULT_LON1 = 11.8, -73.5
OUT_DIR = Path(__file__).parent
# ── Headers NGA (requiere origen mismo dominio) ──────────────────────────────
NGA_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Referer": "https://msi.nga.mil/Publications/NGALOL",
"Origin": "https://msi.nga.mil",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "cors",
}
NGA_URL = (
"https://msi.nga.mil/api/publications/ngalol/lights-buoys"
"?latitudeLeft={lat0}&longitudeLeft={lon0}"
"&latitudeRight={lat1}&longitudeRight={lon1}"
"&includeRemovals=false&output=json"
)
_SSL_CTX = ssl.create_default_context() # verifica certificado SSL por defecto
# ── Rumbos conocidos de enfilaciones Bocas de Ceniza / Rio Magdalena ─────────
# Fuente: NGA Pub.110 Vol.D (Colombia) + geometria del canal DIMAR.
# E-1 y E-3: canal exterior (barra), E-6/E-8: primer tramo, etc.
# ATENCION: valores aproximados +/-1 grado. Verificar contra DIMAR Lista Faros.
KNOWN_ORIENT = {
'E-1': '176.0', # Bocas de Ceniza, barra exterior
'E-3': '176.0', # rear de E-1
'E-3A': '135.6', # ramal alternativo
'E-6': '167.7', # primer codo (visible 166-170 segun NGA)
'E-8': '167.7', # rear de E-6
'E-16': '122.0', # codo hacia Barranquilla
'E-14': '122.0', # rear de E-16 (302-122 opuesto)
'E-18': '142.2', # Dir luz W/R/G (sector W: 141.5-142.5)
'E-20': '142.2', # rear de E-18
'E-4': '141.0', # Dir Iso W (interior)
'E-7': '120.0', # Dir Iso W/R/G sector W 118.5-121.5
}
# ── Conversion DMS -> decimal ─────────────────────────────────────────────────
_DMS = re.compile(r'(\d+)[deg°º]\s*(\d+)[min\'"]\s*([\d.]+)[sec"″]?\s*([NSEW])', re.I)
_DMS2 = re.compile(r'(\d+)[°º]\s*([\d.]+)[\'"]\s*([NSEW])', re.I)
def dms_dec(txt: str) -> float | None:
txt = txt.replace('\n', ' ').replace('\\u00b0', '°')
m = _DMS.search(txt)
if m:
d, mi, s, h = int(m[1]), int(m[2]), float(m[3]), m[4].upper()
v = d + mi/60 + s/3600
return round(-v if h in ('S','W') else v, 6)
m = _DMS2.search(txt)
if m:
d, mi, h = int(m[1]), float(m[2]), m[3].upper()
v = d + mi/60
return round(-v if h in ('S','W') else v, 6)
# Unicode grado (JSON de NGA viene con °)
m2 = re.search(r'(\d+)°(\d+)\'([\d.]+)"([NSEW])', txt)
if m2:
d, mi, s, h = int(m2[1]), int(m2[2]), float(m2[3]), m2[4].upper()
v = d + mi/60 + s/3600
return round(-v if h in ('S','W') else v, 6)
nums = re.findall(r'[-\d.]+', txt)
return float(nums[0]) if nums else None
def parse_pos(pos: str):
parts = re.split(r'\n', pos)
if len(parts) < 2:
return None, None
return dms_dec(parts[0]), dms_dec(parts[1])
# ── Tablas S-57 ───────────────────────────────────────────────────────────────
LITCHR_FIXED = {
'F':1, 'Fl':2, 'LFl':4, 'Q':5, 'VQ':6, 'UQ':7,
'Iso':8, 'Oc':9, 'IQ':10, 'Mo':13, 'FFl':14,
'Al.Oc':18, 'Al.LFl':19, 'Al.Fl':20, 'Dir':28,
'Dir.Iso':8, 'Dir.Fl':2, 'Dir.Oc':9, 'Dir.LFl':4,
'Q+LFl':25, 'VQ+LFl':26,
}
_COLORES = {'Bu':5, 'Or':11, 'Am':6, 'Vi':7, 'W':1, 'R':3, 'G':4, 'Y':6, 'B':2}
COLOUR_TXT_MAP = {
'1':'W','2':'B','3':'R','4':'G','5':'Bu','6':'Y','7':'Vi','11':'Or',
}
_TIPOS = [
'Al.Fl','Al.Oc','Al.LFl','Dir.Iso','Dir.Fl','Dir.Oc','Dir.LFl',
'VQ+LFl','Q+LFl','FFl','LFl','VQ','UQ','IQ','Mo','Iso','Oc','Fl','Q','F',
]
# ── Parser de tokens de caracteristica ───────────────────────────────────────
def _tokenize(s: str) -> list:
"""'Fl.(4)W.R.' -> ['Fl','(4)','W','R'] sin puntos separadores."""
tokens, cur, depth = [], '', 0
for ch in s.rstrip('.'):
if ch == '(':
depth += 1; cur += ch
elif ch == ')':
depth -= 1; cur += ch
if depth == 0:
tokens.append(cur); cur = ''
elif ch == '.' and depth == 0:
if cur: tokens.append(cur); cur = ''
else:
cur += ch
if cur: tokens.append(cur)
return tokens
_PER_RE = re.compile(r'period\s+([\d.]+)\s*s', re.I)
_FL_RE = re.compile(r'\bfl\.\s*([\d.]+)\s*s', re.I)
_SEC_RE = re.compile(
r'([WRGBY])\.\s*(\d+[°º]\d+[\'`])\s*[-]\s*(\d+[°º]\d+[\'`])', re.I
)
_VIS_RE = re.compile(r'[Vv]isible\s+([\d.]+)[°º]\s*[-]\s*([\d.]+)[°º]')
_BRG_RE = re.compile(r'(\d{2,3})\^([\d.]*)', re.I) # "139^18'" formato NGA
def _deg_min(txt: str) -> str:
txt = txt.strip().replace('`',"'")
m = re.match(r'(\d+)[°º](\d+)', txt)
if m: return str(round(int(m[1]) + int(m[2])/60, 2))
nums = re.findall(r'[\d.]+', txt)
return nums[0] if nums else txt
def parse_char(char_str: str, remarks: str = '', name: str = '') -> dict:
"""Extrae todos los campos S-57 de la caracteristica + remarks del NGA."""
out = {
'LITCHR':'', 'SIGGRP':'', 'COLOUR':'', 'SIGPER':'',
'SIGPER2':'', 'SECTR1':'', 'SECTR2':'', 'MLTYLT':'', 'ORIENT':'',
}
if not char_str:
return out
lines = char_str.split('\n')
base = lines[0].strip()
full = ' '.join(lines)
tokens = _tokenize(base)
remaining = list(tokens)
# 1) Tipo de luz (compuestos primero)
tipo_found = ''
for tipo in _TIPOS:
tp_tokens = tipo.split('.')
if remaining[:len(tp_tokens)] == tp_tokens:
tipo_found = tipo
remaining = remaining[len(tp_tokens):]
break
if tipo_found:
out['LITCHR'] = LITCHR_FIXED.get(tipo_found, tipo_found)
# 2) Grupo opcional "(N)" o "(N+M)"
if remaining and remaining[0].startswith('('):
out['SIGGRP'] = remaining.pop(0)
# 3) Colores: tokens de solo letras
colours = []
for t in remaining:
if re.match(r'^[A-Za-z]+$', t):
# 2 letras primero, luego 1
c = _COLORES.get(t) or _COLORES.get(t[:2]) or _COLORES.get(t[:1])
if c: colours.append(str(c))
out['COLOUR'] = ','.join(colours) if colours else '1'
# 4) Periodo
pm = _PER_RE.search(full)
if pm: out['SIGPER'] = pm.group(1)
# 5) Duracion destello
fm = _FL_RE.search(full)
if fm: out['SIGPER2'] = fm.group(1)
# 6) Sectores y orientacion desde remarks
rem = (remarks or '').replace('`',"'").replace('',"'")
secs = _SEC_RE.findall(rem)
if secs:
lst = [f"{c.upper()}:{_deg_min(s1)}-{_deg_min(s2)}"
for c, s1, s2 in secs]
out['MLTYLT'] = ' | '.join(lst)
_, s1, s2 = secs[0]
out['SECTR1'] = _deg_min(s1)
out['SECTR2'] = _deg_min(s2)
if not out['SECTR1']:
vm = _VIS_RE.search(rem + ' ' + full)
if vm:
out['SECTR1'] = vm.group(1)
out['SECTR2'] = vm.group(2)
# 7) Rumbo de enfilacion desde formato NGA "310 meters 139^18' from front"
bm = _BRG_RE.search(rem + ' ' + full)
if bm:
deg = int(bm.group(1))
frac = bm.group(2)
if frac:
deg_dec = round(deg + float(frac)/60, 1)
else:
deg_dec = float(deg)
# El rumbo NGA es desde el frente al trasero -> invertir para ORIENT
# (ORIENT = rumbo inbound = desde el mar hacia el muelle)
orient = round((deg_dec + 180) % 360, 1)
out['ORIENT'] = str(orient)
# 8) Rumbo conocido de tabla local (mas preciso que calculo)
# Buscar el ID de la enfilacion en el nombre
for key, val in KNOWN_ORIENT.items():
if key in name:
out['ORIENT'] = val
# Si el sector visible es estrecho y ORIENT conocido, derivar sectores
if not out['SECTR1'] and out['LITCHR'] == 28: # Dir light
ang = float(val)
out['SECTR1'] = str(round(ang - 1.5, 1))
out['SECTR2'] = str(round(ang + 1.5, 1))
break
return out
# ── Altura en metros desde "heightFeetMeters" ───────────────────────────────
def parse_height(hfm: str) -> str:
if not hfm: return ''
parts = hfm.strip().split('\n')
if len(parts) >= 2 and parts[1].strip():
return parts[1].strip()
nums = re.findall(r'[\d.]+', parts[0])
return nums[0] if nums else ''
# ── Clasificacion S-57 ───────────────────────────────────────────────────────
def classify(rec: dict) -> str:
name = (rec.get('name') or '').lower()
struc = (rec.get('structure') or '').lower()
aid = (rec.get('aidType') or '').lower()
if 'buoy' in struc or 'buoy' in aid:
if any(w in name+struc for w in ['north','south','east','west','cardinal']):
return 'BOYCAR'
if any(w in name+struc for w in ['safe water','fairway','spherical']):
return 'BOYSAW'
return 'BOYLAT'
if 'beacon' in struc or 'beacon' in aid:
return 'BCNLAT'
return 'LIGHTS'
# ── Fetch ────────────────────────────────────────────────────────────────────
def fetch(lat0, lon0, lat1, lon1) -> list:
url = NGA_URL.format(lat0=lat0, lon0=lon0, lat1=lat1, lon1=lon1)
print(f"Consultando NGA MSI...\n{url}\n")
req = urllib.request.Request(url, headers=NGA_HEADERS)
try:
with urllib.request.urlopen(req, timeout=25, context=_SSL_CTX) as r:
data = json.loads(r.read().decode('utf-8'))
except urllib.error.HTTPError as e:
print(f"Error HTTP {e.code}: {e.reason}"); sys.exit(1)
except Exception as e:
print(f"Error conexion: {e}"); sys.exit(1)
recs = data.get('ngalol') or []
print(f" -> {len(recs)} registros recibidos")
return recs
# ── Campos CSV (S-57 completo) ───────────────────────────────────────────────
FIELDS = [
'lon','lat',
'OBJNAM','NOBJNM',
# Luz
'LITCHR','LITCHR_TXT',
'SIGGRP',
'COLOUR','COLOUR_TXT',
'SIGPER','SIGPER2',
'VALNMR',
'HEIGHT',
# Sectores / orientacion
'SECTR1','SECTR2',
'MLTYLT',
'ORIENT',
# Boya
'BOYSHP','CATLBR','CATCAM',
# Texto
'INFORM','TXTDSC',
# Referencia NGA
'_nga_feature','_nga_volume','_nga_region',
'_nga_char_raw','_nga_hfm_raw','_nga_range_raw','_nga_notice',
]
def write_csv(path: Path, rows: list):
with open(path, 'w', newline='', encoding='utf-8-sig') as f:
w = csv.DictWriter(f, fieldnames=FIELDS, extrasaction='ignore')
w.writeheader()
w.writerows(rows)
print(f" OK {path.name} ({len(rows)} registros)")
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
lat0, lon0, lat1, lon1 = DEFAULT_LAT0, DEFAULT_LON0, DEFAULT_LAT1, DEFAULT_LON1
args = sys.argv[1:]
for i, a in enumerate(args):
if a=='--lat0' and i+1<len(args): lat0=float(args[i+1])
if a=='--lat1' and i+1<len(args): lat1=float(args[i+1])
if a=='--lon0' and i+1<len(args): lon0=float(args[i+1])
if a=='--lon1' and i+1<len(args): lon1=float(args[i+1])
records = fetch(lat0, lon0, lat1, lon1)
buckets = {
'LIGHTS':[], 'BOYLAT':[], 'BOYCAR':[],
'BOYSAW':[], 'BCNLAT':[], 'OTHER':[],
}
for rec in records:
lat, lon = parse_pos(rec.get('position',''))
if lat is None: continue
char_raw = rec.get('characteristic','')
remarks = rec.get('remarks','') or ''
name_raw = (rec.get('name') or '').strip().lstrip('-').strip().rstrip('.')
cp = parse_char(char_raw, remarks, name_raw)
height = parse_height(rec.get('heightFeetMeters',''))
valnmr = (rec.get('range') or '').strip()
s57t = classify(rec)
col_txt = ','.join(
COLOUR_TXT_MAP.get(c.strip(), c.strip())
for c in cp['COLOUR'].split(',') if c.strip()
)
row = {
'lon': lon,
'lat': lat,
'OBJNAM': name_raw,
'NOBJNM': '',
'LITCHR': cp['LITCHR'],
'LITCHR_TXT': char_raw.split('\n')[0].strip(),
'SIGGRP': cp['SIGGRP'],
'COLOUR': cp['COLOUR'],
'COLOUR_TXT': col_txt,
'SIGPER': cp['SIGPER'],
'SIGPER2': cp['SIGPER2'],
'VALNMR': valnmr,
'HEIGHT': height,
'SECTR1': cp['SECTR1'],
'SECTR2': cp['SECTR2'],
'MLTYLT': cp['MLTYLT'],
'ORIENT': cp['ORIENT'],
'BOYSHP': '',
'CATLBR': '',
'CATCAM': '',
'INFORM': remarks.replace('\n',' '),
'TXTDSC': (rec.get('structure') or '').replace('\n',' ').strip(),
'_nga_feature': (rec.get('featureNumber') or '').replace('\n',' '),
'_nga_volume': rec.get('volumeNumber',''),
'_nga_region': (rec.get('regionHeading') or ''),
'_nga_char_raw': char_raw.replace('\n',' | '),
'_nga_hfm_raw': (rec.get('heightFeetMeters') or '').replace('\n','ft/'),
'_nga_range_raw': valnmr,
'_nga_notice': str(rec.get('noticeNumber','')),
}
# Clasificacion detallada boyas
struc_low = (rec.get('structure') or '').lower()
if s57t == 'BOYLAT':
if any(w in struc_low for w in ['red','port','can']):
row.update({'CATLBR':'1','COLOUR':'3','COLOUR_TXT':'R','BOYSHP':'2'})
elif any(w in struc_low for w in ['green','starboard','conical']):
row.update({'CATLBR':'2','COLOUR':'4','COLOUR_TXT':'G','BOYSHP':'1'})
elif s57t == 'BOYCAR':
nm = name_raw.lower()
if 'north' in nm: row['CATCAM']='1'
elif 'east' in nm: row['CATCAM']='2'
elif 'south' in nm: row['CATCAM']='3'
elif 'west' in nm: row['CATCAM']='4'
row.update({'COLOUR':'6,1','COLOUR_TXT':'Y,W'})
elif s57t == 'BOYSAW':
row.update({'COLOUR':'1,3','COLOUR_TXT':'W,R','BOYSHP':'3'})
buckets[s57t if s57t in buckets else 'OTHER'].append(row)
# Resumen
print("\nResumen por tipo S-57:")
for k,v in buckets.items():
if v: print(f" {k:<10} {len(v):>3} registros")
# Escribir CSVs
print(f"\nEscribiendo CSVs en: {OUT_DIR}\n")
ts = datetime.now().strftime('%Y%m%d')
for k, rows in buckets.items():
if rows:
write_csv(OUT_DIR / f'nga_{k}_{ts}.csv', rows)
# Preview tabla completa
print(f"\n{'OBJNAM':<45} {'CHAR':<15} {'COL':<8} {'PER':>5} {'RNG':>4} {'HGT':>4} {'ORI':>6} {'SEC':}")
print('-'*115)
for r in buckets['LIGHTS']:
sec = f"{r['SECTR1']}-{r['SECTR2']}" if r['SECTR1'] else ''
print(f"{r['OBJNAM'][:44]:<45} {r['LITCHR_TXT'][:14]:<15} "
f"{r['COLOUR_TXT']:<8} {r['SIGPER']:>5}s {r['VALNMR']:>4}NM "
f"{r['HEIGHT']:>4}m {r['ORIENT']:>6} {sec}")
print("\nListo. En QGIS: Capa -> Anadir capa -> Texto delimitado")
print("X=lon Y=lat CRS=EPSG:4326 Renombre capa con codigo S-57")
print("ATENCION: ORIENT de enfilaciones son aproximados.")
print("Verificar contra DIMAR Lista de Faros antes de publicar carta.")
if __name__ == '__main__':
main()