Files
2026-07-03 12:24:58 -04:00

300 lines
12 KiB
Python

"""dd_agent.py — DueDiligenceCoordinator agent synthesis layer.
Toma TODOS los inputs publicos recopilados sobre una propiedad (PA + court +
financial + flip signals + environmental + market) y los sintetiza via
DueDiligenceCoordinator LLM en un reporte estructurado JSON.
ESTE es el cerebro del producto $150 — sin esto, el full report es solo data
dump. Con esto, el cliente paga por sintesis razonada profesional.
USAGE:
from dd_agent import run_due_diligence_synthesis
synthesis = run_due_diligence_synthesis(
pa_record=...,
court_records=...,
owner_classification=...,
reo_signal=...,
financial_analysis=...,
environmental={...},
market_context={...},
listing_info={...},
)
# → {executive_summary, verdict, confidence_score, top_risks, ...}
"""
from __future__ import annotations
import json
import time
from typing import Optional
MODEL_NAME = "DueDiligenceCoordinator"
# 240s para acomodar: ~30-60s model load primer call + 60-120s generation 2500 tokens
DEFAULT_TIMEOUT_SECONDS = 240
def run_due_diligence_synthesis(
*,
pa_record: Optional[dict] = None,
court_records: Optional[dict] = None,
owner_classification: Optional[dict] = None,
reo_signal: Optional[dict] = None,
renovation_signal: Optional[dict] = None,
financial_analysis: Optional[dict] = None,
environmental: Optional[dict] = None,
market_context: Optional[dict] = None,
listing_info: Optional[dict] = None,
status_cb: Optional[callable] = None,
) -> dict:
"""Run DueDiligenceCoordinator synthesis over all collected public data.
Returns dict matching DueDiligenceCoordinator output schema, or
dict with 'error' key if synthesis failed.
"""
t0 = time.perf_counter()
def _log(m: str):
if status_cb:
status_cb(m)
try:
import ollama
except ImportError:
return {"error": "ollama package not installed"}
# Build comprehensive input payload (trim to keep context manageable)
user_payload = _build_input_payload(
pa_record=pa_record,
court_records=court_records,
owner_classification=owner_classification,
reo_signal=reo_signal,
renovation_signal=renovation_signal,
financial_analysis=financial_analysis,
environmental=environmental,
market_context=market_context,
listing_info=listing_info,
)
prompt = (
"Sintetiza este JSON de inputs publicos sobre una propiedad en el "
"reporte JSON estructurado segun tu system prompt. Solo output JSON, "
"sin markdown wrapper.\n\n"
f"```json\n{json.dumps(user_payload, indent=2, default=str)}\n```\n"
)
_log(f" Calling DueDiligenceCoordinator (qwen2.5:14b)...")
try:
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
future = ex.submit(
ollama.chat,
model=MODEL_NAME,
messages=[{"role": "user", "content": prompt}],
options={"temperature": 0.15, "num_ctx": 16384, "num_predict": 1800},
format="json", # forzar JSON
)
try:
response = future.result(timeout=DEFAULT_TIMEOUT_SECONDS)
except concurrent.futures.TimeoutError:
return {"error": f"DueDiligenceCoordinator timeout after {DEFAULT_TIMEOUT_SECONDS}s"}
except Exception as e:
return {"error": f"ollama call failed: {type(e).__name__}: {e}"}
content = response.get("message", {}).get("content", "").strip()
if not content:
return {"error": "DueDiligenceCoordinator returned empty content"}
# Parse JSON
try:
# Strip markdown fences if present
if content.startswith("```"):
content = content.split("```", 2)[1]
if content.startswith("json"):
content = content[4:]
content = content.strip()
synthesis = json.loads(content)
except json.JSONDecodeError as e:
return {
"error": f"JSON parse failed: {e}",
"raw_output": content[:1000],
}
elapsed = round(time.perf_counter() - t0, 1)
synthesis["_meta"] = {
"model": MODEL_NAME,
"elapsed_seconds": elapsed,
"input_sources_present": _summarize_inputs(user_payload),
}
_log(f" DueDiligence synthesis done in {elapsed}s — verdict={synthesis.get('verdict','?')} confidence={synthesis.get('confidence_score','?')}")
return synthesis
# ════════════════════════════════════════════════════════════════════════════
# Input payload builder — trim to essential fields, keep context small
# ════════════════════════════════════════════════════════════════════════════
def _build_input_payload(
*,
pa_record: Optional[dict] = None,
court_records: Optional[dict] = None,
owner_classification: Optional[dict] = None,
reo_signal: Optional[dict] = None,
renovation_signal: Optional[dict] = None,
financial_analysis: Optional[dict] = None,
environmental: Optional[dict] = None,
market_context: Optional[dict] = None,
listing_info: Optional[dict] = None,
) -> dict:
"""Build trimmed input dict for DueDiligenceCoordinator. Avoid context bloat."""
payload = {}
if pa_record:
# PA key fields only (skip raw + meta)
payload["property_appraiser"] = {
k: pa_record.get(k) for k in (
"county", "parcel_id", "owner_name", "co_owners", "mailing_address",
"site_address", "owner_address_mismatch",
"year_built", "effective_year_built", "sqft_heated", "sqft_total",
"lot_acres", "lot_total_sqft", "bedrooms", "baths", "stories",
"building_type", "use_code", "use_description", "zoning",
"roof_type", "roofing_cover", "exterior_wall", "interior_wall",
"just_value_current", "assessed_value_current", "exemption_current",
"just_value_last", "assessed_value_last", "taxes_paid_last",
"tax_year_current", "tax_year_last",
"homestead_active", "homestead_amount",
"subdivision", "legal_description",
)
}
# Trim sales history to last 8 records
sales = pa_record.get("sales_history") or []
payload["property_appraiser"]["sales_history"] = sales[:8]
payload["property_appraiser"]["most_recent_qualified_sale"] = pa_record.get("most_recent_qualified_sale")
if court_records:
payload["court_records"] = {
k: court_records.get(k) for k in (
"status", "county", "plaintiff", "case_number",
"lis_pendens_count", "lis_pendens",
"liens_inventory", "owner_name",
)
}
if owner_classification:
payload["owner_classification"] = {
"type": owner_classification.get("type"),
"category": owner_classification.get("category"),
"is_lender": owner_classification.get("is_lender"),
"is_government": owner_classification.get("is_government"),
"is_individual": owner_classification.get("is_individual"),
"is_investor_entity": owner_classification.get("is_investor_entity"),
"matched_keyword": owner_classification.get("matched_keyword"),
"confidence": owner_classification.get("confidence"),
}
if reo_signal and reo_signal.get("is_reo_opportunity"):
payload["reo_signal"] = reo_signal
if renovation_signal:
payload["renovation_signal"] = {
"is_flip_pattern": renovation_signal.get("is_flip_pattern"),
"is_flip_in_progress": renovation_signal.get("is_flip_in_progress"),
"evidence": renovation_signal.get("evidence"),
"interpretation_es": renovation_signal.get("interpretation_es"),
"value_increase_pct": renovation_signal.get("value_increase_pct"),
"months_between": renovation_signal.get("months_between"),
"listing_premium_pct": renovation_signal.get("listing_premium_pct"),
"months_since_recent_sale": renovation_signal.get("months_since_recent_sale"),
}
if financial_analysis:
payload["financial_analysis"] = {
"max_profitable_offer": financial_analysis.get("max_profitable_offer"),
"payment_table": financial_analysis.get("payment_table"),
"recommendation": financial_analysis.get("recommendation"),
"inputs": financial_analysis.get("inputs"),
}
# Only include live_in si tiene income (DTI relevant)
li = financial_analysis.get("live_in_scenario") or {}
if (li.get("dti_evaluation") or {}).get("monthly_income"):
payload["financial_analysis"]["live_in_dti"] = li.get("dti_evaluation")
if environmental:
payload["environmental"] = environmental
if market_context:
payload["market_context"] = market_context
if listing_info:
payload["listing_info"] = listing_info
return payload
def _summarize_inputs(payload: dict) -> dict:
"""Return which input sources are present (for confidence calibration)."""
return {k: bool(payload.get(k)) for k in (
"property_appraiser", "court_records", "owner_classification",
"reo_signal", "renovation_signal", "financial_analysis",
"environmental", "market_context", "listing_info",
)}
# ════════════════════════════════════════════════════════════════════════════
# CLI for manual testing
# ════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="DueDiligenceCoordinator manual test")
parser.add_argument("--deal-id", type=int, required=True, help="Deal id from deals.db")
args = parser.parse_args()
# Load deal + run pre_screening to get all the signals, then synthesize
import sqlite3
conn = sqlite3.connect("data/deals.db")
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("SELECT * FROM deals WHERE id=?", (args.deal_id,))
row = cur.fetchone()
if not row:
print(f"Deal id={args.deal_id} not found"); exit(1)
deal = dict(row)
conn.close()
from pre_screening_orchestrator import run_pre_screening
print(f"Running pre-screening for deal {args.deal_id} to gather inputs...")
ps = run_pre_screening(deal=deal)
print("\nRunning DueDiligenceCoordinator synthesis...")
synth = run_due_diligence_synthesis(
pa_record=ps.get("property_appraiser"),
court_records=ps.get("court_records_raw"),
owner_classification=ps.get("owner_classification"),
reo_signal=ps.get("reo_signal"),
renovation_signal=(ps.get("property_appraiser") or {}).get("renovation_signal"),
financial_analysis=ps.get("financial_analysis"),
listing_info={
"listing_price": deal.get("listing_price"),
"deal_type": deal.get("deal_type"),
"source": deal.get("source"),
"source_url": deal.get("source_url"),
},
status_cb=print,
)
print()
# Write to file to avoid Windows cp1252 console encoding issues
out_path = f"_probe_out/dd_synth_{args.deal_id}.json"
import os
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, "w", encoding="utf-8") as fh:
json.dump(synth, fh, indent=2, default=str, ensure_ascii=False)
print(f"Synthesis saved to {out_path}")
# Print ASCII-safe summary
print(f" verdict: {synth.get('verdict')}")
print(f" confidence_score: {synth.get('confidence_score')}")
print(f" top_risks: {len(synth.get('top_risks', []))} items")
bid = synth.get('bid_recommendation') or {}
print(f" bid: ${bid.get('low',0):,} - ${bid.get('high',0):,} (mid ${bid.get('mid',0):,})")
print(f" action_items: {len(synth.get('action_items', []))}")