"""LocationAgent — Orquestador principal. Flujo: 1. Geocode (Census → Nominatim fallback) 2. 7 sub-agentes en paralelo (ThreadPoolExecutor) 3. Calcular scores parciales y score general 4. Síntesis narrativa via Ollama 5. Guardar en SQLite 6. Exportar PDF Uso: from location_agent import run_location_agent result = run_location_agent("1234 W 49th St, Hialeah FL 33012", status_cb=print) """ from __future__ import annotations import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable, Optional from .utils.geocoder import geocode from .utils import ollama_client from .sub_agents import ( crime_agent, property_agent, schools_agent, amenities_agent, demographics_agent, maritime_agent, lifestyle_agent, ) from .report.report_generator import build_report from .report.pdf_generator import export_pdf from .db import save_report SCORE_WEIGHTS = { "crime": 0.20, "property": 0.20, "schools": 0.10, "amenities": 0.15, "demographics": 0.10, "maritime": 0.15, "lifestyle": 0.10, } def _emit(cb: Optional[Callable], msg: str) -> None: if cb: cb(msg) class LocationAgent: def __init__(self, status_cb: Optional[Callable[[str], None]] = None): self.status_cb = status_cb def analyze(self, address: str) -> dict: """Ejecuta el análisis completo para una dirección.""" t0 = time.perf_counter() _emit(self.status_cb, f"🔍 Geocodificando: {address}") # 1. Geocode try: geo = geocode(address) except ValueError as e: return {"error": str(e), "address": address} lat = geo["lat"] lon = geo["lon"] _emit(self.status_cb, f" ✅ {geo['address']} | {geo['county']}, {geo['state']} | ({lat:.4f}, {lon:.4f})") # 2. Sub-agentes en paralelo _emit(self.status_cb, "🔄 Ejecutando sub-agentes en paralelo...") sub_results = self._run_sub_agents(lat, lon, geo) # 3. Calcular scores scores = self._calculate_scores(sub_results) overall = self._overall_score(scores) _emit(self.status_cb, f" 📊 Score general: {overall}/100") # 4. Síntesis Ollama _emit(self.status_cb, "🤖 Generando análisis narrativo (Ollama)...") narratives = self._build_narratives(sub_results, geo["address"]) exec_summary = ollama_client.executive_summary(scores, geo["address"], overall) # 5. Construir reporte completo report = build_report( geo=geo, sub_results=sub_results, scores=scores, overall_score=overall, narratives=narratives, exec_summary=exec_summary, ) # 6. Guardar en SQLite try: report_id = save_report(geo, scores, overall, sub_results, report) report["report_id"] = report_id _emit(self.status_cb, f" 💾 Guardado en BD (id={report_id})") except Exception as e: _emit(self.status_cb, f" ⚠️ Error guardando en BD: {e}") # 7. Exportar PDF try: pdf_path = export_pdf(report) report["pdf_path"] = str(pdf_path) _emit(self.status_cb, f" 📄 PDF: {pdf_path}") except Exception as e: _emit(self.status_cb, f" ⚠️ Error generando PDF: {e}") report["pdf_path"] = None report["duration_seconds"] = round(time.perf_counter() - t0, 2) _emit(self.status_cb, f"✅ Análisis completo en {report['duration_seconds']}s") return report def _run_sub_agents(self, lat: float, lon: float, geo: dict) -> dict: address = geo["address"] county = geo.get("county", "") state = geo.get("state", "FL") tract = geo.get("tract_geoid", "") state_fips = geo.get("state_fips", "") county_fips = geo.get("county_fips", "") tasks = { "crime": lambda: crime_agent.run(lat, lon, address), "property": lambda: property_agent.run(lat, lon, address, county), "schools": lambda: schools_agent.run(lat, lon, address), "amenities": lambda: amenities_agent.run(lat, lon, address), "demographics": lambda: demographics_agent.run( lat, lon, address, tract, state_fips, county_fips), "maritime": lambda: maritime_agent.run(lat, lon, address, state), "lifestyle": lambda: lifestyle_agent.run(lat, lon, address), } results = {} with ThreadPoolExecutor(max_workers=7) as ex: futures = {ex.submit(fn): name for name, fn in tasks.items()} for future in as_completed(futures): name = futures[future] try: results[name] = future.result() _emit(self.status_cb, f" ✅ {name}") except Exception as e: results[name] = {"errors": [str(e)]} _emit(self.status_cb, f" ⚠️ {name}: {e}") return results def _calculate_scores(self, sub_results: dict) -> dict: score_fns = { "crime": crime_agent.score, "property": property_agent.score, "schools": schools_agent.score, "amenities": amenities_agent.score, "demographics": demographics_agent.score, "maritime": maritime_agent.score, "lifestyle": lifestyle_agent.score, } scores = {} for name, fn in score_fns.items(): try: scores[name] = fn(sub_results.get(name, {})) except Exception: scores[name] = 50 return scores def _overall_score(self, scores: dict) -> int: total = sum(scores.get(k, 50) * w for k, w in SCORE_WEIGHTS.items()) return round(total) def _build_narratives(self, sub_results: dict, address: str) -> dict: section_names = { "crime": "criminalidad", "property": "valoración inmobiliaria", "schools": "escuelas", "amenities": "amenities y walkability", "demographics": "demografía", "maritime": "mercado laboral marítimo", "lifestyle": "estilo de vida náutico", } narratives = {} for key, section in section_names.items(): data = sub_results.get(key, {}) narratives[key] = ollama_client.analyze_section(data, section, address) return narratives def run_location_agent( address: str, status_cb: Optional[Callable[[str], None]] = None, ) -> dict: """Función de conveniencia para ejecutar el agente.""" return LocationAgent(status_cb=status_cb).analyze(address)