Files
2026-07-03 12:24:58 -04:00

410 lines
17 KiB
Python

"""data_fetchers/pa_palm_beach.py — Full Palm Beach PA extractor.
Sitio: https://pbcpao.gov (server-rendered HTML + jQuery, no SPA)
Deep link: /Property/Details?parcelId={parcelId}
VENTAJA: NO necesita Playwright. urllib + HTMLParser stdlib = rapidisimo.
"""
from __future__ import annotations
import re
import urllib.request
from datetime import datetime, timezone
from html.parser import HTMLParser
from typing import Optional
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/131"
_BASE_URL = "https://pbcpao.gov"
# ════════════════════════════════════════════════════════════════════════════
# HTML text extractor (skip script/style)
# ════════════════════════════════════════════════════════════════════════════
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.in_skip = False
self.parts: list[str] = []
def handle_starttag(self, tag, attrs):
if tag in ("script", "style", "noscript"):
self.in_skip = True
def handle_endtag(self, tag):
if tag in ("script", "style", "noscript"):
self.in_skip = False
def handle_data(self, d):
if not self.in_skip:
t = d.strip()
if t:
self.parts.append(t)
# ════════════════════════════════════════════════════════════════════════════
# Tables extractor (table → list of rows)
# ════════════════════════════════════════════════════════════════════════════
class _TableExtractor(HTMLParser):
"""Extracts all tables as list of {idx, rows: [[cells]]} dicts."""
def __init__(self):
super().__init__()
self.in_table = False
self.in_tr = False
self.in_cell = False
self.in_skip = False
self.current_row: list[str] = []
self.current_cell = ""
self.current_table: list[list[str]] = []
self.tables: list[list[list[str]]] = []
def handle_starttag(self, tag, attrs):
if tag in ("script", "style"):
self.in_skip = True
elif tag == "table":
self.in_table = True
self.current_table = []
elif tag == "tr" and self.in_table:
self.in_tr = True
self.current_row = []
elif tag in ("td", "th") and self.in_tr:
self.in_cell = True
self.current_cell = ""
elif tag == "br" and self.in_cell:
self.current_cell += " "
def handle_endtag(self, tag):
if tag in ("script", "style"):
self.in_skip = False
elif tag == "table":
if self.current_table:
self.tables.append(self.current_table)
self.in_table = False
elif tag == "tr" and self.in_tr:
if self.current_row:
self.current_table.append(self.current_row)
self.in_tr = False
elif tag in ("td", "th") and self.in_cell:
self.current_row.append(re.sub(r"\s+", " ", self.current_cell).strip())
self.in_cell = False
def handle_data(self, d):
if self.in_cell and not self.in_skip:
self.current_cell += d
# ════════════════════════════════════════════════════════════════════════════
# Helpers
# ════════════════════════════════════════════════════════════════════════════
def _grab_after(text: str, label: str, max_len: int = 80) -> Optional[str]:
"""Find label in flat text, return the next non-empty token cluster."""
if not text or not label:
return None
idx = text.find(label)
if idx < 0:
return None
after = text[idx + len(label): idx + len(label) + max_len].strip()
# Take up to next " ", " ", end-of-line, or "Property" / "Address" etc.
# First word/phrase = value until next CAPITALIZED label pattern
m = re.match(r"\s*([^\n]+?)(?:\s{2,}|\s+[A-Z][A-Z\s]+\s+[A-Za-z]+|$)", after)
if m:
return m.group(1).strip()
return after.split("\n")[0].strip()
def _to_int(s) -> Optional[int]:
if not s:
return None
cleaned = re.sub(r"[^\d-]", "", str(s))
try:
return int(cleaned) if cleaned else None
except ValueError:
return None
def _money_to_int(s) -> Optional[int]:
if not s:
return None
cleaned = re.sub(r"[^\d.-]", "", str(s))
if not cleaned or cleaned == "-":
return None
try:
return int(float(cleaned))
except ValueError:
return None
# ════════════════════════════════════════════════════════════════════════════
# Public API
# ════════════════════════════════════════════════════════════════════════════
def fetch_palm_beach_pa_record(
parcel_id: str,
timeout_seconds: int = 30,
listing_price: Optional[float] = None,
) -> dict:
"""Fetch full Palm Beach PA record by parcel_id (PCN).
Args:
parcel_id: 17-digit PCN (e.g. "00414232000003080") or formatted with dashes
timeout_seconds: HTTP timeout
listing_price: enables flip-in-progress detection
Returns: rich dict (unified schema) with errors list.
"""
fetched_at = datetime.now(timezone.utc).isoformat()
result = {
"county": "Palm Beach",
"source": "Palm Beach County Property Appraiser (pbcpao.gov)",
"fetched_at": fetched_at,
"errors": [],
}
if not parcel_id:
result["errors"].append("no parcel_id provided")
return result
pcn_clean = parcel_id.replace("-", "").strip()
url = f"{_BASE_URL}/Property/Details?parcelId={pcn_clean}"
result["source_url"] = url
# HTTP fetch (no Playwright)
try:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=timeout_seconds) as resp:
html = resp.read().decode("utf-8", errors="ignore")
except Exception as e:
result["errors"].append(f"HTTP fetch failed: {type(e).__name__}: {e}")
return result
# Detect "no property found"
if "no property" in html.lower()[:5000] or "not found" in html.lower()[:5000]:
result["errors"].append("parcel not found in PA records")
return result
# Extract flat text
text_extractor = _TextExtractor()
text_extractor.feed(html)
flat = " ".join(text_extractor.parts)
# Extract tables
tbl_extractor = _TableExtractor()
tbl_extractor.feed(html)
# ─── Parse scalars from flat text ──────────────────────────────────────
# Owner Name DERMYSHI IRFAN Property Control Number ...
m = re.search(r"Owner Name\s+([A-Z][A-Z\s,'.\-&]+?)(?=\s+(?:Property Control|Mailing|Current|Tax|Subdivision|Total))",
flat)
if m:
result["owner_name"] = m.group(1).strip()
# Property Control Number — formatted as XX-XX-XX-XX-XX-XXX-XXXX
m = re.search(r"Property Control Number\s+([\d\-]+)", flat)
if m:
result["parcel_id"] = m.group(1).strip()
else:
result["parcel_id"] = parcel_id
# Year Built
m = re.search(r"Year Built\s+(\d{4})", flat)
if m:
result["year_built"] = int(m.group(1))
# Beds / Baths
m = re.search(r"Bed\s*Rooms\s+(\d+)", flat, re.IGNORECASE)
if m:
result["bedrooms"] = int(m.group(1))
m = re.search(r"Full Baths\s+(\d+)", flat, re.IGNORECASE)
full_b = int(m.group(1)) if m else 0
m = re.search(r"Half Baths\s+(\d+)", flat, re.IGNORECASE)
half_b = int(m.group(1)) if m else 0
if full_b or half_b:
result["baths"] = float(full_b) + (0.5 * half_b)
result["baths_full"] = full_b
result["baths_half"] = half_b
# Square footage
m = re.search(r"Total Square Footage\s+(\d[\d,]*)", flat) or re.search(r"Square Footage\s+(\d[\d,]*)", flat)
if m:
result["sqft_total"] = _to_int(m.group(1))
m = re.search(r"Area Under Air\s+(\d[\d,]*)", flat)
if m:
result["sqft_heated"] = _to_int(m.group(1))
# Lot acres
m = re.search(r"Acres\s+([\d.]+)", flat)
if m:
try:
result["lot_acres"] = float(m.group(1))
except ValueError:
pass
# Property Use Code + Zoning
m = re.search(r"Property Use Code\s+([\w\d\?\.\-\s]+?)(?:\s+Zoning)", flat)
if m:
result["use_code"] = m.group(1).replace("?", " - ").strip()
m = re.search(r"Zoning\s+([\w\d\?\-]+?(?:\s+\([^)]+\))?)", flat)
if m:
result["zoning"] = m.group(1).replace("?", " - ").strip()
# Subdivision
m = re.search(r"Subdivision\s+([A-Z0-9 ,'.\-]+?)(?=\s+Official Records|Sale Date|Legal Description|$)", flat)
if m:
sub = m.group(1).strip()
result["subdivision"] = sub if sub else None
# Legal description
m = re.search(r"Legal Description\s+([^\n]+?)(?=\s+Show Full Map|Show More|Nearby|Owner INFORMATION|$)", flat)
if m:
result["legal_description"] = m.group(1).strip()[:300]
# Roof / interior info
for label, key in [
("Air Condition Desc.", "ac_description"),
("Heat Type", "heat_type"),
("Heat Fuel", "heat_fuel"),
("Roof Structure", "roof_struct"),
("Roof Cover", "roof_cover"),
("Interior Wall 1", "interior_wall"),
]:
m = re.search(rf"{re.escape(label)}\s+([A-Z][A-Z &/\-]+?)(?=\s+[A-Z][a-z])", flat)
if m:
result[key] = m.group(1).strip()
# Site Address (Property address line)
# PB format: addresses are usually shown after "Location Address" header
m = re.search(r"Location Address\s+([^\n]+?)(?=\s+Subdivision|Owner|Property Use|$)", flat)
if m:
result["site_address"] = m.group(1).strip()
# Homestead detection: "Current Homestead" or "Homestead Exemption"
# Easier: check if exemption appears in benefits section
result["homestead_active"] = bool(
re.search(r"Homestead Exemption\s+\$[\d,]+|Current Homestead\s*Yes",
flat, re.IGNORECASE)
)
# ─── Tax/Assessment values from tables ─────────────────────────────────
# Look for table with rows like: "Tax Year 2025 2024 2023 ..."
# "Total Market Value $758,298 $762,232 ..."
# "Total Assessed Value ..."
tax_years = []
market_vals: dict[str, int] = {}
assessed_vals: dict[str, int] = {}
improvement_vals: dict[str, int] = {}
for tbl in tbl_extractor.tables:
for row in tbl:
if not row:
continue
first = row[0].lower() if row else ""
if first == "tax year":
tax_years = [c for c in row[1:] if c]
elif "market value" in first or "total market" in first:
for i, v in enumerate(row[1:]):
if i < len(tax_years):
market_vals[tax_years[i]] = _money_to_int(v) or 0
elif first == "assessed value" or "total assessed" in first:
for i, v in enumerate(row[1:]):
if i < len(tax_years):
assessed_vals[tax_years[i]] = _money_to_int(v) or 0
elif "improvement value" in first:
for i, v in enumerate(row[1:]):
if i < len(tax_years):
improvement_vals[tax_years[i]] = _money_to_int(v) or 0
# Pick most recent year
valid_years = sorted([y for y in tax_years if y.isdigit()], reverse=True)
current_year = valid_years[0] if valid_years else None
last_year = valid_years[1] if len(valid_years) > 1 else None
result["just_value_current"] = market_vals.get(current_year) if current_year else None
result["assessed_value_current"] = assessed_vals.get(current_year) if current_year else None
result["just_value_last"] = market_vals.get(last_year) if last_year else None
result["assessed_value_last"] = assessed_vals.get(last_year) if last_year else None
result["tax_year_current"] = int(current_year) if current_year else None
result["tax_year_last"] = int(last_year) if last_year else None
result["assessment_history"] = {
"market": market_vals,
"assessed": assessed_vals,
"improvement": improvement_vals,
}
# ─── Sales history from tables ─────────────────────────────────────────
sales: list[dict] = []
for tbl in tbl_extractor.tables:
if not tbl or len(tbl) < 2:
continue
hdr = [c.lower() for c in tbl[0]]
# Sales table heuristic: header has "Sale[s] Date" and "Price".
# PB uses "Sales Date" (with 's'), some sites use "Sale Date".
has_sale_date = any(("sale date" in h or "sales date" in h) for h in hdr)
if has_sale_date and any("price" in h for h in hdr):
idx_date = next((i for i, h in enumerate(hdr)
if "sale date" in h or "sales date" in h), -1)
idx_price = next((i for i, h in enumerate(hdr) if "price" in h), -1)
idx_book = next((i for i, h in enumerate(hdr) if "book" in h or h.startswith("or")), -1)
idx_qual = next((i for i, h in enumerate(hdr)
if "qualified" in h or h == "sale type" or h == "type"), -1)
for row in tbl[1:]:
if len(row) < 2:
continue
d = row[idx_date] if idx_date >= 0 and idx_date < len(row) else ""
p = row[idx_price] if idx_price >= 0 and idx_price < len(row) else ""
if not d and not p:
continue
qual_raw = row[idx_qual] if idx_qual >= 0 and idx_qual < len(row) else ""
price = _money_to_int(p)
# Palm Beach uses "Sale Type" not "qualified/disqualified".
# Treat WARRANTY DEED with price >= 50K as Qualified (typical PB convention).
# CERT OF TITLE = foreclosure deed = Unqualified.
# QUIT CLAIM with low price = Unqualified.
q_low = qual_raw.lower()
if "warranty deed" in q_low and (price or 0) >= 50000:
qualified_flag = "Qualified"
elif "qualified" in q_low and "disqualified" not in q_low:
qualified_flag = "Qualified"
else:
qualified_flag = "Unqualified"
sales.append({
"date": d,
"price": price,
"book_page": row[idx_book] if idx_book >= 0 and idx_book < len(row) else "",
"qualification": qual_raw,
"deed_type": qual_raw,
"qualified": qualified_flag,
})
result["sales_history"] = sales
# Most recent qualified sale
qualified = [s for s in sales
if s.get("qualified", "").startswith("Qualified")
and s.get("price", 0) and s["price"] >= 1000]
result["most_recent_qualified_sale"] = qualified[0] if qualified else None
# Renovation signal
from data_fetchers.pa_duval import _detect_renovation_pattern
result["renovation_signal"] = _detect_renovation_pattern(
sales, listing_price=listing_price,
)
return result
# ════════════════════════════════════════════════════════════════════════════
# CLI
# ════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="Palm Beach PA full record fetcher")
parser.add_argument("--parcel", required=True, help="PCN (e.g. '00414232000003080')")
args = parser.parse_args()
rec = fetch_palm_beach_pa_record(parcel_id=args.parcel)
print(json.dumps(rec, indent=2, default=str))