"""Asignador automático de puertos físicos AR-NMEA-IO. Dado un Project con equipos (cada uno con sus default_sensors según EquipmentModel), genera: - Tarjetas necesarias (estimación por sistema → cantidad) - TagBindings que mapean cada sensor a un canal concreto - Conflictos detectados (capacidad excedida) Sprint 2: implementación greedy basada en proximidad. """ from __future__ import annotations from dataclasses import dataclass, field from vmssailor.core.card import Bus, CardInstance, Topology from vmssailor.core.coords import ShipCoord from vmssailor.core.enums import ( BusRole, ChannelType, FilterType, Protocol, SignalType, ) from vmssailor.core.equipment import Equipment, EquipmentModel, Sensor from vmssailor.core.tag import Scaling, Tag, TagBinding # Mapeo SignalType -> ChannelType (para auto-asignar) _SIGNAL_TO_CHANNEL: dict[SignalType, ChannelType] = { # Analógicas → AI SignalType.SIG_4_20_MA: ChannelType.AI, SignalType.SIG_0_10_V: ChannelType.AI, SignalType.SIG_0_5_V: ChannelType.AI, SignalType.RTD_PT100: ChannelType.AI, SignalType.RTD_PT1000: ChannelType.AI, SignalType.THERMOCOUPLE_K: ChannelType.AI, SignalType.THERMOCOUPLE_J: ChannelType.AI, SignalType.RESISTIVE_SENDER: ChannelType.AI, SignalType.VOLTAGE_DIVIDER: ChannelType.AI, # Pulso → RPM SignalType.PULSE_MAGNETIC_PICKUP: ChannelType.RPM, SignalType.PULSE_INDUCTIVE: ChannelType.RPM, SignalType.PULSE_TACHO: ChannelType.RPM, # Digitales: relés son DO, contactos son DI SignalType.RELAY_NO: ChannelType.DO, SignalType.RELAY_NC: ChannelType.DO, SignalType.DRY_CONTACT: ChannelType.DI, SignalType.CONTACT_24VDC: ChannelType.DI, } # Capacidades fijas del hardware AR-NMEA-IO-v1.0 _CAPACITY: dict[ChannelType, int] = { ChannelType.AI: 4, ChannelType.DI: 5, ChannelType.DO: 10, ChannelType.RPM: 1, } # Escalado por defecto por tipo de señal def _default_scaling(signal: SignalType, sensor: Sensor) -> Scaling | None: rng_min = sensor.range_normal_min if sensor.range_normal_min is not None else 0.0 rng_max = sensor.range_normal_max if sensor.range_normal_max is not None else 100.0 if signal == SignalType.SIG_4_20_MA: return Scaling(raw_min=4.0, raw_max=20.0, eng_min=rng_min, eng_max=rng_max) if signal == SignalType.SIG_0_10_V: return Scaling(raw_min=0.0, raw_max=10.0, eng_min=rng_min, eng_max=rng_max) if signal == SignalType.SIG_0_5_V: return Scaling(raw_min=0.0, raw_max=5.0, eng_min=rng_min, eng_max=rng_max) if signal == SignalType.RTD_PT100: return Scaling(raw_min=0.0, raw_max=4095.0, eng_min=-50.0, eng_max=200.0) if signal == SignalType.VOLTAGE_DIVIDER: # rango razonable para baterías + alternadores return Scaling(raw_min=0.0, raw_max=4095.0, eng_min=0.0, eng_max=32.0) return None @dataclass(slots=True) class CardSlot: """Slot de uso interno del asignador. Mantiene contadores por canal.""" card: CardInstance used: dict[ChannelType, int] = field(default_factory=lambda: dict.fromkeys(ChannelType, 0)) def has_space(self, channel: ChannelType) -> bool: return self.used[channel] < _CAPACITY[channel] def take(self, channel: ChannelType) -> int: """Asigna el siguiente canal libre y devuelve su número (1-based).""" self.used[channel] += 1 return self.used[channel] @dataclass(slots=True) class AssignmentReport: cards: list[CardInstance] = field(default_factory=list) tags: list[Tag] = field(default_factory=list) bus: Bus | None = None n_skipped: int = 0 warnings: list[str] = field(default_factory=list) def topology(self) -> Topology: return Topology(buses=[self.bus] if self.bus else [], cards=self.cards) def auto_assign( equipment_list: list[Equipment], model_lookup: dict[str, EquipmentModel], ) -> AssignmentReport: """Asignación greedy de tags a tarjetas distribuidas. Estrategia: - 1 bus Modbus RTU principal - 1 tarjeta por equipo del sistema main_engine / genset (pegada al equipo) - 1 tarjeta compartida para equipos auxiliares """ report = AssignmentReport() if not equipment_list: return report bus = Bus( id="bus_main", name="Bus principal Modbus RTU", protocol=Protocol.MODBUS_RTU, physical_port="COM3", baud_rate=115200, ) report.bus = bus cards: list[CardSlot] = [] # Tarjetas dedicadas para motor principal + genset next_slot = 1 next_addr = 1 card_by_eq: dict[str, CardSlot] = {} for eq in equipment_list: if eq.system_id.value in ("main_engine", "genset"): card = CardInstance( id=f"card_{next_slot:03d}", slot_number=next_slot, bus_id=bus.id, bus_role=BusRole.MODBUS_SLAVE, modbus_address=next_addr, physical_location=f"Junto a {eq.display_name}", location=ShipCoord( x_pp=eq.location.x_pp, y_cl=eq.location.y_cl, z_bl=eq.location.z_bl + 0.2, ), firmware_version="1.0.0", ) slot = CardSlot(card=card) cards.append(slot) card_by_eq[eq.id] = slot next_slot += 1 next_addr += 1 # Tarjeta auxiliar compartida para el resto aux_slot: CardSlot | None = None if any( eq.system_id.value not in ("main_engine", "genset") for eq in equipment_list ): aux_card = CardInstance( id=f"card_{next_slot:03d}", slot_number=next_slot, bus_id=bus.id, bus_role=BusRole.MODBUS_SLAVE, modbus_address=next_addr, physical_location="Sala máquinas — panel auxiliar", firmware_version="1.0.0", ) aux_slot = CardSlot(card=aux_card) cards.append(aux_slot) # Construir tags for eq in equipment_list: model = model_lookup.get(eq.model_ref) if model is None: report.warnings.append( f"Equipo '{eq.id}' referencia model_ref='{eq.model_ref}' " "que no existe en biblioteca cargada." ) continue target_slot = card_by_eq.get(eq.id) or aux_slot if target_slot is None: continue for sensor in model.default_sensors: if sensor.default_signal_type is None: # Sin tipo de señal, no podemos hacer binding físico report.n_skipped += 1 continue signal = sensor.default_signal_type channel = _SIGNAL_TO_CHANNEL.get(signal) if channel is None: report.n_skipped += 1 continue slot = target_slot if not slot.has_space(channel): # Si la tarjeta dedicada se llenó, intentar la auxiliar if aux_slot and aux_slot is not slot and aux_slot.has_space(channel): slot = aux_slot else: report.warnings.append( f"Sensor {eq.tag_prefix}.{sensor.id} no asignado: " f"sin capacidad {channel.value} en tarjetas dedicadas ni aux." ) continue ch_num = slot.take(channel) scaling = _default_scaling(signal, sensor) binding = TagBinding( card_id=slot.card.id, channel_type=channel, channel_number=ch_num, signal_type=signal, scaling=scaling, filter=FilterType.MOVING_AVG if channel == ChannelType.AI else FilterType.NONE, filter_param=4.0 if channel == ChannelType.AI else None, update_rate_ms=200 if channel == ChannelType.AI else 100, ) tag_id = f"{eq.tag_prefix}.{sensor.id.upper()}" # Tags controllable (relés DO) marcados como tales controllable = channel == ChannelType.DO from vmssailor.core.enums import ( AuthorityRequired, ControlMode, UnitSI, ) try: unit_si = UnitSI(sensor.unit_si.value) except Exception: unit_si = UnitSI.NONE tag = Tag( id=tag_id, equipment_id=eq.id, description=f"{sensor.name} ({eq.display_name})", unit_si=unit_si, range_normal_min=sensor.range_normal_min, range_normal_max=sensor.range_normal_max, controllable=controllable, control_mode=ControlMode.MANUAL if controllable else ControlMode.MONITOR, authority_required=( AuthorityRequired.BRIDGE if controllable else AuthorityRequired.EITHER ), protocol=Protocol.MODBUS_RTU, physical_binding=binding, ) report.tags.append(tag) report.cards = [s.card for s in cards] return report