"""Tests del driver NMEA 2000 (Sprint 5 stub).""" from __future__ import annotations import asyncio import pytest from vmssailor.core.enums import Protocol, UnitSI from vmssailor.core.tag import Tag from vmssailor.runtime.server.nmea2000 import ( PGN_ATTITUDE, PGN_ENGINE_RAPID, N2KPublisher, N2KSubscriber, PgnFrame, encode_pgn_attitude, encode_pgn_engine_rapid, parse_pgn_attitude, parse_pgn_engine_rapid, ) from vmssailor.runtime.server.tag_store import TagStore def test_attitude_pgn_roundtrip(): data = encode_pgn_attitude(yaw_deg=15.5, pitch_deg=-2.3, roll_deg=8.1) parsed = parse_pgn_attitude(data) assert abs(parsed["roll_deg"] - 8.1) < 0.1 assert abs(parsed["pitch_deg"] - (-2.3)) < 0.1 assert abs(parsed["yaw_deg"] - 15.5) < 0.1 def test_engine_rapid_pgn_roundtrip(): data = encode_pgn_engine_rapid(instance=0, rpm=1520.5) parsed = parse_pgn_engine_rapid(data) assert parsed["instance"] == 0 assert abs(parsed["rpm"] - 1520.5) < 1.0 @pytest.mark.asyncio async def test_n2k_publisher_emits_engine_rapid(): store = TagStore() store.register_tag(Tag(id="ME_PORT.RPM", unit_si=UnitSI.RPM, protocol=Protocol.MODBUS_RTU, address=1)) frames: list[PgnFrame] = [] pub = N2KPublisher(store, transport=frames.append) await pub.start(period_s=0.05) try: await store.update("ME_PORT.RPM", 1500.0) await asyncio.sleep(0.2) engine_frames = [f for f in frames if f.pgn == PGN_ENGINE_RAPID] assert len(engine_frames) >= 1 finally: await pub.stop() @pytest.mark.asyncio async def test_n2k_subscriber_handles_attitude(): store = TagStore() store.register_tag(Tag(id="VESSEL.ROLL_DEG", unit_si=UnitSI.DEGREE, protocol=Protocol.INTERNAL)) store.register_tag(Tag(id="VESSEL.PITCH_DEG", unit_si=UnitSI.DEGREE, protocol=Protocol.INTERNAL)) sub = N2KSubscriber(store) await sub.start() try: data = encode_pgn_attitude(yaw_deg=0.0, pitch_deg=-2.0, roll_deg=5.0) sub.inject_frame( PgnFrame( pgn=PGN_ATTITUDE, source=0, data=data, timestamp=__import__("datetime").datetime.now(__import__("datetime").timezone.utc), ) ) await asyncio.sleep(0.1) roll = store.get("VESSEL.ROLL_DEG") pitch = store.get("VESSEL.PITCH_DEG") assert roll is not None and roll.value is not None assert abs(float(roll.value) - 5.0) < 0.1 assert pitch is not None and pitch.value is not None assert abs(float(pitch.value) - (-2.0)) < 0.1 finally: await sub.stop()