Files
alro65 deb04c9315 sprint-0: fundaciones VMS-Sailor
Sprint 0 completo del producto VMS-Sailor (Vessel Management System
integrado para buques 30-40m). Brief de referencia en
VMS_Sailor_v2_Parte_*.md (intacto).

Core (vmssailor.core, 95.17% coverage, 99 tests verde):
- ShipCoord: sistema naval x_pp/y_cl/z_bl frozen
- Vessel, Deck, Bulkhead
- Equipment, EquipmentModel, Sensor, EquipmentSpec
- Tag, AlarmConfig, TagBinding, Scaling
- CardInstance, Bus, Topology con validacion 21 puntos I/O AR-NMEA-IO-v1.0
- Alarm, PermissiveRule, Condition
- Project agregado raiz con validacion cross-entity
- Persistencia portable .vmsproj (SQLite) con roundtrip verificable

Biblioteca curada seed (vmssailor.library):
- systems_catalog.json completo (catalogo maestro Parte 1 sec 7)
- 2 vessels: Sunseeker 76, Ferretti 850
- 2 motores: MTU 12V 2000 M96, Volvo D13-900
- 1 genset: Northern Lights M65C13
- yacht_motor_planeo.yaml (reglas heuristicas)
- TODO marcado data_source=seed_estimate - requiere validacion datasheets

Tools:
- vms-validate-library: CLI valida biblioteca completa
- vms-generate-test-project: CLI demo + verificacion roundtrip persistencia

Design System + 8 mockups HTML estaticos:
- docs/design_system.md (paleta Deep Ocean, gradientes, typography, motion)
- docs/brand/ (logo + variantes SVG)
- docs/mockups/splash, studio_main, runtime_overview,
  runtime_mimic_fuel (P&ID animado), runtime_alarms, runtime_trim (panel
  estrella con horizonte artificial), mobile_overview, mobile_trim
- docs/mockups/index.html (galeria)

Firmware (Sprint 12+ implementacion):
- firmware/ar_nmea_io_v1/src/config/pinout.h con macros GPIO

Decisiones autonomas documentadas en docs/decisions_sprint0.md.

Stack: Python 3.11 + uv + Pydantic v2 + SQLite stdlib + hatchling +
pytest 9 + ruff + mypy. Sin PySide6, FastAPI, Flutter ni firmware
funcional (entran en sprints siguientes).

Criterio de aceptacion Sprint 0: cumplido.
- uv sync: OK
- pytest: 99/99 verde
- cov vmssailor.core: 95.17% (objetivo >=80%)
- ruff: clean
- vms-validate-library: OK
- vms-generate-test-project: INTEGRIDAD OK

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 07:26:06 -04:00

366 lines
12 KiB
Python

"""Reconstruye un Project desde un archivo .vmsproj (SQLite)."""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime
from pathlib import Path
from vmssailor.core.alarm import Alarm # noqa: F401 (re-export consistency)
from vmssailor.core.card import Bus, CardInstance, Topology
from vmssailor.core.coords import ShipCoord
from vmssailor.core.enums import (
AlarmPriority,
AuthorityRequired,
BusRole,
ChannelType,
ControlMode,
FilterType,
Protocol,
Quality,
SignalType,
SystemId,
UnitSI,
VesselSubtype,
VesselType,
)
from vmssailor.core.equipment import Equipment
from vmssailor.core.permissive import Condition, PermissiveRule
from vmssailor.core.persistence.migrations import current_schema_version
from vmssailor.core.project import Project
from vmssailor.core.tag import AlarmConfig, Scaling, Tag, TagBinding
from vmssailor.core.vessel import Bulkhead, Deck, Vessel
from vmssailor.version import VMSPROJ_SCHEMA_VERSION
class VmsProjError(Exception):
"""Error al cargar un .vmsproj."""
def load_project(path: str | Path) -> Project:
"""Reconstruye un `Project` desde un archivo `.vmsproj`.
Garantiza: `load_project(save_project(p))` produce un Project
equivalente campo por campo.
"""
p = Path(path)
if not p.exists():
raise VmsProjError(f"Archivo .vmsproj no encontrado: {p}")
conn = sqlite3.connect(str(p))
conn.row_factory = sqlite3.Row
try:
ver = current_schema_version(conn)
if ver == 0:
raise VmsProjError(
f"Archivo {p} no tiene tabla schema_version — probablemente no es .vmsproj válido."
)
if ver > VMSPROJ_SCHEMA_VERSION:
raise VmsProjError(
f"Schema version {ver} es más nueva que la soportada por esta versión "
f"de vmssailor ({VMSPROJ_SCHEMA_VERSION}). Actualizar el código."
)
vessel = _read_vessel(conn)
equipment = _read_equipment(conn)
topology = _read_topology(conn)
tags = _read_tags(conn)
rules = _read_permissive_rules(conn)
project = _read_project(conn, vessel, equipment, topology, tags, rules)
finally:
conn.close()
return project
# --- Readers internos por tabla --------------------------------------------
def _read_project(
conn: sqlite3.Connection,
vessel: Vessel,
equipment: list[Equipment],
topology: Topology,
tags: list[Tag],
rules: list[PermissiveRule],
) -> Project:
row = conn.execute(
"""
SELECT id, name, customer, notes, systems_enabled_json,
created_at, updated_at, vmssailor_version
FROM project
LIMIT 1
"""
).fetchone()
if row is None:
raise VmsProjError("Tabla 'project' vacía.")
systems_raw = json.loads(row["systems_enabled_json"])
systems = [SystemId(s) for s in systems_raw]
return Project(
id=row["id"],
name=row["name"],
customer=row["customer"] or "",
notes=row["notes"] or "",
systems_enabled=systems,
vessel=vessel,
equipment=equipment,
topology=topology,
tags=tags,
permissive_rules=rules,
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]),
vmssailor_version=row["vmssailor_version"],
)
def _read_vessel(conn: sqlite3.Connection) -> Vessel:
row = conn.execute(
"""
SELECT id, name, type, subtype, length_overall_m, beam_max_m, draft_m,
displacement_kg, silhouette_svg, description, data_source
FROM vessel
LIMIT 1
"""
).fetchone()
if row is None:
raise VmsProjError("Tabla 'vessel' vacía.")
decks_rows = conn.execute(
"SELECT id, name, z_bl_bottom, z_bl_top, polygon_xy_json FROM deck "
"WHERE vessel_id = ? ORDER BY rowid",
(row["id"],),
).fetchall()
decks = [
Deck(
id=d["id"],
name=d["name"],
z_bl_bottom=d["z_bl_bottom"],
z_bl_top=d["z_bl_top"],
polygon_xy=[tuple(p) for p in json.loads(d["polygon_xy_json"])],
)
for d in decks_rows
]
bulks_rows = conn.execute(
"SELECT id, name, x_pp, description FROM bulkhead WHERE vessel_id = ? ORDER BY rowid",
(row["id"],),
).fetchall()
bulkheads = [
Bulkhead(id=b["id"], name=b["name"], x_pp=b["x_pp"], description=b["description"] or "")
for b in bulks_rows
]
return Vessel(
id=row["id"],
name=row["name"],
type=VesselType(row["type"]),
subtype=VesselSubtype(row["subtype"]),
length_overall_m=row["length_overall_m"],
beam_max_m=row["beam_max_m"],
draft_m=row["draft_m"],
displacement_kg=row["displacement_kg"],
silhouette_svg=row["silhouette_svg"],
description=row["description"] or "",
data_source=row["data_source"] or "user_input",
decks=decks,
bulkheads=bulkheads,
)
def _read_equipment(conn: sqlite3.Connection) -> list[Equipment]:
rows = conn.execute(
"""
SELECT id, model_ref, tag_prefix, display_name,
x_pp, y_cl, z_bl, deck_id, system_id, description, installed
FROM equipment
ORDER BY rowid
"""
).fetchall()
return [
Equipment(
id=r["id"],
model_ref=r["model_ref"],
tag_prefix=r["tag_prefix"],
display_name=r["display_name"],
location=ShipCoord(x_pp=r["x_pp"], y_cl=r["y_cl"], z_bl=r["z_bl"]),
deck_id=r["deck_id"],
system_id=SystemId(r["system_id"]),
description=r["description"] or "",
installed=bool(r["installed"]),
)
for r in rows
]
def _read_topology(conn: sqlite3.Connection) -> Topology:
bus_rows = conn.execute(
"""
SELECT id, name, protocol, physical_port, baud_rate, parity, stop_bits,
termination, description
FROM bus ORDER BY rowid
"""
).fetchall()
buses = [
Bus(
id=b["id"],
name=b["name"],
protocol=Protocol(b["protocol"]),
physical_port=b["physical_port"],
baud_rate=b["baud_rate"],
parity=b["parity"],
stop_bits=b["stop_bits"],
termination=bool(b["termination"]),
description=b["description"] or "",
)
for b in bus_rows
]
card_rows = conn.execute(
"""
SELECT id, serial_number, slot_number, bus_id, bus_role, modbus_address,
physical_location, x_pp, y_cl, z_bl, firmware_version
FROM card_instance
ORDER BY rowid
"""
).fetchall()
cards: list[CardInstance] = []
for r in card_rows:
loc = None
if r["x_pp"] is not None and r["y_cl"] is not None and r["z_bl"] is not None:
loc = ShipCoord(x_pp=r["x_pp"], y_cl=r["y_cl"], z_bl=r["z_bl"])
cards.append(
CardInstance(
id=r["id"],
serial_number=r["serial_number"] or "",
slot_number=r["slot_number"],
bus_id=r["bus_id"],
bus_role=BusRole(r["bus_role"]),
modbus_address=r["modbus_address"],
physical_location=r["physical_location"] or "",
location=loc,
firmware_version=r["firmware_version"] or "0.0.0",
)
)
return Topology(buses=buses, cards=cards)
def _read_tags(conn: sqlite3.Connection) -> list[Tag]:
rows = conn.execute(
"""
SELECT
id, equipment_id, description, unit_si,
range_normal_min, range_normal_max, quality_default,
controllable, control_mode, authority_required,
protocol, address, historize, historize_period_s,
binding_card_id, binding_channel_type, binding_channel_number,
binding_signal_type, binding_filter, binding_filter_param,
binding_update_rate_ms,
binding_scaling_raw_min, binding_scaling_raw_max,
binding_scaling_eng_min, binding_scaling_eng_max
FROM tag ORDER BY rowid
"""
).fetchall()
tags: list[Tag] = []
for r in rows:
binding = None
if r["binding_card_id"]:
scaling = None
if r["binding_scaling_raw_min"] is not None:
scaling = Scaling(
raw_min=r["binding_scaling_raw_min"],
raw_max=r["binding_scaling_raw_max"],
eng_min=r["binding_scaling_eng_min"],
eng_max=r["binding_scaling_eng_max"],
)
binding = TagBinding(
card_id=r["binding_card_id"],
channel_type=ChannelType(r["binding_channel_type"]),
channel_number=r["binding_channel_number"],
signal_type=SignalType(r["binding_signal_type"]),
filter=FilterType(r["binding_filter"] or "none"),
filter_param=r["binding_filter_param"],
update_rate_ms=r["binding_update_rate_ms"],
scaling=scaling,
)
alarm_rows = conn.execute(
"""
SELECT id, threshold, operator, priority, hysteresis,
delay_seconds, message, escalation_minutes
FROM alarm_config WHERE tag_id = ? ORDER BY rowid
""",
(r["id"],),
).fetchall()
alarms = [
AlarmConfig(
id=a["id"],
threshold=a["threshold"],
operator=a["operator"],
priority=AlarmPriority(a["priority"]),
hysteresis=a["hysteresis"],
delay_seconds=a["delay_seconds"],
message=a["message"] or "",
escalation_minutes=a["escalation_minutes"],
)
for a in alarm_rows
]
tags.append(
Tag(
id=r["id"],
equipment_id=r["equipment_id"],
description=r["description"] or "",
unit_si=UnitSI(r["unit_si"]),
range_normal_min=r["range_normal_min"],
range_normal_max=r["range_normal_max"],
quality_default=Quality(r["quality_default"]),
alarms=alarms,
controllable=bool(r["controllable"]),
control_mode=ControlMode(r["control_mode"]),
authority_required=AuthorityRequired(r["authority_required"]),
protocol=Protocol(r["protocol"]),
address=r["address"],
physical_binding=binding,
historize=bool(r["historize"]),
historize_period_s=r["historize_period_s"],
)
)
return tags
def _read_permissive_rules(conn: sqlite3.Connection) -> list[PermissiveRule]:
rows = conn.execute(
"SELECT id, action_id, description, on_fail_message FROM permissive_rule ORDER BY rowid"
).fetchall()
out: list[PermissiveRule] = []
for r in rows:
cond_rows = conn.execute(
"""
SELECT seq, tag_ref, operator, threshold, threshold_low,
threshold_high, severity, message_on_fail
FROM permissive_condition WHERE rule_id = ? ORDER BY seq
""",
(r["id"],),
).fetchall()
conds = [
Condition(
tag_ref=c["tag_ref"],
operator=c["operator"],
threshold=c["threshold"],
threshold_low=c["threshold_low"],
threshold_high=c["threshold_high"],
severity=c["severity"],
message_on_fail=c["message_on_fail"] or "",
)
for c in cond_rows
]
out.append(
PermissiveRule(
id=r["id"],
action_id=r["action_id"],
description=r["description"] or "",
on_fail_message=r["on_fail_message"] or "",
conditions=conds,
)
)
return out