"""data_fetchers/pa_palm_beach.py — Full Palm Beach PA extractor. Sitio: https://pbcpao.gov (server-rendered HTML + jQuery, no SPA) Deep link: /Property/Details?parcelId={parcelId} VENTAJA: NO necesita Playwright. urllib + HTMLParser stdlib = rapidisimo. """ from __future__ import annotations import re import urllib.request from datetime import datetime, timezone from html.parser import HTMLParser from typing import Optional USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/131" _BASE_URL = "https://pbcpao.gov" # ════════════════════════════════════════════════════════════════════════════ # HTML text extractor (skip script/style) # ════════════════════════════════════════════════════════════════════════════ class _TextExtractor(HTMLParser): def __init__(self): super().__init__() self.in_skip = False self.parts: list[str] = [] def handle_starttag(self, tag, attrs): if tag in ("script", "style", "noscript"): self.in_skip = True def handle_endtag(self, tag): if tag in ("script", "style", "noscript"): self.in_skip = False def handle_data(self, d): if not self.in_skip: t = d.strip() if t: self.parts.append(t) # ════════════════════════════════════════════════════════════════════════════ # Tables extractor (table → list of rows) # ════════════════════════════════════════════════════════════════════════════ class _TableExtractor(HTMLParser): """Extracts all tables as list of {idx, rows: [[cells]]} dicts.""" def __init__(self): super().__init__() self.in_table = False self.in_tr = False self.in_cell = False self.in_skip = False self.current_row: list[str] = [] self.current_cell = "" self.current_table: list[list[str]] = [] self.tables: list[list[list[str]]] = [] def handle_starttag(self, tag, attrs): if tag in ("script", "style"): self.in_skip = True elif tag == "table": self.in_table = True self.current_table = [] elif tag == "tr" and self.in_table: self.in_tr = True self.current_row = [] elif tag in ("td", "th") and self.in_tr: self.in_cell = True self.current_cell = "" elif tag == "br" and self.in_cell: self.current_cell += " " def handle_endtag(self, tag): if tag in ("script", "style"): self.in_skip = False elif tag == "table": if self.current_table: self.tables.append(self.current_table) self.in_table = False elif tag == "tr" and self.in_tr: if self.current_row: self.current_table.append(self.current_row) self.in_tr = False elif tag in ("td", "th") and self.in_cell: self.current_row.append(re.sub(r"\s+", " ", self.current_cell).strip()) self.in_cell = False def handle_data(self, d): if self.in_cell and not self.in_skip: self.current_cell += d # ════════════════════════════════════════════════════════════════════════════ # Helpers # ════════════════════════════════════════════════════════════════════════════ def _grab_after(text: str, label: str, max_len: int = 80) -> Optional[str]: """Find label in flat text, return the next non-empty token cluster.""" if not text or not label: return None idx = text.find(label) if idx < 0: return None after = text[idx + len(label): idx + len(label) + max_len].strip() # Take up to next " ", " ", end-of-line, or "Property" / "Address" etc. # First word/phrase = value until next CAPITALIZED label pattern m = re.match(r"\s*([^\n]+?)(?:\s{2,}|\s+[A-Z][A-Z\s]+\s+[A-Za-z]+|$)", after) if m: return m.group(1).strip() return after.split("\n")[0].strip() def _to_int(s) -> Optional[int]: if not s: return None cleaned = re.sub(r"[^\d-]", "", str(s)) try: return int(cleaned) if cleaned else None except ValueError: return None def _money_to_int(s) -> Optional[int]: if not s: return None cleaned = re.sub(r"[^\d.-]", "", str(s)) if not cleaned or cleaned == "-": return None try: return int(float(cleaned)) except ValueError: return None # ════════════════════════════════════════════════════════════════════════════ # Public API # ════════════════════════════════════════════════════════════════════════════ def fetch_palm_beach_pa_record( parcel_id: str, timeout_seconds: int = 30, listing_price: Optional[float] = None, ) -> dict: """Fetch full Palm Beach PA record by parcel_id (PCN). Args: parcel_id: 17-digit PCN (e.g. "00414232000003080") or formatted with dashes timeout_seconds: HTTP timeout listing_price: enables flip-in-progress detection Returns: rich dict (unified schema) with errors list. """ fetched_at = datetime.now(timezone.utc).isoformat() result = { "county": "Palm Beach", "source": "Palm Beach County Property Appraiser (pbcpao.gov)", "fetched_at": fetched_at, "errors": [], } if not parcel_id: result["errors"].append("no parcel_id provided") return result pcn_clean = parcel_id.replace("-", "").strip() url = f"{_BASE_URL}/Property/Details?parcelId={pcn_clean}" result["source_url"] = url # HTTP fetch (no Playwright) try: req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) with urllib.request.urlopen(req, timeout=timeout_seconds) as resp: html = resp.read().decode("utf-8", errors="ignore") except Exception as e: result["errors"].append(f"HTTP fetch failed: {type(e).__name__}: {e}") return result # Detect "no property found" if "no property" in html.lower()[:5000] or "not found" in html.lower()[:5000]: result["errors"].append("parcel not found in PA records") return result # Extract flat text text_extractor = _TextExtractor() text_extractor.feed(html) flat = " ".join(text_extractor.parts) # Extract tables tbl_extractor = _TableExtractor() tbl_extractor.feed(html) # ─── Parse scalars from flat text ────────────────────────────────────── # Owner Name DERMYSHI IRFAN Property Control Number ... m = re.search(r"Owner Name\s+([A-Z][A-Z\s,'.\-&]+?)(?=\s+(?:Property Control|Mailing|Current|Tax|Subdivision|Total))", flat) if m: result["owner_name"] = m.group(1).strip() # Property Control Number — formatted as XX-XX-XX-XX-XX-XXX-XXXX m = re.search(r"Property Control Number\s+([\d\-]+)", flat) if m: result["parcel_id"] = m.group(1).strip() else: result["parcel_id"] = parcel_id # Year Built m = re.search(r"Year Built\s+(\d{4})", flat) if m: result["year_built"] = int(m.group(1)) # Beds / Baths m = re.search(r"Bed\s*Rooms\s+(\d+)", flat, re.IGNORECASE) if m: result["bedrooms"] = int(m.group(1)) m = re.search(r"Full Baths\s+(\d+)", flat, re.IGNORECASE) full_b = int(m.group(1)) if m else 0 m = re.search(r"Half Baths\s+(\d+)", flat, re.IGNORECASE) half_b = int(m.group(1)) if m else 0 if full_b or half_b: result["baths"] = float(full_b) + (0.5 * half_b) result["baths_full"] = full_b result["baths_half"] = half_b # Square footage m = re.search(r"Total Square Footage\s+(\d[\d,]*)", flat) or re.search(r"Square Footage\s+(\d[\d,]*)", flat) if m: result["sqft_total"] = _to_int(m.group(1)) m = re.search(r"Area Under Air\s+(\d[\d,]*)", flat) if m: result["sqft_heated"] = _to_int(m.group(1)) # Lot acres m = re.search(r"Acres\s+([\d.]+)", flat) if m: try: result["lot_acres"] = float(m.group(1)) except ValueError: pass # Property Use Code + Zoning m = re.search(r"Property Use Code\s+([\w\d\?\.\-\s]+?)(?:\s+Zoning)", flat) if m: result["use_code"] = m.group(1).replace("?", " - ").strip() m = re.search(r"Zoning\s+([\w\d\?\-]+?(?:\s+\([^)]+\))?)", flat) if m: result["zoning"] = m.group(1).replace("?", " - ").strip() # Subdivision m = re.search(r"Subdivision\s+([A-Z0-9 ,'.\-]+?)(?=\s+Official Records|Sale Date|Legal Description|$)", flat) if m: sub = m.group(1).strip() result["subdivision"] = sub if sub else None # Legal description m = re.search(r"Legal Description\s+([^\n]+?)(?=\s+Show Full Map|Show More|Nearby|Owner INFORMATION|$)", flat) if m: result["legal_description"] = m.group(1).strip()[:300] # Roof / interior info for label, key in [ ("Air Condition Desc.", "ac_description"), ("Heat Type", "heat_type"), ("Heat Fuel", "heat_fuel"), ("Roof Structure", "roof_struct"), ("Roof Cover", "roof_cover"), ("Interior Wall 1", "interior_wall"), ]: m = re.search(rf"{re.escape(label)}\s+([A-Z][A-Z &/\-]+?)(?=\s+[A-Z][a-z])", flat) if m: result[key] = m.group(1).strip() # Site Address (Property address line) # PB format: addresses are usually shown after "Location Address" header m = re.search(r"Location Address\s+([^\n]+?)(?=\s+Subdivision|Owner|Property Use|$)", flat) if m: result["site_address"] = m.group(1).strip() # Homestead detection: "Current Homestead" or "Homestead Exemption" # Easier: check if exemption appears in benefits section result["homestead_active"] = bool( re.search(r"Homestead Exemption\s+\$[\d,]+|Current Homestead\s*Yes", flat, re.IGNORECASE) ) # ─── Tax/Assessment values from tables ───────────────────────────────── # Look for table with rows like: "Tax Year 2025 2024 2023 ..." # "Total Market Value $758,298 $762,232 ..." # "Total Assessed Value ..." tax_years = [] market_vals: dict[str, int] = {} assessed_vals: dict[str, int] = {} improvement_vals: dict[str, int] = {} for tbl in tbl_extractor.tables: for row in tbl: if not row: continue first = row[0].lower() if row else "" if first == "tax year": tax_years = [c for c in row[1:] if c] elif "market value" in first or "total market" in first: for i, v in enumerate(row[1:]): if i < len(tax_years): market_vals[tax_years[i]] = _money_to_int(v) or 0 elif first == "assessed value" or "total assessed" in first: for i, v in enumerate(row[1:]): if i < len(tax_years): assessed_vals[tax_years[i]] = _money_to_int(v) or 0 elif "improvement value" in first: for i, v in enumerate(row[1:]): if i < len(tax_years): improvement_vals[tax_years[i]] = _money_to_int(v) or 0 # Pick most recent year valid_years = sorted([y for y in tax_years if y.isdigit()], reverse=True) current_year = valid_years[0] if valid_years else None last_year = valid_years[1] if len(valid_years) > 1 else None result["just_value_current"] = market_vals.get(current_year) if current_year else None result["assessed_value_current"] = assessed_vals.get(current_year) if current_year else None result["just_value_last"] = market_vals.get(last_year) if last_year else None result["assessed_value_last"] = assessed_vals.get(last_year) if last_year else None result["tax_year_current"] = int(current_year) if current_year else None result["tax_year_last"] = int(last_year) if last_year else None result["assessment_history"] = { "market": market_vals, "assessed": assessed_vals, "improvement": improvement_vals, } # ─── Sales history from tables ───────────────────────────────────────── sales: list[dict] = [] for tbl in tbl_extractor.tables: if not tbl or len(tbl) < 2: continue hdr = [c.lower() for c in tbl[0]] # Sales table heuristic: header has "Sale[s] Date" and "Price". # PB uses "Sales Date" (with 's'), some sites use "Sale Date". has_sale_date = any(("sale date" in h or "sales date" in h) for h in hdr) if has_sale_date and any("price" in h for h in hdr): idx_date = next((i for i, h in enumerate(hdr) if "sale date" in h or "sales date" in h), -1) idx_price = next((i for i, h in enumerate(hdr) if "price" in h), -1) idx_book = next((i for i, h in enumerate(hdr) if "book" in h or h.startswith("or")), -1) idx_qual = next((i for i, h in enumerate(hdr) if "qualified" in h or h == "sale type" or h == "type"), -1) for row in tbl[1:]: if len(row) < 2: continue d = row[idx_date] if idx_date >= 0 and idx_date < len(row) else "" p = row[idx_price] if idx_price >= 0 and idx_price < len(row) else "" if not d and not p: continue qual_raw = row[idx_qual] if idx_qual >= 0 and idx_qual < len(row) else "" price = _money_to_int(p) # Palm Beach uses "Sale Type" not "qualified/disqualified". # Treat WARRANTY DEED with price >= 50K as Qualified (typical PB convention). # CERT OF TITLE = foreclosure deed = Unqualified. # QUIT CLAIM with low price = Unqualified. q_low = qual_raw.lower() if "warranty deed" in q_low and (price or 0) >= 50000: qualified_flag = "Qualified" elif "qualified" in q_low and "disqualified" not in q_low: qualified_flag = "Qualified" else: qualified_flag = "Unqualified" sales.append({ "date": d, "price": price, "book_page": row[idx_book] if idx_book >= 0 and idx_book < len(row) else "", "qualification": qual_raw, "deed_type": qual_raw, "qualified": qualified_flag, }) result["sales_history"] = sales # Most recent qualified sale qualified = [s for s in sales if s.get("qualified", "").startswith("Qualified") and s.get("price", 0) and s["price"] >= 1000] result["most_recent_qualified_sale"] = qualified[0] if qualified else None # Renovation signal from data_fetchers.pa_duval import _detect_renovation_pattern result["renovation_signal"] = _detect_renovation_pattern( sales, listing_price=listing_price, ) return result # ════════════════════════════════════════════════════════════════════════════ # CLI # ════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": import argparse import json parser = argparse.ArgumentParser(description="Palm Beach PA full record fetcher") parser.add_argument("--parcel", required=True, help="PCN (e.g. '00414232000003080')") args = parser.parse_args() rec = fetch_palm_beach_pa_record(parcel_id=args.parcel) print(json.dumps(rec, indent=2, default=str))