Files
AR-House/deal_classifier.py
2026-07-03 12:24:58 -04:00

454 lines
17 KiB
Python

"""deal_classifier.py — Lightweight clasificador de deals (Phase 3A).
PROPOSITO:
DealFinder scrapea muchos deals/dia. Antes de gastar ciclos en analisis profundo
(8 agentes Ollama, 18-22 min/deal), clasificamos cada deal nuevo con un modelo
liviano que decide en ~10 segundos si vale la pena.
OUTPUT del classifier:
- classification_status: potential_winner | maybe | pass | red_flag
- score: 0-100
- reasons: list[str]
- strategy: buy_hold | brrrr | wholesale | section8 | auction | needs_analysis
FLOW:
1. precompute_heuristics(deal_data) — Python calcula $/sqft, cap_rate_rough, etc
2. build_classifier_prompt(deal_data, heuristics) — embed inputs en el prompt
3. ollama.chat(model="DealClassifier", prompt) — LLM clasifica
4. parse_classifier_output(response) — extraer JSON estricto, validar campos
5. deals_db.update_classification(deal_id, ...) — persistir resultado
API:
classify_deal(deal_data: dict) -> dict — clasifica un deal
classify_all_unclassified(limit: int = 50) -> dict — batch process pending
FAIL-SAFE: si LLM devuelve JSON invalido o esta down, retorna 'maybe + needs_analysis'
con score 50 (default safe). No bloquea el pipeline.
"""
from __future__ import annotations
import concurrent.futures
import json
import re
import time
from typing import Optional
import ollama
# Hard timeout per deal (seconds). Modelos llama3.1:8b típicamente terminan
# en <10s; si tarda más de 30s probablemente está en loop o stuck por
# format=json no satisfecho. Skip + mark como error.
CLASSIFIER_TIMEOUT_SECONDS = 30
# Hard ceiling de tokens generados. JSON output debería ser <300 tokens.
# Si genera más, el modelo está divagando — abortar.
CLASSIFIER_NUM_PREDICT = 400
from config import (
DEAL_CLASSIFIER_MODEL,
RESOURCE_MODE,
KEEP_ALIVE_BY_MODE,
CLASSIFICATION_VALUES,
CLASS_MAYBE,
CLASS_POTENTIAL_WINNER,
CLASS_PASS,
CLASS_RED_FLAG,
)
# Strategy values aceptables (matchea el modelfile)
VALID_STRATEGIES = {
"buy_hold", "brrrr", "wholesale", "section8", "auction", "needs_analysis",
}
# County typical $/sqft benchmarks (Florida) — usados en heuristics
# Conservative: tomar mid del rango como reference, agente puede recalibrar
COUNTY_PSQFT_BENCHMARKS_FL = {
# Miami-Dade
"miami-dade": {"A": (300, 500), "B": (200, 300), "C": (140, 200), "D": (80, 140)},
# Broward
"broward": {"A": (280, 450), "B": (180, 280), "C": (130, 180), "D": (75, 130)},
# Palm Beach
"palm beach": {"A": (350, 600), "B": (220, 350), "C": (150, 220), "D": (90, 150)},
# Duval (Jacksonville)
"duval": {"A": (200, 350), "B": (140, 200), "C": (100, 140), "D": (60, 100)},
# Hillsborough (Tampa)
"hillsborough": {"A": (240, 400), "B": (170, 240), "C": (120, 170), "D": (70, 120)},
# Orange (Orlando)
"orange": {"A": (220, 380), "B": (160, 220), "C": (110, 160), "D": (70, 110)},
# Default (otros condados FL)
"_default": {"A": (220, 380), "B": (160, 220), "C": (110, 160), "D": (70, 110)},
}
def precompute_heuristics(deal_data: dict) -> dict:
"""Calcula heuristicas baratas en Python ANTES del LLM.
Estos numeros son inputs cerrados para el LLM (NO los recalcules en el prompt).
FIX B1 v1.1: handles listing_price=None correctamente (foreclosure pre-auction).
Para foreclosure: usa estimated_arv (assessed_value) como reference para $/sqft.
NO usa final_judgment_amount como price (eso es la deuda, no el bid del buyer).
"""
listing_price = deal_data.get("listing_price")
starting_bid = deal_data.get("starting_bid")
assessed_arv = deal_data.get("estimated_arv")
sqft = deal_data.get("sqft")
deal_type = (deal_data.get("deal_type") or "mls").lower()
county = (deal_data.get("county") or "").lower().replace(" county", "").strip()
# Reference price para heuristics (no es lo mismo que listing_price)
# tax_deed / mls: usar listing_price (es el bid o asking)
# foreclosure pre-auction: usar assessed_arv como proxy de market (NO final_judgment)
if deal_type == "foreclosure" and not listing_price:
# No tenemos el bid real — usar assessed_value como proxy de market value
reference_price = assessed_arv or 0
else:
reference_price = listing_price or starting_bid or 0
h: dict = {
"listing_price_semantics": (
"tax_deed: listing_price IS the Opening Bid (what buyer pays)"
if deal_type == "tax_deed" else
"foreclosure pre-auction: listing_price hidden; using assessed_value as proxy"
if deal_type == "foreclosure" else
"standard: listing_price is the asking price"
),
"reference_price_used": reference_price,
}
# $/sqft
if reference_price and sqft and sqft > 0:
h["price_per_sqft"] = round(reference_price / sqft, 1)
else:
h["price_per_sqft"] = None
# 1% rule estimated rent (basico)
h["estimated_rent_1pct_rule"] = round(reference_price * 0.01, 0) if reference_price else None
# Cap rate rough usando 50% rule
if reference_price and h.get("estimated_rent_1pct_rule"):
rent_annual = h["estimated_rent_1pct_rule"] * 12
h["estimated_cap_rate_rough_pct"] = round(rent_annual * 0.5 / reference_price * 100, 2)
else:
h["estimated_cap_rate_rough_pct"] = None
# Deal type categorization
h["is_deal_type_distressed"] = deal_type in (
"auction", "foreclosure", "tax_deed", "reo"
)
# County benchmark
bench = COUNTY_PSQFT_BENCHMARKS_FL.get(county) or COUNTY_PSQFT_BENCHMARKS_FL["_default"]
h["county_psqft_benchmarks"] = bench
# ARV upside calculations
# tax_deed: ARV vs starting_bid (REAL buyer payment) = real upside
# foreclosure: ARV vs final_judgment is misleading; we set to None
final_judgment = deal_data.get("final_judgment_amount")
if deal_type == "tax_deed" and assessed_arv and starting_bid and starting_bid > 0:
h["arv_upside_dollars"] = round(assessed_arv - starting_bid, 0)
h["arv_upside_pct"] = round((assessed_arv - starting_bid) / starting_bid * 100, 1)
h["arv_upside_note"] = "tax_deed: real upside (starting_bid is what buyer pays)"
elif deal_type == "foreclosure":
h["arv_upside_dollars"] = None
h["arv_upside_pct"] = None
h["arv_upside_note"] = (
"foreclosure pre-auction: cannot compute real upside (bid hidden). "
f"Reference data: assessed_value=${assessed_arv}, final_judgment=${final_judgment}"
)
elif assessed_arv and reference_price > 0:
h["arv_upside_dollars"] = round(assessed_arv - reference_price, 0)
h["arv_upside_pct"] = round((assessed_arv - reference_price) / reference_price * 100, 1)
h["arv_upside_note"] = "standard: arv - listing_price"
else:
h["arv_upside_dollars"] = None
h["arv_upside_pct"] = None
h["arv_upside_note"] = "insufficient data"
# Photos count
photos = deal_data.get("photos_urls")
if isinstance(photos, str):
try:
photos = json.loads(photos)
except Exception:
photos = None
h["photos_count"] = len(photos) if isinstance(photos, (list, tuple)) else 0
# Description length
desc = deal_data.get("listing_description") or ""
h["description_length"] = len(desc)
return h
def build_classifier_prompt(deal_data: dict, heuristics: dict) -> str:
"""Construye el user prompt para DealClassifier. Compacto, JSON-friendly."""
# Compactar deal_data: solo campos relevantes
d = {
"source": deal_data.get("source"),
"deal_type": deal_data.get("deal_type"),
"address": deal_data.get("address"),
"city": deal_data.get("city"),
"county": deal_data.get("county"),
"state": deal_data.get("state"),
"zip": deal_data.get("zip"),
"listing_price": deal_data.get("listing_price"),
"starting_bid": deal_data.get("starting_bid"),
"estimated_arv": deal_data.get("estimated_arv"),
"beds": deal_data.get("beds"),
"baths": deal_data.get("baths"),
"sqft": deal_data.get("sqft"),
"year_built": deal_data.get("year_built"),
"lot_sqft": deal_data.get("lot_sqft"),
"auction_date": deal_data.get("auction_date"),
"case_number": deal_data.get("case_number"),
"listing_description_excerpt": (
(deal_data.get("listing_description") or "")[:500]
),
}
# Remove None values for cleaner prompt
d = {k: v for k, v in d.items() if v is not None and v != ""}
return f"""Clasifica este deal de real estate USA Florida.
═══ DEAL DATA ═══
{json.dumps(d, indent=2, ensure_ascii=False)}
═══ HEURISTIC PRE-CALCULATIONS (Python — NO recalcules) ═══
{json.dumps(heuristics, indent=2, ensure_ascii=False)}
═══ TU TAREA ═══
Devuelve SOLAMENTE un objeto JSON valido con classification_status, score, reasons, strategy.
No prose. No markdown. No bloques de codigo. Solo JSON.
"""
def parse_classifier_output(raw: str) -> dict:
"""Extrae JSON del output del LLM. Robust contra markdown / prose extra.
Returns dict con classification_status, score, reasons, strategy.
Si falla parsing → fallback safe (maybe + needs_analysis + score 50).
"""
fallback = {
"classification_status": CLASS_MAYBE,
"score": 50,
"reasons": ["LLM output no parseable — fallback safe"],
"strategy": "needs_analysis",
"_parse_error": True,
}
if not raw:
return fallback
# Strip markdown code fences si aparecen
cleaned = re.sub(r"```(?:json)?\s*", "", raw).strip()
cleaned = re.sub(r"```\s*$", "", cleaned).strip()
# Buscar el primer { y el ultimo } para aislar el JSON
start = cleaned.find("{")
end = cleaned.rfind("}")
if start < 0 or end < 0 or end <= start:
return {**fallback, "_parse_error_detail": f"no JSON braces found in: {raw[:200]}"}
json_str = cleaned[start:end + 1]
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
return {**fallback, "_parse_error_detail": f"JSON decode error: {e}: {json_str[:200]}"}
# Validar y normalizar campos
status = data.get("classification_status", "").lower().strip()
if status not in CLASSIFICATION_VALUES:
# Algunos LLMs devuelven valores ligeramente distintos
normalized = {
"winner": CLASS_POTENTIAL_WINNER,
"potential winner": CLASS_POTENTIAL_WINNER,
"potentialwinner": CLASS_POTENTIAL_WINNER,
"good": CLASS_POTENTIAL_WINNER,
"redflag": CLASS_RED_FLAG,
"red flag": CLASS_RED_FLAG,
"warning": CLASS_RED_FLAG,
"bad": CLASS_PASS,
}
status = normalized.get(status, CLASS_MAYBE)
try:
score = int(data.get("score", 50))
score = max(0, min(100, score))
except (TypeError, ValueError):
score = 50
reasons = data.get("reasons", [])
if not isinstance(reasons, list):
reasons = [str(reasons)]
reasons = [str(r)[:200] for r in reasons[:6] if r]
strategy = data.get("strategy", "needs_analysis")
if isinstance(strategy, str):
strategy = strategy.lower().strip().replace("&", "and").replace(" ", "_")
if strategy not in VALID_STRATEGIES:
strategy = "needs_analysis"
return {
"classification_status": status,
"score": score,
"reasons": reasons or ["sin razones provistas"],
"strategy": strategy,
"_parse_error": False,
}
def _call_ollama_chat(prompt: str, keep_alive) -> dict:
"""Direct ollama.chat call. Called inside a threadpool by classify_deal()."""
return ollama.chat(
model=DEAL_CLASSIFIER_MODEL,
messages=[{"role": "user", "content": prompt}],
keep_alive=keep_alive,
format="json", # Forces JSON output
options={
"num_predict": CLASSIFIER_NUM_PREDICT, # cap output length
"num_ctx": 4096, # smaller context = faster+safer
"temperature": 0.2,
},
)
def classify_deal(deal_data: dict, timeout_seconds: int = CLASSIFIER_TIMEOUT_SECONDS) -> dict:
"""Clasifica un solo deal. Entry point principal.
BUG FIX (2026-05-14): added hard timeout via threadpool. Si ollama.chat se
cuelga (model loop, format=json no satisfecho), abortamos a los N segundos
en vez de bloquear forever.
Returns dict con classification_status, score, reasons, strategy, _meta:
_meta: {duration_seconds, model, tokens, parse_error, timed_out}
"""
started = time.perf_counter()
keep_alive = KEEP_ALIVE_BY_MODE[RESOURCE_MODE]
heuristics = precompute_heuristics(deal_data)
prompt = build_classifier_prompt(deal_data, heuristics)
raw = ""
tokens = 0
error = None
timed_out = False
try:
# Wrap ollama.chat in a threadpool with hard timeout.
# Ollama Python client no expone per-request timeout en .chat(), por eso
# usamos concurrent.futures para forzar uno.
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_call_ollama_chat, prompt, keep_alive)
try:
response = future.result(timeout=timeout_seconds)
raw = response["message"]["content"]
tokens = response.get("eval_count", 0)
except concurrent.futures.TimeoutError:
timed_out = True
error = f"timeout after {timeout_seconds}s — model stuck or runaway generation"
# Note: future may still run in background; ollama keep_alive will
# release the model. Nothing we can do to interrupt mid-generation.
except Exception as e:
error = f"{type(e).__name__}: {e}"
parsed = parse_classifier_output(raw)
duration = time.perf_counter() - started
# On timeout, override classification to "error" sentinel so we don't retry
if timed_out:
parsed["classification_status"] = CLASS_MAYBE # safe default
parsed["score"] = 0
parsed["reasons"] = [f"Classifier timeout {timeout_seconds}s — needs manual review"]
parsed["strategy"] = "needs_analysis"
parsed["_meta"] = {
"duration_seconds": round(duration, 2),
"model": DEAL_CLASSIFIER_MODEL,
"tokens": tokens,
"ollama_error": error,
"timed_out": timed_out,
"parse_error": parsed.pop("_parse_error", False),
"parse_error_detail": parsed.pop("_parse_error_detail", None),
"heuristics_used": heuristics,
}
return parsed
def classify_all_unclassified(
limit: int = 50,
status_cb=None,
timeout_seconds: int = CLASSIFIER_TIMEOUT_SECONDS,
) -> dict:
"""Batch process: clasifica todos los deals con status='new'.
Returns summary dict con counts + duration metrics.
BUG FIX (2026-05-14): progress logging muestra i/total + duracion por deal +
skip + continue cuando uno se cuelga. Antes podia colgarse en un solo deal
indefinidamente.
"""
from deals_db import list_deals, update_classification, init_db
init_db()
pending = list_deals(status="new", limit=limit)
total = len(pending)
summary = {
"processed": 0,
"potential_winner": 0,
"maybe": 0,
"pass": 0,
"red_flag": 0,
"errors": 0,
"timeouts": 0,
"total_pending": total,
"total_seconds": 0.0,
}
def _log(msg: str) -> None:
if status_cb:
status_cb(msg)
batch_start = time.perf_counter()
for i, deal in enumerate(pending, 1):
addr_preview = (deal.get("address") or f"id={deal['id']}")[:50]
_log(f"[{i}/{total}] Classifying deal {deal['id']}{addr_preview}")
deal_start = time.perf_counter()
try:
result = classify_deal(deal, timeout_seconds=timeout_seconds)
meta = result.get("_meta", {})
update_classification(
deal_id=deal["id"],
status=result["classification_status"],
score=result["score"],
reasons=result["reasons"],
strategy=result["strategy"],
)
summary["processed"] += 1
cls = result["classification_status"]
summary[cls] = summary.get(cls, 0) + 1
if meta.get("timed_out"):
summary["timeouts"] += 1
_log(f" [{i}/{total}] TIMEOUT after {timeout_seconds}s — marked maybe+needs_analysis, continuing")
elif meta.get("parse_error"):
summary["errors"] += 1
_log(f" [{i}/{total}] parse_error: {meta.get('parse_error_detail', '')[:80]}")
else:
deal_elapsed = time.perf_counter() - deal_start
_log(f" [{i}/{total}] OK in {deal_elapsed:.1f}s — {cls} (score {result['score']})")
except Exception as e:
summary["errors"] += 1
_log(f" [{i}/{total}] ERROR {type(e).__name__}: {str(e)[:120]}")
summary["total_seconds"] = round(time.perf_counter() - batch_start, 1)
_log(f"Batch complete: {summary['processed']}/{total} processed, "
f"{summary['timeouts']} timeouts, {summary['errors']} errors, "
f"{summary['total_seconds']:.0f}s total")
return summary