"""CLI demo de Sprint 0: crea un proyecto completo, lo guarda y lo relee. Demuestra el roundtrip de persistencia (criterio de aceptación Sprint 0). Uso: uv run vms-generate-test-project [--out PATH] Salida: - Crea `projects/_demo/test_project.vmsproj` por defecto. - Imprime stats antes y después de roundtrip. - Exit 0 si la integridad se verifica, 1 si algo diverge. """ from __future__ import annotations import argparse import sys from pathlib import Path from vmssailor.core import ( AlarmConfig, AlarmPriority, AuthorityRequired, Bus, BusRole, CardInstance, ChannelType, ControlMode, Equipment, FilterType, PermissiveRule, Project, Protocol, Scaling, ShipCoord, SignalType, SystemId, Tag, TagBinding, Topology, UnitSI, Vessel, VesselSubtype, VesselType, ) from vmssailor.core.permissive import Condition from vmssailor.core.persistence import load_project, save_project from vmssailor.core.validation import validate_project from vmssailor.core.vessel import Bulkhead, Deck from vmssailor.shared.logging_setup import setup_logging def build_demo_project() -> Project: """Construye un proyecto demo completo basado en Sunseeker 76 + MTU 12V.""" vessel = Vessel( id="sunseeker_76_demo", name="M/Y Aurora — Demo Sprint 0", type=VesselType.YACHT_MOTOR, subtype=VesselSubtype.PLANING, length_overall_m=23.45, beam_max_m=5.65, draft_m=1.85, displacement_kg=55000, description="Buque de demo para verificar persistencia .vmsproj.", data_source="seed_estimate", decks=[ Deck(id="lower", name="Lower Deck", z_bl_bottom=0.5, z_bl_top=2.6), Deck(id="main", name="Main Deck", z_bl_bottom=2.6, z_bl_top=4.8), Deck(id="flybridge", name="Flybridge", z_bl_bottom=4.8, z_bl_top=6.4), ], bulkheads=[ Bulkhead(id="collision", name="Mamparo de colisión", x_pp=21.0), Bulkhead(id="er_fwd", name="Mamparo proa SM", x_pp=7.0), ], ) # ---- Equipos ---- me_port = Equipment( id="eq_me_port", model_ref="mtu_12v_2000_m96", tag_prefix="ME_PORT", display_name="Motor principal babor", location=ShipCoord(x_pp=5.5, y_cl=-0.9, z_bl=1.2), deck_id="lower", system_id=SystemId.MAIN_ENGINE, ) me_stbd = Equipment( id="eq_me_stbd", model_ref="mtu_12v_2000_m96", tag_prefix="ME_STBD", display_name="Motor principal estribor", location=ShipCoord(x_pp=5.5, y_cl=0.9, z_bl=1.2), deck_id="lower", system_id=SystemId.MAIN_ENGINE, ) gen_1 = Equipment( id="eq_gen_1", model_ref="northern_lights_m65c13", tag_prefix="GEN_1", display_name="Genset 1", location=ShipCoord(x_pp=4.5, y_cl=0.0, z_bl=1.0), deck_id="lower", system_id=SystemId.GENSET, ) # ---- Buses y cartas ---- bus_main = Bus( id="bus_main", name="Bus principal Modbus RTU", protocol=Protocol.MODBUS_RTU, physical_port="COM3", baud_rate=115200, ) bus_n2k = Bus( id="bus_n2k", name="Backbone NMEA 2000", protocol=Protocol.NMEA2000, physical_port="USB-CAN0", ) cards = [ CardInstance( id="card_001", slot_number=1, bus_id="bus_main", bus_role=BusRole.MODBUS_SLAVE, modbus_address=1, physical_location="Sala máquinas — junto a ME_PORT", location=ShipCoord(x_pp=5.5, y_cl=-0.5, z_bl=1.4), ), CardInstance( id="card_002", slot_number=2, bus_id="bus_main", bus_role=BusRole.MODBUS_SLAVE, modbus_address=2, physical_location="Sala máquinas — junto a ME_STBD", location=ShipCoord(x_pp=5.5, y_cl=0.5, z_bl=1.4), ), CardInstance( id="card_003", slot_number=3, bus_id="bus_main", bus_role=BusRole.MODBUS_SLAVE, modbus_address=3, physical_location="Sala máquinas — junto a Genset 1", location=ShipCoord(x_pp=4.5, y_cl=0.0, z_bl=1.2), ), ] topology = Topology(buses=[bus_main, bus_n2k], cards=cards) # ---- Tags ---- tags: list[Tag] = [] for eq, card_id in [(me_port, "card_001"), (me_stbd, "card_002")]: tags.append( Tag( id=f"{eq.tag_prefix}.OIL_PRESS", equipment_id=eq.id, description=f"Presión de aceite {eq.display_name}", unit_si=UnitSI.BAR, range_normal_min=3.5, range_normal_max=6.5, alarms=[ AlarmConfig( id=f"{eq.tag_prefix}.OIL_PRESS.LOW", threshold=1.5, operator="<", priority=AlarmPriority.EMERGENCY, hysteresis=0.2, delay_seconds=2.0, message="Presión aceite crítica baja.", ) ], protocol=Protocol.MODBUS_RTU, physical_binding=TagBinding( card_id=card_id, channel_type=ChannelType.AI, channel_number=1, signal_type=SignalType.SIG_4_20_MA, scaling=Scaling(raw_min=4.0, raw_max=20.0, eng_min=0.0, eng_max=10.0), filter=FilterType.MOVING_AVG, filter_param=4.0, update_rate_ms=200, ), ) ) tags.append( Tag( id=f"{eq.tag_prefix}.COOLANT_TEMP", equipment_id=eq.id, description=f"Temperatura refrigerante {eq.display_name}", unit_si=UnitSI.DEGREE_CELSIUS, range_normal_min=65.0, range_normal_max=95.0, alarms=[ AlarmConfig( id=f"{eq.tag_prefix}.COOLANT_TEMP.HIGH", threshold=100.0, operator=">", priority=AlarmPriority.EMERGENCY, hysteresis=2.0, delay_seconds=3.0, message="Temperatura refrigerante crítica alta.", ) ], protocol=Protocol.MODBUS_RTU, physical_binding=TagBinding( card_id=card_id, channel_type=ChannelType.AI, channel_number=2, signal_type=SignalType.RTD_PT100, scaling=Scaling(raw_min=0.0, raw_max=4095.0, eng_min=-50.0, eng_max=200.0), ), ) ) tags.append( Tag( id=f"{eq.tag_prefix}.RPM", equipment_id=eq.id, description=f"RPM {eq.display_name}", unit_si=UnitSI.RPM, range_normal_min=0.0, range_normal_max=2600.0, protocol=Protocol.MODBUS_RTU, physical_binding=TagBinding( card_id=card_id, channel_type=ChannelType.RPM, channel_number=1, signal_type=SignalType.PULSE_MAGNETIC_PICKUP, ), ) ) tags.append( Tag( id=f"{eq.tag_prefix}.ESTOP_ACTIVE", equipment_id=eq.id, description="Pulsador E-stop activo", unit_si=UnitSI.BOOL, protocol=Protocol.MODBUS_RTU, physical_binding=TagBinding( card_id=card_id, channel_type=ChannelType.DI, channel_number=1, signal_type=SignalType.DRY_CONTACT, ), ) ) tags.append( Tag( id=f"{eq.tag_prefix}.START_CMD", equipment_id=eq.id, description=f"Arranque {eq.display_name}", unit_si=UnitSI.BOOL, controllable=True, control_mode=ControlMode.MANUAL, authority_required=AuthorityRequired.BRIDGE, protocol=Protocol.MODBUS_RTU, physical_binding=TagBinding( card_id=card_id, channel_type=ChannelType.DO, channel_number=1, signal_type=SignalType.RELAY_NO, ), ) ) # Tags del genset tags.append( Tag( id="GEN_1.VOLTAGE_L1", equipment_id=gen_1.id, description="Tensión L1 genset 1", unit_si=UnitSI.VOLT, range_normal_min=220.0, range_normal_max=240.0, alarms=[ AlarmConfig( id="GEN_1.VOLTAGE_L1.LOW", threshold=200.0, operator="<", priority=AlarmPriority.HIGH, hysteresis=5.0, ), AlarmConfig( id="GEN_1.VOLTAGE_L1.HIGH", threshold=250.0, operator=">", priority=AlarmPriority.HIGH, hysteresis=5.0, ), ], protocol=Protocol.MODBUS_RTU, physical_binding=TagBinding( card_id="card_003", channel_type=ChannelType.AI, channel_number=1, signal_type=SignalType.VOLTAGE_DIVIDER, scaling=Scaling(raw_min=0.0, raw_max=4095.0, eng_min=0.0, eng_max=300.0), ), ) ) # ---- Permissives ---- permissive_start_me_port = PermissiveRule( id="rule_start_me_port", action_id="START_ME_PORT", description="Pre-condiciones para arrancar motor principal babor.", conditions=[ Condition( tag_ref="ME_PORT.OIL_PRESS", operator=">", threshold=0.3, severity="fail", message_on_fail="Presión aceite muy baja para arranque seguro.", ), Condition( tag_ref="ME_PORT.COOLANT_TEMP", operator=">", threshold=5.0, severity="fail", message_on_fail="Refrigerante < 5°C — pre-calentar.", ), Condition( tag_ref="ME_PORT.ESTOP_ACTIVE", operator="is_false", severity="fail", message_on_fail="E-stop activado — desbloquear.", ), ], on_fail_message="No es seguro arrancar ME_PORT.", ) project = Project( id="demo_sprint0_aurora", name="M/Y Aurora — Demo Sprint 0", customer="Cliente demo (no real)", notes="Proyecto sintético para verificar persistencia roundtrip.", vessel=vessel, systems_enabled=[SystemId.MAIN_ENGINE, SystemId.GENSET], equipment=[me_port, me_stbd, gen_1], tags=tags, topology=topology, permissive_rules=[permissive_start_me_port], ) return project def _projects_equal_for_demo(a: Project, b: Project) -> tuple[bool, str]: """Compara dos Project para roundtrip. Ignora updated_at por design.""" a_dump = a.model_dump(mode="json", exclude={"updated_at"}) b_dump = b.model_dump(mode="json", exclude={"updated_at"}) if a_dump == b_dump: return True, "match" # Localizar la primera diferencia diffs: list[str] = [] keys = set(a_dump.keys()) | set(b_dump.keys()) for k in sorted(keys): if a_dump.get(k) != b_dump.get(k): diffs.append(f" - {k}: differs") return False, "\n".join(diffs) or "structures differ" def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( prog="vms-generate-test-project", description=( "Genera un Project demo, lo guarda como .vmsproj y verifica el roundtrip. " "Criterio de aceptación del Sprint 0." ), ) parser.add_argument( "--out", type=Path, default=Path("projects/_demo/test_project.vmsproj"), help="Ruta del .vmsproj a escribir.", ) parser.add_argument("--verbose", action="store_true") args = parser.parse_args(argv) setup_logging(verbose=args.verbose) print("VMS-Sailor — Generador de proyecto demo (roundtrip)") print("=" * 60) project = build_demo_project() stats_in = project.stats() print("Proyecto en memoria:") for k, v in stats_in.items(): print(f" {k:25s}: {v}") # Validación cross-entity (informativa) report = validate_project(project) print() print("Validación cross-entity:") print(" " + report.format().replace("\n", "\n ")) print() final = save_project(project, args.out) print(f"Guardado en: {final}") print(f"Tamaño : {final.stat().st_size:,} bytes") print() print("Releyendo desde disco...") loaded = load_project(final) stats_out = loaded.stats() print("Proyecto releído:") for k, v in stats_out.items(): print(f" {k:25s}: {v}") ok, msg = _projects_equal_for_demo(project, loaded) print() if ok: print("INTEGRIDAD OK — roundtrip perfecto.") return 0 print("INTEGRIDAD FAIL — diferencias encontradas:") print(msg) return 1 if __name__ == "__main__": sys.exit(main())