Source code for sebs.sebs

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Main SeBS (Serverless Benchmarking Suite) client implementation.

This module provides the main interface for SeBS:
- Deployment client creation for different platforms (AWS, Azure, GCP, OpenWhisk, local)
- Benchmark execution and configuration
- Experiment setup and execution
- Storage access (object storage and NoSQL)
- Caching and Docker management
- Logging and output handling

The SeBS client is the central point of interaction for both the CLI and programmatic use.
"""

import os
from typing import Optional, Dict, Type

import docker

from sebs import types
from sebs.local import Local
from sebs.cache import Cache
from sebs.config import SeBSConfig
from sebs.benchmark import Benchmark
from sebs.faas.system import System as FaaSSystem
from sebs.faas.storage import PersistentStorage
from sebs.faas.nosql import NoSQLStorage
from sebs.faas.config import Config
from sebs.storage import minio, config, scylladb
from sebs.utils import has_platform, LoggingHandlers, LoggingBase

from sebs.experiments.config import Config as ExperimentConfig
from sebs.experiments import Experiment


[docs] class SeBS(LoggingBase): """Main client for the Serverless Benchmarking Suite. Attributes: cache_client: Client for managing cached artifacts (code packages, etc.) docker_client: Docker client for container operations output_dir: Directory for storing output files and logs verbose: Whether to enable verbose logging logging_filename: Default log file name config: Global SeBS configuration """ @property def cache_client(self) -> Cache: """Get the cache client. Returns: Cache client for managing cached artifacts """ return self._cache_client @property def docker_client(self) -> docker.client.DockerClient: """Get the Docker client. Returns: Docker client for container operations """ return self._docker_client @property def output_dir(self) -> str: """Get the output directory. Returns: Path to the output directory """ return self._output_dir @property def verbose(self) -> bool: """Get the verbose flag. Returns: Whether verbose logging is enabled """ return self._verbose @property def logging_filename(self) -> Optional[str]: """Get the default logging filename. Returns: Default logging filename or None if not set """ return self._logging_filename @property def config(self) -> SeBSConfig: """Get the global SeBS configuration. Returns: Global configuration object """ return self._config
[docs] def generate_logging_handlers(self, logging_filename: Optional[str] = None) -> LoggingHandlers: """Generate logging handlers for a specific file. This method creates or retrieves cached logging handlers for a given filename. If no filename is provided, the default logging filename is used. Args: logging_filename: Optional filename for logs, defaults to self.logging_filename Returns: LoggingHandlers configured for the specified file """ filename = logging_filename if logging_filename else self.logging_filename if filename in self._handlers: return self._handlers[filename] else: handlers = LoggingHandlers(verbose=self.verbose, filename=filename) self._handlers[filename] = handlers return handlers
def __init__( self, cache_dir: str, output_dir: str, verbose: bool = False, logging_filename: Optional[str] = None, ): """Initialize the SeBS client. Creates a new SeBS client with the specified configuration. This sets up: - Docker client - Cache client - Global configuration - Logging handlers - Output directory Args: cache_dir: Directory for caching artifacts output_dir: Directory for storing output files and logs verbose: Whether to enable verbose logging (default: False) logging_filename: Default log file name (default: None) """ super().__init__() self._docker_client = docker.from_env() self._cache_client = Cache(cache_dir, self._docker_client) self._config = SeBSConfig() self._output_dir = output_dir self._verbose = verbose self._logging_filename = logging_filename self._handlers: Dict[Optional[str], LoggingHandlers] = {} self.logging_handlers = self.generate_logging_handlers() # Create output directory if it doesn't exist os.makedirs(self.output_dir, exist_ok=True)
[docs] def ignore_cache(self): """Configure the cache to only store code packages. After calling this method, the cache will only store code packages and won't update or use cached functions and storage. This is useful when you want to ensure that functions are redeployed and storage is recreated, but still want to reuse code packages. """ self._cache_client.ignore_storage = True self._cache_client.ignore_functions = True
[docs] def get_deployment( self, config: dict, logging_filename: Optional[str] = None, deployment_config: Optional[Config] = None, ) -> FaaSSystem: """Get a deployment client for a specific cloud platform. This method creates and returns a deployment client for the specified cloud platform. It validates that the requested platform and configuration are supported, and initializes the client with the appropriate resources. The method dynamically imports the necessary modules for each platform based on what's available in the environment, determined by has_platform(). Args: config: Configuration dictionary with deployment and experiment settings logging_filename: Optional filename for logs deployment_config: Optional pre-configured deployment config Returns: An initialized FaaS system deployment client Raises: RuntimeError: If the requested deployment is not supported or if the configuration is invalid (unsupported architecture, deployment type, etc.) """ dep_config = config["deployment"] name = dep_config["name"] implementations: Dict[str, Type[FaaSSystem]] = {"local": Local} # Dynamically import platform-specific modules as needed if has_platform("aws"): from sebs.aws import AWS implementations["aws"] = AWS if has_platform("azure"): from sebs.azure.azure import Azure implementations["azure"] = Azure if has_platform("gcp"): from sebs.gcp import GCP implementations["gcp"] = GCP if has_platform("openwhisk"): from sebs.openwhisk import OpenWhisk implementations["openwhisk"] = OpenWhisk # Validate deployment platform if name not in implementations: raise RuntimeError("Deployment {name} not supported!".format(name=name)) # Validate architecture if config["experiments"]["architecture"] not in self._config.supported_architecture(name): raise RuntimeError( "{architecture} is not supported in {name}".format( architecture=config["experiments"]["architecture"], name=name ) ) # Validate deployment type - container if config["experiments"][ "container_deployment" ] and not self._config.supported_container_deployment(name): raise RuntimeError(f"Container deployment is not supported in {name}.") # Validate deployment type - package if not config["experiments"][ "container_deployment" ] and not self._config.supported_package_deployment(name): raise RuntimeError(f"Code package deployment is not supported in {name}.") # Set up logging and create deployment configuration handlers = self.generate_logging_handlers(logging_filename) if not deployment_config: deployment_config = Config.deserialize(dep_config, self.cache_client, handlers) # Create and return the deployment client deployment_client = implementations[name]( self._config, deployment_config, # type: ignore self.cache_client, # type: ignore self.docker_client, # type: ignore handlers, # type: ignore ) return deployment_client
[docs] def get_deployment_config( self, config: dict, logging_filename: Optional[str] = None, ) -> Config: """Create a deployment configuration from a dictionary. This method deserializes a deployment configuration from a dictionary, setting up logging handlers and connecting it to the cache client. Args: config: Configuration dictionary logging_filename: Optional filename for logs Returns: A deserialized deployment configuration object """ handlers = self.generate_logging_handlers(logging_filename) return Config.deserialize(config, self.cache_client, handlers)
[docs] def get_experiment_config(self, config: dict) -> ExperimentConfig: """Create an experiment configuration from a dictionary. This method deserializes an experiment configuration from a dictionary. The experiment configuration contains settings specific to the experiment being run, such as the number of iterations, timeout, etc. Args: config: Configuration dictionary Returns: A deserialized experiment configuration object """ return ExperimentConfig.deserialize(config)
[docs] def get_experiment( self, experiment_type: str, config: dict, logging_filename: Optional[str] = None ) -> Experiment: """Get an experiment implementation for a specific experiment type. This method creates and returns an experiment implementation for the specified experiment type. It validates that the requested experiment type is supported and initializes the experiment with the appropriate configuration. Args: experiment_type: Type of experiment to create (e.g., "perf-cost") config: Configuration dictionary logging_filename: Optional filename for logs Returns: An initialized experiment implementation Raises: RuntimeError: If the requested experiment type is not supported """ from sebs.experiments import ( Experiment, PerfCost, NetworkPingPong, InvocationOverhead, EvictionModel, ) # Map of supported experiment types to their implementations implementations: Dict[str, Type[Experiment]] = { "perf-cost": PerfCost, "network-ping-pong": NetworkPingPong, "invocation-overhead": InvocationOverhead, "eviction-model": EvictionModel, } # Validate experiment type if experiment_type not in implementations: raise RuntimeError(f"Experiment {experiment_type} not supported!") # Create and configure the experiment experiment = implementations[experiment_type](self.get_experiment_config(config)) experiment.logging_handlers = self.generate_logging_handlers( logging_filename=logging_filename ) return experiment
[docs] def get_benchmark( self, name: str, deployment: FaaSSystem, config: ExperimentConfig, logging_filename: Optional[str] = None, ) -> Benchmark: """Get a benchmark implementation for a specific benchmark. This method creates and returns a benchmark implementation for the specified benchmark name. It configures the benchmark with the appropriate deployment, configuration, and resources. Args: name: Name of the benchmark to create (e.g., "210.thumbnailer") deployment: FaaS system deployment client config: Experiment configuration logging_filename: Optional filename for logs Returns: An initialized benchmark implementation """ # Create and configure the benchmark benchmark = Benchmark( name, deployment.name(), config, self._config, self._output_dir, self.cache_client, self.docker_client, ) # Set up logging benchmark.logging_handlers = self.generate_logging_handlers( logging_filename=logging_filename ) return benchmark
[docs] @staticmethod def get_storage_implementation(storage_type: types.Storage) -> Type[PersistentStorage]: """Get a storage implementation for a specific storage type. This method returns the class for a persistent storage implementation for the specified storage type. Args: storage_type: Type of storage to get implementation for Returns: Storage implementation class Raises: AssertionError: If the requested storage type is not supported """ _storage_implementations = {types.Storage.MINIO: minio.Minio} impl = _storage_implementations.get(storage_type) assert impl, f"Storage type {storage_type} not supported" return impl
[docs] @staticmethod def get_nosql_implementation(storage_type: types.NoSQLStorage) -> Type[NoSQLStorage]: """Get a NoSQL storage implementation for a specific storage type. This method returns the class for a NoSQL storage implementation for the specified storage type. Args: storage_type: Type of NoSQL storage to get implementation for Returns: NoSQL storage implementation class Raises: AssertionError: If the requested storage type is not supported """ _storage_implementations = {types.NoSQLStorage.SCYLLADB: scylladb.ScyllaDB} impl = _storage_implementations.get(storage_type) assert impl, f"NoSQL storage type {storage_type} not supported" return impl
[docs] @staticmethod def get_storage_config_implementation(storage_type: types.Storage): """Get a storage configuration implementation for a specific storage type. This method returns the class for a storage configuration implementation for the specified storage type. Args: storage_type: Type of storage to get configuration for Returns: Storage configuration implementation class Raises: AssertionError: If the requested storage type is not supported """ _storage_implementations = {types.Storage.MINIO: config.MinioConfig} impl = _storage_implementations.get(storage_type) assert impl, f"Storage configuration for type {storage_type} not supported" return impl
[docs] @staticmethod def get_nosql_config_implementation(storage_type: types.NoSQLStorage): """Get a NoSQL configuration implementation for a specific storage type. This method returns the class for a NoSQL configuration implementation for the specified storage type. Args: storage_type: Type of NoSQL storage to get configuration for Returns: NoSQL configuration implementation class Raises: AssertionError: If the requested storage type is not supported """ _storage_implementations = {types.NoSQLStorage.SCYLLADB: config.ScyllaDBConfig} impl = _storage_implementations.get(storage_type) assert impl, f"NoSQL configuration for type {storage_type} not supported" return impl
[docs] def shutdown(self): """Shutdown the SeBS client and release resources. This method shuts down the cache client and releases any resources that need to be cleaned up when the client is no longer needed. It is automatically called when using the client as a context manager. """ self.cache_client.shutdown()
def __enter__(self): """Enter context manager. This method allows the SeBS client to be used as a context manager using the 'with' statement, which ensures proper cleanup of resources. Returns: The SeBS client instance """ return self def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): """Exit context manager. This method is called when exiting a 'with' block. It ensures that resources are properly cleaned up by calling shutdown(). Args: exc_type: Exception type if an exception occurred, None otherwise exc_val: Exception value if an exception occurred, None otherwise exc_tb: Exception traceback if an exception occurred, None otherwise """ self.shutdown()