Files
2026-07-03 12:24:58 -04:00

631 lines
24 KiB
Python

"""deals_db.py — SQLite persistence layer for Phase 3 DealFinder.
PROPOSITO:
DealFinder scrapea deals de multiples sources (county auctions, MLS, HUD, etc).
Cada deal se persiste en data/deals.db con su lifecycle status:
new → classified → viewed → interesting → dismissed | analyzed
DealClassifier corre automaticamente sobre cada deal nuevo (gratis, local Ollama).
El analisis profundo (5+ agentes) se dispara on-demand cuando user clickea "Analizar".
TABLAS:
deals — deal listings con classification + lifecycle status
scraper_runs — historico de corridas de scrapers
firecrawl_usage — tracking de credits consumidos (para budget alerts)
API publica:
init_db() — crea schema si no existe (idempotent)
insert_deal(deal_data) -> deal_id — persiste deal (UPSERT por hash)
get_deal_by_hash(hash) -> dict | None
get_deal_by_id(deal_id) -> dict | None
update_classification(deal_id, ...)
update_status(deal_id, new_status)
update_analysis(deal_id, analysis_file)
list_deals(status=None, classification=None, limit=100) -> list
list_recent_deals(days=7, limit=100) -> list
record_scraper_run(...) -> run_id
finish_scraper_run(run_id, ...)
record_firecrawl_usage(source, credits, url=None, description=None)
get_firecrawl_month_usage(year_month=None) -> int
is_firecrawl_paused() -> bool — True si supero PAUSE threshold
firecrawl_alert_level() -> "ok" | "warn" | "pause"
USAGE:
from deals_db import init_db, insert_deal, list_deals
init_db()
deal_id = insert_deal({"source": "miami_dade_clerk", "address": "...", ...})
"""
from __future__ import annotations
import hashlib
import json
import os
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from config import (
DEALS_DB_PATH,
FIRECRAWL_CREDIT_BUDGET,
FIRECRAWL_ALERT_THRESHOLD_PCT,
FIRECRAWL_PAUSE_THRESHOLD_PCT,
DEAL_STATUS_NEW,
DEAL_STATUS_VALUES,
CLASSIFICATION_VALUES,
)
# Paths absolutos (anclados al proyecto, no al CWD del caller)
_PROJECT_ROOT = Path(__file__).resolve().parent
_DB_PATH = _PROJECT_ROOT / DEALS_DB_PATH
# Thread-local connection pool (SQLite connections are not thread-safe by default)
_LOCAL = threading.local()
def _get_conn() -> sqlite3.Connection:
"""Get thread-local SQLite connection. Auto-creates if needed."""
conn = getattr(_LOCAL, "conn", None)
if conn is None:
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(_DB_PATH), isolation_level=None) # autocommit
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") # WAL = concurrent reads
conn.execute("PRAGMA foreign_keys=ON")
_LOCAL.conn = conn
return conn
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _utc_month_key(dt: Optional[datetime] = None) -> str:
"""Returns 'YYYY-MM' for the given UTC datetime (default: now)."""
if dt is None:
dt = datetime.now(timezone.utc)
return dt.strftime("%Y-%m")
# ═══════════════════════════════════════════════════════════════════════════
# Schema
# ═══════════════════════════════════════════════════════════════════════════
_SCHEMA = """
CREATE TABLE IF NOT EXISTS deals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
deal_hash TEXT UNIQUE NOT NULL,
source TEXT NOT NULL,
source_url TEXT,
-- Property
address TEXT, -- nullable: vacant lots may have only parcel_id
city TEXT,
state TEXT,
zip TEXT,
county TEXT,
parcel_id TEXT, -- official PA parcel/RE number (fallback identifier for raw lots)
-- Pricing
listing_price REAL, -- nullable: foreclosure pre-auction bids hidden
deal_type TEXT,
starting_bid REAL,
estimated_arv REAL,
final_judgment_amount REAL, -- foreclosure only: max debt owed (NOT buyer price)
-- Auction status (REDEEMED bug fix): 'scheduled' | 'pending' | 'postponed' (live)
-- vs 'Redeemed' | 'Canceled' | 'Sold' | 'Closed' | 'Dismissed' (dead)
auction_status TEXT,
-- Attributes
beds INTEGER,
baths REAL,
sqft INTEGER,
year_built INTEGER,
lot_sqft INTEGER,
-- Property type (display + filtering): sfr | condo | townhome | multi_family
-- | land | mobile_home | commercial | unknown
property_type TEXT,
-- External source ID (NOT a court case). Examples: Zillow zpid, HUD case#,
-- MLS #, internal listing ID. Used for dedup + source URL reconstruction.
-- DISTINCTO de case_number, que es SOLO para court cases reales (foreclosure
-- judicial, tax_deed). Esta separacion es critical: el pre-screening
-- usa case_number como senal de "deal judicial activo" → DEBE correr
-- court records search. external_id NO dispara esa logica.
external_id TEXT,
-- Metadata
photos_urls TEXT,
listing_description TEXT,
case_number TEXT,
auction_date TEXT,
-- DealClassifier output
classification_status TEXT,
classification_score INTEGER,
classification_reasons TEXT,
classification_strategy TEXT,
classified_at TEXT,
-- User interaction
status TEXT NOT NULL DEFAULT 'new',
user_notes TEXT,
-- Lifecycle
scraped_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
analyzed_at TEXT,
analysis_file TEXT
);
CREATE INDEX IF NOT EXISTS idx_deals_status ON deals(status);
CREATE INDEX IF NOT EXISTS idx_deals_classification_status ON deals(classification_status);
CREATE INDEX IF NOT EXISTS idx_deals_classification_score ON deals(classification_score);
CREATE INDEX IF NOT EXISTS idx_deals_source ON deals(source);
CREATE INDEX IF NOT EXISTS idx_deals_scraped_at ON deals(scraped_at);
CREATE INDEX IF NOT EXISTS idx_deals_county ON deals(county);
CREATE INDEX IF NOT EXISTS idx_deals_auction_date ON deals(auction_date);
CREATE TABLE IF NOT EXISTS scraper_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
deals_found INTEGER DEFAULT 0,
deals_new INTEGER DEFAULT 0,
deals_updated INTEGER DEFAULT 0,
errors_count INTEGER DEFAULT 0,
errors_summary TEXT,
firecrawl_credits_used INTEGER DEFAULT 0,
duration_seconds REAL,
status TEXT
);
CREATE INDEX IF NOT EXISTS idx_scraper_runs_source ON scraper_runs(source);
CREATE INDEX IF NOT EXISTS idx_scraper_runs_started_at ON scraper_runs(started_at);
CREATE TABLE IF NOT EXISTS firecrawl_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
used_at TEXT NOT NULL,
source TEXT NOT NULL,
credits INTEGER NOT NULL,
url TEXT,
description TEXT,
year_month TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_firecrawl_year_month ON firecrawl_usage(year_month);
CREATE INDEX IF NOT EXISTS idx_firecrawl_source ON firecrawl_usage(source);
"""
def init_db() -> None:
"""Crea schema si no existe + aplica ALTER TABLE idempotentes para columnas
agregadas despues. Llamar al startup."""
conn = _get_conn()
conn.executescript(_SCHEMA)
_apply_idempotent_migrations(conn)
def _apply_idempotent_migrations(conn) -> None:
"""ALTER TABLE para columnas agregadas despues del schema inicial.
Cada bloque verifica antes con PRAGMA table_info; safe to call siempre.
Mantener en sync con _SCHEMA arriba.
"""
existing_cols = {r["name"] for r in conn.execute("PRAGMA table_info(deals)").fetchall()}
# Added 2026-05: auction_status (REDEEMED bug fix)
if "auction_status" not in existing_cols:
conn.execute("ALTER TABLE deals ADD COLUMN auction_status TEXT")
# Added 2026-05: property_type (display + filtering)
if "property_type" not in existing_cols:
conn.execute("ALTER TABLE deals ADD COLUMN property_type TEXT")
# Added 2026-05: external_id (Zillow zpid, HUD case#, MLS#) — separated from
# case_number which is now ONLY for real court cases (foreclosure/tax_deed).
if "external_id" not in existing_cols:
conn.execute("ALTER TABLE deals ADD COLUMN external_id TEXT")
# Added 2026-05-15: hoa_monthly captured from Zillow detail + manual input.
# NULL = unknown (not searched yet). 0 = confirmed no HOA. >0 = has HOA.
# Used for financial analysis PITI + "Solo sin HOA" filter in Inventory.
if "hoa_monthly" not in existing_cols:
conn.execute("ALTER TABLE deals ADD COLUMN hoa_monthly REAL")
# ═══════════════════════════════════════════════════════════════════════════
# Deal CRUD
# ═══════════════════════════════════════════════════════════════════════════
# Campos que pueden venir del scraper (en el dict deal_data)
_DEAL_INSERT_FIELDS = [
"source", "source_url", "address", "city", "state", "zip", "county",
"listing_price", "deal_type", "starting_bid", "estimated_arv",
"beds", "baths", "sqft", "year_built", "lot_sqft", "property_type",
"photos_urls", "listing_description", "case_number", "external_id",
"auction_date", "parcel_id", "final_judgment_amount", "auction_status",
"hoa_monthly",
]
def compute_deal_hash(source: str, address: Optional[str], listing_price: Optional[float],
parcel_id: Optional[str] = None, case_number: Optional[str] = None) -> str:
"""Hash unico para deduplicacion. Identifica un deal del mismo source.
Identifier priority: case_number > parcel_id > (address + price).
Esto asegura que vacant lots sin address (pero con parcel_id o case_number)
se deduplicquen correctamente entre corridas.
"""
if case_number:
identifier = f"case|{case_number}"
elif parcel_id:
identifier = f"parcel|{parcel_id}"
elif address:
norm_addr = " ".join(address.upper().split())
identifier = f"addr|{norm_addr}|{int(listing_price) if listing_price else 0}"
else:
# Last resort: just price + source
identifier = f"price|{int(listing_price) if listing_price else 0}"
payload = f"{source}|{identifier}"
return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16]
def insert_deal(deal_data: dict) -> tuple[int, bool]:
"""Persiste o actualiza un deal. UPSERT por deal_hash.
Returns (deal_id, is_new). is_new=True si era deal nuevo, False si solo se
actualizo last_seen_at (deal ya conocido).
deal_data debe contener mínimo: source. Identificacion requiere AL MENOS UNO
de: address, parcel_id, case_number. listing_price puede ser None (foreclosure
pre-auction donde bid no es publico).
photos_urls puede ser lista o JSON string.
"""
src = deal_data.get("source")
addr = deal_data.get("address")
parcel_id = deal_data.get("parcel_id")
case_number = deal_data.get("case_number")
price = deal_data.get("listing_price")
if not src:
raise ValueError("insert_deal requires source")
if not any([addr, parcel_id, case_number]):
raise ValueError(
f"insert_deal requires AT LEAST ONE of address, parcel_id, case_number, got: "
f"address={addr!r} parcel_id={parcel_id!r} case_number={case_number!r}"
)
h = compute_deal_hash(src, addr, price, parcel_id=parcel_id, case_number=case_number)
now = _now_iso()
conn = _get_conn()
# Auto-infer property_type if scraper didn't provide one
# (centralized so scrapers don't need individual updates)
if not deal_data.get("property_type"):
try:
from property_type_inference import infer_property_type
deal_data["property_type"] = infer_property_type(deal_data)
except Exception:
deal_data["property_type"] = "unknown"
# JSON-encode list/dict fields
photos = deal_data.get("photos_urls")
if isinstance(photos, (list, dict)):
photos = json.dumps(photos, ensure_ascii=False)
existing = conn.execute(
"SELECT id FROM deals WHERE deal_hash = ?", (h,)
).fetchone()
if existing:
# UPDATE: refresh last_seen_at + sobrescribir campos provistos
deal_id = existing["id"]
update_fields = []
update_values = []
for f in _DEAL_INSERT_FIELDS:
if f in deal_data:
value = deal_data[f]
if f == "photos_urls" and isinstance(value, (list, dict)):
value = json.dumps(value, ensure_ascii=False)
update_fields.append(f"{f} = ?")
update_values.append(value)
update_fields.append("last_seen_at = ?")
update_values.append(now)
update_values.append(deal_id)
sql = f"UPDATE deals SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(sql, tuple(update_values))
return deal_id, False
else:
# INSERT
values = {f: deal_data.get(f) for f in _DEAL_INSERT_FIELDS}
values["photos_urls"] = photos
values["deal_hash"] = h
values["scraped_at"] = now
values["last_seen_at"] = now
values["status"] = DEAL_STATUS_NEW
cols = list(values.keys())
placeholders = ", ".join(["?"] * len(cols))
sql = f"INSERT INTO deals ({', '.join(cols)}) VALUES ({placeholders})"
cur = conn.execute(sql, tuple(values[c] for c in cols))
return cur.lastrowid, True
def get_deal_by_hash(deal_hash: str) -> Optional[dict]:
conn = _get_conn()
row = conn.execute("SELECT * FROM deals WHERE deal_hash = ?", (deal_hash,)).fetchone()
return dict(row) if row else None
def get_deal_by_id(deal_id: int) -> Optional[dict]:
conn = _get_conn()
row = conn.execute("SELECT * FROM deals WHERE id = ?", (deal_id,)).fetchone()
return dict(row) if row else None
def update_classification(
deal_id: int,
*,
status: str,
score: int,
reasons: list[str],
strategy: Optional[str] = None,
) -> None:
"""Update DealClassifier output. Also flips deal.status from 'new' to 'classified'."""
if status not in CLASSIFICATION_VALUES:
raise ValueError(f"Invalid classification status: {status}. Must be one of {CLASSIFICATION_VALUES}")
conn = _get_conn()
conn.execute(
"""UPDATE deals SET
classification_status = ?,
classification_score = ?,
classification_reasons = ?,
classification_strategy = ?,
classified_at = ?,
status = CASE WHEN status = 'new' THEN 'classified' ELSE status END
WHERE id = ?""",
(status, score, json.dumps(reasons, ensure_ascii=False), strategy,
_now_iso(), deal_id),
)
def update_status(deal_id: int, new_status: str) -> None:
if new_status not in DEAL_STATUS_VALUES:
raise ValueError(f"Invalid status: {new_status}. Must be one of {DEAL_STATUS_VALUES}")
conn = _get_conn()
conn.execute("UPDATE deals SET status = ? WHERE id = ?", (new_status, deal_id))
def update_analysis(deal_id: int, analysis_file: str) -> None:
"""Marca un deal como analyzed + linkea el JSON file generado por orchestrator."""
conn = _get_conn()
conn.execute(
"UPDATE deals SET status = 'analyzed', analyzed_at = ?, analysis_file = ? WHERE id = ?",
(_now_iso(), analysis_file, deal_id),
)
_ALLOWED_ORDER_BY = {
"scraped_at DESC",
"scraped_at ASC",
"classification_score DESC",
"classification_score ASC",
"classification_score DESC NULLS LAST",
"listing_price ASC",
"listing_price DESC",
"auction_date ASC",
"auction_date DESC",
"last_seen_at DESC",
}
def list_deals(
*,
status: Optional[str] = None,
classification: Optional[str] = None,
source: Optional[str] = None,
county: Optional[str] = None,
min_score: Optional[int] = None,
limit: int = 100,
order_by: str = "scraped_at DESC",
) -> list[dict]:
"""Query deals con filtros opcionales."""
# Guard: whitelist order_by to prevent SQL injection via unsanitised caller input.
if order_by not in _ALLOWED_ORDER_BY:
raise ValueError(
f"Invalid order_by value: {order_by!r}. "
f"Must be one of: {sorted(_ALLOWED_ORDER_BY)}"
)
where = []
params: list = []
if status:
where.append("status = ?")
params.append(status)
if classification:
where.append("classification_status = ?")
params.append(classification)
if source:
where.append("source = ?")
params.append(source)
if county:
where.append("county = ?")
params.append(county)
if min_score is not None:
where.append("classification_score >= ?")
params.append(min_score)
where_clause = f"WHERE {' AND '.join(where)}" if where else ""
sql = f"SELECT * FROM deals {where_clause} ORDER BY {order_by} LIMIT ?"
params.append(limit)
conn = _get_conn()
rows = conn.execute(sql, tuple(params)).fetchall()
return [dict(r) for r in rows]
def get_cities_for_counties(counties: list[str]) -> list[str]:
"""Devuelve ciudades únicas (no nulas) para los condados dados, ordenadas alfabéticamente."""
if not counties:
return []
placeholders = ",".join("?" * len(counties))
conn = _get_conn()
rows = conn.execute(
f"SELECT DISTINCT city FROM deals WHERE county IN ({placeholders}) "
f"AND city IS NOT NULL AND city != '' ORDER BY city",
tuple(counties),
).fetchall()
return [r["city"] for r in rows]
def list_recent_deals(days: int = 7, limit: int = 100) -> list[dict]:
"""Deals vistos en los ultimos N dias."""
from datetime import timedelta
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
conn = _get_conn()
rows = conn.execute(
"SELECT * FROM deals WHERE scraped_at >= ? ORDER BY scraped_at DESC LIMIT ?",
(cutoff, limit),
).fetchall()
return [dict(r) for r in rows]
def count_deals_by_status() -> dict[str, int]:
"""Resumen de cuantos deals hay en cada status."""
conn = _get_conn()
rows = conn.execute(
"SELECT status, COUNT(*) as n FROM deals GROUP BY status"
).fetchall()
return {r["status"]: r["n"] for r in rows}
# ═══════════════════════════════════════════════════════════════════════════
# Scraper runs tracking
# ═══════════════════════════════════════════════════════════════════════════
def record_scraper_run(source: str) -> int:
"""Inicia un scraper run. Retorna run_id para finish_scraper_run despues."""
conn = _get_conn()
cur = conn.execute(
"INSERT INTO scraper_runs (source, started_at) VALUES (?, ?)",
(source, _now_iso()),
)
return cur.lastrowid
def finish_scraper_run(
run_id: int,
*,
deals_found: int,
deals_new: int,
deals_updated: int,
errors_count: int = 0,
errors_summary: Optional[list] = None,
firecrawl_credits_used: int = 0,
status: str = "success",
) -> None:
"""Cierra un scraper run con sus metricas."""
conn = _get_conn()
started_row = conn.execute(
"SELECT started_at FROM scraper_runs WHERE id = ?", (run_id,)
).fetchone()
if not started_row:
return
started = datetime.fromisoformat(started_row["started_at"])
duration = (datetime.now(timezone.utc) - started).total_seconds()
errors_json = json.dumps(errors_summary, ensure_ascii=False) if errors_summary else None
conn.execute(
"""UPDATE scraper_runs SET
finished_at = ?, deals_found = ?, deals_new = ?, deals_updated = ?,
errors_count = ?, errors_summary = ?, firecrawl_credits_used = ?,
duration_seconds = ?, status = ?
WHERE id = ?""",
(_now_iso(), deals_found, deals_new, deals_updated, errors_count,
errors_json, firecrawl_credits_used, round(duration, 2), status, run_id),
)
def list_recent_scraper_runs(source: Optional[str] = None, limit: int = 20) -> list[dict]:
conn = _get_conn()
if source:
rows = conn.execute(
"SELECT * FROM scraper_runs WHERE source = ? ORDER BY started_at DESC LIMIT ?",
(source, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM scraper_runs ORDER BY started_at DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
# ═══════════════════════════════════════════════════════════════════════════
# Firecrawl credit tracking + budget enforcement
# ═══════════════════════════════════════════════════════════════════════════
def record_firecrawl_usage(
*,
source: str,
credits: int,
url: Optional[str] = None,
description: Optional[str] = None,
) -> None:
"""Registra consumo de credits Firecrawl. Llamar despues de cada API call."""
now = datetime.now(timezone.utc)
conn = _get_conn()
conn.execute(
"""INSERT INTO firecrawl_usage
(used_at, source, credits, url, description, year_month)
VALUES (?, ?, ?, ?, ?, ?)""",
(now.isoformat(), source, credits, url, description, _utc_month_key(now)),
)
def get_firecrawl_month_usage(year_month: Optional[str] = None) -> int:
"""Total credits consumidos en el mes actual (o el especificado)."""
if year_month is None:
year_month = _utc_month_key()
conn = _get_conn()
row = conn.execute(
"SELECT COALESCE(SUM(credits), 0) AS total FROM firecrawl_usage WHERE year_month = ?",
(year_month,),
).fetchone()
return int(row["total"]) if row else 0
def firecrawl_alert_level() -> str:
"""Retorna 'ok' | 'warn' | 'pause' segun consumo actual vs budget."""
used = get_firecrawl_month_usage()
budget = FIRECRAWL_CREDIT_BUDGET
if budget <= 0:
return "ok"
pct = used / budget * 100
if pct >= FIRECRAWL_PAUSE_THRESHOLD_PCT:
return "pause"
elif pct >= FIRECRAWL_ALERT_THRESHOLD_PCT:
return "warn"
return "ok"
def is_firecrawl_paused() -> bool:
"""True si el budget mensual fue superado (auto-pause). Permite que el
scraper layer pregunte 'puedo gastar credits?' antes de cada Firecrawl call."""
return firecrawl_alert_level() == "pause"
def firecrawl_budget_status() -> dict:
"""Snapshot completo del estado del budget Firecrawl. Util para UI."""
used = get_firecrawl_month_usage()
budget = FIRECRAWL_CREDIT_BUDGET
return {
"year_month": _utc_month_key(),
"credits_used": used,
"credits_budget": budget,
"credits_remaining": max(0, budget - used),
"usage_pct": round(used / budget * 100, 1) if budget > 0 else 0,
"alert_level": firecrawl_alert_level(),
"is_paused": is_firecrawl_paused(),
"alert_threshold_pct": FIRECRAWL_ALERT_THRESHOLD_PCT,
"pause_threshold_pct": FIRECRAWL_PAUSE_THRESHOLD_PCT,
}
# ═══════════════════════════════════════════════════════════════════════════
# Auto-init al importar (cheap operation, idempotent)
# ═══════════════════════════════════════════════════════════════════════════
# NOTA: NO auto-inicializamos al import para que tests/CI puedan setear
# DEALS_DB_PATH antes de tocar el disco. Llamar init_db() explicitamente desde
# el main del orchestrator / streamlit app / scrapers.