Source code for openlifu.cloud.sync_thread

from __future__ import annotations

import threading
import time
import traceback
from pathlib import Path
from typing import Any, Callable, List

from openlifu.cloud.status import Status
from openlifu.cloud.utils import logger_cloud


[docs] class SyncThread: DEBOUNCE_SEC = 1.0 def __init__(self, status_callback: Callable[[Status], None]): self._pending_syncs: dict[tuple[Any, Path | None], float] = {} self._lock = threading.Lock() self._running = False self._worker_thread = None self._status_callback = status_callback self._ignore_paths: List[Path] = [] def add_path_to_ignore_list(self, path: Path): self._ignore_paths.append(path) def is_path_in_ignore_list(self, path: Path) -> bool: return path in self._ignore_paths def post(self, item: Any, path: Path | None): with self._lock: self._pending_syncs[(item, path)] = time.time() def emit_status(self, status: Status): if self._status_callback is not None: self._status_callback(status) def is_running(self) -> bool: return self._running def start(self): if self._running: return self._running = True self._worker_thread = threading.Thread( target=self._worker, daemon=False, name="SyncThread") self._worker_thread.start() def stop(self): if not self._running: return self._worker_thread.join(timeout=5) self._running = False self._worker_thread = None def _worker(self): from openlifu.cloud.components.abstract_component import AbstractComponent self.emit_status(Status(Status.STATUS_IDLE)) while self._running: time.sleep(0.01) #logger_cloud.debug('SyncThread: Checking for pending syncs...') now = time.time() item_to_process = None with self._lock: ready = [ item for item, ts in self._pending_syncs.items() if now - ts > self.DEBOUNCE_SEC ] if ready: item_to_process = ready[0] del self._pending_syncs[item_to_process] if not item_to_process: continue component, path = item_to_process if isinstance(component, AbstractComponent): try: component.sync(path) except Exception as e: # pylint: disable=broad-exception-caught traceback.print_exc() logger_cloud.error(f"Error syncing component {component.get_component_type_plural()} for path {path}: {e}") #self._pending_syncs.clear() #self.emit_status(Status( # Status.STATUS_ERROR, component_type=component.get_component_type_plural(), ex=e)) continue idle = len(self._pending_syncs) == 0 if idle and len(ready) > 0: self._ignore_paths.clear() self.emit_status(Status(Status.STATUS_IDLE)) logger_cloud.info("SyncThread: Worker thread exiting.")