"""Reconstruye un Project desde un archivo .vmsproj (SQLite).""" from __future__ import annotations import json import sqlite3 from datetime import datetime from pathlib import Path from vmssailor.core.alarm import Alarm # noqa: F401 (re-export consistency) from vmssailor.core.card import Bus, CardInstance, Topology from vmssailor.core.coords import ShipCoord from vmssailor.core.enums import ( AlarmPriority, AuthorityRequired, BusRole, ChannelType, ControlMode, FilterType, Protocol, Quality, SignalType, SystemId, UnitSI, VesselSubtype, VesselType, ) from vmssailor.core.equipment import Equipment from vmssailor.core.permissive import Condition, PermissiveRule from vmssailor.core.persistence.migrations import current_schema_version from vmssailor.core.project import Project from vmssailor.core.tag import AlarmConfig, Scaling, Tag, TagBinding from vmssailor.core.vessel import Bulkhead, Deck, Vessel from vmssailor.version import VMSPROJ_SCHEMA_VERSION class VmsProjError(Exception): """Error al cargar un .vmsproj.""" def load_project(path: str | Path) -> Project: """Reconstruye un `Project` desde un archivo `.vmsproj`. Garantiza: `load_project(save_project(p))` produce un Project equivalente campo por campo. """ p = Path(path) if not p.exists(): raise VmsProjError(f"Archivo .vmsproj no encontrado: {p}") conn = sqlite3.connect(str(p)) conn.row_factory = sqlite3.Row try: ver = current_schema_version(conn) if ver == 0: raise VmsProjError( f"Archivo {p} no tiene tabla schema_version — probablemente no es .vmsproj válido." ) if ver > VMSPROJ_SCHEMA_VERSION: raise VmsProjError( f"Schema version {ver} es más nueva que la soportada por esta versión " f"de vmssailor ({VMSPROJ_SCHEMA_VERSION}). Actualizar el código." ) vessel = _read_vessel(conn) equipment = _read_equipment(conn) topology = _read_topology(conn) tags = _read_tags(conn) rules = _read_permissive_rules(conn) project = _read_project(conn, vessel, equipment, topology, tags, rules) finally: conn.close() return project # --- Readers internos por tabla -------------------------------------------- def _read_project( conn: sqlite3.Connection, vessel: Vessel, equipment: list[Equipment], topology: Topology, tags: list[Tag], rules: list[PermissiveRule], ) -> Project: row = conn.execute( """ SELECT id, name, customer, notes, systems_enabled_json, created_at, updated_at, vmssailor_version FROM project LIMIT 1 """ ).fetchone() if row is None: raise VmsProjError("Tabla 'project' vacía.") systems_raw = json.loads(row["systems_enabled_json"]) systems = [SystemId(s) for s in systems_raw] return Project( id=row["id"], name=row["name"], customer=row["customer"] or "", notes=row["notes"] or "", systems_enabled=systems, vessel=vessel, equipment=equipment, topology=topology, tags=tags, permissive_rules=rules, created_at=datetime.fromisoformat(row["created_at"]), updated_at=datetime.fromisoformat(row["updated_at"]), vmssailor_version=row["vmssailor_version"], ) def _read_vessel(conn: sqlite3.Connection) -> Vessel: row = conn.execute( """ SELECT id, name, type, subtype, length_overall_m, beam_max_m, draft_m, displacement_kg, silhouette_svg, description, data_source FROM vessel LIMIT 1 """ ).fetchone() if row is None: raise VmsProjError("Tabla 'vessel' vacía.") decks_rows = conn.execute( "SELECT id, name, z_bl_bottom, z_bl_top, polygon_xy_json FROM deck " "WHERE vessel_id = ? ORDER BY rowid", (row["id"],), ).fetchall() decks = [ Deck( id=d["id"], name=d["name"], z_bl_bottom=d["z_bl_bottom"], z_bl_top=d["z_bl_top"], polygon_xy=[tuple(p) for p in json.loads(d["polygon_xy_json"])], ) for d in decks_rows ] bulks_rows = conn.execute( "SELECT id, name, x_pp, description FROM bulkhead WHERE vessel_id = ? ORDER BY rowid", (row["id"],), ).fetchall() bulkheads = [ Bulkhead(id=b["id"], name=b["name"], x_pp=b["x_pp"], description=b["description"] or "") for b in bulks_rows ] return Vessel( id=row["id"], name=row["name"], type=VesselType(row["type"]), subtype=VesselSubtype(row["subtype"]), length_overall_m=row["length_overall_m"], beam_max_m=row["beam_max_m"], draft_m=row["draft_m"], displacement_kg=row["displacement_kg"], silhouette_svg=row["silhouette_svg"], description=row["description"] or "", data_source=row["data_source"] or "user_input", decks=decks, bulkheads=bulkheads, ) def _read_equipment(conn: sqlite3.Connection) -> list[Equipment]: rows = conn.execute( """ SELECT id, model_ref, tag_prefix, display_name, x_pp, y_cl, z_bl, deck_id, system_id, description, installed FROM equipment ORDER BY rowid """ ).fetchall() return [ Equipment( id=r["id"], model_ref=r["model_ref"], tag_prefix=r["tag_prefix"], display_name=r["display_name"], location=ShipCoord(x_pp=r["x_pp"], y_cl=r["y_cl"], z_bl=r["z_bl"]), deck_id=r["deck_id"], system_id=SystemId(r["system_id"]), description=r["description"] or "", installed=bool(r["installed"]), ) for r in rows ] def _read_topology(conn: sqlite3.Connection) -> Topology: bus_rows = conn.execute( """ SELECT id, name, protocol, physical_port, baud_rate, parity, stop_bits, termination, description FROM bus ORDER BY rowid """ ).fetchall() buses = [ Bus( id=b["id"], name=b["name"], protocol=Protocol(b["protocol"]), physical_port=b["physical_port"], baud_rate=b["baud_rate"], parity=b["parity"], stop_bits=b["stop_bits"], termination=bool(b["termination"]), description=b["description"] or "", ) for b in bus_rows ] card_rows = conn.execute( """ SELECT id, serial_number, slot_number, bus_id, bus_role, modbus_address, physical_location, x_pp, y_cl, z_bl, firmware_version FROM card_instance ORDER BY rowid """ ).fetchall() cards: list[CardInstance] = [] for r in card_rows: loc = None if r["x_pp"] is not None and r["y_cl"] is not None and r["z_bl"] is not None: loc = ShipCoord(x_pp=r["x_pp"], y_cl=r["y_cl"], z_bl=r["z_bl"]) cards.append( CardInstance( id=r["id"], serial_number=r["serial_number"] or "", slot_number=r["slot_number"], bus_id=r["bus_id"], bus_role=BusRole(r["bus_role"]), modbus_address=r["modbus_address"], physical_location=r["physical_location"] or "", location=loc, firmware_version=r["firmware_version"] or "0.0.0", ) ) return Topology(buses=buses, cards=cards) def _read_tags(conn: sqlite3.Connection) -> list[Tag]: rows = conn.execute( """ SELECT id, equipment_id, description, unit_si, range_normal_min, range_normal_max, quality_default, controllable, control_mode, authority_required, protocol, address, historize, historize_period_s, binding_card_id, binding_channel_type, binding_channel_number, binding_signal_type, binding_filter, binding_filter_param, binding_update_rate_ms, binding_scaling_raw_min, binding_scaling_raw_max, binding_scaling_eng_min, binding_scaling_eng_max FROM tag ORDER BY rowid """ ).fetchall() tags: list[Tag] = [] for r in rows: binding = None if r["binding_card_id"]: scaling = None if r["binding_scaling_raw_min"] is not None: scaling = Scaling( raw_min=r["binding_scaling_raw_min"], raw_max=r["binding_scaling_raw_max"], eng_min=r["binding_scaling_eng_min"], eng_max=r["binding_scaling_eng_max"], ) binding = TagBinding( card_id=r["binding_card_id"], channel_type=ChannelType(r["binding_channel_type"]), channel_number=r["binding_channel_number"], signal_type=SignalType(r["binding_signal_type"]), filter=FilterType(r["binding_filter"] or "none"), filter_param=r["binding_filter_param"], update_rate_ms=r["binding_update_rate_ms"], scaling=scaling, ) alarm_rows = conn.execute( """ SELECT id, threshold, operator, priority, hysteresis, delay_seconds, message, escalation_minutes FROM alarm_config WHERE tag_id = ? ORDER BY rowid """, (r["id"],), ).fetchall() alarms = [ AlarmConfig( id=a["id"], threshold=a["threshold"], operator=a["operator"], priority=AlarmPriority(a["priority"]), hysteresis=a["hysteresis"], delay_seconds=a["delay_seconds"], message=a["message"] or "", escalation_minutes=a["escalation_minutes"], ) for a in alarm_rows ] tags.append( Tag( id=r["id"], equipment_id=r["equipment_id"], description=r["description"] or "", unit_si=UnitSI(r["unit_si"]), range_normal_min=r["range_normal_min"], range_normal_max=r["range_normal_max"], quality_default=Quality(r["quality_default"]), alarms=alarms, controllable=bool(r["controllable"]), control_mode=ControlMode(r["control_mode"]), authority_required=AuthorityRequired(r["authority_required"]), protocol=Protocol(r["protocol"]), address=r["address"], physical_binding=binding, historize=bool(r["historize"]), historize_period_s=r["historize_period_s"], ) ) return tags def _read_permissive_rules(conn: sqlite3.Connection) -> list[PermissiveRule]: rows = conn.execute( "SELECT id, action_id, description, on_fail_message FROM permissive_rule ORDER BY rowid" ).fetchall() out: list[PermissiveRule] = [] for r in rows: cond_rows = conn.execute( """ SELECT seq, tag_ref, operator, threshold, threshold_low, threshold_high, severity, message_on_fail FROM permissive_condition WHERE rule_id = ? ORDER BY seq """, (r["id"],), ).fetchall() conds = [ Condition( tag_ref=c["tag_ref"], operator=c["operator"], threshold=c["threshold"], threshold_low=c["threshold_low"], threshold_high=c["threshold_high"], severity=c["severity"], message_on_fail=c["message_on_fail"] or "", ) for c in cond_rows ] out.append( PermissiveRule( id=r["id"], action_id=r["action_id"], description=r["description"] or "", on_fail_message=r["on_fail_message"] or "", conditions=conds, ) ) return out