Source code for sebs.storage.resources

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Resource management for self-hosted storage deployments in SeBS.

Its main responsibility is providing consistent interface and cache
behavior of self-hosted storage for the entire SeBS system.

Key Classes:
    SelfHostedResources: Configuration management for self-hosted storage resources
    SelfHostedSystemResources: System-level resource management and service provisioning
"""

import docker
from typing import cast, Dict, Optional, Tuple, Any

from sebs.cache import Cache
from sebs.faas.config import Config, Resources
from sebs.faas.resources import SystemResources
from sebs.faas.storage import PersistentStorage
from sebs.faas.nosql import NoSQLStorage
from sebs.storage.minio import Minio
from sebs.storage.scylladb import ScyllaDB
from sebs.storage.config import (
    NoSQLStorageConfig,
    PersistentStorageConfig,
    ScyllaDBConfig,
    MinioConfig,
)
from sebs.utils import LoggingHandlers


[docs] class SelfHostedResources(Resources): """Resource configuration for self-hosted storage deployments. Attributes: _object_storage: Configuration for object storage (MinIO) _nosql_storage: Configuration for NoSQL storage (ScyllaDB) """ def __init__( self, name: str, storage_cfg: Optional[PersistentStorageConfig] = None, nosql_storage_cfg: Optional[NoSQLStorageConfig] = None, ): """Initialize self-hosted resources configuration. Args: name: Name of the deployment/resource group storage_cfg: Configuration for object storage service nosql_storage_cfg: Configuration for NoSQL storage service """ super().__init__(name=name) self._object_storage = storage_cfg self._nosql_storage = nosql_storage_cfg @property def storage_config(self) -> Optional[PersistentStorageConfig]: """Get the object storage configuration. Returns: Optional[PersistentStorageConfig]: Object storage configuration or None """ return self._object_storage @property def nosql_storage_config(self) -> Optional[NoSQLStorageConfig]: """Get the NoSQL storage configuration. Returns: Optional[NoSQLStorageConfig]: NoSQL storage configuration or None """ return self._nosql_storage
[docs] def serialize(self) -> Dict[str, Any]: """Serialize the resource configuration to a dictionary. Returns: Dict[str, Any]: Serialized configuration containing storage and/or nosql sections """ out: Dict[str, Any] = {} if self._object_storage is not None: out = {**out, "storage": self._object_storage.serialize()} if self._nosql_storage is not None: out = {**out, "nosql": self._nosql_storage.serialize()} return out
[docs] def update_cache(self, cache: Cache) -> None: """Update the configuration cache with current resource settings. Stores both object storage and NoSQL storage configurations in the cache for later retrieval. Args: cache: Cache instance to store configurations in """ super().update_cache(cache) if self._object_storage is not None: cast(MinioConfig, self._object_storage).update_cache( [self._name, "resources", "storage"], cache ) if self._nosql_storage is not None: cast(ScyllaDBConfig, self._nosql_storage).update_cache( [self._name, "resources", "nosql"], cache )
def _deserialize_storage( self, config: Dict[str, Any], cached_config: Optional[Dict[str, Any]], storage_type: str ) -> Tuple[str, Dict[str, Any]]: """Deserialize storage configuration from config or cache. Attempts to load storage configuration from the provided config first, then falls back to cached configuration if available. Args: config: Current configuration dictionary cached_config: Previously cached configuration dictionary storage_type: Type of storage to deserialize ('object' or 'nosql') Returns: Tuple[str, Dict[str, Any]]: Storage implementation name and configuration """ storage_impl = "" storage_config: Dict[str, Any] = {} # Check for new config if "storage" in config and storage_type in config["storage"]: storage_impl = config["storage"][storage_type]["type"] storage_config = config["storage"][storage_type][storage_impl] self.logging.info( "Using user-provided configuration of storage " f"type: {storage_type} for {self._name} containers." ) # Load cached values elif ( cached_config is not None and "resources" in cached_config and "storage" in cached_config["resources"] and "object" in cached_config["resources"]["storage"] ): storage_impl = cached_config["resources"]["storage"][storage_type]["type"] storage_config = cached_config["resources"]["storage"][storage_type][storage_impl] self.logging.info( f"Using cached configuration of storage type: " f"{storage_type} for {self._name} container." ) return storage_impl, storage_config @staticmethod def _deserialize( ret: "SelfHostedResources", config: Dict[str, Any], cached_config: Optional[Dict[str, Any]] ) -> None: """Deserialize storage configurations from config and cache data. Populates the SelfHostedResources instance with storage configurations loaded from the provided configuration and cached data. Args: ret: SelfHostedResources instance to populate config: Current configuration dictionary cached_config: Previously cached configuration dictionary """ obj_storage_impl, obj_storage_cfg = ret._deserialize_storage( config, cached_config, "object" ) if obj_storage_impl == "minio": ret._object_storage = MinioConfig.deserialize(obj_storage_cfg) ret.logging.info("Deserializing access data to Minio storage") elif obj_storage_impl != "": ret.logging.warning(f"Unknown object storage type: {obj_storage_impl}") else: ret.logging.info("No object storage available") nosql_storage_impl, nosql_storage_cfg = ret._deserialize_storage( config, cached_config, "nosql" ) if nosql_storage_impl == "scylladb": ret._nosql_storage = ScyllaDBConfig.deserialize(nosql_storage_cfg) ret.logging.info("Deserializing access data to ScylladB NoSQL storage") elif nosql_storage_impl != "": ret.logging.warning(f"Unknown NoSQL storage type: {nosql_storage_impl}") else: ret.logging.info("No NoSQL storage available")
[docs] class SelfHostedSystemResources(SystemResources): """System-level resource management for self-hosted storage deployments. Attributes: _name: Name of the deployment _logging_handlers: Logging configuration handlers _storage: Active persistent storage instance (MinIO) _nosql_storage: Active NoSQL storage instance (ScyllaDB) """ def __init__( self, name: str, config: Config, cache_client: Cache, docker_client: docker.client.DockerClient, logger_handlers: LoggingHandlers, ): """Initialize system resources for self-hosted storage. Args: name: Name of the deployment config: SeBS configuration object cache_client: Cache client for configuration persistence docker_client: Docker client for container management logger_handlers: Logging configuration handlers """ super().__init__(config, cache_client, docker_client) self._name = name self._logging_handlers = logger_handlers self._storage: Optional[PersistentStorage] = None self._nosql_storage: Optional[NoSQLStorage] = None
[docs] def get_storage(self, replace_existing: Optional[bool] = None) -> PersistentStorage: """Get or create a persistent storage instance. Creates a MinIO storage instance if one doesn't exist, or returns the existing instance. The storage is deserialized from a serialized config of an existing storage deployment. Args: replace_existing: Whether to replace existing buckets (optional) Returns: PersistentStorage: MinIO storage instance Raises: RuntimeError: If storage configuration is missing or unsupported """ if self._storage is None: storage_config = cast(SelfHostedResources, self._config.resources).storage_config if storage_config is None: self.logging.error( f"The {self._name} deployment is missing the " "configuration of pre-allocated storage!" ) raise RuntimeError(f"Cannot run {self._name} deployment without any object storage") if isinstance(storage_config, MinioConfig): self._storage = Minio.deserialize( storage_config, self._cache_client, self._config.resources, ) self._storage.logging_handlers = self._logging_handlers else: self.logging.error( f"The {self._name} deployment does not support " f"the object storage config type: {type(storage_config)}!" ) raise RuntimeError("Cannot work with the provided object storage!") elif replace_existing is not None: self._storage.replace_existing = replace_existing return self._storage
[docs] def get_nosql_storage(self) -> NoSQLStorage: """Get or create a NoSQL storage instance. Creates a ScyllaDB storage instance if one doesn't exist, or returns the existing instance. The storage is deserialized from a serialized config of an existing storage deployment. Returns: NoSQLStorage: ScyllaDB storage instance Raises: RuntimeError: If NoSQL storage configuration is missing or unsupported """ if self._nosql_storage is None: storage_config = cast(SelfHostedResources, self._config.resources).nosql_storage_config if storage_config is None: self.logging.error( f"The {self._name} deployment is missing the configuration " "of pre-allocated NoSQL storage!" ) raise RuntimeError("Cannot allocate NoSQL storage!") if isinstance(storage_config, ScyllaDBConfig): self._nosql_storage = ScyllaDB.deserialize( storage_config, self._cache_client, self._config.resources ) self._nosql_storage.logging_handlers = self._logging_handlers else: self.logging.error( f"The {self._name} deployment does not support " f"the NoSQL storage config type: {type(storage_config)}!" ) raise RuntimeError("Cannot work with the provided NoSQL storage!") return self._nosql_storage