"""Generate firmware C++ header and Python module from modbus_registers.yaml. The YAML at ``firmware/ar_autopilot_v1/modbus_registers.yaml`` is the single source of truth for the Modbus RTU register map shared between the ESP32 firmware and the Python tooling / Studio simulator. This script regenerates: - ``firmware/ar_autopilot_v1/src/protocols/modbus_registers.h`` - ``arautopilot/shared/modbus_register_map.py`` Run from the repository root: python tools/gen_modbus_registers.py """ from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path from typing import Any import yaml REPO_ROOT = Path(__file__).resolve().parent.parent YAML_PATH = REPO_ROOT / "firmware/ar_autopilot_v1/modbus_registers.yaml" HEADER_PATH = ( REPO_ROOT / "firmware/ar_autopilot_v1/src/protocols/modbus_registers.h" ) PY_PATH = REPO_ROOT / "arautopilot/shared/modbus_register_map.py" @dataclass(frozen=True) class Entry: addr: int name: str desc: str unit: str = "" scale: float = 1.0 offset: float = 0.0 @dataclass(frozen=True) class RegisterMap: schema_version: str slave_id: int baudrate: int parity: str data_bits: int stop_bits: int discretes: list[Entry] coils: list[Entry] inputs: list[Entry] holdings: list[Entry] def load_yaml(path: Path) -> RegisterMap: raw: dict[str, Any] = yaml.safe_load(path.read_text(encoding="utf-8")) def to_entries(items: list[dict[str, Any]]) -> list[Entry]: out: list[Entry] = [] seen: set[int] = set() for item in items: addr = int(item["addr"]) if addr in seen: raise ValueError(f"duplicate address {addr} in {path.name}") seen.add(addr) out.append( Entry( addr=addr, name=str(item["name"]), desc=str(item.get("desc", "")), unit=str(item.get("unit", "")), scale=float(item.get("scale", 1.0)), offset=float(item.get("offset", 0.0)), ) ) out.sort(key=lambda e: e.addr) return out return RegisterMap( schema_version=str(raw["schema_version"]), slave_id=int(raw["slave_id"]), baudrate=int(raw["baudrate"]), parity=str(raw["parity"]), data_bits=int(raw["data_bits"]), stop_bits=int(raw["stop_bits"]), discretes=to_entries(raw.get("discretes", [])), coils=to_entries(raw.get("coils", [])), inputs=to_entries(raw.get("inputs", [])), holdings=to_entries(raw.get("holdings", [])), ) def _max_addr(entries: list[Entry]) -> int: return max((e.addr for e in entries), default=-1) def render_header(rm: RegisterMap) -> str: lines: list[str] = [] a = lines.append a("// =============================================================================") a("// modbus_registers.h -- AR-Autopilot Modbus RTU register map") a("// =============================================================================") a("//") a("// AUTO-GENERATED. Do not edit by hand.") a("// Source: firmware/ar_autopilot_v1/modbus_registers.yaml") a("// Regenerate with: python tools/gen_modbus_registers.py") a("// =============================================================================") a("") a("#pragma once") a("") a("#include ") a("") a("namespace arautopilot::protocols::modbus {") a("") a(f'constexpr const char* MAP_SCHEMA_VERSION = "{rm.schema_version}";') a(f"constexpr uint8_t SLAVE_ID = {rm.slave_id};") a(f"constexpr uint32_t BAUDRATE = {rm.baudrate};") a(f'constexpr char PARITY = \'{rm.parity}\';') a(f"constexpr uint8_t DATA_BITS = {rm.data_bits};") a(f"constexpr uint8_t STOP_BITS = {rm.stop_bits};") a("") def block(kind: str, prefix: str, entries: list[Entry]) -> None: a(f"// ----- {kind} -----") a(f"constexpr uint16_t {prefix}_COUNT = {len(entries)};") max_a = _max_addr(entries) a(f"constexpr uint16_t {prefix}_MAX_ADDR = {max_a};") a("") for e in entries: a(f"// {e.desc}") extras = [] if e.unit: extras.append(f"unit={e.unit}") if e.scale != 1.0: extras.append(f"scale={e.scale}") if e.offset != 0.0: extras.append(f"offset={e.offset}") if extras: a("// " + ", ".join(extras)) a(f"constexpr uint16_t {prefix}_{e.name} = {e.addr};") a("") block("Discrete inputs (read-only bits)", "DISCRETE", rm.discretes) block("Coils (read-write bits)", "COIL", rm.coils) block("Input registers (read-only words)", "INPUT", rm.inputs) block("Holding registers (read-write words)", "HOLDING", rm.holdings) a("} // namespace arautopilot::protocols::modbus") a("") return "\n".join(lines) def render_python(rm: RegisterMap) -> str: lines: list[str] = [] a = lines.append a('"""AR-Autopilot Modbus RTU register map -- generated mirror of the firmware.') a("") a("AUTO-GENERATED. Do not edit by hand.") a("Source: firmware/ar_autopilot_v1/modbus_registers.yaml") a("Regenerate with: python tools/gen_modbus_registers.py") a('"""') a("") a("from __future__ import annotations") a("") a("from dataclasses import dataclass") a("") a("") a("@dataclass(frozen=True)") a("class Reg:") a(' """One Modbus register entry as seen from Python tooling."""') a(" addr: int") a(" name: str") a(" desc: str = \"\"") a(" unit: str = \"\"") a(" scale: float = 1.0") a(" offset: float = 0.0") a("") a("") a(f'MAP_SCHEMA_VERSION = "{rm.schema_version}"') a(f"SLAVE_ID = {rm.slave_id}") a(f"BAUDRATE = {rm.baudrate}") a(f'PARITY = "{rm.parity}"') a(f"DATA_BITS = {rm.data_bits}") a(f"STOP_BITS = {rm.stop_bits}") a("") def block(group_name: str, entries: list[Entry]) -> None: a(f"# {group_name.upper()}") a(f"{group_name.upper()}: dict[str, Reg] = {{") for e in entries: a( f' "{e.name}": Reg(' f"addr={e.addr}, " f'name="{e.name}", ' f'desc={e.desc!r}, ' f'unit="{e.unit}", ' f"scale={e.scale}, " f"offset={e.offset})," ) a("}") a("") block("discretes", rm.discretes) block("coils", rm.coils) block("inputs", rm.inputs) block("holdings", rm.holdings) a("ALL_GROUPS: dict[str, dict[str, Reg]] = {") a(' "discretes": DISCRETES,') a(' "coils": COILS,') a(' "inputs": INPUTS,') a(' "holdings": HOLDINGS,') a("}") a("") return "\n".join(lines) def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--check", action="store_true", help="Exit non-zero if the generated files are out of date; do not write.", ) args = parser.parse_args(argv) rm = load_yaml(YAML_PATH) header = render_header(rm) pymod = render_python(rm) if args.check: ok = True for path, expected in ((HEADER_PATH, header), (PY_PATH, pymod)): actual = path.read_text(encoding="utf-8") if path.exists() else "" if actual != expected: print(f"OUT-OF-DATE: {path.relative_to(REPO_ROOT)}", file=sys.stderr) ok = False return 0 if ok else 1 HEADER_PATH.parent.mkdir(parents=True, exist_ok=True) PY_PATH.parent.mkdir(parents=True, exist_ok=True) HEADER_PATH.write_text(header, encoding="utf-8") PY_PATH.write_text(pymod, encoding="utf-8") print(f"wrote {HEADER_PATH.relative_to(REPO_ROOT)}") print(f"wrote {PY_PATH.relative_to(REPO_ROOT)}") print( f" discretes={len(rm.discretes)} coils={len(rm.coils)} " f"inputs={len(rm.inputs)} holdings={len(rm.holdings)}" ) return 0 if __name__ == "__main__": raise SystemExit(main())