deb04c9315
Sprint 0 completo del producto VMS-Sailor (Vessel Management System integrado para buques 30-40m). Brief de referencia en VMS_Sailor_v2_Parte_*.md (intacto). Core (vmssailor.core, 95.17% coverage, 99 tests verde): - ShipCoord: sistema naval x_pp/y_cl/z_bl frozen - Vessel, Deck, Bulkhead - Equipment, EquipmentModel, Sensor, EquipmentSpec - Tag, AlarmConfig, TagBinding, Scaling - CardInstance, Bus, Topology con validacion 21 puntos I/O AR-NMEA-IO-v1.0 - Alarm, PermissiveRule, Condition - Project agregado raiz con validacion cross-entity - Persistencia portable .vmsproj (SQLite) con roundtrip verificable Biblioteca curada seed (vmssailor.library): - systems_catalog.json completo (catalogo maestro Parte 1 sec 7) - 2 vessels: Sunseeker 76, Ferretti 850 - 2 motores: MTU 12V 2000 M96, Volvo D13-900 - 1 genset: Northern Lights M65C13 - yacht_motor_planeo.yaml (reglas heuristicas) - TODO marcado data_source=seed_estimate - requiere validacion datasheets Tools: - vms-validate-library: CLI valida biblioteca completa - vms-generate-test-project: CLI demo + verificacion roundtrip persistencia Design System + 8 mockups HTML estaticos: - docs/design_system.md (paleta Deep Ocean, gradientes, typography, motion) - docs/brand/ (logo + variantes SVG) - docs/mockups/splash, studio_main, runtime_overview, runtime_mimic_fuel (P&ID animado), runtime_alarms, runtime_trim (panel estrella con horizonte artificial), mobile_overview, mobile_trim - docs/mockups/index.html (galeria) Firmware (Sprint 12+ implementacion): - firmware/ar_nmea_io_v1/src/config/pinout.h con macros GPIO Decisiones autonomas documentadas en docs/decisions_sprint0.md. Stack: Python 3.11 + uv + Pydantic v2 + SQLite stdlib + hatchling + pytest 9 + ruff + mypy. Sin PySide6, FastAPI, Flutter ni firmware funcional (entran en sprints siguientes). Criterio de aceptacion Sprint 0: cumplido. - uv sync: OK - pytest: 99/99 verde - cov vmssailor.core: 95.17% (objetivo >=80%) - ruff: clean - vms-validate-library: OK - vms-generate-test-project: INTEGRIDAD OK Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
215 lines
7.8 KiB
Python
215 lines
7.8 KiB
Python
"""Carga y validación de la biblioteca curada.
|
|
|
|
Funciones públicas:
|
|
|
|
- `load_systems_catalog()` — devuelve el dict del catálogo maestro
|
|
- `load_library(root=None)` — carga TODA la biblioteca y devuelve un
|
|
`LibraryLoadResult` con vessels, equipment,
|
|
rules y la lista de issues.
|
|
|
|
El loader es defensivo: NO levanta excepciones por archivos individuales
|
|
malos. Acumula issues y deja al caller decidir si abortar.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
from pydantic import ValidationError
|
|
|
|
from vmssailor.core.equipment import EquipmentModel
|
|
from vmssailor.core.vessel import Vessel
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class LibraryIssue:
|
|
"""Un problema encontrado al cargar la biblioteca."""
|
|
|
|
severity: str # "error" | "warning" | "info"
|
|
path: str
|
|
message: str
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.severity.upper():7s} {self.path:60s} {self.message}"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class LibraryLoadResult:
|
|
"""Resultado de cargar la biblioteca completa."""
|
|
|
|
vessels: list[Vessel] = field(default_factory=list)
|
|
equipment_models: list[EquipmentModel] = field(default_factory=list)
|
|
rules: dict[str, dict[str, Any]] = field(default_factory=dict)
|
|
systems_catalog: dict[str, Any] = field(default_factory=dict)
|
|
issues: list[LibraryIssue] = field(default_factory=list)
|
|
|
|
@property
|
|
def errors(self) -> list[LibraryIssue]:
|
|
return [i for i in self.issues if i.severity == "error"]
|
|
|
|
@property
|
|
def warnings(self) -> list[LibraryIssue]:
|
|
return [i for i in self.issues if i.severity == "warning"]
|
|
|
|
def ok(self) -> bool:
|
|
return len(self.errors) == 0
|
|
|
|
def format(self) -> str:
|
|
lines: list[str] = []
|
|
lines.append("Library load summary:")
|
|
lines.append(f" Vessels : {len(self.vessels)}")
|
|
lines.append(f" EquipmentModels: {len(self.equipment_models)}")
|
|
lines.append(f" Rules : {len(self.rules)}")
|
|
lines.append(f" Issues : {len(self.errors)} errors, {len(self.warnings)} warnings")
|
|
if self.issues:
|
|
lines.append("")
|
|
for i in self.issues:
|
|
lines.append(f" {i}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def load_systems_catalog(root: Path | None = None) -> dict[str, Any]:
|
|
"""Carga el catálogo maestro de sistemas."""
|
|
base = root or _default_root()
|
|
catalog_path = base / "systems_catalog.json"
|
|
if not catalog_path.exists():
|
|
raise FileNotFoundError(f"systems_catalog.json no encontrado en {base}")
|
|
return json.loads(catalog_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def load_library(root: Path | None = None) -> LibraryLoadResult:
|
|
"""Carga la biblioteca completa y devuelve un `LibraryLoadResult`."""
|
|
base = root or _default_root()
|
|
result = LibraryLoadResult()
|
|
|
|
# systems_catalog
|
|
try:
|
|
result.systems_catalog = load_systems_catalog(base)
|
|
except Exception as exc:
|
|
result.issues.append(
|
|
LibraryIssue("error", "systems_catalog.json", f"{type(exc).__name__}: {exc}")
|
|
)
|
|
|
|
# vessels/
|
|
vessels_dir = base / "vessels"
|
|
if vessels_dir.exists():
|
|
for f in sorted(vessels_dir.glob("*.json")):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8"))
|
|
v = Vessel(**data)
|
|
result.vessels.append(v)
|
|
if v.data_source == "seed_estimate":
|
|
result.issues.append(
|
|
LibraryIssue(
|
|
"info",
|
|
str(f.relative_to(base)),
|
|
"data_source=seed_estimate — requiere validación de Álvaro.",
|
|
)
|
|
)
|
|
except ValidationError as exc:
|
|
result.issues.append(
|
|
LibraryIssue("error", str(f.relative_to(base)), f"Pydantic: {exc.errors()}")
|
|
)
|
|
except Exception as exc:
|
|
result.issues.append(
|
|
LibraryIssue(
|
|
"error", str(f.relative_to(base)), f"{type(exc).__name__}: {exc}"
|
|
)
|
|
)
|
|
|
|
# equipment/**/*.json
|
|
eq_dir = base / "equipment"
|
|
if eq_dir.exists():
|
|
for f in sorted(eq_dir.glob("**/*.json")):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8"))
|
|
em = EquipmentModel(**data)
|
|
result.equipment_models.append(em)
|
|
if em.data_source == "seed_estimate":
|
|
result.issues.append(
|
|
LibraryIssue(
|
|
"info",
|
|
str(f.relative_to(base)),
|
|
"data_source=seed_estimate — requiere validación.",
|
|
)
|
|
)
|
|
except ValidationError as exc:
|
|
result.issues.append(
|
|
LibraryIssue("error", str(f.relative_to(base)), f"Pydantic: {exc.errors()}")
|
|
)
|
|
except Exception as exc:
|
|
result.issues.append(
|
|
LibraryIssue(
|
|
"error", str(f.relative_to(base)), f"{type(exc).__name__}: {exc}"
|
|
)
|
|
)
|
|
|
|
# rules/*.yaml
|
|
rules_dir = base / "rules"
|
|
if rules_dir.exists():
|
|
for f in sorted(rules_dir.glob("*.yaml")):
|
|
try:
|
|
data = yaml.safe_load(f.read_text(encoding="utf-8")) or {}
|
|
meta = data.get("meta") or {}
|
|
rule_id = meta.get("rule_id") or f.stem
|
|
result.rules[rule_id] = data
|
|
except Exception as exc:
|
|
result.issues.append(
|
|
LibraryIssue(
|
|
"error", str(f.relative_to(base)), f"{type(exc).__name__}: {exc}"
|
|
)
|
|
)
|
|
|
|
# Validación cross-references
|
|
_validate_cross_refs(result)
|
|
|
|
return result
|
|
|
|
|
|
def _validate_cross_refs(result: LibraryLoadResult) -> None:
|
|
"""Chequea referencias entre rules → equipment models → systems."""
|
|
em_ids = {em.id for em in result.equipment_models}
|
|
valid_systems = set()
|
|
for cat in result.systems_catalog.get("categories", []):
|
|
for s in cat.get("systems", []):
|
|
valid_systems.add(s["id"])
|
|
|
|
# Chequear que equipment.typical_systems referencien sistemas válidos
|
|
for em in result.equipment_models:
|
|
for sys_id in em.typical_systems:
|
|
if sys_id.value not in valid_systems:
|
|
result.issues.append(
|
|
LibraryIssue(
|
|
"error",
|
|
f"equipment/.../{em.id}.json",
|
|
f"typical_systems contiene '{sys_id.value}' que no existe en "
|
|
"systems_catalog.json.",
|
|
)
|
|
)
|
|
|
|
# Chequear que rules referencien EquipmentModel.id existentes
|
|
for rule_id, rule_data in result.rules.items():
|
|
proposals = rule_data.get("equipment_proposals") or {}
|
|
for _sys_id, sys_proposals in proposals.items():
|
|
if not isinstance(sys_proposals, dict):
|
|
continue
|
|
for cand in sys_proposals.get("candidates", []) or []:
|
|
model_ref = cand.get("model_ref")
|
|
if model_ref and model_ref not in em_ids:
|
|
result.issues.append(
|
|
LibraryIssue(
|
|
"warning",
|
|
f"rules/{rule_id}.yaml",
|
|
f"Rule referencia model_ref='{model_ref}' que no existe en "
|
|
"equipment_models cargados.",
|
|
)
|
|
)
|
|
|
|
|
|
def _default_root() -> Path:
|
|
return Path(__file__).parent
|