"""Runner para data fetchers. Flujo: 1. Geocode (sequential) - sin esto no podemos hacer FEMA ni NOAA 2. FEMA + HUD + NOAA en paralelo (ThreadPoolExecutor, max 3 workers) Fail-soft en cada fetcher: si uno falla, el campo queda {} y se anota en fetch_errors. El pipeline nunca aborta. Output schema: { "geocode": {matched_address, lat, lng, city, state, zip, county_name, county_fips, state_fips} | {} "flood": {zone, bfe, sfha, subtype, source} | {} "fmr": {year, county, state, fmr_efficiency, fmr_1br..fmr_4br, source} | {} "hurricanes": [{name, year, category, max_wind_mph, closest_pass_miles}, ...] "hurricanes_summary": {lookback_years, max_distance_mi, total_hurricanes_nearby, source} "fetch_errors": ["geocode: ...", "hud: ...", ...] # strings con explicacion "duration_seconds": float } """ from __future__ import annotations import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Callable, Optional, TYPE_CHECKING from .base import FetcherError from .cache import FileCache from .census_geocode import fetch_geocode from .fema_flood import fetch_flood from .hud_fmr import fetch_fmr from .noaa_hurricanes import fetch_hurricanes from .neighborhood_class import fetch_neighborhood from .court_records import fetch_court_records, _enable_court_records # Wave 1.5A # Paths ABSOLUTOS anclados al proyecto (no relativos al CWD del caller). # Asi el cache y data files siempre estan en D:\Proyectos Software\AR-House\ # sin importar desde donde se llama fetch_all(). _PROJECT_ROOT = Path(__file__).resolve().parent.parent DEFAULT_CACHE_DIR = _PROJECT_ROOT / ".cache" / "data_fetchers" DEFAULT_HURDAT2_PATH = _PROJECT_ROOT / "data" / "hurdat2.txt" if TYPE_CHECKING: from orchestrator import DealInputs # TTL por namespace (dias) TTL = { "geocode": 30, "fema": 30, "hud_fmr": 365, # cambia anualmente "hurricanes": 30, "neighborhood": 90, # ACS y crime cambian lentamente "court_records": 7, # Wave 1.5A: procesos judiciales se mueven lento } def _emit(cb: Optional[Callable[[str], None]], msg: str) -> None: if cb: cb(msg) def _safe( cache: FileCache, namespace: str, cache_key: str, ttl: float, func: Callable[[], dict], errors: list, error_prefix: str, ) -> dict: """Wrapper fail-soft: usa cache, llama func si miss, captura errores.""" cached = cache.get(namespace, cache_key, ttl) if cached is not None: return cached try: data = func() cache.set(namespace, cache_key, data) return data except FetcherError as e: errors.append(f"{error_prefix}: {e}") return {} except Exception as e: errors.append(f"{error_prefix}: unexpected {type(e).__name__}: {e}") return {} def fetch_all( deal: "DealInputs", status_cb: Optional[Callable[[str], None]] = None, cache_dir: str | Path | None = None, hurdat2_path: str | Path | None = None, include_neighborhood_dom: bool = False, ) -> dict: """Obtiene todos los datos verificados para un deal. Geocode primero (necesario para FEMA y NOAA). Los demas en paralelo. Si cache_dir o hurdat2_path son None, usa paths absolutos anclados al proyecto (independientes del CWD del caller). """ # Default paths absolutos al proyecto (no relativos al CWD del caller) if cache_dir is None: cache_dir = DEFAULT_CACHE_DIR if hurdat2_path is None: hurdat2_path = DEFAULT_HURDAT2_PATH t0 = time.perf_counter() cache = FileCache(cache_dir) errors: list[str] = [] # --- 1. Geocode (sequential, bloquea a los demas) ---------------------- _emit(status_cb, "Geocodificando direccion (Census)...") geocode = _safe( cache, "geocode", deal.address, TTL["geocode"], lambda: fetch_geocode(deal.address), errors, "geocode", ) if not geocode or not geocode.get("lat") or not geocode.get("lng"): _emit(status_cb, " Geocodificacion fallo - omitiendo FEMA/NOAA/HUD/neighborhood") return { "geocode": geocode, "flood": {}, "fmr": {}, "hurricanes": [], "hurricanes_summary": {}, "neighborhood": {}, "fetch_errors": errors + (["geocode_failed_no_coords"] if not errors else []), "duration_seconds": round(time.perf_counter() - t0, 2), } lat = float(geocode["lat"]) lng = float(geocode["lng"]) state = geocode.get("state", "") county_name = geocode.get("county_name", "") _emit( status_cb, f" OK: {geocode.get('matched_address', '?')} | " f"{county_name}, {state} | ({lat:.4f}, {lng:.4f})" ) # --- 2. FEMA + HUD + NOAA + Neighborhood en paralelo ------------------- _emit(status_cb, "Fetching FEMA / HUD / NOAA / Neighborhood en paralelo...") def task_fema(): return _safe( cache, "fema", f"{lat:.5f},{lng:.5f}", TTL["fema"], lambda: fetch_flood(lat, lng), errors, "fema", ) def task_hud(): if not state or not county_name: errors.append("hud: state o county_name faltantes en geocode") return {} return _safe( cache, "hud_fmr", f"{state}|{county_name}", TTL["hud_fmr"], lambda: fetch_fmr(state, county_name), errors, "hud", ) def task_noaa(): return _safe( cache, "hurricanes", f"{lat:.4f},{lng:.4f}", TTL["hurricanes"], lambda: fetch_hurricanes(lat, lng, years_back=20, hurdat2_path=hurdat2_path), errors, "noaa", ) def task_neighborhood(): tract = geocode.get("tract_geoid") or "no_tract" return _safe( cache, "neighborhood", f"{tract}|dom={include_neighborhood_dom}", TTL["neighborhood"], lambda: fetch_neighborhood(geocode, include_dom=include_neighborhood_dom), errors, "neighborhood", ) # Wave 1.5A: court records (opt-in via ENABLE_COURT_RECORDS=true) # Solo si el county es Duval (Wave 1.5A v1). Otros condados → soft-fail. def task_court_records(): if not _enable_court_records(): return {"status": "DISABLED", "recommendation": "Activar ENABLE_COURT_RECORDS=true en .env para " "deteccion deterministica de foreclosure/lis pendens."} return _safe( cache, "court_records", f"{deal.address}|{county_name}", TTL["court_records"], lambda: fetch_court_records(address=deal.address, county_name=county_name), errors, "court_records", ) with ThreadPoolExecutor(max_workers=5) as ex: f_fema = ex.submit(task_fema) f_hud = ex.submit(task_hud) f_noaa = ex.submit(task_noaa) f_nbh = ex.submit(task_neighborhood) f_court = ex.submit(task_court_records) flood = f_fema.result() fmr = f_hud.result() noaa_data = f_noaa.result() neighborhood = f_nbh.result() court_records = f_court.result() hurricanes = noaa_data.get("hurricanes", []) if isinstance(noaa_data, dict) else [] hurricanes_summary = { k: v for k, v in (noaa_data or {}).items() if k != "hurricanes" } # Log de resumen f_zone = flood.get("zone", "N/A") if flood else "N/A" h3 = fmr.get("fmr_3br", "N/A") if fmr else "N/A" n_hur = len(hurricanes) nbh_class = neighborhood.get("neighborhood_class", "?") if neighborhood else "?" nbh_conf = neighborhood.get("confidence_level", "?") if neighborhood else "?" _emit(status_cb, f" Datos: FEMA={f_zone}, HUD 3BR=${h3}, {n_hur} huracanes, Nbh={nbh_class}({nbh_conf})") if errors: _emit(status_cb, f" Fetcher errors: {len(errors)} (continuamos con datos parciales)") return { "geocode": geocode, "flood": flood, "fmr": fmr, "hurricanes": hurricanes, "hurricanes_summary": hurricanes_summary, "neighborhood": neighborhood, "court_records": court_records, # Wave 1.5A "fetch_errors": errors, "duration_seconds": round(time.perf_counter() - t0, 2), }