Source code for openlifu.io.LIFUUart

from __future__ import annotations

import asyncio
import logging
import queue
import threading
import time
from contextlib import suppress

import serial
import serial.tools.list_ports

from openlifu.io.LIFUConfig import OW_ACK, OW_CMD_NOP, OW_DATA, OW_ERROR, OW_RESP
from openlifu.io.LIFUSignal import LIFUSignal

# Packet structure constants
OW_START_BYTE = 0xAA
OW_END_BYTE = 0xDD
ID_COUNTER = 0  # Initializing the ID counter

# Set up logging
log = logging.getLogger("UART")
log.setLevel(logging.ERROR)
log.propagate = False

handler = logging.StreamHandler()
handler.setLevel(logging.ERROR)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") # Format output with timestamp
handler.setFormatter(formatter)
log.addHandler(handler)

# CRC16-ccitt lookup table
crc16_tab = [
	0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
	0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
	0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
	0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
	0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
	0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
	0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
	0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
	0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
	0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
	0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
	0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
	0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
	0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
	0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
	0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
	0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
	0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
	0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
	0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
	0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
	0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
	0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
	0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
	0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
	0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
	0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
	0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
	0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
	0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
	0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
	0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
]

[docs] def util_crc16(buf): crc = 0xFFFF for byte in buf: crc = ((crc << 8) & 0xFFFF) ^ crc16_tab[((crc >> 8) ^ byte) & 0xFF] return crc
[docs] class UartPacket: def __init__(self, id=None, packet_type=None, command=None, addr=None, reserved=None, data=None, buffer=None): if buffer: self.from_buffer(buffer) else: self.id = id self.packet_type = packet_type self.command = command self.addr = addr self.reserved = reserved self.data = data self.data_len = len(data) self.crc = self.calculate_crc() def calculate_crc(self) -> int: crc_value = 0xFFFF packet = bytearray() packet.append(OW_START_BYTE) packet.extend(self.id.to_bytes(2, 'big')) packet.append(self.packet_type) packet.append(self.command) packet.append(self.addr) packet.append(self.reserved) packet.extend(self.data_len.to_bytes(2, 'big')) if self.data_len > 0: packet.extend(self.data) crc_value = util_crc16(packet[1:]) return crc_value def to_bytes(self) -> bytes: buffer = bytearray() buffer.append(OW_START_BYTE) buffer.extend(self.id.to_bytes(2, 'big')) buffer.append(self.packet_type) buffer.append(self.command) buffer.append(self.addr) buffer.append(self.reserved) buffer.extend(self.data_len.to_bytes(2, 'big')) if self.data_len > 0: buffer.extend(self.data) crc_value = util_crc16(buffer[1:]) buffer.extend(crc_value.to_bytes(2, 'big')) buffer.append(OW_END_BYTE) return bytes(buffer) def from_buffer(self, buffer: bytes): if buffer[0] != OW_START_BYTE or buffer[-1] != OW_END_BYTE: raise ValueError("Invalid buffer format") self.id = int.from_bytes(buffer[1:3], 'big') self.packet_type = buffer[3] self.command = buffer[4] self.addr = buffer[5] self.reserved = buffer[6] self.data_len = int.from_bytes(buffer[7:9], 'big') self.data = bytearray(buffer[9:9+self.data_len]) crc_value = util_crc16(buffer[1:9+self.data_len]) self.crc = int.from_bytes(buffer[9+self.data_len:11+self.data_len], 'big') if self.crc != crc_value: raise ValueError("CRC mismatch") def print_packet(self): log.info("UartPacket:") log.info(f" Packet ID: {self.id}") log.info(f" Packet Type: {hex(self.packet_type)}") log.info(f" Command: {hex(self.command)}") log.info(f" Address: {hex(self.addr)}") log.info(f" Reserved: {hex(self.reserved)}") log.info(f" Data Length: {self.data_len}") if self.data_len > 0: log.info(f" Data: {self.data.hex()}") else: log.info(" Data: None") log.info(f" CRC: {hex(self.crc)}")
[docs] class LIFUUart:
[docs] def __init__(self, vid, pid, baudrate=921600, timeout=10, align=0, desc="VCP", demo_mode=False, async_mode=False): """ Initialize the UART instance. Args: vid (int): Vendor ID of the USB device. pid (int): Product ID of the USB device. baudrate (int): Communication speed. timeout (int): Read timeout in seconds. align (int): Data alignment parameter. desc (str): Descriptor for the device (e.g. "TX" or "HV"). demo_mode (bool): If True, simulate the connection. async_mode (bool): If True, use asynchronous mode. """ self.vid = vid self.pid = pid self.port = None self.baudrate = baudrate self.timeout = timeout self.align = align self.packet_count = 0 self.serial = None self.running = False self.asyncMode = async_mode self.monitoring_task = None self.demo_mode = demo_mode self.descriptor = desc self.read_thread = None self.read_buffer = [] self.last_rx = time.monotonic() self.demo_responses = [] # Predefined responses for testing # Signals: each signal emits (descriptor, port or data) self.signal_connect = LIFUSignal() self.signal_disconnect = LIFUSignal() self.signal_data_received = LIFUSignal() if async_mode: self.loop = asyncio.get_event_loop() self.response_queues = {} # Dictionary to map packet IDs to response queues self.response_lock = threading.Lock() # Lock for thread-safe access to response_queues
[docs] def connect(self): """Open the serial port.""" if self.demo_mode: log.info("Demo mode: Simulating UART connection.") self.signal_connect.emit(self.descriptor, "demo_mode") return try: self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=self.timeout ) log.info("Connected to UART on port %s.", self.port) self.signal_connect.emit(self.descriptor, self.port) if self.asyncMode: log.info("Starting read thread for %s.", self.descriptor) self.running = True self.read_thread = threading.Thread(target=self._read_data) self.read_thread.daemon = True self.read_thread.start() except serial.SerialException as se: log.error("Failed to connect to %s: %s", self.port, se) self.running = False self.port = None except Exception as e: raise e
[docs] def disconnect(self): """Close the serial port.""" self.running = False if self.demo_mode: log.info("Demo mode: Simulating UART disconnection.") self.signal_disconnect.emit(self.descriptor, "demo_mode") return if self.read_thread: self.read_thread.join() if self.serial and self.serial.is_open: self.serial.close() self.serial = None log.info("Disconnected from UART.") self.signal_disconnect.emit(self.descriptor, self.port) self.port = None
[docs] def is_connected(self) -> bool: """ Check if the device is connected. Returns: bool: True if connected, False otherwise. """ if self.demo_mode: return True return self.port is not None and self.serial is not None and self.serial.is_open
[docs] def check_usb_status(self): """Check if the USB device is connected or disconnected.""" device = self.list_vcp_with_vid_pid() if device and not self.port: log.debug("Device found; trying to connect.") self.port = device self.connect() elif not device and self.port: log.debug("Device removed; disconnecting.") self.running = False self.disconnect()
[docs] async def monitor_usb_status(self, interval=1): """Periodically check for USB device connection.""" if self.demo_mode: log.debug("Monitoring in demo mode.") self.connect() return while True: self.check_usb_status() await asyncio.sleep(interval)
[docs] def start_monitoring(self, interval=1): """Start the periodic USB device connection check.""" if self.demo_mode: log.debug("Monitoring in demo mode.") return if not self.monitoring_task and self.asyncMode: self.monitoring_task = asyncio.create_task(self.monitor_usb_status(interval))
[docs] def stop_monitoring(self): """Stop the periodic USB device connection check.""" if self.demo_mode: log.info("Monitoring in demo mode.") return if self.monitoring_task: self.monitoring_task.cancel() self.monitoring_task = None
[docs] def list_vcp_with_vid_pid(self): """Find the USB device by VID and PID.""" ports = serial.tools.list_ports.comports() for port in ports: if hasattr(port, 'vid') and hasattr(port, 'pid') and port.vid == self.vid and port.pid == self.pid: return port.device return None
[docs] def reopen_after_reset(self, retries=5, delay=1.0): """ Attempt to reopen the serial port after a device reset or disconnection. Returns True if reconnected, False otherwise. """ try: if self.serial and self.serial.is_open: with suppress(Exception): self.serial.close() self.serial = None self.port = None for _ in range(retries): time.sleep(delay) port = self.list_vcp_with_vid_pid() if port: self.port = port try: self.connect() log.info("Reconnected to UART on %s after reset.", self.port) return True except (OSError, ValueError) as e: log.warning("Reconnect attempt failed: %s", e) log.error("Failed to reconnect UART after reset.") return False except (OSError, ValueError) as e: log.error("reopen_after_reset() error: %s", e) return False
[docs] def _read_data(self, timeout=20): """Read data from the serial port in a separate thread.""" log.debug("Starting data read loop for %s.", self.descriptor) if self.demo_mode: while self.running: if self.demo_responses: data = self.demo_responses.pop(0) log.info("Demo mode: Simulated data received: %s", data) self.signal_data_received.emit(self.descriptor, "Demo Response") time.sleep(10) # Simulate delay (10 second) return # In async mode, run the reading loop in a thread while self.running: try: if self.serial.in_waiting > 0: data = self.serial.read(self.serial.in_waiting) self.read_buffer.extend(data) log.debug("Data received on %s: %s", self.descriptor, data) # Attempt to parse a complete packet from read_buffer. try: # Note: Depending on your protocol, you might need to check for start/end bytes # and possibly handle partial packets. packet = UartPacket(buffer=bytes(self.read_buffer)) # Clear the buffer after a successful parse. self.read_buffer = [] if self.asyncMode: with self.response_lock: # Check if a queue is waiting for this packet ID. if packet.id in self.response_queues: self.response_queues[packet.id].put(packet) elif packet.packet_type == OW_DATA and packet.id == 0: text = packet.data.decode('utf-8', errors='replace') self.signal_data_received.emit(self.descriptor, text) else: log.warning("Received an unsolicited packet with ID %d", packet.id) elif packet.packet_type == OW_DATA: self.signal_data_received.emit(self.descriptor, OW_DATA) except ValueError as ve: log.error("Error parsing packet: %s", ve) else: time.sleep(0.05) # Brief sleep to avoid a busy loop except serial.SerialException as e: log.error("Serial _read_data error on %s: %s", self.descriptor, e) self.running = False
[docs] def _tx(self, data: bytes): """Send data over UART.""" if not self.serial or not self.serial.is_open: log.error("Serial port is not initialized.") return if self.demo_mode: log.info("Demo mode: Simulating data transmission: %s", data) return try: if self.align > 0: while len(data) % self.align != 0: data += bytes([OW_END_BYTE]) self.serial.write(data) except Exception as e: log.error("Error during transmission: %s", e) raise e
[docs] def read_packet(self, timeout=20) -> UartPacket: """ Read a packet from the UART interface. Returns: UartPacket: Parsed packet or an error packet if parsing fails. """ start_time = time.monotonic() raw_data = b"" count = 0 while timeout == -1 or time.monotonic() - start_time < timeout: time.sleep(0.05) try: raw_data += self.serial.read_all() except serial.SerialException as e: if "ClearCommError" in str(e): log.warning("Serial lost during read_packet; attempting reconnect...") self.reopen_after_reset() return None raise if raw_data: count += 1 if count > 1: break try: if not raw_data: raise ValueError("No data received from UART within timeout") packet = UartPacket(buffer=raw_data) except Exception as e: log.error("Error parsing packet: %s", e) packet = UartPacket( id=0, packet_type=OW_ERROR, command=0, addr=0, reserved=0, data=[] ) raise e return packet
[docs] def send_packet(self, id=None, packetType=OW_ACK, command=OW_CMD_NOP, addr=0, reserved=0, data=None, timeout=20): """ Send a packet over UART and, if not running, return a response packet. """ try: if not self.serial or not self.serial.is_open: log.error("Cannot send packet. Serial port is not connected.") return None if id is None: self.packet_count += 1 id = self.packet_count if data: if not isinstance(data, (bytes, bytearray)): raise ValueError("Data must be bytes or bytearray") payload = data payload_length = len(payload) else: payload_length = 0 payload = b'' packet = bytearray() packet.append(OW_START_BYTE) packet.extend(id.to_bytes(2, 'big')) packet.append(packetType) packet.append(command) packet.append(addr) packet.append(reserved) packet.extend(payload_length.to_bytes(2, 'big')) if payload_length > 0: packet.extend(payload) crc_value = util_crc16(packet[1:]) # Exclude start byte packet.extend(crc_value.to_bytes(2, 'big')) packet.append(OW_END_BYTE) self._tx(packet) if not self.asyncMode: try: return self.read_packet(timeout=timeout) except serial.SerialException as e: if "ClearCommError" in str(e): log.warning("Serial handle lost after RESET, attempting reconnect...") self.reopen_after_reset() return None raise else: response_queue = queue.Queue() with self.response_lock: self.response_queues[id] = response_queue try: # Wait for a response that matches the packet ID. response = response_queue.get(timeout=timeout) # Optionally, check that the response has the expected type and command. if response.packet_type == OW_RESP and response.command == command: return response else: log.error("Received unexpected response: %s", response) return response except queue.Empty: log.error("Timeout waiting for response to packet ID %d", id) return None finally: with self.response_lock: # Clean up the queue entry regardless of outcome. self.response_queues.pop(id, None) except ValueError as ve: log.error("Validation error in send_packet: %s", ve) raise except Exception as e: log.error("Unexpected error in send_packet: %s", e) raise
[docs] def clear_buffer(self): """Clear the read buffer.""" self.read_buffer = []
[docs] def run_coroutine(self, coro): """Run a coroutine using the internal event loop.""" if not self.loop.is_running(): return self.loop.run_until_complete(coro) else: return asyncio.create_task(coro)
[docs] def add_demo_response(self, response: bytes): """Add a predefined response for demo mode.""" if self.demo_mode: self.demo_responses.append(response) else: log.warning("Cannot add demo response when not in demo mode.")
[docs] def print(self): """Print the current UART configuration.""" log.info(" Serial Port: %s", self.port) log.info(" Serial Baud: %s", self.baudrate)