Files
2026-07-03 12:24:58 -04:00

125 lines
3.6 KiB
Python

"""Sub-agente: Criminalidad.
Fuentes: SpotCrime (scraping) + FBI UCR API (key opcional).
Retorna datos fail-soft — si falla, devuelve dict vacío con error.
"""
from __future__ import annotations
import os
import time
import requests
from bs4 import BeautifulSoup
from data_fetchers.base import USER_AGENT, DEFAULT_TIMEOUT
FBI_API_KEY = os.getenv("FBI_UCR_API_KEY", "")
FBI_BASE = "https://api.usa.gov/crime/fbi/cde"
def run(lat: float, lon: float, address: str) -> dict:
"""Recopila datos de criminalidad para la ubicación."""
result = {
"score_input": {},
"crimes_recent": [],
"crime_types": {},
"trend": "desconocido",
"sources": [],
"errors": [],
}
# --- SpotCrime scraping ---
try:
spot = _spotcrime(lat, lon)
result["crimes_recent"] = spot.get("crimes", [])
result["crime_types"] = spot.get("by_type", {})
result["sources"].append("SpotCrime.com")
except Exception as e:
result["errors"].append(f"SpotCrime: {e}")
# --- FBI UCR API (solo si hay key) ---
if FBI_API_KEY:
try:
fbi = _fbi_ucr(address)
result["fbi_data"] = fbi
result["sources"].append("FBI UCR API")
except Exception as e:
result["errors"].append(f"FBI UCR: {e}")
# Score input: cantidad de crímenes en los últimos 30 días
total = len(result["crimes_recent"])
result["score_input"]["total_crimes_30d"] = total
result["score_input"]["has_violent"] = any(
c.get("type", "").lower() in ("assault", "robbery", "shooting", "homicide")
for c in result["crimes_recent"]
)
return result
def _spotcrime(lat: float, lon: float) -> dict:
"""Scraping básico de SpotCrime para el área."""
url = f"https://spotcrime.com/crimes.json?lat={lat}&lon={lon}&callback=spotcrime"
headers = {"User-Agent": USER_AGENT, "Referer": "https://spotcrime.com"}
time.sleep(2)
r = requests.get(url, headers=headers, timeout=DEFAULT_TIMEOUT)
r.raise_for_status()
# SpotCrime devuelve JSONP — extraer JSON interior
text = r.text
if text.startswith("spotcrime("):
text = text[len("spotcrime("):-1]
import json
data = json.loads(text)
crimes = data.get("crimes", [])
by_type: dict = {}
for c in crimes:
t = c.get("type", "Other")
by_type[t] = by_type.get(t, 0) + 1
return {"crimes": crimes[:50], "by_type": by_type}
def _fbi_ucr(address: str) -> dict:
"""FBI UCR API — estadísticas por estado/ciudad."""
# Extraer estado de la dirección (últimas 2 letras antes del ZIP)
import re
m = re.search(r",\s*([A-Z]{2})\s+\d{5}", address.upper())
state = m.group(1) if m else "FL"
url = f"{FBI_BASE}/summarized/state/{state}/all?API_KEY={FBI_API_KEY}&from=2020&to=2023"
r = requests.get(url, timeout=DEFAULT_TIMEOUT, headers={"User-Agent": USER_AGENT})
r.raise_for_status()
return r.json()
def score(data: dict) -> int:
"""Calcula score 0-100 de seguridad (100 = muy seguro)."""
if not data or not data.get("score_input"):
return 50 # neutral si no hay datos
total = data["score_input"].get("total_crimes_30d", 0)
has_violent = data["score_input"].get("has_violent", False)
# Base score inversamente proporcional a crímenes
if total == 0:
base = 90
elif total <= 5:
base = 75
elif total <= 15:
base = 60
elif total <= 30:
base = 45
elif total <= 50:
base = 30
else:
base = 15
# Penalización por crimen violento
if has_violent:
base = max(0, base - 15)
return min(100, max(0, base))