Source code for openlifu.io.LIFUHVController

from __future__ import annotations

import logging
import struct

from openlifu.io.LIFUConfig import (
    OW_CMD_ECHO,
    OW_CMD_HWID,
    OW_CMD_PING,
    OW_CMD_RESET,
    OW_CMD_TOGGLE_LED,
    OW_CMD_VERSION,
    OW_ERROR,
    OW_POWER,
    OW_POWER_12V_OFF,
    OW_POWER_12V_ON,
    OW_POWER_GET_12VON,
    OW_POWER_GET_FAN,
    OW_POWER_GET_HV,
    OW_POWER_GET_HVON,
    OW_POWER_GET_RGB,
    OW_POWER_GET_TEMP1,
    OW_POWER_GET_TEMP2,
    OW_POWER_HV_OFF,
    OW_POWER_HV_ON,
    OW_POWER_SET_FAN,
    OW_POWER_SET_HV,
    OW_POWER_SET_RGB,
)
from openlifu.io.LIFUUart import LIFUUart

logger = logging.getLogger(__name__)


[docs] class HVController:
[docs] def __init__(self, uart: LIFUUart = None): """ Initialize the HVController. Args: uart (LIFUUart): The LIFUUart instance for communication. """ self.uart = uart if self.uart and not self.uart.asyncMode: self.uart.check_usb_status() if self.uart.is_connected(): logger.info("HV Console connected.") else: logger.info("HV Console NOT Connected.") # Initialize the high voltage state (should get this from device) self.output_voltage = 0.0 self.is_hv_on = False self.is_12v_on = False self.supply_voltage = None
def is_connected(self): if self.uart: return self.uart.is_connected()
[docs] def ping(self) -> bool: """ Send a ping command to the Console device to verify connectivity. Raises: ValueError: If the UART is not connected. Exception: If an error occurs during the ping process. """ try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console Device not connected") logger.info("Send Ping to Device.") r = self.uart.send_packet(id=None, packetType=OW_POWER, command=OW_CMD_PING) self.uart.clear_buffer() logger.info("Received Ping from Device.") # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error sending ping") return False else: return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def get_version(self) -> str: """ Retrieve the firmware version of the Console device. Returns: str: Firmware version in the format 'vX.Y.Z'. Raises: ValueError: If the UART is not connected. Exception: If an error occurs while fetching the version. """ try: if self.uart.demo_mode: return 'v0.1.1' if not self.uart.is_connected(): raise ValueError("Console Device not connected") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_CMD_VERSION ) self.uart.clear_buffer() # r.print_packet() if r.data_len == 3: ver = f"v{r.data[0]}.{r.data[1]}.{r.data[2]}" else: ver = "v0.0.0" logger.info(ver) return ver except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def echo(self, echo_data=None) -> tuple[bytes, int]: """ Send an echo command to the device with data and receive the same data in response. Args: echo_data (bytes): The data to send (must be a byte array). Returns: tuple[bytes, int]: The echoed data and its length. Raises: ValueError: If the UART is not connected. TypeError: If the `echo_data` is not a byte array. Exception: If an error occurs during the echo process. """ try: if self.uart.demo_mode: data = b"Hello LIFU!" return data, len(data) if not self.uart.is_connected(): raise ValueError("Console Device not connected") # Check if echo_data is a byte array if echo_data is not None and not isinstance(echo_data, (bytes, bytearray)): raise TypeError("echo_data must be a byte array") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_CMD_ECHO, data=echo_data ) self.uart.clear_buffer() # r.print_packet() if r.data_len > 0: return r.data, r.data_len else: return None, None except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def toggle_led(self) -> bool: """ Toggle the LED on the Console device. Raises: ValueError: If the UART is not connected. Exception: If an error occurs while toggling the LED. """ try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console Device not connected") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_CMD_TOGGLE_LED ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: return False return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def get_hardware_id(self) -> str: """ Retrieve the hardware ID of the Console device. Returns: str: Hardware ID in hexadecimal format. Raises: ValueError: If the UART is not connected. Exception: If an error occurs while retrieving the hardware ID. """ try: if self.uart.demo_mode: return bytes.fromhex("deadbeefcafebabe5566778811223344") if not self.uart.is_connected(): raise ValueError("Console Device not connected") r = self.uart.send_packet(id=None, packetType=OW_POWER, command=OW_CMD_HWID) self.uart.clear_buffer() # r.print_packet() if r.data_len == 16: return r.data.hex() else: return None except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def get_temperature1(self) -> float: """ Retrieve the temperature reading from the TX device. Returns: float: Temperature value in Celsius. Raises: ValueError: If the UART is not connected. Exception: If an error occurs or the received data length is invalid. """ try: if self.uart.demo_mode: return 32.4 if not self.uart.is_connected(): logger.error("TX Device not connected") return 0 # Send the GET_TEMP command r = self.uart.send_packet(id=None, packetType=OW_POWER, command=OW_POWER_GET_TEMP1) self.uart.clear_buffer() # r.print_packet() # Check if the data length matches a float (4 bytes) if r.data_len == 4: # Unpack the float value from the received data (assuming little-endian) temperature = struct.unpack('<f', r.data)[0] # Truncate the temperature to 2 decimal places truncated_temperature = round(temperature, 2) return truncated_temperature else: raise ValueError("Invalid data length received for temperature") except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle
[docs] def get_temperature2(self) -> float: """ Retrieve the temperature reading from the TX device. Returns: float: Temperature value in Celsius. Raises: ValueError: If the UART is not connected. Exception: If an error occurs or the received data length is invalid. """ try: if self.uart.demo_mode: return 32.4 if not self.uart.is_connected(): logger.error("TX Device not connected") return 0 # Send the GET_TEMP command r = self.uart.send_packet(id=None, packetType=OW_POWER, command=OW_POWER_GET_TEMP2) self.uart.clear_buffer() # r.print_packet() # Check if the data length matches a float (4 bytes) if r.data_len == 4: # Unpack the float value from the received data (assuming little-endian) temperature = struct.unpack('<f', r.data)[0] # Truncate the temperature to 2 decimal places truncated_temperature = round(temperature, 2) return truncated_temperature else: raise ValueError("Invalid data length received for temperature") except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle
def turn_12v_off(self): try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console not connected") logger.info("Turning off 12V.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_12V_OFF ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error turning off 12V") return False else: self.is_12v_on = False logger.info("12V turned off successfully.") return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle def turn_12v_on(self): try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console not connected") logger.info("Turning on 12V.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_12V_ON ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error turning on 12V") return False else: self.is_12v_on = True logger.info("12V turned on successfully.") return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle def get_12v_status(self): try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console not connected") logger.info("Get 12V voltage status.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_GET_12VON ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error retrieving 12V status") return False else: if r.reserved == 1: self.is_12v_on = True else: self.is_12v_on = False return self.is_12v_on except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def turn_hv_on(self): """ Turn on the high voltage. """ try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console not connected") logger.info("Turning on high voltage.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_HV_ON ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error turning on HV Supply") return False else: self.is_hv_on = True logger.info("HV Supply turned on successfully.") return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def turn_hv_off(self): """ Turn off the high voltage. """ try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console not connected") logger.info("Turning off high voltage.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_HV_OFF ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error turning off HV Supply") return False else: self.is_hv_on = False logger.info("HV Supply turned off successfully.") return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
def get_hv_status(self): try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console not connected") logger.info("Get high voltage status.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_GET_HVON ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error retrievinging HV Status") return False else: if r.reserved == 1: self.is_hv_on = True else: self.is_hv_on = False return self.is_hv_on except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def set_voltage(self, voltage: float) -> bool: """ Set the output voltage. Args: voltage (float): The desired output voltage. Raises: ValueError: If the controller is not connected or voltage exceeds supply voltage. """ if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("High voltage controller not connected") # Validate and process the DAC input if voltage is None: voltage = 0 elif not (5.0 <= voltage <= 100.0): raise ValueError( "Voltage input must be within the valid range 5 to 100 Volts)." ) try: dac_input = int((voltage / 150) * 4095) # logger.info("Setting DAC Value %d.", dac_input) # Pack the 12-bit DAC input into two bytes data = bytes( [ (dac_input >> 8) & 0xFF, # High byte (most significant bits) dac_input & 0xFF, # Low byte (least significant bits) ] ) r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_SET_HV, data=data ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error setting HV") return False else: self.supply_voltage = voltage logger.info("Output voltage set to %.2fV successfully.", voltage) return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def get_voltage(self) -> float: """ Get the current output voltage setting. Returns: float: The current output voltage. Raises: ValueError: If the controller is not connected. """ if not self.uart.is_connected(): raise ValueError("High voltage controller not connected") try: if self.uart.demo_mode: return 18.4 logger.info("Getting current output voltage.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_GET_HV ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error setting HV") return 0.0 elif r.data_len == 2: dac_value = r.data[1] << 8 | r.data[0] # logger.info("Got DAC Value %d.", dac_value) voltage = dac_value / 4095 * 150 self.supply_voltage = voltage logger.info("Output voltage set to %.2fV successfully.", voltage) return voltage else: logger.error("Error getting output voltage from device") return 0.0 except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def set_fan_speed(self, fan_id: int = 0, fan_speed: int = 50) -> int: """ Get the current output fan percentage. Args: fan_id (int): The desired fan to set (default is 0). bottom fans (0), and top fans (1). fan_speed (int): The desired fan speed (default is 50). Returns: int: The current output fan percentage. Raises: ValueError: If the controller is not connected. """ if not self.uart.is_connected(): raise ValueError("High voltage controller not connected") if fan_id not in [0, 1]: raise ValueError("Invalid fan ID. Must be 0 or 1") if fan_speed not in range(101): raise ValueError("Invalid fan speed. Must be 0 to 100") try: if self.uart.demo_mode: return 40 logger.info("Getting current output voltage.") data = bytes( [ fan_speed & 0xFF, # Low byte (least significant bits) ] ) r = self.uart.send_packet( id=None, addr=fan_id, packetType=OW_POWER, command=OW_POWER_SET_FAN, data=data ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error setting Fan Speed") return -1 logger.info(f'Set fan speed to {fan_speed}') return fan_speed except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def get_fan_speed(self, fan_id: int = 0) -> int: """ Get the current output fan percentage. Args: fan_id (int): The desired fan to read (default is 0). bottom fans (0), and top fans (1). Returns: int: The current output fan percentage. Raises: ValueError: If the controller is not connected. """ if not self.uart.is_connected(): raise ValueError("High voltage controller not connected") if fan_id not in [0, 1]: raise ValueError("Invalid fan ID. Must be 0 or 1") try: if self.uart.demo_mode: return 40.0 logger.info("Getting current output voltage.") r = self.uart.send_packet( id=None, addr=fan_id, packetType=OW_POWER, command=OW_POWER_GET_FAN ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error setting HV") return 0.0 elif r.data_len == 1: fan_value = r.data[0] logger.info(f'Output fan speed is {fan_value}') return fan_value else: logger.error("Error getting output voltage from device") return -1 except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def set_rgb_led(self, rgb_state: int) -> int: """ Set the RGB LED state. Args: rgb_state (int): The desired RGB state (0 = OFF, 1 = RED, 2 = BLUE, 3 = GREEN). Returns: int: The current RGB state after setting. Raises: ValueError: If the controller is not connected or the RGB state is invalid. """ if not self.uart.is_connected(): raise ValueError("High voltage controller not connected") if rgb_state not in [0, 1, 2, 3]: raise ValueError("Invalid RGB state. Must be 0 (OFF), 1 (RED), 2 (BLUE), or 3 (GREEN)") try: if self.uart.demo_mode: return rgb_state logger.info("Setting RGB LED state.") # Send the RGB state as the reserved byte in the packet r = self.uart.send_packet( id=None, reserved=rgb_state & 0xFF, # Send the RGB state as a single byte packetType=OW_POWER, command=OW_POWER_SET_RGB ) self.uart.clear_buffer() if r.packet_type == OW_ERROR: logger.error("Error setting RGB LED state") return -1 logger.info(f'Set RGB LED state to {rgb_state}') return rgb_state except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def get_rgb_led(self) -> int: """ Get the current RGB LED state. Returns: int: The current RGB state (0 = OFF, 1 = RED, 2 = BLUE, 3 = GREEN). Raises: ValueError: If the controller is not connected. """ if not self.uart.is_connected(): raise ValueError("High voltage controller not connected") try: if self.uart.demo_mode: return 1 # Default to RED in demo mode logger.info("Getting current RGB LED state.") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_POWER_GET_RGB ) self.uart.clear_buffer() if r.packet_type == OW_ERROR: logger.error("Error getting RGB LED state") return -1 rgb_state = r.reserved logger.info(f'Current RGB LED state is {rgb_state}') return rgb_state except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle
[docs] def soft_reset(self) -> bool: """ Perform a soft reset on the Console device. Returns: bool: True if the reset was successful, False otherwise. Raises: ValueError: If the UART is not connected. Exception: If an error occurs while resetting the device. """ try: if self.uart.demo_mode: return True if not self.uart.is_connected(): raise ValueError("Console Device not connected") r = self.uart.send_packet( id=None, packetType=OW_POWER, command=OW_CMD_RESET ) self.uart.clear_buffer() # r.print_packet() if r.packet_type == OW_ERROR: logger.error("Error resetting device") return False else: return True except ValueError as v: logger.error("ValueError: %s", v) raise # Re-raise the exception for the caller to handle except Exception as e: logger.error("Unexpected error during process: %s", e) raise # Re-raise the exception for the caller to handle