from __future__ import annotations
import json
import shutil
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any, List
from openlifu.cloud.api.api import Api
from openlifu.cloud.status import Status
from openlifu.cloud.sync_thread import SyncThread
from openlifu.cloud.utils import logger_cloud, mtime
[docs]
class AbstractComponent(ABC):
def __init__(self, api: Api, parent_path: Path, database_id: int, sync_thread: SyncThread, download_only: bool):
self.api = api
self.db_id = database_id
self._sync_thread = sync_thread
self._last_sync_date: datetime | None = None
self._parent_path = parent_path
self.parent_id: int | None = None
self._children: List[AbstractComponent] = []
self.download_only = download_only
def add_child(self, component: AbstractComponent) -> AbstractComponent:
self._children.append(component)
return self
def remove_child(self, component: AbstractComponent):
self._children.remove(component)
def get_children(self) -> List[AbstractComponent]:
return self._children
def set_parent_path(self, parent_path: Path):
self._parent_path = parent_path
def set_parent_id(self, parent_id: int):
self.parent_id = parent_id
@abstractmethod
def get_config_ids_key(self) -> str:
pass
@abstractmethod
def get_component_type_plural(self) -> str:
pass
@abstractmethod
def get_sync_date_from_cloud(self) -> datetime | None:
pass
@abstractmethod
def send_sync_date_to_cloud(self, sync_date: datetime):
pass
@abstractmethod
def upload_config(self, data: bytes, modification_date: datetime, local_id: str, remote_id: int | None) -> int:
pass
@abstractmethod
def download_config(self, local_id: str, remote_id: int) -> bytes:
pass
@abstractmethod
def delete_on_cloud(self, local_id: str, remote_id: int):
pass
@abstractmethod
def get_cloud_items(self) -> List[Any]:
pass
def get_cloud_item_id(self, cloud_item) -> int:
return cloud_item.id
def get_cloud_item_local_id(self, cloud_item) -> str:
return cloud_item.local_id
[docs]
def upload_data_files(self, local_id: str, remote_id: int, config: dict, modification_date: datetime):
"""
Default implementation does nothing.
Override in subclasses that manage data files.
"""
return
[docs]
def download_data_files(self, local_id: str, remote_id: int, config: dict):
"""
Default implementation does nothing.
Override in subclasses that manage data files.
"""
return
def should_ignore_path(self, path: Path) -> bool:
return False
def on_update_from_cloud(self, path: Path, update_date: datetime):
if self._last_sync_date is not None and self._last_sync_date >= update_date:
return
self.on_filesystem_change(path)
def on_filesystem_change(self, path: Path):
if self.is_path_relative_to_component_path(path) and not self.should_ignore_path(path):
self._sync_thread.post(self, path)
def get_directory_path(self) -> Path:
return self._parent_path / self.get_component_type_plural()
def get_ids_file_name(self) -> str:
return f"{self.get_component_type_plural()}.json"
def read_local_ids(self) -> List[str]:
with open(self.get_directory_path() / self.get_ids_file_name()) as f:
data = json.load(f)
return data[self.get_config_ids_key()]
def write_local_ids(self, local_ids: List[str]):
with open(self.get_directory_path() / self.get_ids_file_name(), "w") as f:
data = {self.get_config_ids_key(): local_ids}
f.write(json.dumps(data))
def get_local_modification_date(self) -> datetime:
path = self.get_directory_path() / self.get_ids_file_name()
return mtime(path)
def upload(self, local_id: str, remote_id: int | None):
if self.download_only:
return
path = self.get_directory_path() / f"{local_id}/{local_id}.json"
if path.is_file():
self._sync_thread.add_path_to_ignore_list(self.get_directory_path() / local_id)
self.emit_status(local_id, Status.STATUS_UPLOADING)
data = path.read_text()
config_mtime = mtime(path)
try:
remote_id = self.upload_config(data.encode(), config_mtime, local_id, remote_id)
self.upload_data_files(local_id, remote_id, json.loads(data), config_mtime)
except Exception as e: # pylint: disable=broad-exception-caught
logger_cloud.error(f"Error uploading {self.get_component_type_plural()} with local_id {local_id}: {e}")
#self._sync_thread.emit_status(Status(
# Status.STATUS_ERROR, component_type=self.get_component_type_plural(), local_id=local_id, ex=e))
def download(self, local_id: str, remote_id: int):
self.emit_status(local_id, Status.STATUS_DOWNLOADING)
dir_path = self.get_directory_path() / local_id
self._sync_thread.add_path_to_ignore_list(dir_path)
if not dir_path.exists():
dir_path.mkdir(parents=True)
data = self.download_config(local_id, remote_id).decode()
file_path = dir_path / f"{local_id}.json"
self._sync_thread.add_path_to_ignore_list(file_path)
with open(file_path, "w") as f:
f.write(data)
self.download_data_files(local_id, remote_id, json.loads(data))
def emit_status(self, local_id: str | None, status: str):
self._sync_thread.emit_status(Status(status, self.get_component_type_plural(), local_id))
def delete_local(self, local_id: str):
self._sync_thread.emit_status(Status(Status.STATUS_DELETING, self.get_component_type_plural(), local_id))
dir_path = self.get_directory_path() / local_id
if dir_path.exists():
shutil.rmtree(dir_path, ignore_errors=True)
def sync_item(self, local_id: str, remote_id: int, remote_mtime: datetime | None):
path = self.get_directory_path() / local_id / f"{local_id}.json"
if path.exists():
local_mtime = mtime(path)
if remote_mtime > local_mtime:
self.download(local_id, remote_id)
elif remote_mtime < local_mtime:
self.upload(local_id, remote_id)
else:
self.download(local_id, remote_id)
if self.download_only:
self.delete_on_cloud(local_id, remote_id)
def sync(self, path: Path | None):
self.emit_status(None, Status.STATUS_SYNCHRONIZING)
local_ids = self.read_local_ids()
remote_items = self.get_cloud_items()
local_mtime = self.get_local_modification_date()
remote_mtime = self.get_sync_date_from_cloud()
logger_cloud.debug(f"================== {self.get_component_type_plural()} SYNC ================= "
f"REMOTE: {remote_mtime}, LOCAL: {local_mtime}")
# no added/deleted items, all components known
if local_mtime == remote_mtime:
for cloud_item in remote_items:
remote_id = self.get_cloud_item_id(cloud_item)
local_id = self.get_cloud_item_local_id(cloud_item)
self.sync_item(local_id, remote_id, cloud_item.modification_date)
# first time components
if not remote_mtime:
for local_id in local_ids:
remote_id = None
for cloud_item in remote_items:
if local_id == self.get_cloud_item_local_id(cloud_item):
remote_id = self.get_cloud_item_id(cloud_item)
self.upload(local_id, remote_id)
elif remote_mtime > local_mtime: # cloud has more recent data
logger_cloud.debug(
f"===== SYNC FROM CLOUD ===== {self.get_component_type_plural()}: {local_ids}"
)
self.sync_from_cloud(local_ids, remote_items)
elif remote_mtime < local_mtime: # local data is more recent than on cloud
logger_cloud.debug(f"===== SYNC TO CLOUD ===== {self.get_component_type_plural()}: {local_ids}")
self.sync_to_cloud(local_ids, remote_items)
if local_mtime != remote_mtime:
self.send_sync_date_to_cloud(local_mtime)
self._last_sync_date = local_mtime
if len(self._children) > 0:
local_path = self.get_directory_path()
for cloud_item in self.get_cloud_items():
if not self._sync_thread.is_running():
break
local_id = self.get_cloud_item_local_id(cloud_item)
remote_id = self.get_cloud_item_id(cloud_item)
for child in self._children:
if not self._sync_thread.is_running():
break
child.set_parent_path(local_path / local_id)
child.set_parent_id(remote_id)
if path is None or child.is_path_relative_to_component_path(path):
child.sync(path)
def sync_from_cloud(self, local_ids: List[str], cloud_items: List[Any]):
local_ids_from_cloud = [self.get_cloud_item_local_id(item) for item in cloud_items]
# delete local items that doesn't exist on cloud
if not self.download_only:
for local_id in local_ids:
if local_id not in local_ids_from_cloud:
self.delete_local(local_id)
for cloud_item in cloud_items:
remote_id = self.get_cloud_item_id(cloud_item)
local_id = self.get_cloud_item_local_id(cloud_item)
# update local item
if local_id in local_ids:
self.sync_item(local_id, remote_id, cloud_item.modification_date)
else: # create new local item
self.download(local_id, remote_id)
if self.download_only:
self.delete_on_cloud(local_id, remote_id)
self.write_local_ids(local_ids_from_cloud)
def sync_to_cloud(self, local_ids: List[str], cloud_items: List[Any]):
local_ids_from_cloud = [self.get_cloud_item_local_id(item) for item in cloud_items]
for cloud_item in cloud_items:
remote_id = self.get_cloud_item_id(cloud_item)
local_id = self.get_cloud_item_local_id(cloud_item)
if local_id not in local_ids or self.download_only: # delete item on cloud
self._sync_thread.emit_status(Status(Status.STATUS_DELETING, self.get_component_type_plural(), local_id))
self.delete_on_cloud(local_id, remote_id)
else: # item exists locally and on cloud
self.sync_item(local_id, remote_id, cloud_item.modification_date)
if not self.download_only:
for local_id in local_ids:
# create new item on cloud
if local_id not in local_ids_from_cloud:
self.upload(local_id, None)
def is_path_relative_to_component_path(self, path: Path) -> bool:
return path.resolve().is_relative_to(self.get_directory_path().resolve())