Files
alro65 6ad76a89fa sprint-2: rule engine + auto-assigner + equipment editor + biblioteca
Wizard pasos 5-7 ahora funcionales (1-4 ya estaban en Sprint 1).

vmssailor/studio/designer/rule_engine.py
- RuleContext, RuleEngine, EquipmentProposal
- Lee library/rules/*.yaml y aplica reglas heuristicas
- Filtra por vessel_type, vessel_subtype, length_overall_m range
- Selecciona candidato segun condiciones 'when' (loa min/max)
- Genera tag_prefix con sustitucion {side}/{idx}

vmssailor/studio/designer/port_auto_assigner.py
- auto_assign() greedy: 1 bus Modbus RTU + tarjetas dedicadas para motores/gensets
- Tarjeta auxiliar compartida para resto de equipos
- Mapea SignalType -> ChannelType (AI/DI/DO/RPM)
- Genera TagBindings con scaling apropiado por tipo de senal
- Respeta capacidades 10/5/4/1 de AR-NMEA-IO-v1.0
- AssignmentReport con cards + tags + warnings

vmssailor/studio/wizard/step_05_equipment.py
- Tabla con propuestas del rule engine
- Checkboxes accept/reject + edicion inline de columnas
- Boton 'Regenerar' para re-aplicar reglas

vmssailor/studio/wizard/step_06_refinement.py
- Vista resumen de equipos aceptados

vmssailor/studio/wizard/step_07_topology.py
- Llama auto_assign sobre los equipos materializados
- Muestra tabla de tarjetas con uso por canal (DO/DI/AI/RPM)
- Lista warnings de capacidad

vmssailor/studio/editors/equipment_editor.py
- CRUD de Equipment del proyecto activo
- Tabla editable inline (tag_prefix, name, model_ref, system_id, coords, deck)
- Dialog modal para agregar equipos
- Senal projectMutated para refrescar canvas + sidebar

vmssailor/studio/main_window.py
- Layout actualizado: splitter vertical en panel derecho
  (canvas arriba + equipment editor abajo)
- _on_project_mutated() re-distribuye al sidebar y canvas

Biblioteca expandida (Sprint 2 brief: 5-7 yates, 10+ motores, gensets, bombas):
- vessels: + azimut_grande_32m, princess_y85, trawler_32m_offshore, patrol_coastal_30m (total: 6)
- engines: + cat_c32_acert, mtu_16v_2000_m96, yanmar_8lv_370 (total: 5)
- gensets: + kohler_28efkozd, onan_qd13500 (total: 3)
- pumps: + jabsco_36800, grundfos_cm10 (NUEVO categoria pumps)

Tests (tests/studio/test_designer.py, 10 nuevos, total 120/120):
- Rule engine: load default, propose engines, candidate picking por LOA
- auto_assign builds topology compatible with Project (Pydantic validation)
- Equipment editor smoke

VesselWizard.build_project() ahora materializa equipment + topology + tags
desde las propuestas y la asignacion automatica del paso 7.

Criterios Sprint 2:
- uv run vms-studio crea proyecto completo desde wizard con equipos + tags + topologia
- vms-validate-library: OK 6 vessels, 10 equipment, 1 rules
- 120/120 pytest verde, ruff clean

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 09:50:33 -04:00

257 lines
9.1 KiB
Python

"""Asignador automático de puertos físicos AR-NMEA-IO.
Dado un Project con equipos (cada uno con sus default_sensors según
EquipmentModel), genera:
- Tarjetas necesarias (estimación por sistema → cantidad)
- TagBindings que mapean cada sensor a un canal concreto
- Conflictos detectados (capacidad excedida)
Sprint 2: implementación greedy basada en proximidad.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from vmssailor.core.card import Bus, CardInstance, Topology
from vmssailor.core.coords import ShipCoord
from vmssailor.core.enums import (
BusRole,
ChannelType,
FilterType,
Protocol,
SignalType,
)
from vmssailor.core.equipment import Equipment, EquipmentModel, Sensor
from vmssailor.core.tag import Scaling, Tag, TagBinding
# Mapeo SignalType -> ChannelType (para auto-asignar)
_SIGNAL_TO_CHANNEL: dict[SignalType, ChannelType] = {
# Analógicas → AI
SignalType.SIG_4_20_MA: ChannelType.AI,
SignalType.SIG_0_10_V: ChannelType.AI,
SignalType.SIG_0_5_V: ChannelType.AI,
SignalType.RTD_PT100: ChannelType.AI,
SignalType.RTD_PT1000: ChannelType.AI,
SignalType.THERMOCOUPLE_K: ChannelType.AI,
SignalType.THERMOCOUPLE_J: ChannelType.AI,
SignalType.RESISTIVE_SENDER: ChannelType.AI,
SignalType.VOLTAGE_DIVIDER: ChannelType.AI,
# Pulso → RPM
SignalType.PULSE_MAGNETIC_PICKUP: ChannelType.RPM,
SignalType.PULSE_INDUCTIVE: ChannelType.RPM,
SignalType.PULSE_TACHO: ChannelType.RPM,
# Digitales: relés son DO, contactos son DI
SignalType.RELAY_NO: ChannelType.DO,
SignalType.RELAY_NC: ChannelType.DO,
SignalType.DRY_CONTACT: ChannelType.DI,
SignalType.CONTACT_24VDC: ChannelType.DI,
}
# Capacidades fijas del hardware AR-NMEA-IO-v1.0
_CAPACITY: dict[ChannelType, int] = {
ChannelType.AI: 4,
ChannelType.DI: 5,
ChannelType.DO: 10,
ChannelType.RPM: 1,
}
# Escalado por defecto por tipo de señal
def _default_scaling(signal: SignalType, sensor: Sensor) -> Scaling | None:
rng_min = sensor.range_normal_min if sensor.range_normal_min is not None else 0.0
rng_max = sensor.range_normal_max if sensor.range_normal_max is not None else 100.0
if signal == SignalType.SIG_4_20_MA:
return Scaling(raw_min=4.0, raw_max=20.0, eng_min=rng_min, eng_max=rng_max)
if signal == SignalType.SIG_0_10_V:
return Scaling(raw_min=0.0, raw_max=10.0, eng_min=rng_min, eng_max=rng_max)
if signal == SignalType.SIG_0_5_V:
return Scaling(raw_min=0.0, raw_max=5.0, eng_min=rng_min, eng_max=rng_max)
if signal == SignalType.RTD_PT100:
return Scaling(raw_min=0.0, raw_max=4095.0, eng_min=-50.0, eng_max=200.0)
if signal == SignalType.VOLTAGE_DIVIDER:
# rango razonable para baterías + alternadores
return Scaling(raw_min=0.0, raw_max=4095.0, eng_min=0.0, eng_max=32.0)
return None
@dataclass(slots=True)
class CardSlot:
"""Slot de uso interno del asignador. Mantiene contadores por canal."""
card: CardInstance
used: dict[ChannelType, int] = field(default_factory=lambda: dict.fromkeys(ChannelType, 0))
def has_space(self, channel: ChannelType) -> bool:
return self.used[channel] < _CAPACITY[channel]
def take(self, channel: ChannelType) -> int:
"""Asigna el siguiente canal libre y devuelve su número (1-based)."""
self.used[channel] += 1
return self.used[channel]
@dataclass(slots=True)
class AssignmentReport:
cards: list[CardInstance] = field(default_factory=list)
tags: list[Tag] = field(default_factory=list)
bus: Bus | None = None
n_skipped: int = 0
warnings: list[str] = field(default_factory=list)
def topology(self) -> Topology:
return Topology(buses=[self.bus] if self.bus else [], cards=self.cards)
def auto_assign(
equipment_list: list[Equipment],
model_lookup: dict[str, EquipmentModel],
) -> AssignmentReport:
"""Asignación greedy de tags a tarjetas distribuidas.
Estrategia:
- 1 bus Modbus RTU principal
- 1 tarjeta por equipo del sistema main_engine / genset (pegada al equipo)
- 1 tarjeta compartida para equipos auxiliares
"""
report = AssignmentReport()
if not equipment_list:
return report
bus = Bus(
id="bus_main",
name="Bus principal Modbus RTU",
protocol=Protocol.MODBUS_RTU,
physical_port="COM3",
baud_rate=115200,
)
report.bus = bus
cards: list[CardSlot] = []
# Tarjetas dedicadas para motor principal + genset
next_slot = 1
next_addr = 1
card_by_eq: dict[str, CardSlot] = {}
for eq in equipment_list:
if eq.system_id.value in ("main_engine", "genset"):
card = CardInstance(
id=f"card_{next_slot:03d}",
slot_number=next_slot,
bus_id=bus.id,
bus_role=BusRole.MODBUS_SLAVE,
modbus_address=next_addr,
physical_location=f"Junto a {eq.display_name}",
location=ShipCoord(
x_pp=eq.location.x_pp,
y_cl=eq.location.y_cl,
z_bl=eq.location.z_bl + 0.2,
),
firmware_version="1.0.0",
)
slot = CardSlot(card=card)
cards.append(slot)
card_by_eq[eq.id] = slot
next_slot += 1
next_addr += 1
# Tarjeta auxiliar compartida para el resto
aux_slot: CardSlot | None = None
if any(
eq.system_id.value not in ("main_engine", "genset") for eq in equipment_list
):
aux_card = CardInstance(
id=f"card_{next_slot:03d}",
slot_number=next_slot,
bus_id=bus.id,
bus_role=BusRole.MODBUS_SLAVE,
modbus_address=next_addr,
physical_location="Sala máquinas — panel auxiliar",
firmware_version="1.0.0",
)
aux_slot = CardSlot(card=aux_card)
cards.append(aux_slot)
# Construir tags
for eq in equipment_list:
model = model_lookup.get(eq.model_ref)
if model is None:
report.warnings.append(
f"Equipo '{eq.id}' referencia model_ref='{eq.model_ref}' "
"que no existe en biblioteca cargada."
)
continue
target_slot = card_by_eq.get(eq.id) or aux_slot
if target_slot is None:
continue
for sensor in model.default_sensors:
if sensor.default_signal_type is None:
# Sin tipo de señal, no podemos hacer binding físico
report.n_skipped += 1
continue
signal = sensor.default_signal_type
channel = _SIGNAL_TO_CHANNEL.get(signal)
if channel is None:
report.n_skipped += 1
continue
slot = target_slot
if not slot.has_space(channel):
# Si la tarjeta dedicada se llenó, intentar la auxiliar
if aux_slot and aux_slot is not slot and aux_slot.has_space(channel):
slot = aux_slot
else:
report.warnings.append(
f"Sensor {eq.tag_prefix}.{sensor.id} no asignado: "
f"sin capacidad {channel.value} en tarjetas dedicadas ni aux."
)
continue
ch_num = slot.take(channel)
scaling = _default_scaling(signal, sensor)
binding = TagBinding(
card_id=slot.card.id,
channel_type=channel,
channel_number=ch_num,
signal_type=signal,
scaling=scaling,
filter=FilterType.MOVING_AVG if channel == ChannelType.AI else FilterType.NONE,
filter_param=4.0 if channel == ChannelType.AI else None,
update_rate_ms=200 if channel == ChannelType.AI else 100,
)
tag_id = f"{eq.tag_prefix}.{sensor.id.upper()}"
# Tags controllable (relés DO) marcados como tales
controllable = channel == ChannelType.DO
from vmssailor.core.enums import (
AuthorityRequired,
ControlMode,
UnitSI,
)
try:
unit_si = UnitSI(sensor.unit_si.value)
except Exception:
unit_si = UnitSI.NONE
tag = Tag(
id=tag_id,
equipment_id=eq.id,
description=f"{sensor.name} ({eq.display_name})",
unit_si=unit_si,
range_normal_min=sensor.range_normal_min,
range_normal_max=sensor.range_normal_max,
controllable=controllable,
control_mode=ControlMode.MANUAL if controllable else ControlMode.MONITOR,
authority_required=(
AuthorityRequired.BRIDGE
if controllable
else AuthorityRequired.EITHER
),
protocol=Protocol.MODBUS_RTU,
physical_binding=binding,
)
report.tags.append(tag)
report.cards = [s.card for s in cards]
return report