Files
AR-House/dd_orchestrator.py
2026-07-03 12:24:58 -04:00

616 lines
27 KiB
Python

"""dd_orchestrator.py — Due Diligence orchestrator (Phase 3.5.A foundation).
PROPOSITO:
DD ON-DEMAND. Cada deal con potencial puede pasar por:
Pre-DD → 2-3 min, ~1 credit Firecrawl
Photo lookup + court records (donde aplica) + LienPositionAnalyzer
Output: analysis_report.json + briefing_pre_dd.md en property folder
Full DD → 10-20 min, ~5 credits Firecrawl
Pre-DD + tax assessed + FEMA flood + NOAA hurricanes + HUD FMR +
neighborhood class + Zillow comps (if budget allows).
Output: same + due_diligence/full_dd_summary.json
(Phase 3.5.D agregara DueDiligenceAgent synthesis arriba de esto).
AUTO-TRIGGER:
should_auto_trigger_dd(deal) → True si deal_type es distressed
(auction/foreclosure/tax_deed/reo) Y aun no se ejecuto DD.
Caller decide si dispararlo o no (e.g. background job, UI confirmation).
API:
run_pre_dd(deal_id, status_cb=None) → dict summary
run_full_dd(deal_id, status_cb=None) → dict summary
should_auto_trigger_dd(deal_dict) → bool
"""
from __future__ import annotations
import json
import time
from datetime import datetime, timezone
from typing import Callable, Optional
DISTRESSED_DEAL_TYPES = {"auction", "foreclosure", "tax_deed", "reo"}
# ────────────────────────────────────────────────────────────────────────────
# Triggers
# ────────────────────────────────────────────────────────────────────────────
def should_auto_trigger_dd(deal: dict) -> bool:
"""True si deal es distressed AND aun no se ejecuto DD.
Caller decide si dispararlo (e.g. el scraper en su post-loop, o el UI con
un checkbox 'Auto-DD para distressed').
"""
deal_type = (deal.get("deal_type") or "").lower().strip()
if deal_type not in DISTRESSED_DEAL_TYPES:
return False
# Check meta.json — already ran DD?
from properties_store import get_property_folder
folder = get_property_folder(deal)
meta_path = folder / ".meta.json"
if not meta_path.exists():
return True # No folder yet → never ran DD
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
return not meta.get("last_dd_run_at")
except Exception:
return True
# ────────────────────────────────────────────────────────────────────────────
# Pre-DD: quick (1 credit, ~2-3 min)
# ────────────────────────────────────────────────────────────────────────────
def run_pre_dd(deal_id: int, status_cb: Optional[Callable[[str], None]] = None) -> dict:
"""Ejecuta Pre-DD sobre un deal.
Steps:
1. Load deal de DB + create property folder
2. Photo lookup (Zillow address-search, 1 credit) → photos/zillow_lookup.json
3. Court records (Duval+others soportados, gratis) → court_records/raw.json
4. Pre-screening con LienPositionAnalyzer → due_diligence/pre_screening.json
5. Compose briefing_pre_dd.md
6. Update .meta.json (last_dd_run_at, last_dd_kind='pre_dd')
Returns dict con: deal_id, folder, photos_count, lien_verdict, errors, elapsed.
"""
from deals_db import init_db, get_deal_by_id
from properties_store import (
ensure_property_folder, save_json, save_markdown, write_meta,
)
init_db()
t0 = time.perf_counter()
errors: list[str] = []
credits_used = 0
def _log(msg: str) -> None:
if status_cb:
status_cb(msg)
deal = get_deal_by_id(deal_id)
if not deal:
return {"deal_id": deal_id, "error": "deal not found", "errors": [], "elapsed": 0}
folder = ensure_property_folder(deal)
_log(f"Pre-DD: deal #{deal_id} {(deal.get('address') or '?')[:50]} → folder ready")
# ─── Step 2: Photo lookup ──────────────────────────────────────────────
photos_count = 0
try:
from data_fetchers.zillow_photo_lookup import fetch_zillow_photos_by_address
addr_query = _build_zillow_query(deal)
if addr_query:
_log(" Photo lookup (Zillow address-search)...")
photos, photo_meta = fetch_zillow_photos_by_address(addr_query)
credits_used += photo_meta.get("credits_used", 0)
save_json(folder / "photos" / "zillow_lookup.json", {
"query_address": addr_query,
"photos": photos,
"meta": photo_meta,
"fetched_at": _now_iso(),
})
if photos:
photos_count = len(photos)
# Update deals.db photos_urls (so cards show them)
_update_deal_photos(deal_id, photos)
_log(f" {photos_count} photos saved")
else:
_log(f" no photos found ({photo_meta.get('error', 'empty')[:60]})")
except Exception as e:
errors.append(f"photo lookup: {type(e).__name__}: {e}")
_log(f" photo lookup error: {e}")
# ─── Step 3: Court records (county-dependent) ──────────────────────────
court_data = {}
try:
from data_fetchers.court_records import fetch_court_records
county = deal.get("county")
if county and deal.get("address"):
_log(f" Court records lookup ({county})...")
court_data = fetch_court_records(
address=deal["address"], county_name=county,
)
save_json(folder / "court_records" / "raw.json", court_data)
status = court_data.get("status", "?")
_log(f" status={status} sources={court_data.get('sources_used', [])}")
else:
_log(" Court records: missing county or address, skipped")
except Exception as e:
errors.append(f"court records: {type(e).__name__}: {e}")
_log(f" court records error: {e}")
# ─── Step 4: Pre-screening (lien position analysis) ────────────────────
pre_screen_result = None
try:
from pre_screening_orchestrator import run_pre_screening
_log(" Pre-screening (LienPositionAnalyzer)...")
pre_screen_result = run_pre_screening(deal=deal)
save_json(folder / "due_diligence" / "pre_screening.json", pre_screen_result)
verdict = pre_screen_result.get("verdict", "?")
score = pre_screen_result.get("score", "?")
_log(f" verdict={verdict} score={score}/10 margin={pre_screen_result.get('margin_pct')}%")
except Exception as e:
errors.append(f"pre-screening: {type(e).__name__}: {e}")
_log(f" pre-screening error: {e}")
# ─── Step 5: Compose briefing markdown ─────────────────────────────────
briefing = _compose_pre_dd_briefing(
deal=deal,
photos_count=photos_count,
court_data=court_data,
pre_screen=pre_screen_result,
)
save_markdown(folder / "briefing_pre_dd.md", briefing)
# ─── Step 6: Update meta + global analysis_report ──────────────────────
elapsed = round(time.perf_counter() - t0, 1)
summary = {
"kind": "pre_dd",
"deal_id": deal_id,
"folder": str(folder),
"photos_count": photos_count,
"court_data_status": court_data.get("status") if court_data else None,
"pre_screening_verdict": pre_screen_result.get("verdict") if pre_screen_result else None,
"pre_screening_score": pre_screen_result.get("score") if pre_screen_result else None,
"credits_used": credits_used,
"elapsed_seconds": elapsed,
"errors": errors,
"run_at": _now_iso(),
}
save_json(folder / "analysis_report.json", summary)
write_meta(folder, last_dd_run_at=_now_iso(), last_dd_kind="pre_dd")
_log(f"Pre-DD done in {elapsed}s — {len(errors)} errors")
return summary
# ────────────────────────────────────────────────────────────────────────────
# Full DD: comprehensive (Phase 3.5.A: extends pre-DD with more fetchers;
# DueDiligenceAgent synthesis comes in Phase 3.5.D)
# ────────────────────────────────────────────────────────────────────────────
def run_full_dd(deal_id: int, status_cb: Optional[Callable[[str], None]] = None) -> dict:
"""Ejecuta Full DD sobre un deal.
Phase 3.5.A: extends Pre-DD con tax_assessed + FEMA + NOAA + HUD FMR +
neighborhood. NO incluye DueDiligenceAgent synthesis (Phase 3.5.D).
Steps:
1. Run pre-DD first (re-uses photo + court_records + pre_screening)
2. Geocode address (Census)
3. FEMA flood zone
4. NOAA hurricane history
5. HUD Fair Market Rent (by county)
6. Neighborhood class
7. (Optional) Zillow comps via Firecrawl if budget OK
8. Compose briefing_full_dd.md
9. Update meta with last_dd_kind='full_dd'
Returns dict con todos los componentes recopilados.
"""
from deals_db import init_db, get_deal_by_id, is_firecrawl_paused
from properties_store import (
ensure_property_folder, save_json, save_markdown, write_meta,
)
init_db()
t0 = time.perf_counter()
errors: list[str] = []
credits_used = 0
def _log(msg: str) -> None:
if status_cb:
status_cb(msg)
deal = get_deal_by_id(deal_id)
if not deal:
return {"deal_id": deal_id, "error": "deal not found"}
# ─── Step 1: Re-use Pre-DD ─────────────────────────────────────────────
_log("Full DD — running Pre-DD first")
pre_dd_summary = run_pre_dd(deal_id, status_cb=status_cb)
credits_used += pre_dd_summary.get("credits_used", 0)
if pre_dd_summary.get("errors"):
errors.extend([f"pre_dd: {e}" for e in pre_dd_summary["errors"]])
folder = ensure_property_folder(deal)
# ─── Step 2: Geocode ───────────────────────────────────────────────────
geo = {}
try:
from data_fetchers.census_geocode import fetch_geocode
_log(" Geocoding (Census)...")
full_addr = _full_address_string(deal)
if full_addr:
geo = fetch_geocode(full_addr) or {}
save_json(folder / "due_diligence" / "geocode.json", geo)
_log(f" {geo.get('matched_address', '?')[:60]} lat={geo.get('lat')} lng={geo.get('lng')}")
except Exception as e:
errors.append(f"geocode: {type(e).__name__}: {e}")
_log(f" geocode error: {e}")
lat = geo.get("lat")
lng = geo.get("lng")
# ─── Step 3: FEMA flood ────────────────────────────────────────────────
fema = {}
try:
if lat and lng:
from data_fetchers.fema_flood import fetch_flood
_log(" FEMA flood zone...")
fema = fetch_flood(lat, lng) or {}
save_json(folder / "due_diligence" / "fema_flood.json", fema)
_log(f" zone={fema.get('flood_zone', '?')}")
except Exception as e:
errors.append(f"fema: {type(e).__name__}: {e}")
# ─── Step 4: NOAA hurricanes ───────────────────────────────────────────
noaa = {}
try:
if lat and lng:
from data_fetchers.noaa_hurricanes import fetch_hurricanes
_log(" NOAA hurricane history...")
noaa = fetch_hurricanes(lat, lng) or {}
save_json(folder / "due_diligence" / "noaa_hurricanes.json", noaa)
_log(f" {noaa.get('count', 0)} hurricanes within radius")
except Exception as e:
errors.append(f"noaa: {type(e).__name__}: {e}")
# ─── Step 5: HUD FMR ───────────────────────────────────────────────────
hud = {}
try:
from data_fetchers.hud_fmr import fetch_fmr
_log(" HUD Fair Market Rent...")
beds = deal.get("beds") or 3
county = deal.get("county")
state = deal.get("state") or "FL"
if county:
hud = fetch_fmr(county_name=county, state=state, beds=beds) or {}
save_json(folder / "due_diligence" / "hud_fmr.json", hud)
_log(f" {beds}BR FMR=${hud.get('fmr_amount', '?')}")
except Exception as e:
errors.append(f"hud_fmr: {type(e).__name__}: {e}")
# ─── Step 6: Neighborhood class ────────────────────────────────────────
nbh = {}
try:
if lat and lng:
from data_fetchers.neighborhood_class import fetch_neighborhood_class
_log(" Neighborhood class...")
nbh = fetch_neighborhood_class(lat=lat, lng=lng) or {}
save_json(folder / "due_diligence" / "neighborhood.json", nbh)
_log(f" class={nbh.get('class', '?')}")
except Exception as e:
errors.append(f"neighborhood: {type(e).__name__}: {e}")
# ─── Step 7: Tax assessed (PA) ─────────────────────────────────────────
tax_assessed = None
try:
from data_fetchers.property_value import fetch_tax_assessed
_log(" Tax assessed (Property Appraiser)...")
tax_assessed = fetch_tax_assessed(
address=deal.get("address", ""),
county_name=deal.get("county"),
state=deal.get("state") or "FL",
)
if tax_assessed:
save_json(folder / "property_appraiser" / "tax_assessed.json", tax_assessed)
_log(f" assessed=${tax_assessed.get('assessed_value', '?')}")
except Exception as e:
errors.append(f"tax_assessed: {type(e).__name__}: {e}")
# ─── Step 8: DueDiligenceCoordinator AGENT (Phase 3.5.D) ───────────────
# THE BRAIN — sintetiza TODOS los inputs en reporte profesional razonado.
# Sin esto el reporte es solo data dump. Con esto = el cliente paga $150.
synthesis = {}
pa_data = (pre_dd_summary.get("pre_screen") or {}).get("property_appraiser")
cr_data = (pre_dd_summary.get("pre_screen") or {}).get("court_records_raw")
oc_data = (pre_dd_summary.get("pre_screen") or {}).get("owner_classification")
reo_data = (pre_dd_summary.get("pre_screen") or {}).get("reo_signal")
renov_data = (pa_data or {}).get("renovation_signal") if pa_data else None
fin_data = (pre_dd_summary.get("pre_screen") or {}).get("financial_analysis")
try:
from dd_agent import run_due_diligence_synthesis
_log(" DueDiligenceCoordinator synthesis (~75-120s)...")
synthesis = run_due_diligence_synthesis(
pa_record=pa_data,
court_records=cr_data,
owner_classification=oc_data,
reo_signal=reo_data,
renovation_signal=renov_data,
financial_analysis=fin_data,
environmental={"fema": fema, "noaa": noaa},
market_context={"hud_fmr": hud, "neighborhood": nbh},
listing_info={
"listing_price": deal.get("listing_price"),
"deal_type": deal.get("deal_type"),
"source": deal.get("source"),
"source_url": deal.get("source_url"),
"address": deal.get("address"),
},
status_cb=status_cb,
)
if synthesis.get("error"):
errors.append(f"dd_agent: {synthesis['error']}")
_log(f" DD synthesis FAILED: {synthesis['error']}")
else:
save_json(folder / "due_diligence" / "synthesis.json", synthesis)
_log(f" DD synthesis OK — verdict={synthesis.get('verdict')} confidence={synthesis.get('confidence_score')}")
except Exception as e:
errors.append(f"dd_agent: {type(e).__name__}: {e}")
_log(f" DD synthesis exception: {e}")
# ─── Step 9: Compose Full DD briefing (markdown) ───────────────────────
briefing = _compose_full_dd_briefing(
deal=deal,
pre_dd_summary=pre_dd_summary,
geo=geo, fema=fema, noaa=noaa, hud=hud, nbh=nbh,
tax_assessed=tax_assessed,
)
save_markdown(folder / "briefing_full_dd.md", briefing)
# ─── Step 10: Save full summary + update meta ──────────────────────────
elapsed = round(time.perf_counter() - t0, 1)
summary = {
"kind": "full_dd",
"deal_id": deal_id,
"folder": str(folder),
"pre_dd": pre_dd_summary,
"synthesis": synthesis, # NEW: DueDiligenceCoordinator output
"geo_coords": (geo.get("lat"), geo.get("lng")),
"fema_zone": fema.get("flood_zone"),
"noaa_hurricanes_count": noaa.get("count"),
"hud_fmr": hud.get("fmr_amount"),
"neighborhood_class": nbh.get("class"),
"tax_assessed_value": tax_assessed.get("assessed_value") if tax_assessed else None,
"credits_used": credits_used,
"elapsed_seconds": elapsed,
"errors": errors,
"run_at": _now_iso(),
}
save_json(folder / "due_diligence" / "full_dd_summary.json", summary)
save_json(folder / "analysis_report.json", summary)
write_meta(folder, last_dd_run_at=_now_iso(), last_dd_kind="full_dd",
last_verdict=synthesis.get("verdict") if synthesis else None,
last_confidence=synthesis.get("confidence_score") if synthesis else None)
_log(f"Full DD done in {elapsed}s — verdict={synthesis.get('verdict','?')}{len(errors)} errors")
return summary
# ────────────────────────────────────────────────────────────────────────────
# Helpers
# ────────────────────────────────────────────────────────────────────────────
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _build_zillow_query(deal: dict) -> Optional[str]:
"""Build the address string to use for Zillow lookup."""
addr = (deal.get("address") or "").strip()
if not addr:
return None
county = (deal.get("county") or "").strip().replace(" County", "")
state = (deal.get("state") or "FL").strip()
if county:
return f"{addr}, {county} COUNTY, {state}"
return f"{addr}, {state}"
def _full_address_string(deal: dict) -> Optional[str]:
"""Full address for geocoding."""
addr = (deal.get("address") or "").strip()
if not addr:
return None
city = (deal.get("city") or "").strip()
state = (deal.get("state") or "FL").strip()
zip_code = (deal.get("zip") or "").strip()
parts = [addr]
if city:
parts.append(city)
parts.append(state)
if zip_code:
parts.append(zip_code)
return ", ".join(parts)
def _update_deal_photos(deal_id: int, photos: list[str]) -> None:
"""Update deals.photos_urls column with the fetched list."""
try:
from deals_db import _get_conn
_get_conn().execute(
"UPDATE deals SET photos_urls = ? WHERE id = ?",
(json.dumps(photos), deal_id),
)
except Exception:
pass # non-fatal
def _compose_pre_dd_briefing(*, deal: dict, photos_count: int, court_data: dict,
pre_screen: Optional[dict]) -> str:
"""Generate the Pre-DD markdown briefing."""
lines = [
f"# Pre-DD — {deal.get('address') or '(sin dirección)'}",
f"",
f"_Generado: {_now_iso()}_",
f"",
f"## Deal context",
f"- **Source**: `{deal.get('source')}`",
f"- **Type**: `{deal.get('deal_type')}`",
f"- **County**: {deal.get('county')}, {deal.get('state')}",
f"- **Case #**: `{deal.get('case_number') or '—'}`",
f"- **Parcel**: `{deal.get('parcel_id') or '—'}`",
f"- **Listing price**: ${(deal.get('listing_price') or 0):,.0f}",
f"- **Starting bid**: ${(deal.get('starting_bid') or 0):,.0f}",
f"- **Assessed value**: ${(deal.get('estimated_arv') or 0):,.0f}",
f"",
]
# Photos
lines.append("## Photos")
lines.append(f"- Capturados {photos_count} foto(s) vía Zillow address-search")
if photos_count == 0:
lines.append(f"- _Zillow no encontró fotos para esta address; ver `photos/zillow_lookup.json` para detalle._")
lines.append("")
# Court records
lines.append("## Court records")
if court_data:
status = court_data.get("status", "?")
lines.append(f"- **Status**: {status}")
sources = court_data.get("sources_used", [])
if sources:
lines.append(f"- Sources consultados: {', '.join(sources)}")
lis = court_data.get("lis_pendens") or []
if lis:
lines.append(f"- **Lis pendens activos**: {len(lis)}")
for lp in lis[:3]:
lines.append(f" - {lp.get('plaintiff', '?')} vs {lp.get('defendant', '?')} (case `{lp.get('case_number', '?')}`)")
else:
lines.append("- No se ejecutó (county no soportado o address incompleta)")
lines.append("")
# Pre-screening
lines.append("## Pre-screening (LienPositionAnalyzer)")
if pre_screen:
verdict = pre_screen.get("verdict", "?")
score = pre_screen.get("score", "?")
margin = pre_screen.get("margin_pct", 0)
lines.append(f"- **Verdict**: `{verdict}` · Score `{score}/10` · Margin `{margin}%`")
lines.append(f"- Surviving debt: ${pre_screen.get('surviving_debt_total', 0):,}")
lines.append(f"- Effective cost: ${pre_screen.get('effective_cost', 0):,}")
lines.append("")
lines.append(f"**Razonamiento**: {pre_screen.get('reasoning', '')}")
red_flags = pre_screen.get("red_flags") or []
if red_flags:
lines.append("")
lines.append("**Red flags**:")
for rf in red_flags[:10]:
lines.append(f"- {rf}")
else:
lines.append("- _No disponible (error en ejecución)_")
lines.append("")
# Next steps
lines.append("## Próximos pasos sugeridos")
if pre_screen and pre_screen.get("verdict") == "NO-GO":
lines.append("- **NO-GO**: este deal probablemente no vale la pena. Revisar razonamiento arriba.")
elif pre_screen and pre_screen.get("verdict") == "GO":
lines.append("- **GO preliminar**: correr `Full DD` para análisis completo + estrategia de oferta.")
else:
lines.append("- **MAYBE**: correr `Full DD` con datos completos para decidir.")
lines.append("- Si decidís perseguir el deal: cargar **escrituras** en `deeds/`, **liens** detallados en `liens/`")
lines.append("- Escribir tu análisis en `notes.md`")
return "\n".join(lines)
def _compose_full_dd_briefing(*, deal: dict, pre_dd_summary: dict,
geo: dict, fema: dict, noaa: dict,
hud: dict, nbh: dict,
tax_assessed: Optional[dict]) -> str:
"""Generate the Full DD markdown briefing."""
lines = [
f"# Full DD — {deal.get('address') or '(sin dirección)'}",
f"",
f"_Generado: {_now_iso()}_",
f"",
f"## Datos verificados (fuentes oficiales)",
f"",
]
if geo:
lines.append(f"### Ubicación (Census Geocoder)")
lines.append(f"- Address normalizada: `{geo.get('matched_address', '?')}`")
lines.append(f"- Coords: ({geo.get('lat')}, {geo.get('lng')})")
lines.append(f"- County FIPS: `{geo.get('county_fips', '?')}`")
lines.append("")
if fema:
zone = fema.get("flood_zone", "?")
lines.append(f"### FEMA flood zone")
lines.append(f"- **Zone**: `{zone}`")
if zone in ("A", "AE", "V", "VE"):
lines.append(f"- ⚠️ **Zona de alto riesgo**: flood insurance es OBLIGATORIO. Costo anual ~$2K-$8K.")
lines.append("")
if noaa:
lines.append(f"### NOAA hurricanes")
lines.append(f"- {noaa.get('count', 0)} huracanes within {noaa.get('radius_miles', '?')} mi en últimos {noaa.get('years_back', '?')} años")
cat3plus = noaa.get("category_3_plus", 0)
if cat3plus:
lines.append(f"- {cat3plus} fueron Cat-3+ (vientos sostenidos ≥111 mph)")
lines.append("")
if hud:
lines.append(f"### HUD Fair Market Rent")
lines.append(f"- {deal.get('beds', '?')}BR FMR: **${hud.get('fmr_amount', '?')}/mo**")
lines.append(f"- Year: {hud.get('year', '?')}")
lines.append("")
if nbh:
cls = nbh.get("class", "?")
lines.append(f"### Neighborhood class")
lines.append(f"- Class: **{cls}**")
lines.append(f"- Tier: {nbh.get('tier_label', '?')}")
lines.append(f"- Source: {nbh.get('source', '?')}")
lines.append("")
if tax_assessed:
lines.append(f"### Property Appraiser")
lines.append(f"- **Assessed value**: ${tax_assessed.get('assessed_value', '?'):,}")
lines.append(f"- Year built: {tax_assessed.get('year_built', '?')}")
lines.append(f"- Sqft: {tax_assessed.get('sqft', '?')}")
lines.append(f"- Source: {tax_assessed.get('source', '?')}")
lines.append("")
# Reuse pre-DD content
lines.append("---")
lines.append("")
lines.append("## Pre-DD findings")
lines.append("")
pre_screen_verdict = pre_dd_summary.get("pre_screening_verdict")
pre_screen_score = pre_dd_summary.get("pre_screening_score")
if pre_screen_verdict:
lines.append(f"- **Verdict**: `{pre_screen_verdict}` (score `{pre_screen_score}/10`)")
lines.append(f"- Photos: {pre_dd_summary.get('photos_count', 0)}")
lines.append(f"- Court records: {pre_dd_summary.get('court_data_status', '?')}")
lines.append("")
lines.append("_Ver `briefing_pre_dd.md` para detalle del análisis de liens._")
lines.append("")
# Note on Phase 3.5.D
lines.append("---")
lines.append("")
lines.append("## Phase 3.5.D pendiente")
lines.append("")
lines.append("- `DueDiligenceAgent`: agente Ollama que sintetiza TODOS los datos arriba")
lines.append("- Output: PROCEED / CAUTION / DO NOT BID recommendation + action items list")
lines.append("- ETA: próxima sesión")
return "\n".join(lines)