Source code for openlifu.io.LIFUDFU

"""LIFU Transmitter Firmware Update (DFU) support.

Provides:
  - :func:`stm32_crc32`          — STM32-compatible CRC32
  - :func:`parse_signed_package` — parse/validate a signed firmware package
  - :class:`STM32USBDFU`         — USB DFU client (PyUSB, for module 0)
  - :class:`STM32I2CDFUviaMaster`— I2C DFU via OW UART master passthrough (modules 1+)
  - :class:`LIFUDFUManager`      — high-level firmware update orchestration
"""

from __future__ import annotations

import logging
import struct
import time
import unittest
from typing import TYPE_CHECKING, Callable

from openlifu.io.LIFUConfig import OW_ERROR, OW_I2C_PASSTHRU

if TYPE_CHECKING:
    from openlifu.io.LIFUUart import LIFUUart

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Optional USB DFU dependencies (module 0 only)
# ---------------------------------------------------------------------------
try:
    import usb.backend.libusb1 as _usb_libusb1
    import usb.core as _usb_core
    import usb.util as _usb_util
    _USB_DFU_AVAILABLE = True
except ImportError:
    _usb_core = None
    _usb_util = None
    _usb_libusb1 = None
    _USB_DFU_AVAILABLE = False

try:
    import libusb_package as _libusb_package
except ImportError:
    _libusb_package = None

# ---------------------------------------------------------------------------
# DFU protocol constants (shared by USB and I2C paths)
# ---------------------------------------------------------------------------

# USB DFU virtual addresses (must match usbd_dfu_if.c)
USB_DFU_VERSION_VIRT_ADDR = 0xFFFFFF00
USB_DFU_VERSION_READ_LEN  = 64

# I2C DFU command bytes (must match i2c_dfu_if.h)
I2C_DFU_SLAVE_ADDR      = 0x72
I2C_DFU_CMD_DNLOAD      = 0x01
I2C_DFU_CMD_ERASE       = 0x02
I2C_DFU_CMD_GETSTATUS   = 0x03
I2C_DFU_CMD_MANIFEST    = 0x04
I2C_DFU_CMD_RESET       = 0x05
I2C_DFU_CMD_GETVERSION  = 0x06
I2C_DFU_STATUS_OK       = 0x00
I2C_DFU_STATUS_BUSY     = 0x01
I2C_DFU_STATUS_ERROR    = 0x02
I2C_DFU_STATUS_BAD_ADDR = 0x03
I2C_DFU_STATUS_FLASH_ERR= 0x04
I2C_DFU_STATE_DNBUSY    = 0x01
I2C_DFU_STATE_ERROR     = 0x04
# Maximum data bytes per write_block call.  The enclosing OW_I2C_PASSTHRU UART
# packet carries (1 cmd + 4 addr + 2 len) = 7 bytes of I2C-DFU header, so the
# total packet payload is I2C_DFU_MAX_XFER_SIZE + 7.  The master firmware hard-
# rejects any UART packet with data_len > DATA_MAX_SIZE (2048), so this value
# must be ≤ 2041.  Use 512 for a safe, standard I2C block size.
I2C_DFU_MAX_XFER_SIZE   = 512
I2C_DFU_VERSION_STR_MAX = 32

# OW_I2C_PASSTHRU sub-commands (must match firmware if_commands.c handler)
_PASSTHRU_WRITE       = 0x00   # write only
_PASSTHRU_WRITE_READ  = 0x01   # write then delay 5 ms then read

# Signed package format (must match dfu-test.py)
_PKG_MAGIC        = 0x314B4750   # 'PGK1'
_PKG_VERSION      = 1
_PKG_HDR_NOCRC    = "<IHHIIIII"
_PKG_HDR_FULL     = "<IHHIIIIII"


# ---------------------------------------------------------------------------
# Package helpers
# ---------------------------------------------------------------------------

[docs] def stm32_crc32(data: bytes, init: int = 0xFFFFFFFF) -> int: """Compute CRC32 compatible with the STM32 CRC peripheral (poly=0x04C11DB7).""" poly = 0x04C11DB7 crc = init & 0xFFFFFFFF for b in data: crc ^= (b & 0xFF) << 24 for _ in range(8): if crc & 0x80000000: crc = ((crc << 1) ^ poly) & 0xFFFFFFFF else: crc = (crc << 1) & 0xFFFFFFFF return crc
[docs] def parse_signed_package(pkg: bytes) -> dict: """Parse and integrity-check a signed firmware package. Returns a dict with keys: ``fw_address``, ``meta_address``, ``fw``, ``meta``. Raises: ValueError: If the package is malformed or any CRC fails. """ hdr_size = struct.calcsize(_PKG_HDR_FULL) if len(pkg) < hdr_size: raise ValueError("signed package too small") (magic, version, declared_hdr_size, fw_address, fw_len, meta_address, meta_len, payload_crc, header_crc) = struct.unpack(_PKG_HDR_FULL, pkg[:hdr_size]) if magic != _PKG_MAGIC: raise ValueError(f"signed package magic mismatch: 0x{magic:08X}") if version != _PKG_VERSION: raise ValueError(f"signed package version mismatch: {version}") if declared_hdr_size != hdr_size: raise ValueError("signed package header size mismatch") calc_hdr_crc = stm32_crc32(pkg[:hdr_size - 4]) if header_crc != calc_hdr_crc: raise ValueError( f"header CRC mismatch: pkg=0x{header_crc:08X}, calc=0x{calc_hdr_crc:08X}" ) payload_len = fw_len + meta_len payload = pkg[hdr_size:] if len(payload) != payload_len: raise ValueError( f"payload size mismatch: expected {payload_len}, got {len(payload)}" ) calc_payload_crc = stm32_crc32(payload) if payload_crc != calc_payload_crc: raise ValueError( f"payload CRC mismatch: pkg=0x{payload_crc:08X}, calc=0x{calc_payload_crc:08X}" ) return { "fw_address": fw_address, "meta_address": meta_address, "fw": payload[:fw_len], "meta": payload[fw_len:], }
# --------------------------------------------------------------------------- # Internal tests for DFU package parsing / CRC (deterministic, no hardware) # --------------------------------------------------------------------------- def _build_synthetic_signed_package( fw: bytes, meta: bytes, fw_address: int = 0x08000000, meta_address: int = 0x08008000, ) -> bytes: """Construct a minimal, self-consistent signed DFU package for testing. This uses the module's own header layout/CRC implementation so that tests validate :func:`stm32_crc32` and :func:`parse_signed_package` end to end. """ hdr_size = struct.calcsize(_PKG_HDR_FULL) payload = fw + meta fw_len = len(fw) meta_len = len(meta) payload_crc = stm32_crc32(payload) # First pack with a placeholder header CRC so we can compute the real one. header_crc_placeholder = 0 header = struct.pack( _PKG_HDR_FULL, _PKG_MAGIC, _PKG_VERSION, hdr_size, fw_address, fw_len, meta_address, meta_len, payload_crc, header_crc_placeholder, ) header_crc = stm32_crc32(header[:-4]) header = struct.pack( _PKG_HDR_FULL, _PKG_MAGIC, _PKG_VERSION, hdr_size, fw_address, fw_len, meta_address, meta_len, payload_crc, header_crc, ) return header + payload
[docs] class TestSignedPackage(unittest.TestCase): """Unit tests for :func:`stm32_crc32` and :func:`parse_signed_package`. These tests are deterministic and require no hardware; they can be run by any standard Python test runner to guard against regressions that might otherwise risk bricking devices during DFU. """ def test_parse_signed_package_valid(self) -> None: fw = b"\x01\x02\x03\x04" meta = b"\xAA\xBB" fw_addr = 0x08001000 meta_addr = 0x08009000 pkg = _build_synthetic_signed_package( fw=fw, meta=meta, fw_address=fw_addr, meta_address=meta_addr, ) parsed = parse_signed_package(pkg) assert parsed["fw_address"] == fw_addr assert parsed["meta_address"] == meta_addr assert parsed["fw"] == fw assert parsed["meta"] == meta
[docs] def test_parse_signed_package_header_crc_mismatch(self) -> None: """Corrupt the header so header CRC verification fails.""" fw = b"\x10\x20" meta = b"\x30" pkg = _build_synthetic_signed_package(fw=fw, meta=meta) # Flip a bit inside the header (but keep magic/version/size plausible). pkg_bytes = bytearray(pkg) if len(pkg_bytes) < 8: self.skipTest("synthetic package unexpectedly small") pkg_bytes[4] ^= 0x01 corrupted = bytes(pkg_bytes) exc_msg = None try: parse_signed_package(corrupted) raise AssertionError("Expected ValueError was not raised") except ValueError as exc: exc_msg = str(exc) assert exc_msg is not None assert "header CRC mismatch" in exc_msg
[docs] def test_parse_signed_package_payload_crc_mismatch(self) -> None: """Corrupt the payload so payload CRC verification fails.""" fw = b"\xDE\xAD\xBE\xEF" meta = b"\x00\x01" pkg = _build_synthetic_signed_package(fw=fw, meta=meta) hdr_size = struct.calcsize(_PKG_HDR_FULL) pkg_bytes = bytearray(pkg) # Flip a bit in the first payload byte (after the header). if len(pkg_bytes) <= hdr_size: self.skipTest("synthetic package unexpectedly small") pkg_bytes[hdr_size] ^= 0x01 corrupted = bytes(pkg_bytes) exc_msg = None try: parse_signed_package(corrupted) raise AssertionError("Expected ValueError was not raised") except ValueError as exc: exc_msg = str(exc) assert exc_msg is not None assert "payload CRC mismatch" in exc_msg
# --------------------------------------------------------------------------- # USB DFU client (module 0) # ---------------------------------------------------------------------------
[docs] class STM32USBDFU: """Minimal STM32 DfuSe USB client using PyUSB. Supports Set-Address-Pointer, page erase, memory write and DFU UPLOAD (used to read the bootloader version string). Requires: ``pip install pyusb`` plus a libusb-1.0 backend. """ # DFU class requests DFU_DNLOAD = 1 DFU_UPLOAD = 2 DFU_GETSTATUS = 3 DFU_CLRSTATUS = 4 DFU_ABORT = 6 # DfuSe DNLOAD block 0 sub-commands CMD_SET_ADDRESS_POINTER = 0x21 CMD_ERASE = 0x41 # DFU state values STATE_DFU_DNLOAD_SYNC = 3 STATE_DFU_DNLOAD_BUSY = 4 STATE_DFU_DNLOAD_IDLE = 5 STATE_DFU_MANIFEST_SYNC = 6 STATE_DFU_MANIFEST = 7 STATE_DFU_MANIFEST_WAIT_RESET = 8 STATE_DFU_ERROR = 10 def __init__(self, vid: int = 0x0483, pid: int = 0xDF11, transfer_size: int = 1024, timeout_ms: int = 4000, libusb_dll: str | None = None): if not _USB_DFU_AVAILABLE: raise RuntimeError( "PyUSB not available. Install with: pip install pyusb" ) self.vid = vid self.pid = pid self.transfer_size = transfer_size self.timeout_ms = timeout_ms self.libusb_dll = libusb_dll self.dev = None self.intf = None self._backend = None def _get_backend(self): if self._backend is not None: return self._backend if self.libusb_dll: self._backend = _usb_libusb1.get_backend( find_library=lambda _: self.libusb_dll ) elif _libusb_package is not None: self._backend = _usb_libusb1.get_backend( find_library=_libusb_package.find_library ) else: self._backend = _usb_libusb1.get_backend() return self._backend
[docs] def get_backend(self): """Public accessor for the libusb backend (avoids protected-member access).""" return self._get_backend()
def open(self) -> STM32USBDFU: self.dev = _usb_core.find( idVendor=self.vid, idProduct=self.pid, backend=self._get_backend() ) if self.dev is None: raise RuntimeError( f"USB DFU device not found: VID=0x{self.vid:04X}, PID=0x{self.pid:04X}" ) self.dev.set_configuration() cfg = self.dev.get_active_configuration() for intf in cfg: if (intf.bInterfaceClass == 0xFE and intf.bInterfaceSubClass == 0x01 and intf.bInterfaceProtocol == 0x02): self.intf = intf break if self.intf is None: raise RuntimeError("No DFU interface found on USB device") try: if self.dev.is_kernel_driver_active(self.intf.bInterfaceNumber): self.dev.detach_kernel_driver(self.intf.bInterfaceNumber) except (NotImplementedError, _usb_core.USBError) as e: logger.debug("kernel driver detach not supported or failed: %s", e) _usb_util.claim_interface(self.dev, self.intf.bInterfaceNumber) self._clear_error_state() return self def close(self) -> None: if self.dev is not None and self.intf is not None: try: _usb_util.release_interface(self.dev, self.intf.bInterfaceNumber) except _usb_core.USBError as e: logger.exception("error closing interface: %s", e) _usb_util.dispose_resources(self.dev) self.dev = None self.intf = None def __enter__(self) -> STM32USBDFU: return self.open() def __exit__(self, *args) -> None: self.close() # --- low-level USB control transfers --- def _ctrl_out(self, req: int, value: int, data: bytes = b"") -> int: return self.dev.ctrl_transfer( 0x21, req, value, self.intf.bInterfaceNumber, data, timeout=self.timeout_ms ) def _ctrl_in(self, req: int, value: int, length: int) -> bytes: return bytes(self.dev.ctrl_transfer( 0xA1, req, value, self.intf.bInterfaceNumber, length, timeout=self.timeout_ms )) def get_status(self) -> dict: raw = self._ctrl_in(self.DFU_GETSTATUS, 0, 6) poll_ms = raw[1] | (raw[2] << 8) | (raw[3] << 16) return {"status": raw[0], "poll_timeout_ms": poll_ms, "state": raw[4]} def clear_status(self) -> None: self._ctrl_out(self.DFU_CLRSTATUS, 0, b"") def abort(self) -> None: self._ctrl_out(self.DFU_ABORT, 0, b"") def _clear_error_state(self) -> None: for _ in range(3): st = self.get_status() if st["state"] != self.STATE_DFU_ERROR: break self.clear_status() def _recover_idle(self) -> None: for _ in range(4): st = self.get_status() if st["state"] in ( self.STATE_DFU_DNLOAD_IDLE, self.STATE_DFU_MANIFEST_WAIT_RESET, ): self.abort() elif st["state"] == self.STATE_DFU_ERROR: self.clear_status() else: break def _wait_while_busy(self, timeout_s: float = 30.0) -> dict: busy = { self.STATE_DFU_DNLOAD_SYNC, self.STATE_DFU_DNLOAD_BUSY, self.STATE_DFU_MANIFEST_SYNC, self.STATE_DFU_MANIFEST, } deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: st = self.get_status() if st["state"] not in busy: return st time.sleep(max(st["poll_timeout_ms"] / 1000.0, 0.005)) raise TimeoutError(f"USB DFU device stuck in busy state after {timeout_s:.0f} s") def _dnload(self, block_num: int, payload: bytes) -> dict: self._recover_idle() try: self._ctrl_out( self.DFU_DNLOAD, block_num, bytes(payload) if payload else b"" ) except _usb_core.USBError as e: # errno 110 = ETIMEDOUT (libusb); swallow only genuine transfer timeouts if e.errno != 110: raise logger.debug("_dnload timeout on block %d, polling status anyway: %s", block_num, e) return self._wait_while_busy() def _set_address(self, address: int) -> None: payload = bytes([self.CMD_SET_ADDRESS_POINTER]) + struct.pack("<I", address) self._dnload(0, payload) def _erase_page(self, address: int) -> None: payload = bytes([self.CMD_ERASE]) + struct.pack("<I", address) self._dnload(0, payload)
[docs] def get_version(self) -> str: """Read bootloader version string via DFU UPLOAD from the virtual address.""" self._set_address(USB_DFU_VERSION_VIRT_ADDR) self.abort() raw = self._ctrl_in(self.DFU_UPLOAD, 2, USB_DFU_VERSION_READ_LEN) try: self._wait_while_busy() except (TimeoutError, RuntimeError) as e: logger.warning("busy wait error: %s", e) self.abort() return raw.rstrip(b"\x00").decode("ascii", errors="replace")
[docs] def write_memory(self, address: int, data: bytes, page_erase: bool = True, progress_callback: Callable | None = None) -> None: """Write data to target flash, optionally erasing each 2 KB page first.""" total = len(data) page_size = 2048 self._recover_idle() self._set_address(address) block = 2 written = 0 for offset in range(0, total, self.transfer_size): chunk = data[offset:offset + self.transfer_size] if page_erase and (offset % page_size == 0): self._erase_page(address + offset) self._dnload(block, chunk) block += 1 written += len(chunk) if progress_callback: progress_callback(written, total, "USB DFU write") self.abort()
[docs] def manifest(self) -> None: """Send zero-length DNLOAD to trigger DFU manifestation (launches firmware).""" self._recover_idle() try: self._ctrl_out(self.DFU_DNLOAD, 0, b"") self._wait_while_busy() except (_usb_core.USBError, TimeoutError) as e: logger.debug("device disconnect/timeout during manifest: %s", e)
# --------------------------------------------------------------------------- # I2C DFU client via OW master passthrough (modules 1+) # ---------------------------------------------------------------------------
[docs] class STM32I2CDFUviaMaster: """I2C DFU client that routes all I2C transactions through the USB-master module via the ``OW_I2C_PASSTHRU`` UART packet type. The master firmware receives the passthrough request and executes the raw I2C write (and optional read) on the global I2C bus toward the slave DFU bootloader at *i2c_addr* (default 0x72). Packet wire format used:: packetType = OW_I2C_PASSTHRU (0xE9) addr = 7-bit I2C slave address command = 0x00 write-only = 0x01 write, 5 ms delay, read back <reserved> bytes reserved = number of bytes to read back (command 0x01 only, max 255) data = raw bytes to write """ def __init__(self, uart: LIFUUart, i2c_addr: int = I2C_DFU_SLAVE_ADDR, write_read_delay_s: float = 0.005): self._uart = uart self._addr = i2c_addr # --- low-level transport primitives ---
[docs] def _write(self, payload: bytes) -> None: """Send a write-only passthrough packet to the I2C slave.""" r = self._uart.send_packet( id=None, packetType=OW_I2C_PASSTHRU, command=_PASSTHRU_WRITE, addr=self._addr, reserved=0, data=payload, ) self._uart.clear_buffer() if r is None or r.packet_type == OW_ERROR: raise RuntimeError( f"I2C passthrough write failed (addr=0x{self._addr:02X}, " f"payload={payload[:8].hex()}...)" )
[docs] def _exchange(self, payload: bytes, read_len: int, pre_read_delay_s: float | None = None) -> bytes: """Write *payload* to the I2C slave and read *read_len* bytes back. The firmware executes a combined write+read transaction and inserts a fixed 5 ms gap between the write and read phases internally. The optional *pre_read_delay_s* parameter adds an extra host-side delay **before** issuing the passthrough transaction (i.e. before the firmware performs the write+read). This does *not* change the internal 5 ms gap handled by the firmware and is rarely needed. """ if pre_read_delay_s and pre_read_delay_s > 0: time.sleep(pre_read_delay_s) r = self._uart.send_packet( id=None, packetType=OW_I2C_PASSTHRU, command=_PASSTHRU_WRITE_READ, addr=self._addr, reserved=read_len, data=payload, ) self._uart.clear_buffer() if r is None or r.packet_type == OW_ERROR: raise RuntimeError( f"I2C passthrough exchange failed (addr=0x{self._addr:02X}, " f"want_rx={read_len})" ) actual = len(r.data) if r.data else 0 if actual < read_len: raise RuntimeError( f"I2C passthrough exchange returned too few bytes: " f"expected {read_len}, got {actual} " f"(addr=0x{self._addr:02X})" ) return bytes(r.data[:read_len])
# --- DFU protocol commands ---
[docs] def get_status(self) -> dict: """Send CMD_GETSTATUS and return status/state.""" raw = self._exchange(bytes([I2C_DFU_CMD_GETSTATUS]), 2) return {"status": raw[0], "state": raw[1]}
def _wait_while_busy(self, timeout_s: float = 10.0) -> dict: _ERROR_STATUSES = (I2C_DFU_STATUS_ERROR, I2C_DFU_STATUS_BAD_ADDR, I2C_DFU_STATUS_FLASH_ERR) deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: st = self.get_status() if st["state"] == I2C_DFU_STATE_ERROR or st["status"] in _ERROR_STATUSES: raise RuntimeError( f"I2C DFU error: status=0x{st['status']:02X}, " f"state=0x{st['state']:02X}" ) if (st["status"] != I2C_DFU_STATUS_BUSY and st["state"] != I2C_DFU_STATE_DNBUSY): return st time.sleep(0.020) raise TimeoutError(f"I2C DFU timed out after {timeout_s:.0f} s")
[docs] def erase_page(self, address: int) -> None: """Erase the flash page containing *address*.""" self._write(struct.pack("<BI", I2C_DFU_CMD_ERASE, address)) self._wait_while_busy(timeout_s=10.0)
[docs] def mass_erase(self) -> None: """Erase the entire application flash region (sentinel addr = 0xFFFFFFFF).""" self._write(struct.pack("<BI", I2C_DFU_CMD_ERASE, 0xFFFFFFFF)) self._wait_while_busy(timeout_s=120.0)
[docs] def write_block(self, address: int, data: bytes) -> None: """Program one block (≤ ``I2C_DFU_MAX_XFER_SIZE`` bytes).""" if not data: return payload = struct.pack("<BIH", I2C_DFU_CMD_DNLOAD, address, len(data)) + data self._write(payload) self._wait_while_busy(timeout_s=10.0)
[docs] def write_memory(self, address: int, data: bytes, progress_callback: Callable | None = None) -> None: """Write arbitrary-length data in ``I2C_DFU_MAX_XFER_SIZE``-byte chunks.""" total = len(data) written = 0 for offset in range(0, total, I2C_DFU_MAX_XFER_SIZE): chunk = data[offset:offset + I2C_DFU_MAX_XFER_SIZE] self.write_block(address + offset, chunk) written += len(chunk) if progress_callback: progress_callback(written, total, "I2C DFU write")
[docs] def manifest(self) -> None: """Send CMD_MANIFEST to finalise the download and lock flash.""" self._write(bytes([I2C_DFU_CMD_MANIFEST])) self._wait_while_busy(timeout_s=10.0)
[docs] def reset(self) -> None: """Send CMD_RESET; the device reboots immediately (no response).""" self._write(bytes([I2C_DFU_CMD_RESET]))
[docs] def get_version(self) -> str: """Read the null-terminated bootloader version string.""" read_len = 2 + I2C_DFU_VERSION_STR_MAX raw = self._exchange(bytes([I2C_DFU_CMD_GETVERSION]), read_len) if raw[0] not in (I2C_DFU_STATUS_OK, I2C_DFU_STATUS_BUSY): raise RuntimeError( f"I2C DFU GETVERSION failed: status=0x{raw[0]:02X}" ) return raw[2:].split(b"\x00")[0].decode("ascii", errors="replace")
# --------------------------------------------------------------------------- # High-level firmware update manager # ---------------------------------------------------------------------------
[docs] class LIFUDFUManager: """Orchestrates firmware updates for a single LIFU transmitter module. Usage:: from openlifu.io.LIFUDFU import LIFUDFUManager mgr = LIFUDFUManager(uart=txdevice.uart) mgr.update_module( module=1, package_file="path/to/lifu-transmitter-fw.bin.signed.bin", enter_dfu_fn=txdevice.enter_dfu, ) """ def __init__(self, uart: LIFUUart): self._uart = uart # --- per-transport helpers ---
[docs] def get_bootloader_version_usb(self, vid: int = 0x0483, pid: int = 0xDF11, libusb_dll: str | None = None) -> str: """Read bootloader version string from module 0 via USB DFU.""" with STM32USBDFU(vid=vid, pid=pid, libusb_dll=libusb_dll) as dfu: return dfu.get_version()
[docs] def get_bootloader_version_i2c(self, i2c_addr: int = I2C_DFU_SLAVE_ADDR) -> str: """Read bootloader version string from a slave module via I2C passthrough.""" dfu = STM32I2CDFUviaMaster(uart=self._uart, i2c_addr=i2c_addr) return dfu.get_version()
[docs] def program_usb(self, package_file: str, vid: int = 0x0483, pid: int = 0xDF11, libusb_dll: str | None = None, progress_callback: Callable | None = None) -> None: """Program a signed package to module 0 via USB DFU. The module must already be in DFU bootloader mode. """ with open(package_file, "rb") as f: pkg_blob = f.read() pkg = parse_signed_package(pkg_blob) logger.info( "USB DFU: fw %d B @ 0x%08X, meta %d B @ 0x%08X", len(pkg["fw"]), pkg["fw_address"], len(pkg["meta"]), pkg["meta_address"], ) with STM32USBDFU(vid=vid, pid=pid, libusb_dll=libusb_dll) as dfu: dfu.write_memory( pkg["fw_address"], pkg["fw"], page_erase=True, progress_callback=progress_callback ) dfu.write_memory( pkg["meta_address"], pkg["meta"], page_erase=True, progress_callback=progress_callback ) logger.info("USB DFU: sending manifest...") dfu.manifest() logger.info("USB DFU: programming complete.")
[docs] def program_i2c(self, package_file: str, i2c_addr: int = I2C_DFU_SLAVE_ADDR, progress_callback: Callable | None = None) -> None: """Program a signed package to a slave module via I2C passthrough. The slave must already be in DFU bootloader mode at *i2c_addr*. Sequence (mirrors dfu-i2c-test.py program-package): 1. Mass-erase the application flash region. 2. Erase the metadata page explicitly (it is outside the app region and is NOT touched by mass-erase). 3. Write the firmware payload. 4. Write the metadata blob. 5. Send CMD_MANIFEST. """ with open(package_file, "rb") as f: pkg_blob = f.read() pkg = parse_signed_package(pkg_blob) logger.info( "I2C DFU: fw %d B @ 0x%08X, meta %d B @ 0x%08X", len(pkg["fw"]), pkg["fw_address"], len(pkg["meta"]), pkg["meta_address"], ) dfu = STM32I2CDFUviaMaster(uart=self._uart, i2c_addr=i2c_addr) logger.info("I2C DFU: mass erasing application region...") dfu.mass_erase() logger.info("I2C DFU: erasing metadata page @ 0x%08X...", pkg["meta_address"]) dfu.erase_page(pkg["meta_address"]) dfu.write_memory( pkg["fw_address"], pkg["fw"], progress_callback=progress_callback ) logger.info("I2C DFU: writing metadata...") dfu.write_memory( pkg["meta_address"], pkg["meta"] ) logger.info("I2C DFU: sending manifest...") dfu.manifest() logger.info("I2C DFU: programming complete.")
[docs] def _wait_for_usb_dfu(self, vid: int, pid: int, libusb_dll: str | None, timeout_s: float = 30.0, poll_interval_s: float = 1.0) -> str: """Poll for the USB DFU device until it enumerates or *timeout_s* elapses. Returns the bootloader version string once the device is found. Raises RuntimeError if the device does not appear within the timeout. """ # Pre-flight: verify the libusb backend can be loaded before entering # the poll loop. If the DLL is missing or the path is wrong this fails # immediately with a clear message instead of silently timing out. _probe = STM32USBDFU(vid=vid, pid=pid, libusb_dll=libusb_dll) backend = _probe.get_backend() if backend is None: raise RuntimeError( "libusb backend not available — install libusb or pass --libusb-dll " "pointing to a valid libusb-1.0.dll." ) deadline = time.monotonic() + timeout_s attempt = 0 while time.monotonic() < deadline: attempt += 1 # Phase 1: check if the DFU device has appeared (no I/O yet). try: dev = _usb_core.find(idVendor=vid, idProduct=pid, backend=backend) except (_usb_core.USBError, OSError) as e: logger.warning("USB DFU find error (attempt %d): %s", attempt, e) time.sleep(poll_interval_s) continue if dev is None: remaining = deadline - time.monotonic() logger.debug( "USB DFU not found yet (attempt %d, %.0f s remaining)...", attempt, max(remaining, 0) ) time.sleep(poll_interval_s) continue # Phase 2: device is present — open it and read the version string. elapsed = timeout_s - (deadline - time.monotonic()) logger.info( "USB DFU device found after %.1f s (attempt %d)", elapsed, attempt ) try: with STM32USBDFU(vid=vid, pid=pid, libusb_dll=libusb_dll) as dfu: version = dfu.get_version() return version except (_usb_core.USBError, RuntimeError, TimeoutError) as e: # Device enumerated but version read failed (e.g. DFU state # machine not ready yet or bootloader doesn't support virtual # version address). Log visibly and return a placeholder so # the update can still proceed. logger.warning( "USB DFU device found but version read failed: %s — " "proceeding with version='unknown'", e ) return "unknown" raise RuntimeError( f"USB DFU device (VID=0x{vid:04X}, PID=0x{pid:04X}) did not " f"enumerate within {timeout_s:.0f} s" )
[docs] def update_module(self, module: int, package_file: str, enter_dfu_fn: Callable, vid: int = 0x0483, pid: int = 0xDF11, libusb_dll: str | None = None, i2c_addr: int = I2C_DFU_SLAVE_ADDR, dfu_wait_s: float = 3.0, dfu_enum_timeout_s: float = 30.0, progress_callback: Callable | None = None) -> None: """High-level firmware update for a single module. Steps: 1. Call *enter_dfu_fn(module=module)* to reboot into the bootloader. 2. Wait *dfu_wait_s* seconds (initial settling delay). 3. For module 0: poll for the USB DFU device until it enumerates (up to *dfu_enum_timeout_s*) then program. For modules 1+: poll the I2C DFU slave via passthrough then program. 4. Program the signed package. Module 0 (USB master) uses USB DFU. Modules 1+ use I2C DFU through the master's ``OW_I2C_PASSTHRU`` path, writing to *i2c_addr* (default 0x72). Args: module: Physical module index (0 = USB master). package_file: Path to the signed firmware package. enter_dfu_fn: Callable that triggers DFU mode, e.g. ``txdevice.enter_dfu``. vid: USB VID for module 0 USB DFU. pid: USB PID for module 0 USB DFU. libusb_dll: Optional path to libusb-1.0.dll (Windows). i2c_addr: I2C DFU slave address for modules 1+. dfu_wait_s: Initial settling delay after DFU-enter (default 3 s). dfu_enum_timeout_s: Total time to wait for the bootloader to appear (default 30 s). Includes *dfu_wait_s*. progress_callback: Optional ``(written, total, label)`` callable. Raises: RuntimeError: If DFU entry cannot be verified or programming fails. """ logger.info("Requesting DFU mode on module %d...", module) enter_dfu_fn(module=module) if dfu_wait_s > 0: logger.info("Initial DFU settling delay: %.1f s...", dfu_wait_s) time.sleep(dfu_wait_s) if module == 0: logger.info( "Waiting for USB DFU device (timeout %ds)...", dfu_enum_timeout_s ) try: bl_version = self._wait_for_usb_dfu( vid=vid, pid=pid, libusb_dll=libusb_dll, timeout_s=dfu_enum_timeout_s, ) except RuntimeError as e: raise RuntimeError( f"Module 0 did not enter USB DFU mode: {e}" ) from e logger.info("USB DFU bootloader version: %s", bl_version) self.program_usb( package_file, vid=vid, pid=pid, libusb_dll=libusb_dll, progress_callback=progress_callback, ) else: logger.info( "Verifying I2C DFU entry (module %d, addr=0x%02X via master)...", module, i2c_addr, ) start_time = time.time() bl_version = None last_error: Exception | None = None while True: elapsed = time.time() - start_time if elapsed >= dfu_enum_timeout_s: break try: candidate = self.get_bootloader_version_i2c(i2c_addr=i2c_addr) if candidate: bl_version = candidate break # Treat empty version string as a failure worth retrying. last_error = RuntimeError( "I2C DFU bootloader returned an empty version string" ) except (RuntimeError, TimeoutError) as e: last_error = e # Small delay before retrying to avoid busy-waiting. time.sleep(0.2) if not bl_version: if last_error is not None: raise RuntimeError( f"Module {module} did not enter I2C DFU mode at " f"0x{i2c_addr:02X} within {dfu_enum_timeout_s}s: {last_error}" ) from last_error raise RuntimeError( f"Module {module} did not enter I2C DFU mode at " f"0x{i2c_addr:02X} within {dfu_enum_timeout_s}s" ) logger.info("I2C DFU bootloader version: %s", bl_version) self.program_i2c( package_file, i2c_addr=i2c_addr, progress_callback=progress_callback, ) logger.info("Firmware update complete for module %d.", module)