410 lines
17 KiB
Python
410 lines
17 KiB
Python
"""data_fetchers/pa_palm_beach.py — Full Palm Beach PA extractor.
|
|
|
|
Sitio: https://pbcpao.gov (server-rendered HTML + jQuery, no SPA)
|
|
Deep link: /Property/Details?parcelId={parcelId}
|
|
|
|
VENTAJA: NO necesita Playwright. urllib + HTMLParser stdlib = rapidisimo.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from html.parser import HTMLParser
|
|
from typing import Optional
|
|
|
|
|
|
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/131"
|
|
_BASE_URL = "https://pbcpao.gov"
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# HTML text extractor (skip script/style)
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
class _TextExtractor(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.in_skip = False
|
|
self.parts: list[str] = []
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag in ("script", "style", "noscript"):
|
|
self.in_skip = True
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag in ("script", "style", "noscript"):
|
|
self.in_skip = False
|
|
|
|
def handle_data(self, d):
|
|
if not self.in_skip:
|
|
t = d.strip()
|
|
if t:
|
|
self.parts.append(t)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# Tables extractor (table → list of rows)
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
class _TableExtractor(HTMLParser):
|
|
"""Extracts all tables as list of {idx, rows: [[cells]]} dicts."""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.in_table = False
|
|
self.in_tr = False
|
|
self.in_cell = False
|
|
self.in_skip = False
|
|
self.current_row: list[str] = []
|
|
self.current_cell = ""
|
|
self.current_table: list[list[str]] = []
|
|
self.tables: list[list[list[str]]] = []
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag in ("script", "style"):
|
|
self.in_skip = True
|
|
elif tag == "table":
|
|
self.in_table = True
|
|
self.current_table = []
|
|
elif tag == "tr" and self.in_table:
|
|
self.in_tr = True
|
|
self.current_row = []
|
|
elif tag in ("td", "th") and self.in_tr:
|
|
self.in_cell = True
|
|
self.current_cell = ""
|
|
elif tag == "br" and self.in_cell:
|
|
self.current_cell += " "
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag in ("script", "style"):
|
|
self.in_skip = False
|
|
elif tag == "table":
|
|
if self.current_table:
|
|
self.tables.append(self.current_table)
|
|
self.in_table = False
|
|
elif tag == "tr" and self.in_tr:
|
|
if self.current_row:
|
|
self.current_table.append(self.current_row)
|
|
self.in_tr = False
|
|
elif tag in ("td", "th") and self.in_cell:
|
|
self.current_row.append(re.sub(r"\s+", " ", self.current_cell).strip())
|
|
self.in_cell = False
|
|
|
|
def handle_data(self, d):
|
|
if self.in_cell and not self.in_skip:
|
|
self.current_cell += d
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# Helpers
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _grab_after(text: str, label: str, max_len: int = 80) -> Optional[str]:
|
|
"""Find label in flat text, return the next non-empty token cluster."""
|
|
if not text or not label:
|
|
return None
|
|
idx = text.find(label)
|
|
if idx < 0:
|
|
return None
|
|
after = text[idx + len(label): idx + len(label) + max_len].strip()
|
|
# Take up to next " ", " ", end-of-line, or "Property" / "Address" etc.
|
|
# First word/phrase = value until next CAPITALIZED label pattern
|
|
m = re.match(r"\s*([^\n]+?)(?:\s{2,}|\s+[A-Z][A-Z\s]+\s+[A-Za-z]+|$)", after)
|
|
if m:
|
|
return m.group(1).strip()
|
|
return after.split("\n")[0].strip()
|
|
|
|
|
|
def _to_int(s) -> Optional[int]:
|
|
if not s:
|
|
return None
|
|
cleaned = re.sub(r"[^\d-]", "", str(s))
|
|
try:
|
|
return int(cleaned) if cleaned else None
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _money_to_int(s) -> Optional[int]:
|
|
if not s:
|
|
return None
|
|
cleaned = re.sub(r"[^\d.-]", "", str(s))
|
|
if not cleaned or cleaned == "-":
|
|
return None
|
|
try:
|
|
return int(float(cleaned))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# Public API
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
def fetch_palm_beach_pa_record(
|
|
parcel_id: str,
|
|
timeout_seconds: int = 30,
|
|
listing_price: Optional[float] = None,
|
|
) -> dict:
|
|
"""Fetch full Palm Beach PA record by parcel_id (PCN).
|
|
|
|
Args:
|
|
parcel_id: 17-digit PCN (e.g. "00414232000003080") or formatted with dashes
|
|
timeout_seconds: HTTP timeout
|
|
listing_price: enables flip-in-progress detection
|
|
|
|
Returns: rich dict (unified schema) with errors list.
|
|
"""
|
|
fetched_at = datetime.now(timezone.utc).isoformat()
|
|
result = {
|
|
"county": "Palm Beach",
|
|
"source": "Palm Beach County Property Appraiser (pbcpao.gov)",
|
|
"fetched_at": fetched_at,
|
|
"errors": [],
|
|
}
|
|
|
|
if not parcel_id:
|
|
result["errors"].append("no parcel_id provided")
|
|
return result
|
|
|
|
pcn_clean = parcel_id.replace("-", "").strip()
|
|
url = f"{_BASE_URL}/Property/Details?parcelId={pcn_clean}"
|
|
result["source_url"] = url
|
|
|
|
# HTTP fetch (no Playwright)
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
|
|
with urllib.request.urlopen(req, timeout=timeout_seconds) as resp:
|
|
html = resp.read().decode("utf-8", errors="ignore")
|
|
except Exception as e:
|
|
result["errors"].append(f"HTTP fetch failed: {type(e).__name__}: {e}")
|
|
return result
|
|
|
|
# Detect "no property found"
|
|
if "no property" in html.lower()[:5000] or "not found" in html.lower()[:5000]:
|
|
result["errors"].append("parcel not found in PA records")
|
|
return result
|
|
|
|
# Extract flat text
|
|
text_extractor = _TextExtractor()
|
|
text_extractor.feed(html)
|
|
flat = " ".join(text_extractor.parts)
|
|
|
|
# Extract tables
|
|
tbl_extractor = _TableExtractor()
|
|
tbl_extractor.feed(html)
|
|
|
|
# ─── Parse scalars from flat text ──────────────────────────────────────
|
|
# Owner Name DERMYSHI IRFAN Property Control Number ...
|
|
m = re.search(r"Owner Name\s+([A-Z][A-Z\s,'.\-&]+?)(?=\s+(?:Property Control|Mailing|Current|Tax|Subdivision|Total))",
|
|
flat)
|
|
if m:
|
|
result["owner_name"] = m.group(1).strip()
|
|
|
|
# Property Control Number — formatted as XX-XX-XX-XX-XX-XXX-XXXX
|
|
m = re.search(r"Property Control Number\s+([\d\-]+)", flat)
|
|
if m:
|
|
result["parcel_id"] = m.group(1).strip()
|
|
else:
|
|
result["parcel_id"] = parcel_id
|
|
|
|
# Year Built
|
|
m = re.search(r"Year Built\s+(\d{4})", flat)
|
|
if m:
|
|
result["year_built"] = int(m.group(1))
|
|
|
|
# Beds / Baths
|
|
m = re.search(r"Bed\s*Rooms\s+(\d+)", flat, re.IGNORECASE)
|
|
if m:
|
|
result["bedrooms"] = int(m.group(1))
|
|
m = re.search(r"Full Baths\s+(\d+)", flat, re.IGNORECASE)
|
|
full_b = int(m.group(1)) if m else 0
|
|
m = re.search(r"Half Baths\s+(\d+)", flat, re.IGNORECASE)
|
|
half_b = int(m.group(1)) if m else 0
|
|
if full_b or half_b:
|
|
result["baths"] = float(full_b) + (0.5 * half_b)
|
|
result["baths_full"] = full_b
|
|
result["baths_half"] = half_b
|
|
|
|
# Square footage
|
|
m = re.search(r"Total Square Footage\s+(\d[\d,]*)", flat) or re.search(r"Square Footage\s+(\d[\d,]*)", flat)
|
|
if m:
|
|
result["sqft_total"] = _to_int(m.group(1))
|
|
m = re.search(r"Area Under Air\s+(\d[\d,]*)", flat)
|
|
if m:
|
|
result["sqft_heated"] = _to_int(m.group(1))
|
|
|
|
# Lot acres
|
|
m = re.search(r"Acres\s+([\d.]+)", flat)
|
|
if m:
|
|
try:
|
|
result["lot_acres"] = float(m.group(1))
|
|
except ValueError:
|
|
pass
|
|
|
|
# Property Use Code + Zoning
|
|
m = re.search(r"Property Use Code\s+([\w\d\?\.\-\s]+?)(?:\s+Zoning)", flat)
|
|
if m:
|
|
result["use_code"] = m.group(1).replace("?", " - ").strip()
|
|
m = re.search(r"Zoning\s+([\w\d\?\-]+?(?:\s+\([^)]+\))?)", flat)
|
|
if m:
|
|
result["zoning"] = m.group(1).replace("?", " - ").strip()
|
|
|
|
# Subdivision
|
|
m = re.search(r"Subdivision\s+([A-Z0-9 ,'.\-]+?)(?=\s+Official Records|Sale Date|Legal Description|$)", flat)
|
|
if m:
|
|
sub = m.group(1).strip()
|
|
result["subdivision"] = sub if sub else None
|
|
|
|
# Legal description
|
|
m = re.search(r"Legal Description\s+([^\n]+?)(?=\s+Show Full Map|Show More|Nearby|Owner INFORMATION|$)", flat)
|
|
if m:
|
|
result["legal_description"] = m.group(1).strip()[:300]
|
|
|
|
# Roof / interior info
|
|
for label, key in [
|
|
("Air Condition Desc.", "ac_description"),
|
|
("Heat Type", "heat_type"),
|
|
("Heat Fuel", "heat_fuel"),
|
|
("Roof Structure", "roof_struct"),
|
|
("Roof Cover", "roof_cover"),
|
|
("Interior Wall 1", "interior_wall"),
|
|
]:
|
|
m = re.search(rf"{re.escape(label)}\s+([A-Z][A-Z &/\-]+?)(?=\s+[A-Z][a-z])", flat)
|
|
if m:
|
|
result[key] = m.group(1).strip()
|
|
|
|
# Site Address (Property address line)
|
|
# PB format: addresses are usually shown after "Location Address" header
|
|
m = re.search(r"Location Address\s+([^\n]+?)(?=\s+Subdivision|Owner|Property Use|$)", flat)
|
|
if m:
|
|
result["site_address"] = m.group(1).strip()
|
|
|
|
# Homestead detection: "Current Homestead" or "Homestead Exemption"
|
|
# Easier: check if exemption appears in benefits section
|
|
result["homestead_active"] = bool(
|
|
re.search(r"Homestead Exemption\s+\$[\d,]+|Current Homestead\s*Yes",
|
|
flat, re.IGNORECASE)
|
|
)
|
|
|
|
# ─── Tax/Assessment values from tables ─────────────────────────────────
|
|
# Look for table with rows like: "Tax Year 2025 2024 2023 ..."
|
|
# "Total Market Value $758,298 $762,232 ..."
|
|
# "Total Assessed Value ..."
|
|
tax_years = []
|
|
market_vals: dict[str, int] = {}
|
|
assessed_vals: dict[str, int] = {}
|
|
improvement_vals: dict[str, int] = {}
|
|
for tbl in tbl_extractor.tables:
|
|
for row in tbl:
|
|
if not row:
|
|
continue
|
|
first = row[0].lower() if row else ""
|
|
if first == "tax year":
|
|
tax_years = [c for c in row[1:] if c]
|
|
elif "market value" in first or "total market" in first:
|
|
for i, v in enumerate(row[1:]):
|
|
if i < len(tax_years):
|
|
market_vals[tax_years[i]] = _money_to_int(v) or 0
|
|
elif first == "assessed value" or "total assessed" in first:
|
|
for i, v in enumerate(row[1:]):
|
|
if i < len(tax_years):
|
|
assessed_vals[tax_years[i]] = _money_to_int(v) or 0
|
|
elif "improvement value" in first:
|
|
for i, v in enumerate(row[1:]):
|
|
if i < len(tax_years):
|
|
improvement_vals[tax_years[i]] = _money_to_int(v) or 0
|
|
|
|
# Pick most recent year
|
|
valid_years = sorted([y for y in tax_years if y.isdigit()], reverse=True)
|
|
current_year = valid_years[0] if valid_years else None
|
|
last_year = valid_years[1] if len(valid_years) > 1 else None
|
|
|
|
result["just_value_current"] = market_vals.get(current_year) if current_year else None
|
|
result["assessed_value_current"] = assessed_vals.get(current_year) if current_year else None
|
|
result["just_value_last"] = market_vals.get(last_year) if last_year else None
|
|
result["assessed_value_last"] = assessed_vals.get(last_year) if last_year else None
|
|
result["tax_year_current"] = int(current_year) if current_year else None
|
|
result["tax_year_last"] = int(last_year) if last_year else None
|
|
result["assessment_history"] = {
|
|
"market": market_vals,
|
|
"assessed": assessed_vals,
|
|
"improvement": improvement_vals,
|
|
}
|
|
|
|
# ─── Sales history from tables ─────────────────────────────────────────
|
|
sales: list[dict] = []
|
|
for tbl in tbl_extractor.tables:
|
|
if not tbl or len(tbl) < 2:
|
|
continue
|
|
hdr = [c.lower() for c in tbl[0]]
|
|
# Sales table heuristic: header has "Sale[s] Date" and "Price".
|
|
# PB uses "Sales Date" (with 's'), some sites use "Sale Date".
|
|
has_sale_date = any(("sale date" in h or "sales date" in h) for h in hdr)
|
|
if has_sale_date and any("price" in h for h in hdr):
|
|
idx_date = next((i for i, h in enumerate(hdr)
|
|
if "sale date" in h or "sales date" in h), -1)
|
|
idx_price = next((i for i, h in enumerate(hdr) if "price" in h), -1)
|
|
idx_book = next((i for i, h in enumerate(hdr) if "book" in h or h.startswith("or")), -1)
|
|
idx_qual = next((i for i, h in enumerate(hdr)
|
|
if "qualified" in h or h == "sale type" or h == "type"), -1)
|
|
for row in tbl[1:]:
|
|
if len(row) < 2:
|
|
continue
|
|
d = row[idx_date] if idx_date >= 0 and idx_date < len(row) else ""
|
|
p = row[idx_price] if idx_price >= 0 and idx_price < len(row) else ""
|
|
if not d and not p:
|
|
continue
|
|
qual_raw = row[idx_qual] if idx_qual >= 0 and idx_qual < len(row) else ""
|
|
price = _money_to_int(p)
|
|
# Palm Beach uses "Sale Type" not "qualified/disqualified".
|
|
# Treat WARRANTY DEED with price >= 50K as Qualified (typical PB convention).
|
|
# CERT OF TITLE = foreclosure deed = Unqualified.
|
|
# QUIT CLAIM with low price = Unqualified.
|
|
q_low = qual_raw.lower()
|
|
if "warranty deed" in q_low and (price or 0) >= 50000:
|
|
qualified_flag = "Qualified"
|
|
elif "qualified" in q_low and "disqualified" not in q_low:
|
|
qualified_flag = "Qualified"
|
|
else:
|
|
qualified_flag = "Unqualified"
|
|
sales.append({
|
|
"date": d,
|
|
"price": price,
|
|
"book_page": row[idx_book] if idx_book >= 0 and idx_book < len(row) else "",
|
|
"qualification": qual_raw,
|
|
"deed_type": qual_raw,
|
|
"qualified": qualified_flag,
|
|
})
|
|
result["sales_history"] = sales
|
|
|
|
# Most recent qualified sale
|
|
qualified = [s for s in sales
|
|
if s.get("qualified", "").startswith("Qualified")
|
|
and s.get("price", 0) and s["price"] >= 1000]
|
|
result["most_recent_qualified_sale"] = qualified[0] if qualified else None
|
|
|
|
# Renovation signal
|
|
from data_fetchers.pa_duval import _detect_renovation_pattern
|
|
result["renovation_signal"] = _detect_renovation_pattern(
|
|
sales, listing_price=listing_price,
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
# CLI
|
|
# ════════════════════════════════════════════════════════════════════════════
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
import json
|
|
|
|
parser = argparse.ArgumentParser(description="Palm Beach PA full record fetcher")
|
|
parser.add_argument("--parcel", required=True, help="PCN (e.g. '00414232000003080')")
|
|
args = parser.parse_args()
|
|
|
|
rec = fetch_palm_beach_pa_record(parcel_id=args.parcel)
|
|
print(json.dumps(rec, indent=2, default=str))
|