Source code for openlifu.cloud.ws
from __future__ import annotations
from typing import Callable
import socketio
from socketio import exceptions
from openlifu.cloud.const import API_URL
from openlifu.cloud.utils import logger_cloud
DATABASE_UPDATES_NS = "/database_updates"
[docs]
class Websocket:
def __init__(self, update_callback: Callable[[dict], None]):
self._sio: socketio.Client | None = None
self._database_id = None
self._auth = {}
self._update_callback = update_callback
[docs]
def log(self, msg: str):
"""Force message to Slicer console immediately."""
logger_cloud.debug(f"WS_DEBUG: {msg}\n")
def authenticate(self, access_token: str):
self._auth = {"token": f"Bearer {access_token}"}
if self._database_id is not None:
self.connect(self._database_id)
def connect(self, database_id: int):
self.log(f"Attempting connection to {API_URL} for DB {database_id}")
if self._sio is not None:
self.disconnect()
self._database_id = database_id
if len(self._auth) == 0:
self.log("Abort: No authentication token set.")
return
self._sio = socketio.Client(
reconnection=True,
reconnection_attempts=0,
reconnection_delay=1,
reconnection_delay_max=10
)
@self._sio.event(namespace=DATABASE_UPDATES_NS)
def connect():
self.log(f"CONNECTED to namespace {DATABASE_UPDATES_NS}")
try:
self._sio.emit(
"subscribe",
{"database_id": database_id},
namespace=DATABASE_UPDATES_NS
)
self.log(f"Subscribed to database {database_id}")
except exceptions.BadNamespaceError:
self.log("Error: Namespace not ready for emit.")
@self._sio.event(namespace=DATABASE_UPDATES_NS)
def disconnect():
self.log("Disconnected from server.")
@self._sio.on("update", namespace=DATABASE_UPDATES_NS)
def on_update(data):
self.log(f"Update received: {data}")
if self._update_callback is not None:
self._update_callback(data)
try:
self._sio.connect(
f"{API_URL}/socket.io",
auth=self._auth,
namespaces=[DATABASE_UPDATES_NS],
transports=["websocket"],
wait=False,
socketio_path="/socket.io"
)
self.log("Connect call initiated successfully.")
except exceptions.SocketIOError as e:
logger_cloud.error(f"WS_FATAL_ERROR: {e!s}\n")
def disconnect(self):
if self._sio is not None:
try:
self.log("Disconnecting...")
self._sio.disconnect()
except exceptions.SocketIOError as e:
logger_cloud.error(f"WS_FATAL_ERROR: {e!s}\n")
finally:
self._sio = None