deb04c9315
Sprint 0 completo del producto VMS-Sailor (Vessel Management System integrado para buques 30-40m). Brief de referencia en VMS_Sailor_v2_Parte_*.md (intacto). Core (vmssailor.core, 95.17% coverage, 99 tests verde): - ShipCoord: sistema naval x_pp/y_cl/z_bl frozen - Vessel, Deck, Bulkhead - Equipment, EquipmentModel, Sensor, EquipmentSpec - Tag, AlarmConfig, TagBinding, Scaling - CardInstance, Bus, Topology con validacion 21 puntos I/O AR-NMEA-IO-v1.0 - Alarm, PermissiveRule, Condition - Project agregado raiz con validacion cross-entity - Persistencia portable .vmsproj (SQLite) con roundtrip verificable Biblioteca curada seed (vmssailor.library): - systems_catalog.json completo (catalogo maestro Parte 1 sec 7) - 2 vessels: Sunseeker 76, Ferretti 850 - 2 motores: MTU 12V 2000 M96, Volvo D13-900 - 1 genset: Northern Lights M65C13 - yacht_motor_planeo.yaml (reglas heuristicas) - TODO marcado data_source=seed_estimate - requiere validacion datasheets Tools: - vms-validate-library: CLI valida biblioteca completa - vms-generate-test-project: CLI demo + verificacion roundtrip persistencia Design System + 8 mockups HTML estaticos: - docs/design_system.md (paleta Deep Ocean, gradientes, typography, motion) - docs/brand/ (logo + variantes SVG) - docs/mockups/splash, studio_main, runtime_overview, runtime_mimic_fuel (P&ID animado), runtime_alarms, runtime_trim (panel estrella con horizonte artificial), mobile_overview, mobile_trim - docs/mockups/index.html (galeria) Firmware (Sprint 12+ implementacion): - firmware/ar_nmea_io_v1/src/config/pinout.h con macros GPIO Decisiones autonomas documentadas en docs/decisions_sprint0.md. Stack: Python 3.11 + uv + Pydantic v2 + SQLite stdlib + hatchling + pytest 9 + ruff + mypy. Sin PySide6, FastAPI, Flutter ni firmware funcional (entran en sprints siguientes). Criterio de aceptacion Sprint 0: cumplido. - uv sync: OK - pytest: 99/99 verde - cov vmssailor.core: 95.17% (objetivo >=80%) - ruff: clean - vms-validate-library: OK - vms-generate-test-project: INTEGRIDAD OK Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
256 lines
8.9 KiB
Python
256 lines
8.9 KiB
Python
"""Validación cross-entity y reglas de negocio del proyecto.
|
|
|
|
Las validaciones intra-entidad viven en cada `BaseModel` (validators de
|
|
Pydantic). Aquí se agrupan chequeos que cruzan múltiples entidades o
|
|
que no son hard-errors sino *warnings* informativos para el integrador.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from enum import StrEnum
|
|
|
|
from vmssailor.core.enums import ChannelType, ControlMode
|
|
from vmssailor.core.project import Project
|
|
|
|
|
|
class Severity(StrEnum):
|
|
ERROR = "error"
|
|
WARNING = "warning"
|
|
INFO = "info"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ValidationIssue:
|
|
"""Un hallazgo de la validación cross-entity."""
|
|
|
|
severity: Severity
|
|
code: str
|
|
message: str
|
|
entity_id: str = ""
|
|
|
|
def __str__(self) -> str:
|
|
eid = f" [{self.entity_id}]" if self.entity_id else ""
|
|
return f"{self.severity.value.upper():7s} {self.code:24s} {self.message}{eid}"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ValidationReport:
|
|
"""Reporte agregado de validación de un Project."""
|
|
|
|
issues: list[ValidationIssue] = field(default_factory=list)
|
|
|
|
@property
|
|
def errors(self) -> list[ValidationIssue]:
|
|
return [i for i in self.issues if i.severity == Severity.ERROR]
|
|
|
|
@property
|
|
def warnings(self) -> list[ValidationIssue]:
|
|
return [i for i in self.issues if i.severity == Severity.WARNING]
|
|
|
|
@property
|
|
def infos(self) -> list[ValidationIssue]:
|
|
return [i for i in self.issues if i.severity == Severity.INFO]
|
|
|
|
def ok(self) -> bool:
|
|
return len(self.errors) == 0
|
|
|
|
def add(self, severity: Severity, code: str, message: str, entity_id: str = "") -> None:
|
|
self.issues.append(
|
|
ValidationIssue(
|
|
severity=severity, code=code, message=message, entity_id=entity_id
|
|
)
|
|
)
|
|
|
|
def format(self) -> str:
|
|
if not self.issues:
|
|
return "Validation: OK (no issues)."
|
|
lines = [str(i) for i in self.issues]
|
|
lines.append("")
|
|
lines.append(
|
|
f"Total: {len(self.errors)} errors, {len(self.warnings)} warnings, "
|
|
f"{len(self.infos)} info."
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def validate_project(project: Project) -> ValidationReport:
|
|
"""Validación cross-entity de un Project (warnings + errors informativos).
|
|
|
|
Las validaciones que son hard-errors (referencias rotas, IDs duplicados)
|
|
ya están en los model_validators de Pydantic y harán fallar la
|
|
construcción del objeto. Aquí se chequean reglas más laxas.
|
|
"""
|
|
report = ValidationReport()
|
|
|
|
_check_card_capacity_utilization(project, report)
|
|
_check_orphan_systems(project, report)
|
|
_check_controllable_tags_authority(project, report)
|
|
_check_tag_alarms_within_range(project, report)
|
|
_check_equipment_coords_within_vessel(project, report)
|
|
_check_unbound_modbus_tags(project, report)
|
|
|
|
return report
|
|
|
|
|
|
# --- chequeos específicos ---
|
|
|
|
|
|
def _check_card_capacity_utilization(project: Project, report: ValidationReport) -> None:
|
|
"""Avisa si una carta tiene canales del mismo tipo asignados que se solapan o exceden."""
|
|
# Cuenta de canales usados por (card_id, channel_type, channel_number)
|
|
seen: dict[tuple[str, ChannelType, int], str] = {}
|
|
for t in project.tags:
|
|
b = t.physical_binding
|
|
if b is None:
|
|
continue
|
|
key = (b.card_id, b.channel_type, b.channel_number)
|
|
if key in seen:
|
|
report.add(
|
|
Severity.ERROR,
|
|
"DUPLICATE_CHANNEL_BINDING",
|
|
f"Canal {b.channel_type.value}{b.channel_number} de la tarjeta "
|
|
f"'{b.card_id}' ya está asignado a tag '{seen[key]}', "
|
|
f"colisiona con '{t.id}'.",
|
|
entity_id=t.id,
|
|
)
|
|
else:
|
|
seen[key] = t.id
|
|
|
|
# Cuenta agregada por carta y tipo
|
|
use: dict[tuple[str, ChannelType], int] = {}
|
|
for t in project.tags:
|
|
b = t.physical_binding
|
|
if b is None:
|
|
continue
|
|
use[(b.card_id, b.channel_type)] = use.get((b.card_id, b.channel_type), 0) + 1
|
|
|
|
caps = {
|
|
ChannelType.AI: 4,
|
|
ChannelType.DI: 5,
|
|
ChannelType.DO: 10,
|
|
ChannelType.RPM: 1,
|
|
}
|
|
for (card_id, ch_type), count in use.items():
|
|
cap = caps[ch_type]
|
|
pct = (count / cap) * 100.0
|
|
if count > cap:
|
|
report.add(
|
|
Severity.ERROR,
|
|
"CARD_CAPACITY_EXCEEDED",
|
|
f"Tarjeta '{card_id}' usa {count} canales {ch_type.value} "
|
|
f"(capacidad {cap}).",
|
|
entity_id=card_id,
|
|
)
|
|
elif pct >= 100.0:
|
|
report.add(
|
|
Severity.WARNING,
|
|
"CARD_CAPACITY_AT_LIMIT",
|
|
f"Tarjeta '{card_id}' a 100% de capacidad {ch_type.value} "
|
|
f"({count}/{cap}). Considere expandir.",
|
|
entity_id=card_id,
|
|
)
|
|
elif pct >= 80.0:
|
|
report.add(
|
|
Severity.INFO,
|
|
"CARD_CAPACITY_HIGH",
|
|
f"Tarjeta '{card_id}' al {pct:.0f}% de {ch_type.value} ({count}/{cap}).",
|
|
entity_id=card_id,
|
|
)
|
|
|
|
|
|
def _check_orphan_systems(project: Project, report: ValidationReport) -> None:
|
|
"""Avisa si hay sistemas habilitados que no tienen ningún equipment."""
|
|
used = {e.system_id for e in project.equipment}
|
|
for sys_id in project.systems_enabled:
|
|
if sys_id not in used:
|
|
report.add(
|
|
Severity.WARNING,
|
|
"SYSTEM_WITHOUT_EQUIPMENT",
|
|
f"Sistema '{sys_id.value}' habilitado pero sin equipos asignados.",
|
|
entity_id=sys_id.value,
|
|
)
|
|
|
|
|
|
def _check_controllable_tags_authority(
|
|
project: Project, report: ValidationReport
|
|
) -> None:
|
|
"""Sanity-check de tags controllable con control_mode AUTO sin permissive."""
|
|
permissive_actions = {r.action_id for r in project.permissive_rules}
|
|
for t in project.tags:
|
|
if not t.controllable:
|
|
continue
|
|
if t.control_mode == ControlMode.AUTO:
|
|
# Heurística: acciones AUTO sin permissive son sospechosas.
|
|
implied_action = f"AUTO_{t.id}"
|
|
if implied_action not in permissive_actions:
|
|
report.add(
|
|
Severity.INFO,
|
|
"AUTO_TAG_NO_PERMISSIVE",
|
|
f"Tag '{t.id}' en control_mode=AUTO sin PermissiveRule "
|
|
f"con action_id='{implied_action}'. Considere agregar.",
|
|
entity_id=t.id,
|
|
)
|
|
|
|
|
|
def _check_tag_alarms_within_range(project: Project, report: ValidationReport) -> None:
|
|
"""Avisa si una alarma está claramente fuera del rango normal del tag."""
|
|
for t in project.tags:
|
|
if t.range_normal_min is None or t.range_normal_max is None:
|
|
continue
|
|
for a in t.alarms:
|
|
inside = t.range_normal_min <= a.threshold <= t.range_normal_max
|
|
if inside and a.operator in (">", ">="):
|
|
report.add(
|
|
Severity.INFO,
|
|
"ALARM_THRESHOLD_IN_NORMAL_RANGE",
|
|
f"Tag '{t.id}' alarm '{a.id}' tiene threshold={a.threshold} "
|
|
f"dentro del rango normal [{t.range_normal_min},{t.range_normal_max}]. "
|
|
"Verifique que esto es intencional.",
|
|
entity_id=t.id,
|
|
)
|
|
|
|
|
|
def _check_equipment_coords_within_vessel(
|
|
project: Project, report: ValidationReport
|
|
) -> None:
|
|
"""Avisa si un equipo está claramente fuera de la envolvente del buque."""
|
|
v = project.vessel
|
|
for eq in project.equipment:
|
|
if eq.location.x_pp > v.length_overall_m + 1.0:
|
|
report.add(
|
|
Severity.WARNING,
|
|
"EQUIPMENT_OUT_OF_HULL",
|
|
f"Equipment '{eq.id}' x_pp={eq.location.x_pp:.2f} excede "
|
|
f"eslora total {v.length_overall_m:.2f}.",
|
|
entity_id=eq.id,
|
|
)
|
|
if abs(eq.location.y_cl) > v.beam_max_m / 2.0 + 0.5:
|
|
report.add(
|
|
Severity.WARNING,
|
|
"EQUIPMENT_OUT_OF_HULL",
|
|
f"Equipment '{eq.id}' y_cl={eq.location.y_cl:+.2f} excede "
|
|
f"semi-manga {v.beam_max_m / 2.0:.2f}.",
|
|
entity_id=eq.id,
|
|
)
|
|
|
|
|
|
def _check_unbound_modbus_tags(project: Project, report: ValidationReport) -> None:
|
|
"""Info: tags Modbus sin physical_binding (válido pero raro)."""
|
|
from vmssailor.core.enums import Protocol
|
|
|
|
for t in project.tags:
|
|
if (
|
|
t.protocol in (Protocol.MODBUS_RTU, Protocol.MODBUS_TCP)
|
|
and t.physical_binding is None
|
|
and t.address is not None
|
|
):
|
|
report.add(
|
|
Severity.INFO,
|
|
"MODBUS_TAG_NO_PHYSICAL_BINDING",
|
|
f"Tag '{t.id}' es Modbus con address={t.address} pero sin "
|
|
"physical_binding. Esto es OK para equipos externos "
|
|
"(no AR-NMEA-IO).",
|
|
entity_id=t.id,
|
|
)
|