from __future__ import annotations
import logging
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List
from openlifu.cloud.api.api import Api
from openlifu.cloud.api.dto import ClaimDbDto, DatabaseDto
from openlifu.cloud.components.abstract_component import AbstractComponent
from openlifu.cloud.components.photoscans import Photoscans
from openlifu.cloud.components.protocols import Protocols
from openlifu.cloud.components.runs import Runs
from openlifu.cloud.components.sessions import Sessions
from openlifu.cloud.components.solutions import Solutions
from openlifu.cloud.components.subjects import Subjects
from openlifu.cloud.components.systems import Systems
from openlifu.cloud.components.transducers import Transducers
from openlifu.cloud.components.users import Users
from openlifu.cloud.components.volumes import Volumes
from openlifu.cloud.filesystem_observer import FilesystemObserver
from openlifu.cloud.status import Status
from openlifu.cloud.sync_thread import SyncThread
from openlifu.cloud.utils import from_isoformat, get_mac_address, logger_cloud
from openlifu.cloud.ws import Websocket
[docs]
class Cloud:
def __init__(self):
self._filesystem_observer = FilesystemObserver(self._on_file_system_update)
self._api = Api()
self._websocket = Websocket(self._on_websocket_update)
self._components: List[AbstractComponent] = []
self._sync_thread = SyncThread(self._on_status_changed)
self._db_path: Path | None = None
self._db: DatabaseDto | None = None
self._status_callback: Callable[[Status], None] | None = None
self._sync_idle = True
self._pending_updates: Dict[Path, datetime] = {}
def set_access_token(self, token: str):
self._api.authenticate(token)
self._websocket.authenticate(token)
def set_status_callback(self, callback: Callable[[Status], None]):
self._status_callback = callback
def start(self, db_path: Path):
if not db_path.exists():
raise ValueError("Database path does not exist.")
if not db_path.is_dir():
raise ValueError("Database path is not a directory.")
self._db_path = db_path
mac_address = get_mac_address()
if mac_address is None:
raise ValueError("MAC address is unavailable.")
logger_cloud.debug(f"Using DB path: {db_path}")
logger_cloud.debug(f"Mac address: {mac_address}")
self._db = self._api.databases().claim_database(
ClaimDbDto(
db_path=str(db_path),
mac_address=mac_address,
description=None
)
)
self._websocket.connect(self._db.id)
if self._sync_thread is not None and self._sync_thread.is_running():
self._sync_thread.stop()
self._sync_thread = SyncThread(self._on_status_changed)
self._create_components()
self._sync_thread.start()
def stop(self):
if self._sync_thread is not None:
self._sync_thread.stop()
self._sync_thread = None
self._websocket.disconnect()
self.stop_background_sync()
self._db_path = None
self._db = None
def sync(self):
for component in self._components:
self._sync_thread.post(component, None)
def start_background_sync(self):
if self._db_path is None:
raise ValueError("Database path does not exist.")
self._filesystem_observer.start(self._db_path)
def stop_background_sync(self):
self._filesystem_observer.stop()
def _on_status_changed(self, status: Status):
self._sync_idle = status.status == Status.STATUS_IDLE
if self._status_callback is not None:
self._status_callback(status)
if self._sync_idle and len(self._pending_updates) > 0:
logger_cloud.debug("Syncing pending updates...")
for path, update_date in self._pending_updates.items():
for component in self._components:
component.on_update_from_cloud(path, update_date)
self._pending_updates.clear()
def _on_file_system_update(self, path: Path):
if self._sync_thread.is_path_in_ignore_list(path) or path.name == '.DS_Store':
return
if self._sync_idle:
for component in self._components:
component.on_filesystem_change(path)
else:
self._pending_updates[path] = datetime.now(timezone.utc)
def _on_websocket_update(self, data: dict):
if self._db_path is None:
return
update_date = from_isoformat(data["update_date"])
updated_path = data["path"]
if updated_path == '/':
return
path = self._db_path / updated_path
if self._sync_thread.is_path_in_ignore_list(path):
return
if self._sync_idle:
for component in self._components:
component.on_update_from_cloud(path, update_date)
else:
self._pending_updates[path] = update_date
def _create_components(self):
self._components.clear()
self._components.append(Users(self._api, self._db_path, self._db.id, self._sync_thread))
self._components.append(Protocols(self._api, self._db_path, self._db.id, self._sync_thread))
self._components.append(Systems(self._api, self._db_path, self._db.id, self._sync_thread))
self._components.append(Transducers(self._api, self._db_path, self._db.id, self._sync_thread))
self._components.append(
Subjects(self._api, self._db_path, self._db.id, self._sync_thread)
.add_child(
Volumes(self._api, self._db_path, self._db.id, self._sync_thread)
)
.add_child(
Sessions(self._api, self._db_path, self._db.id, self._sync_thread)
.add_child(
Photoscans(self._api, self._db_path, self._db.id, self._sync_thread)
)
.add_child(
Runs(self._api, self._db_path, self._db.id, self._sync_thread)
)
.add_child(
Solutions(self._api, self._db_path, self._db.id, self._sync_thread)
)
)
)
if __name__ == "__main__":
logger_cloud.setLevel(logging.DEBUG)
logger_cloud.addHandler(logging.StreamHandler(sys.stdout))
cloud = Cloud()
token = os.getenv("TOKEN")
db_path = os.getenv("DB_PATH")
cloud.set_access_token(token)
cloud.set_status_callback(lambda status: logger_cloud.debug(f"Status: {status}"))
cloud.start(Path(db_path))
cloud.sync()
cloud.start_background_sync()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
cloud.stop()