"""Carga y validación de la biblioteca curada. Funciones públicas: - `load_systems_catalog()` — devuelve el dict del catálogo maestro - `load_library(root=None)` — carga TODA la biblioteca y devuelve un `LibraryLoadResult` con vessels, equipment, rules y la lista de issues. El loader es defensivo: NO levanta excepciones por archivos individuales malos. Acumula issues y deja al caller decidir si abortar. """ from __future__ import annotations import json from dataclasses import dataclass, field from pathlib import Path from typing import Any import yaml from pydantic import ValidationError from vmssailor.core.equipment import EquipmentModel from vmssailor.core.vessel import Vessel @dataclass(slots=True) class LibraryIssue: """Un problema encontrado al cargar la biblioteca.""" severity: str # "error" | "warning" | "info" path: str message: str def __str__(self) -> str: return f"{self.severity.upper():7s} {self.path:60s} {self.message}" @dataclass(slots=True) class LibraryLoadResult: """Resultado de cargar la biblioteca completa.""" vessels: list[Vessel] = field(default_factory=list) equipment_models: list[EquipmentModel] = field(default_factory=list) rules: dict[str, dict[str, Any]] = field(default_factory=dict) systems_catalog: dict[str, Any] = field(default_factory=dict) issues: list[LibraryIssue] = field(default_factory=list) @property def errors(self) -> list[LibraryIssue]: return [i for i in self.issues if i.severity == "error"] @property def warnings(self) -> list[LibraryIssue]: return [i for i in self.issues if i.severity == "warning"] def ok(self) -> bool: return len(self.errors) == 0 def format(self) -> str: lines: list[str] = [] lines.append("Library load summary:") lines.append(f" Vessels : {len(self.vessels)}") lines.append(f" EquipmentModels: {len(self.equipment_models)}") lines.append(f" Rules : {len(self.rules)}") lines.append(f" Issues : {len(self.errors)} errors, {len(self.warnings)} warnings") if self.issues: lines.append("") for i in self.issues: lines.append(f" {i}") return "\n".join(lines) def load_systems_catalog(root: Path | None = None) -> dict[str, Any]: """Carga el catálogo maestro de sistemas.""" base = root or _default_root() catalog_path = base / "systems_catalog.json" if not catalog_path.exists(): raise FileNotFoundError(f"systems_catalog.json no encontrado en {base}") return json.loads(catalog_path.read_text(encoding="utf-8")) def load_library(root: Path | None = None) -> LibraryLoadResult: """Carga la biblioteca completa y devuelve un `LibraryLoadResult`.""" base = root or _default_root() result = LibraryLoadResult() # systems_catalog try: result.systems_catalog = load_systems_catalog(base) except Exception as exc: result.issues.append( LibraryIssue("error", "systems_catalog.json", f"{type(exc).__name__}: {exc}") ) # vessels/ vessels_dir = base / "vessels" if vessels_dir.exists(): for f in sorted(vessels_dir.glob("*.json")): try: data = json.loads(f.read_text(encoding="utf-8")) v = Vessel(**data) result.vessels.append(v) if v.data_source == "seed_estimate": result.issues.append( LibraryIssue( "info", str(f.relative_to(base)), "data_source=seed_estimate — requiere validación de Álvaro.", ) ) except ValidationError as exc: result.issues.append( LibraryIssue("error", str(f.relative_to(base)), f"Pydantic: {exc.errors()}") ) except Exception as exc: result.issues.append( LibraryIssue( "error", str(f.relative_to(base)), f"{type(exc).__name__}: {exc}" ) ) # equipment/**/*.json eq_dir = base / "equipment" if eq_dir.exists(): for f in sorted(eq_dir.glob("**/*.json")): try: data = json.loads(f.read_text(encoding="utf-8")) em = EquipmentModel(**data) result.equipment_models.append(em) if em.data_source == "seed_estimate": result.issues.append( LibraryIssue( "info", str(f.relative_to(base)), "data_source=seed_estimate — requiere validación.", ) ) except ValidationError as exc: result.issues.append( LibraryIssue("error", str(f.relative_to(base)), f"Pydantic: {exc.errors()}") ) except Exception as exc: result.issues.append( LibraryIssue( "error", str(f.relative_to(base)), f"{type(exc).__name__}: {exc}" ) ) # rules/*.yaml rules_dir = base / "rules" if rules_dir.exists(): for f in sorted(rules_dir.glob("*.yaml")): try: data = yaml.safe_load(f.read_text(encoding="utf-8")) or {} meta = data.get("meta") or {} rule_id = meta.get("rule_id") or f.stem result.rules[rule_id] = data except Exception as exc: result.issues.append( LibraryIssue( "error", str(f.relative_to(base)), f"{type(exc).__name__}: {exc}" ) ) # Validación cross-references _validate_cross_refs(result) return result def _validate_cross_refs(result: LibraryLoadResult) -> None: """Chequea referencias entre rules → equipment models → systems.""" em_ids = {em.id for em in result.equipment_models} valid_systems = set() for cat in result.systems_catalog.get("categories", []): for s in cat.get("systems", []): valid_systems.add(s["id"]) # Chequear que equipment.typical_systems referencien sistemas válidos for em in result.equipment_models: for sys_id in em.typical_systems: if sys_id.value not in valid_systems: result.issues.append( LibraryIssue( "error", f"equipment/.../{em.id}.json", f"typical_systems contiene '{sys_id.value}' que no existe en " "systems_catalog.json.", ) ) # Chequear que rules referencien EquipmentModel.id existentes for rule_id, rule_data in result.rules.items(): proposals = rule_data.get("equipment_proposals") or {} for _sys_id, sys_proposals in proposals.items(): if not isinstance(sys_proposals, dict): continue for cand in sys_proposals.get("candidates", []) or []: model_ref = cand.get("model_ref") if model_ref and model_ref not in em_ids: result.issues.append( LibraryIssue( "warning", f"rules/{rule_id}.yaml", f"Rule referencia model_ref='{model_ref}' que no existe en " "equipment_models cargados.", ) ) def _default_root() -> Path: return Path(__file__).parent