"""scrapers/zillow.py — Zillow MLS scraper via Firecrawl. SOURCE: https://www.zillow.com/{county-slug}-county-{state}/houses/ STACK: Firecrawl (no Playwright — Zillow has aggressive anti-bot) URL PATTERN: https://www.zillow.com/miami-dade-county-fl/houses/ https://www.zillow.com/broward-county-fl/houses/ https://www.zillow.com/palm-beach-county-fl/houses/ COST: ~1 Firecrawl credit per page scrape (~25-40 listings/page) Default: 1 page = 1 credit per county MARKDOWN STRUCTURE (descubierto via exploration): - [$PRICE](URL_zpid) - **N** bds - **N** ba - **N,NNN** sqft [optional badges like "New construction" / "Price reduced"] [STREET, CITY, STATE ZIP](URL_zpid) [REALTOR/BROKERAGE_NAME] DEAL TYPE: 'mls' """ from __future__ import annotations import json import os import re import time from typing import Callable, Optional from scrapers._cache import get_cached, save_cache, DEFAULT_TTL_SECONDS_HOURLY SOURCE = "zillow" ZILLOW_BASE = "https://www.zillow.com" # Rate limit conservador (Zillow es sensible) _REQUEST_INTERVAL_SECONDS = 4.0 # Default cache TTL: 1h (MLS listings se mueven mas rapido que court records) _CACHE_TTL = DEFAULT_TTL_SECONDS_HOURLY def _build_zillow_url(county: str, state: str, page: int = 1) -> str: """Genera URL canonica de Zillow para un county. Args: county: e.g. "Miami-Dade", "Palm Beach" state: 2-letter code, e.g. "FL" page: 1-indexed page number Returns: URL string. """ slug = county.lower().replace(" ", "-") base = f"{ZILLOW_BASE}/{slug}-county-{state.lower()}/houses/" if page > 1: base += f"{page}_p/" return base # ─── Parser ──────────────────────────────────────────────────────────────── # Regex para extraer cada listing block del markdown. # Captura: price, URL, bds, ba, sqft (cuando hay), address line, brokerage line. # Format del markdown: # - [$PRICE](URL_TO_DETAIL) # (texto opcional con bds/ba/sqft o badges) # [STREET, CITY, FL ZIP](URL_TO_DETAIL) # [BROKERAGE] _LISTING_PATTERN = re.compile( r"-\s+\[\$([\d,]+)\]\((https?://[^)]+zpid[^)]*)\)" r"([\s\S]*?)" # body (lazy match hasta el siguiente listing) r"(?=\n-\s+\[\$[\d,]+\]\(|\Z)", re.MULTILINE, ) # Image URL pattern within listing body (Zillow CDN) _IMG_PATTERN = re.compile( r"!\[[^\]]*\]\((https?://photos\.zillowstatic\.com/[^)]+\.(?:webp|jpg|png))\)", re.IGNORECASE, ) _BEDS_RE = re.compile(r"\*\*\s*([\d.]+)\s*\*\*\s*bds", re.IGNORECASE) _BATHS_RE = re.compile(r"\*\*\s*([\d.]+)\s*\*\*\s*ba\b", re.IGNORECASE) _SQFT_RE = re.compile(r"\*\*\s*([\d,]+)\s*\*\*\s*sqft", re.IGNORECASE) _ADDRESS_LINK_RE = re.compile(r"\[([^\]]+,\s*[A-Z]{2}\s+\d{5})\]\(([^)]+zpid[^)]+)\)") _STATE_ZIP_RE = re.compile(r",\s*([A-Z]{2})\s+(\d{5})") def _parse_money(s: str) -> Optional[float]: if not s: return None try: return float(s.replace(",", "").replace("$", "")) except ValueError: return None def _extract_listings_from_markdown(md: str) -> list[dict]: """Parse Zillow markdown into listing dicts. BUG FIX 2026-05-15: el parser previo hacia 'lookback 800 chars' por cada listing, lo que causaba que multiples listings reclamaran la MISMA foto (ej. 2352 Scenic View, 1950 Holly Oaks y 11189 Stapleton compartian foto). Algoritmo corregido: 1. Indexar todas las imagenes Zillow con su posicion en el markdown 2. Para cada listing, buscar la foto CLOSEST PRECEDING (entre el end del listing previo y el start del current) que NO haya sido reclamada 3. Cada photo URL se "claima" → solo se asigna a UN listing """ listings: list[dict] = [] # Pre-index all listings and all photos with positions listing_matches = list(_LISTING_PATTERN.finditer(md)) all_photos_positioned: list[tuple[int, str]] = [] for img_m in re.finditer( r"!\[[^\]]*\]\((https?://photos\.zillowstatic\.com/[^)]+\.(?:webp|jpg|png))\)", md, re.IGNORECASE, ): all_photos_positioned.append((img_m.start(), img_m.group(1))) claimed_photos: set[str] = set() for i, m in enumerate(listing_matches): price_raw = m.group(1) url = m.group(2) body = m.group(3) listing_start = m.start() # Use prev listing's START (not .end()) as boundary because each listing's # body lazily matches up to the next listing — so the photo for listing N+1 # technically lives inside listing N's match.body. Using prev.start() gives # us the "between" range that includes that photo. prev_listing_start = listing_matches[i - 1].start() if i > 0 else 0 # Find the LAST photo in (prev_listing_start, listing_start) that isn't claimed. # 'Last' = closest to current listing in markdown order = the right one. # claimed_photos set ensures each photo URL gets assigned to AT MOST one listing. my_photo: Optional[str] = None for pos, photo_url in reversed(all_photos_positioned): if pos < prev_listing_start or pos >= listing_start: continue if photo_url in claimed_photos: continue my_photo = photo_url claimed_photos.add(photo_url) break photos_unique = [my_photo] if my_photo else [] price = _parse_money(price_raw) beds = None baths = None sqft = None bm = _BEDS_RE.search(body) if bm: try: beds = int(float(bm.group(1))) except ValueError: pass bm = _BATHS_RE.search(body) if bm: try: baths = float(bm.group(1)) except ValueError: pass sm = _SQFT_RE.search(body) if sm: try: sqft = int(sm.group(1).replace(",", "")) except ValueError: pass # Extract address from the body (markdown link with address text) addr_match = _ADDRESS_LINK_RE.search(body) address_full = addr_match.group(1) if addr_match else None state = None zip_code = None city = None if address_full: sz_match = _STATE_ZIP_RE.search(address_full) if sz_match: state = sz_match.group(1) zip_code = sz_match.group(2) # city = penultimate comma part (after street, before STATE) parts = [p.strip() for p in address_full.split(",")] if len(parts) >= 3: # parts = [street, city, "STATE ZIP"] city = parts[-2] if not _STATE_ZIP_RE.match(parts[-2]) else None if not city and len(parts) >= 4: city = parts[-3] # Extract zpid for source_url canonical zpid_match = re.search(r"/(\d+)_zpid", url) zpid = zpid_match.group(1) if zpid_match else None # Badges (text between price block and address) badges = [] for kw in ("New construction", "Price reduced", "Price cut", "New listing", "Open House", "Foreclosure", "Pre-foreclosure", "Coming soon", "Auction"): if kw.lower() in body.lower(): badges.append(kw) listings.append({ "price": price, "source_url": url, "zpid": zpid, "beds": beds, "baths": baths, "sqft": sqft, "address": address_full, "city": city, "state": state, "zip": zip_code, "badges": badges, "photos_urls": photos_unique, }) return listings # ─── Property Detail Parser (individual property page) ──────────────────── # Bug fix 2026-05-15: search results pages NO incluyen condition/features/ # year_built. La pagina individual de cada property SI los tiene. Esta funcion # se llama on-demand durante pre-screening cuando vale gastar 1 credit Firecrawl # para enriquecer un deal especifico. # Keywords que indican condition tag (Zillow lo expone explicito) _CONDITION_TAG_RE = re.compile( r"(?:Condition|Property\s*Condition)\s*[:\-]\s*([A-Za-z][A-Za-z0-9\s\-/]+?)(?:\n|$|\|)", re.IGNORECASE, ) # "Updated/Remodeled" tag aparece tambien standalone (sin label) _CONDITION_STANDALONE_RE = re.compile( r"\b(Updated/Remodeled|Remodeled|Renovated|Updated|Original|New\s+construction|Newly\s+built|Fixer[- ]upper|Needs\s+work)\b", re.IGNORECASE, ) # Year built (multiple formats) _YEAR_BUILT_RE = re.compile( r"(?:Year\s*built|Built\s*in|Construction\s*year)\s*[:\-]?\s*(\d{4})", re.IGNORECASE, ) # Home status (active, under contract, pending, sold) _HOME_STATUS_RE = re.compile( r"\b(For\s*sale|Active\s+under\s+contract|Active\s+contingent|Pending|Sold|Off\s*market|Coming\s*soon|Auction)\b", re.IGNORECASE, ) # "What's special" extraction — usually H2 + bullet list right after _WHATS_SPECIAL_RE = re.compile( r"(?:##|\*\*)\s*What\W*s?\s+special\W*?\s*(?:##|\*\*)?\s*([\s\S]+?)" r"(?=\n##\s|\n\*\*[A-Z]|\Z)", re.IGNORECASE, ) # Bullet item extractor (after "What's special" matches a section) _BULLET_ITEM_RE = re.compile(r"(?:^|\n)\s*(?:-|\*)\s+([^\n]+)") # Zestimate _ZESTIMATE_RE = re.compile(r"Zestimate\W*?\$?([\d,]+)", re.IGNORECASE) # Tax assessed value _TAX_ASSESSED_RE = re.compile( r"(?:Tax\s+assessed\s+value|Assessed\s+value|Assessment)\W*?\$?([\d,]+)", re.IGNORECASE, ) # HOA monthly fee (multiple formats Zillow uses) # Examples: "HOA: $350/mo", "HOA fee: $250 monthly", "Monthly HOA: $400" # Also: "HOA: None", "No HOA", "HOA: 0" _HOA_RE = re.compile( r"HOA(?:\s+fee)?[\s:]+\$?([\d,]+)\s*(?:/\s*(?:mo|month)|monthly)?", re.IGNORECASE, ) _HOA_MONTHLY_RE = re.compile( r"Monthly\s+HOA[\s:]+\$?([\d,]+)", re.IGNORECASE, ) _NO_HOA_RE = re.compile( r"(?:HOA[\s:]+(?:None|N/A|0|No)\b)|(?:\bNo\s+HOA\b)", re.IGNORECASE, ) def _parse_property_detail_md(md: str) -> dict: """Extract enriched fields from Zillow property detail page markdown. Returns: { "condition_status": str | None, "year_built": int | None, "home_status": str | None, # "For sale" | "Active under contract" | etc "features_special": [str], # tags from "What's special" section "description": str, # best-effort full description "zestimate": int | None, "tax_assessed_value": int | None, "active_under_contract": bool, # convenience flag "renovation_keywords_found": [str], # keywords detected anywhere } """ out: dict = { "condition_status": None, "year_built": None, "home_status": None, "features_special": [], "description": "", "zestimate": None, "tax_assessed_value": None, "active_under_contract": False, "renovation_keywords_found": [], "hoa_monthly": None, # None=unknown, 0=confirmed no-HOA, >0=has HOA } if not md: return out # 1. Condition status — try explicit label first, then standalone tag m = _CONDITION_TAG_RE.search(md) if m: out["condition_status"] = m.group(1).strip() else: m = _CONDITION_STANDALONE_RE.search(md) if m: out["condition_status"] = m.group(1).strip() # 2. Year built m = _YEAR_BUILT_RE.search(md) if m: try: yb = int(m.group(1)) if 1800 < yb < 2100: out["year_built"] = yb except ValueError: pass # 3. Home status — capture first occurrence m = _HOME_STATUS_RE.search(md) if m: out["home_status"] = m.group(1).strip() if "under contract" in out["home_status"].lower() or "contingent" in out["home_status"].lower(): out["active_under_contract"] = True # 4. What's special features m = _WHATS_SPECIAL_RE.search(md) if m: section = m.group(1) # Extract bullet items for bm in _BULLET_ITEM_RE.finditer(section): item = bm.group(1).strip().rstrip(",.") # Filter out very short / irrelevant items if 2 <= len(item) <= 100 and not item.startswith("["): out["features_special"].append(item) # Cap to first 12 items (Zillow usually has 4-8) out["features_special"] = out["features_special"][:12] # 5. Description — heuristic: look for "Description" section header or use the # longest paragraph after the price block desc_match = re.search( r"(?:##|\*\*)\s*Description\W*?\s*(?:##|\*\*)?\s*([\s\S]+?)(?=\n##\s|\n\*\*[A-Z]|\Z)", md, re.IGNORECASE, ) if desc_match: out["description"] = desc_match.group(1).strip()[:2000] else: # Fallback: largest paragraph in first 8KB paras = [p.strip() for p in md[:8000].split("\n\n") if len(p.strip()) > 200] if paras: out["description"] = max(paras, key=len)[:2000] # 6. Zestimate m = _ZESTIMATE_RE.search(md) if m: try: out["zestimate"] = int(m.group(1).replace(",", "")) except ValueError: pass # 7. Tax assessed value m = _TAX_ASSESSED_RE.search(md) if m: try: out["tax_assessed_value"] = int(m.group(1).replace(",", "")) except ValueError: pass # 7b. HOA monthly fee # First check "No HOA" pattern (preferred — explicit negative) if _NO_HOA_RE.search(md): out["hoa_monthly"] = 0 else: # Then look for explicit HOA $X m = _HOA_MONTHLY_RE.search(md) or _HOA_RE.search(md) if m: try: val = int(m.group(1).replace(",", "")) # Sanity: HOA between $1 and $5000/mo if 1 <= val <= 5000: out["hoa_monthly"] = val elif val == 0: out["hoa_monthly"] = 0 except ValueError: pass # 8. Renovation keywords found in description + features (for downstream consumers) from data_fetchers.property_value import NEW_ITEM_KEYWORDS, RENOVATED_GLOBAL_KEYWORDS combined = (out["description"] + " " + " ".join(out["features_special"])).lower() found: list[str] = [] for kw in RENOVATED_GLOBAL_KEYWORDS: if kw.lower() in combined: found.append(kw) for cat, kws in NEW_ITEM_KEYWORDS.items(): for kw in kws: if kw.lower() in combined: found.append(f"{cat}:{kw}") break out["renovation_keywords_found"] = found return out def scrape_zillow_property_detail( url_or_zpid: str, *, status_cb: Optional[Callable[[str], None]] = None, use_cache: bool = True, ) -> tuple[dict, int]: """Scrape individual Zillow property page para extraer condition + features + status. Costo: 1 Firecrawl credit. Cachea 24h por URL. Args: url_or_zpid: full Zillow URL OR just the zpid (will build URL). status_cb: optional logging callback. use_cache: skip Firecrawl if cached. Returns: (detail_dict, credits_used) detail_dict: {condition_status, year_built, home_status, features_special, description, zestimate, tax_assessed_value, active_under_contract, renovation_keywords_found, source_url, _fetched_at, _errors} credits_used: 0 if cached, 1 if fresh fetch. """ def _log(m: str) -> None: if status_cb: status_cb(m) # Normalize to URL if url_or_zpid.startswith("http"): url = url_or_zpid else: # Assume zpid; Zillow accepts /homedetails/{zpid}_zpid/ as canonical url = f"{ZILLOW_BASE}/homedetails/{url_or_zpid}_zpid/" out: dict = { "source_url": url, "_fetched_at": None, "_errors": [], } # Cache check if use_cache: cached = get_cached("zillow_detail", url, ttl_seconds=86400) if cached: _log(f" zillow detail cache HIT for {url}") from datetime import datetime, timezone parsed = _parse_property_detail_md(cached) out.update(parsed) out["_fetched_at"] = datetime.now(timezone.utc).isoformat() out["_cached"] = True return out, 0 # Fresh fetch via Firecrawl api_key = os.getenv("FIRECRAWL_API_KEY", "") if not api_key: out["_errors"].append("FIRECRAWL_API_KEY not configured") return out, 0 try: from firecrawl import FirecrawlApp except ImportError as e: out["_errors"].append(f"firecrawl-py not installed: {e}") return out, 0 from deals_db import record_firecrawl_usage, is_firecrawl_paused if is_firecrawl_paused(): out["_errors"].append("Firecrawl budget paused") return out, 0 app = FirecrawlApp(api_key=api_key) try: result = app.scrape(url, formats=["markdown"]) md = result.markdown if hasattr(result, "markdown") else None if not md: out["_errors"].append("Firecrawl returned no markdown") return out, 1 # still consumed a credit even if empty record_firecrawl_usage(source=SOURCE, credits=1, url=url, description=f"Property detail enrichment: {url}") save_cache("zillow_detail", url, md, ttl_seconds=86400) parsed = _parse_property_detail_md(md) out.update(parsed) from datetime import datetime, timezone out["_fetched_at"] = datetime.now(timezone.utc).isoformat() return out, 1 except Exception as e: out["_errors"].append(f"Firecrawl scrape failed: {type(e).__name__}: {e}") return out, 0 # ─── Public API ──────────────────────────────────────────────────────────── def scrape_zillow_county( *, county: str, state: str, pages: int = 1, status_cb: Optional[Callable[[str], None]] = None, use_cache: bool = True, cache_ttl_seconds: int = _CACHE_TTL, ) -> tuple[list[dict], int]: """Scrape Zillow para un county. Args: county: nombre del condado (e.g. "Miami-Dade") state: 2-letter state code pages: cuantas paginas scrapear (default 1 = ~25-40 listings = 1 credit) use_cache: si True, lookup en cache 1h primero cache_ttl_seconds: TTL del cache Returns: (list[dict] compatible con deals_db.insert_deal, credits_actually_used) """ def _log(m: str) -> None: if status_cb: status_cb(m) deals: list[dict] = [] credits_used = 0 cache_namespace = "zillow" # Lazy import — Firecrawl is heavy api_key = os.getenv("FIRECRAWL_API_KEY", "") if not api_key: _log("❌ FIRECRAWL_API_KEY no configurada; abortando") return deals try: from firecrawl import FirecrawlApp except ImportError as e: _log(f"❌ firecrawl-py no instalado: {e}") return deals from deals_db import record_firecrawl_usage, is_firecrawl_paused if is_firecrawl_paused(): _log("🚨 Firecrawl budget paused (95%+ used) — aborting Zillow scrape") return deals, credits_used app = FirecrawlApp(api_key=api_key) last_request_at = 0.0 for page in range(1, pages + 1): url = _build_zillow_url(county, state, page) _log(f" Zillow {county} {state} page {page}: {url}") # Cache check md: Optional[str] = None if use_cache: md = get_cached(cache_namespace, url, ttl_seconds=cache_ttl_seconds) if md: _log(f" cache HIT ({len(md):,} chars)") if md is None: # Rate limit elapsed = time.time() - last_request_at if elapsed < _REQUEST_INTERVAL_SECONDS: time.sleep(_REQUEST_INTERVAL_SECONDS - elapsed) last_request_at = time.time() try: result = app.scrape(url, formats=["markdown"]) md = result.markdown if hasattr(result, "markdown") else None credits_used += 1 # 1 credit per scrape (confirmed via exploration) record_firecrawl_usage( source=SOURCE, credits=1, url=url, description=f"Zillow county scrape: {county} {state} page {page}", ) except Exception as e: _log(f" Firecrawl error: {type(e).__name__}: {e}") continue if not md: _log(f" Firecrawl returned empty markdown") continue if use_cache: save_cache(cache_namespace, url, md, status_code=200, ttl_seconds=cache_ttl_seconds) # Parse listings listings = _extract_listings_from_markdown(md) _log(f" parsed {len(listings)} listings from page {page}") for lst in listings: deal = _build_deal_record(lst, county=county, state=state) if deal.get("address") and deal.get("listing_price"): deals.append(deal) _log(f"Zillow {county} {state}: {len(deals)} deals, {credits_used} credits used") return deals, credits_used def _build_deal_record(listing: dict, *, county: str, state: str) -> dict: """Convert parsed Zillow listing → deal record para deals_db.""" badges = listing.get("badges") or [] desc_bits = [] if badges: desc_bits.append("Badges: " + ", ".join(badges)) if listing.get("zpid"): desc_bits.append(f"Zillow zpid: {listing['zpid']}") desc_bits.append("Source: Zillow MLS") # Inferir deal_type: si tiene badge "Foreclosure"/"Pre-foreclosure"/"Auction" # marcamos como foreclosure/auction; sino mls. deal_type = "mls" badge_str = " ".join(badges).lower() if "auction" in badge_str: deal_type = "auction" elif "foreclosure" in badge_str or "pre-foreclosure" in badge_str: deal_type = "foreclosure" return { "source": SOURCE, "source_url": listing.get("source_url"), "address": listing.get("address"), "city": listing.get("city"), "state": listing.get("state") or state, "zip": listing.get("zip"), "county": county, "listing_price": listing.get("price"), "deal_type": deal_type, "starting_bid": None, "estimated_arv": None, "beds": listing.get("beds"), "baths": listing.get("baths"), "sqft": listing.get("sqft"), "year_built": None, # No disponible en el card de resultados # Zillow zpid es un ID INTERNO de Zillow, NO un court case number. # Va en external_id (separado de case_number que es solo para court cases). "case_number": None, "external_id": listing.get("zpid"), "auction_date": None, "listing_description": " | ".join(desc_bits), "photos_urls": listing.get("photos_urls") or [], } def run_scraper_to_db( *, counties: list[str] = None, state: str = "FL", pages_per_county: int = 1, auto_classify: bool = True, status_cb: Optional[Callable[[str], None]] = None, ) -> dict: """Full pipeline: scrape Zillow para counties dados → persist → classify.""" from deals_db import init_db, insert_deal, record_scraper_run, finish_scraper_run init_db() if counties is None: counties = ["Miami-Dade"] run_id = record_scraper_run(SOURCE) errors: list[str] = [] total_credits = 0 def _log(m: str) -> None: if status_cb: status_cb(m) all_deals: list[dict] = [] for county in counties: try: deals, credits_actual = scrape_zillow_county( county=county, state=state, pages=pages_per_county, status_cb=status_cb, ) all_deals.extend(deals) total_credits += credits_actual except Exception as e: errors.append(f"scrape failed for {county}: {e}") deals_new = 0 deals_updated = 0 new_deal_ids: list[int] = [] for deal in all_deals: try: deal_id, is_new = insert_deal(deal) if is_new: deals_new += 1 new_deal_ids.append(deal_id) else: deals_updated += 1 except Exception as e: errors.append(f"insert fail for {deal.get('case_number')}: {e}") classified_count = 0 if auto_classify and new_deal_ids: _log(f"Auto-classifying {len(new_deal_ids)} new Zillow deals...") from deal_classifier import classify_deal from deals_db import get_deal_by_id, update_classification for did in new_deal_ids: try: d = get_deal_by_id(did) if not d: continue result = classify_deal(d) update_classification( deal_id=did, status=result["classification_status"], score=result["score"], reasons=result["reasons"], strategy=result["strategy"], ) classified_count += 1 except Exception as e: errors.append(f"classify fail for {did}: {e}") finish_scraper_run( run_id, deals_found=len(all_deals), deals_new=deals_new, deals_updated=deals_updated, errors_count=len(errors), errors_summary=errors if errors else None, firecrawl_credits_used=total_credits, status="success" if not errors else ("partial" if all_deals else "failed"), ) return { "source": SOURCE, "scraper_run_id": run_id, "deals_found": len(all_deals), "deals_new": deals_new, "deals_updated": deals_updated, "deals_classified": classified_count, "firecrawl_credits_used": total_credits, "errors_count": len(errors), "errors": errors, }