"""Manual Modbus RTU client for poking the AR-Autopilot ESP32 slave. Usage (with the AR-NMEA-IO connected via a USB-to-RS485 adapter): python firmware/ar_autopilot_v1/tools/modbus_client_test.py \ --port COM7 It does NOT require any extra dependency beyond ``pymodbus``, which is NOT installed by default. Install on demand with: pip install 'pymodbus>=3.6,<4' The script: 1. Connects to the slave at the baudrate / framing declared in ``arautopilot.shared.modbus_register_map``. 2. Reads firmware version and uptime (input registers). 3. Reads the current pilot mode. 4. Reads the rudder angle (input register). 5. Writes a heading setpoint to a holding register, then reads it back. 6. Pulses the disengage coil and verifies the discrete bit goes high (PILOT_ENGAGED is expected to remain false in Sprint 1). """ from __future__ import annotations import argparse import sys import time from pathlib import Path # Make the project package importable when run from a fresh shell. REPO_ROOT = Path(__file__).resolve().parents[3] sys.path.insert(0, str(REPO_ROOT)) from arautopilot.shared import modbus_register_map as m # noqa: E402 try: from pymodbus.client import ModbusSerialClient except ImportError: sys.stderr.write( "ERROR: pymodbus is not installed. Run:\n" " pip install 'pymodbus>=3.6,<4'\n" ) sys.exit(2) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--port", required=True, help="Serial port (e.g. COM7 or /dev/ttyUSB0)") parser.add_argument("--baudrate", type=int, default=m.BAUDRATE) parser.add_argument("--slave-id", type=int, default=m.SLAVE_ID) parser.add_argument("--write-setpoint", type=float, default=180.0, help="Heading setpoint to write, in degrees (default 180.0)") args = parser.parse_args() print(f"[connect] {args.port} @ {args.baudrate} 8{m.PARITY}1, slave={args.slave_id}") client = ModbusSerialClient( port=args.port, baudrate=args.baudrate, parity=m.PARITY, stopbits=m.STOP_BITS, bytesize=m.DATA_BITS, timeout=1.0, ) if not client.connect(): print("[connect] FAILED") return 1 try: # --- Firmware version + uptime --------------------------------------- rr = client.read_input_registers( address=m.INPUTS["FW_VERSION_MAJOR"].addr, count=6, slave=args.slave_id ) if rr.isError(): print("[error] read input registers (version): ", rr) return 1 major, minor, patch, schema, up_lo, up_hi = rr.registers uptime_s = up_lo | (up_hi << 16) print( f"[fw ] version v{major}.{minor}.{patch} schema={schema} " f"uptime={uptime_s}s" ) # --- Mode ----------------------------------------------------------- rr = client.read_input_registers( address=m.INPUTS["CURRENT_MODE"].addr, count=1, slave=args.slave_id ) mode_names = {0: "STANDBY", 1: "HEADING_HOLD", 2: "TRUE_COURSE", 3: "TRACK_KEEPING", 4: "DODGE"} print(f"[mode ] {mode_names.get(rr.registers[0], '?')} ({rr.registers[0]})") # --- Rudder angle --------------------------------------------------- rr = client.read_input_registers( address=m.INPUTS["RUDDER_ANGLE_DEG_X100"].addr, count=3, slave=args.slave_id ) angle_x100, raw_adc, valid = rr.registers # interpret as signed int16 if angle_x100 >= 0x8000: angle_x100 -= 0x10000 print( f"[rudd ] angle={angle_x100 / 100.0:+.2f} deg raw_adc={raw_adc} " f"valid={'yes' if valid else 'no'}" ) # --- Heading + ROT (Sprint 1 wires these via NMEA 2000) ------------- rr = client.read_input_registers( address=m.INPUTS["HEADING_DEG_X100"].addr, count=3, slave=args.slave_id ) heading_x100, rot_x100, age_ms = rr.registers if rot_x100 >= 0x8000: rot_x100 -= 0x10000 print( f"[n2k ] heading={heading_x100 / 100.0:7.2f} deg " f"rot={rot_x100 / 100.0:+.2f} deg/s age={age_ms} ms" ) # --- Write heading setpoint ----------------------------------------- sp_x100 = int(round(args.write_setpoint * 100.0)) if sp_x100 < -0x8000 or sp_x100 > 0x7FFF: print(f"[error] setpoint out of int16 range") return 1 wr_value = sp_x100 if sp_x100 >= 0 else (sp_x100 + 0x10000) wr = client.write_register( address=m.HOLDINGS["HEADING_SETPOINT_X100"].addr, value=wr_value, slave=args.slave_id, ) if wr.isError(): print(f"[error] write holding (heading setpoint): {wr}") return 1 rr = client.read_holding_registers( address=m.HOLDINGS["HEADING_SETPOINT_X100"].addr, count=1, slave=args.slave_id, ) read_back = rr.registers[0] if read_back >= 0x8000: read_back -= 0x10000 print( f"[hold ] wrote heading setpoint {args.write_setpoint:.2f} deg, " f"read back {read_back / 100.0:+.2f} deg" ) # --- Disengage coil pulse ------------------------------------------- client.write_coil( address=m.COILS["CMD_DISENGAGE_REQUEST"].addr, value=True, slave=args.slave_id, ) time.sleep(0.05) rr = client.read_discrete_inputs( address=m.DISCRETES["PILOT_ENGAGED"].addr, count=1, slave=args.slave_id ) print( f"[coil ] disengage pulsed; PILOT_ENGAGED = " f"{'YES (unexpected)' if rr.bits[0] else 'no (correct -- Sprint 1 is always STANDBY)'}" ) return 0 finally: client.close() if __name__ == "__main__": raise SystemExit(main())