196 lines
6.8 KiB
Python
196 lines
6.8 KiB
Python
"""LocationAgent — Orquestador principal.
|
|
|
|
Flujo:
|
|
1. Geocode (Census → Nominatim fallback)
|
|
2. 7 sub-agentes en paralelo (ThreadPoolExecutor)
|
|
3. Calcular scores parciales y score general
|
|
4. Síntesis narrativa via Ollama
|
|
5. Guardar en SQLite
|
|
6. Exportar PDF
|
|
|
|
Uso:
|
|
from location_agent import run_location_agent
|
|
result = run_location_agent("1234 W 49th St, Hialeah FL 33012",
|
|
status_cb=print)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import Callable, Optional
|
|
|
|
from .utils.geocoder import geocode
|
|
from .utils import ollama_client
|
|
from .sub_agents import (
|
|
crime_agent,
|
|
property_agent,
|
|
schools_agent,
|
|
amenities_agent,
|
|
demographics_agent,
|
|
maritime_agent,
|
|
lifestyle_agent,
|
|
)
|
|
from .report.report_generator import build_report
|
|
from .report.pdf_generator import export_pdf
|
|
from .db import save_report
|
|
|
|
SCORE_WEIGHTS = {
|
|
"crime": 0.20,
|
|
"property": 0.20,
|
|
"schools": 0.10,
|
|
"amenities": 0.15,
|
|
"demographics": 0.10,
|
|
"maritime": 0.15,
|
|
"lifestyle": 0.10,
|
|
}
|
|
|
|
|
|
def _emit(cb: Optional[Callable], msg: str) -> None:
|
|
if cb:
|
|
cb(msg)
|
|
|
|
|
|
class LocationAgent:
|
|
def __init__(self, status_cb: Optional[Callable[[str], None]] = None):
|
|
self.status_cb = status_cb
|
|
|
|
def analyze(self, address: str) -> dict:
|
|
"""Ejecuta el análisis completo para una dirección."""
|
|
t0 = time.perf_counter()
|
|
_emit(self.status_cb, f"🔍 Geocodificando: {address}")
|
|
|
|
# 1. Geocode
|
|
try:
|
|
geo = geocode(address)
|
|
except ValueError as e:
|
|
return {"error": str(e), "address": address}
|
|
|
|
lat = geo["lat"]
|
|
lon = geo["lon"]
|
|
_emit(self.status_cb, f" ✅ {geo['address']} | {geo['county']}, {geo['state']} | ({lat:.4f}, {lon:.4f})")
|
|
|
|
# 2. Sub-agentes en paralelo
|
|
_emit(self.status_cb, "🔄 Ejecutando sub-agentes en paralelo...")
|
|
sub_results = self._run_sub_agents(lat, lon, geo)
|
|
|
|
# 3. Calcular scores
|
|
scores = self._calculate_scores(sub_results)
|
|
overall = self._overall_score(scores)
|
|
_emit(self.status_cb, f" 📊 Score general: {overall}/100")
|
|
|
|
# 4. Síntesis Ollama
|
|
_emit(self.status_cb, "🤖 Generando análisis narrativo (Ollama)...")
|
|
narratives = self._build_narratives(sub_results, geo["address"])
|
|
exec_summary = ollama_client.executive_summary(scores, geo["address"], overall)
|
|
|
|
# 5. Construir reporte completo
|
|
report = build_report(
|
|
geo=geo,
|
|
sub_results=sub_results,
|
|
scores=scores,
|
|
overall_score=overall,
|
|
narratives=narratives,
|
|
exec_summary=exec_summary,
|
|
)
|
|
|
|
# 6. Guardar en SQLite
|
|
try:
|
|
report_id = save_report(geo, scores, overall, sub_results, report)
|
|
report["report_id"] = report_id
|
|
_emit(self.status_cb, f" 💾 Guardado en BD (id={report_id})")
|
|
except Exception as e:
|
|
_emit(self.status_cb, f" ⚠️ Error guardando en BD: {e}")
|
|
|
|
# 7. Exportar PDF
|
|
try:
|
|
pdf_path = export_pdf(report)
|
|
report["pdf_path"] = str(pdf_path)
|
|
_emit(self.status_cb, f" 📄 PDF: {pdf_path}")
|
|
except Exception as e:
|
|
_emit(self.status_cb, f" ⚠️ Error generando PDF: {e}")
|
|
report["pdf_path"] = None
|
|
|
|
report["duration_seconds"] = round(time.perf_counter() - t0, 2)
|
|
_emit(self.status_cb, f"✅ Análisis completo en {report['duration_seconds']}s")
|
|
return report
|
|
|
|
def _run_sub_agents(self, lat: float, lon: float, geo: dict) -> dict:
|
|
address = geo["address"]
|
|
county = geo.get("county", "")
|
|
state = geo.get("state", "FL")
|
|
tract = geo.get("tract_geoid", "")
|
|
state_fips = geo.get("state_fips", "")
|
|
county_fips = geo.get("county_fips", "")
|
|
|
|
tasks = {
|
|
"crime": lambda: crime_agent.run(lat, lon, address),
|
|
"property": lambda: property_agent.run(lat, lon, address, county),
|
|
"schools": lambda: schools_agent.run(lat, lon, address),
|
|
"amenities": lambda: amenities_agent.run(lat, lon, address),
|
|
"demographics": lambda: demographics_agent.run(
|
|
lat, lon, address, tract, state_fips, county_fips),
|
|
"maritime": lambda: maritime_agent.run(lat, lon, address, state),
|
|
"lifestyle": lambda: lifestyle_agent.run(lat, lon, address),
|
|
}
|
|
|
|
results = {}
|
|
with ThreadPoolExecutor(max_workers=7) as ex:
|
|
futures = {ex.submit(fn): name for name, fn in tasks.items()}
|
|
for future in as_completed(futures):
|
|
name = futures[future]
|
|
try:
|
|
results[name] = future.result()
|
|
_emit(self.status_cb, f" ✅ {name}")
|
|
except Exception as e:
|
|
results[name] = {"errors": [str(e)]}
|
|
_emit(self.status_cb, f" ⚠️ {name}: {e}")
|
|
|
|
return results
|
|
|
|
def _calculate_scores(self, sub_results: dict) -> dict:
|
|
score_fns = {
|
|
"crime": crime_agent.score,
|
|
"property": property_agent.score,
|
|
"schools": schools_agent.score,
|
|
"amenities": amenities_agent.score,
|
|
"demographics": demographics_agent.score,
|
|
"maritime": maritime_agent.score,
|
|
"lifestyle": lifestyle_agent.score,
|
|
}
|
|
scores = {}
|
|
for name, fn in score_fns.items():
|
|
try:
|
|
scores[name] = fn(sub_results.get(name, {}))
|
|
except Exception:
|
|
scores[name] = 50
|
|
return scores
|
|
|
|
def _overall_score(self, scores: dict) -> int:
|
|
total = sum(scores.get(k, 50) * w for k, w in SCORE_WEIGHTS.items())
|
|
return round(total)
|
|
|
|
def _build_narratives(self, sub_results: dict, address: str) -> dict:
|
|
section_names = {
|
|
"crime": "criminalidad",
|
|
"property": "valoración inmobiliaria",
|
|
"schools": "escuelas",
|
|
"amenities": "amenities y walkability",
|
|
"demographics": "demografía",
|
|
"maritime": "mercado laboral marítimo",
|
|
"lifestyle": "estilo de vida náutico",
|
|
}
|
|
narratives = {}
|
|
for key, section in section_names.items():
|
|
data = sub_results.get(key, {})
|
|
narratives[key] = ollama_client.analyze_section(data, section, address)
|
|
return narratives
|
|
|
|
|
|
def run_location_agent(
|
|
address: str,
|
|
status_cb: Optional[Callable[[str], None]] = None,
|
|
) -> dict:
|
|
"""Función de conveniencia para ejecutar el agente."""
|
|
return LocationAgent(status_cb=status_cb).analyze(address)
|