Source code for sebs.faas.system

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""
Module providing the core abstraction for Function-as-a-Service (FaaS) systems.

This module defines the base System class that provides consistent interfaces for
working with different serverless platforms (AWS Lambda, Azure Functions, Google Cloud
Functions, OpenWhisk, etc.). It handles function lifecycle management, code packaging,
deployment, triggering, and metrics collection while abstracting away platform-specific
details.
"""

from abc import ABC
from abc import abstractmethod
from random import randrange
from typing import Callable, Dict, List, Optional, Tuple, Type
import uuid

import docker

from sebs.benchmark import Benchmark
from sebs.cache import Cache
from sebs.config import SeBSConfig
from sebs.experiments.config import SystemVariant
from sebs.faas.container import DockerContainer
from sebs.faas.resources import SystemResources
from sebs.faas.config import Resources
from sebs.faas.function import Function, Trigger, ExecutionResult
from sebs.utils import LoggingBase
from sebs.sebs_types import Language
from .config import Config


[docs] class System(ABC, LoggingBase): """ Abstract base class for FaaS system implementations. This class provides basic abstractions for all supported FaaS platforms. It defines the interface for system initialization, resource management, function deployment, code packaging, function invocation, and metrics collection. Each cloud provider implements a concrete subclass of this abstract base. The class handles: - System and storage service initialization - Creation and updating of serverless functions - Function code packaging and deployment - Trigger creation and management - Metrics collection and error handling - Caching of functions to avoid redundant deployments - Cold start management Attributes: system_config: Global SeBS configuration docker_client: Docker client for building code packages and containers cache_client: Cache client for storing function and deployment information cold_start_counter: Counter for generating unique function names to force cold starts system_resources: Resources manager for the specific cloud platform """ def __init__( self, system_config: SeBSConfig, cache_client: Cache, docker_client: docker.client.DockerClient, system_resources: SystemResources, ): """ Initialize a FaaS system implementation. Args: system_config: Global SeBS configuration settings cache_client: Cache client for storing function and deployment information docker_client: Docker client for building code packages and containers system_resources: Resources manager for the specific cloud platform """ super().__init__() self._system_config = system_config self._docker_client = docker_client self._cache_client = cache_client # Initialize with random value to help with cold start detection/forcing self._cold_start_counter = randrange(100) self._system_resources = system_resources @property def system_config(self) -> SeBSConfig: """ Get the global SeBS configuration. Returns: SeBSConfig: The system configuration """ return self._system_config @property def docker_client(self) -> docker.client.DockerClient: """ Get the Docker client. Returns: docker.client.DockerClient: The Docker client """ return self._docker_client @property def cache_client(self) -> Cache: """ Get the cache client. Returns: Cache: The cache client """ return self._cache_client @property def cold_start_counter(self) -> int: """ Get the cold start counter. A counter used in attempts to enforce cold starts. Its value might be incorporated into function environment variables. Returns: int: The current cold start counter value """ return self._cold_start_counter @cold_start_counter.setter def cold_start_counter(self, val: int): """ Set the cold start counter. Args: val: The new counter value """ self._cold_start_counter = val @property @abstractmethod def config(self) -> Config: """ Get the platform-specific configuration. Returns: Config: The platform-specific configuration """ pass @property def container_client(self) -> DockerContainer | None: """Get the platform-specific container manager. For example, on OpenWhisk we push to DockerHub, while on AWS we push images to ECR. Returns: Container manager instance. """ return None
[docs] def system_variant_suffix(self, system_variant: SystemVariant) -> Optional[str]: """Return an optional provider-local system variant suffix.""" return None
@property def system_resources(self) -> SystemResources: """ Get the platform-specific resources manager. Returns: SystemResources: The resources manager """ return self._system_resources
[docs] @staticmethod @abstractmethod def function_type() -> "Type[Function]": """ Get the platform-specific Function class type. Returns: Type[Function]: The Function class for this platform """ pass
[docs] def find_deployments(self) -> List[str]: """ Find existing deployments in the cloud platform. Default implementation uses storage buckets to identify deployments. This can be overridden by platform-specific implementations, e.g., Azure that looks for unique storage accounts. Returns: List[str]: List of existing deployment resource IDs """ return self.system_resources.get_storage().find_deployments()
[docs] def cleanup_resources(self, dry_run: bool = False) -> dict: """Discover and delete all SeBS resources of the deployment. Args: dry_run: when true, it does not delete anything. Returns: Dict mapping resource type names to lists of deleted resource identifiers. """ raise NotImplementedError(f"Resource cleanup not implemented for {self.name()}")
[docs] def initialize_resources(self, select_prefix: Optional[str], quiet: bool = False): """ Initialize cloud resources for the deployment. This method either: 1. Uses an existing resource ID from configuration 2. Finds existing deployment in the cloud and reuses it, matching the optional prefix 3. If no suitable existing deployment is found or specified, a new unique resource ID is generated. Args: select_prefix: Optional prefix to match when looking for existing deployments """ # User provided resources or found in cache if self.config.resources.has_resources_id: if not quiet: self.logging.info( f"Using existing resource name: {self.config.resources.resources_id}." ) return # Now search for existing resources deployments = self.find_deployments() # If a prefix is specified, we find the first matching resource ID if select_prefix is not None: for dep in deployments: if select_prefix in dep: if not quiet: self.logging.info( f"Using existing deployment {dep} that matches prefix {select_prefix}!" ) self.config.resources.resources_id = dep return # We warn users that we create a new resource ID # They can use them with a new config if len(deployments) > 0 and not quiet: self.logging.warning( f"We found {len(deployments)} existing deployments! " "If you want to use any of them, please abort, and " "provide the resource id in your input config." ) self.logging.warning(f"Deployment resource IDs in the cloud: {deployments}") # Create a new unique resource ID res_id = "" if select_prefix is not None: res_id = f"{select_prefix}-{str(uuid.uuid1())[0:8]}" else: res_id = str(uuid.uuid1())[0:8] self.config.resources.resources_id = res_id if not quiet: self.logging.info(f"Generating unique resource name {res_id}") # Ensure that the bucket is created - this allocates the new resource self.system_resources.get_storage().get_bucket(Resources.StorageBucketType.BENCHMARKS)
[docs] def initialize( self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None, quiet: bool = False, ): """ Initialize the system. After this call completes, the local or remote FaaS system should be ready to allocate functions, manage storage resources, and invoke functions. Subclasses should override this to perform provider-specific initialization. Args: config: System-specific parameters resource_prefix: Optional prefix for resource naming """ pass
[docs] @abstractmethod def package_code( self, directory: str, language: Language, language_version: str, architecture: str, benchmark: str, is_cached: bool, ) -> Tuple[str, float]: """ Apply system-specific code packaging to prepare a deployment package. The benchmark creates a code directory with the following structure: - [benchmark sources] - [benchmark resources], e.g., HTML template or ffmpeg binary - [dependence specification], e.g. requirements.txt or package.json - [language-specific wrappers implementation for the specific system] This step transforms that structure to fit platform-specific deployment requirements, such as creating a zip file for AWS or container image. Args: directory: Path to the code directory language: Programming language name language_version: Programming language version architecture: Target architecture (e.g., 'x64', 'arm64') benchmark: Benchmark name is_cached: Whether the code is cached Returns: Tuple containing: - Path to packaged code - Size of the package in bytes """ pass
[docs] def finalize_container_build( self, ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, float]] | None: """Default behavior of container deployment is that no code package is needed. Thus, we return None to signal that. One exception is OpenWhisk: we deploy code package + container. Returns: Null, as no code package. """ return None
[docs] @abstractmethod def create_function( self, code_package: Benchmark, func_name: str, system_variant: SystemVariant, container_uri: str | None, ) -> Function: """ Create a new function in the FaaS platform. The implementation is responsible for creating all necessary cloud resources. Args: code_package: Benchmark containing the function code func_name: Name of the function system_variant: Selected deployment variant container_uri: URI of the container image Returns: Function: Created function instance Raises: NotImplementedError: If the deployment variant is not supported """ pass
[docs] @abstractmethod def cached_function(self, function: Function): """ Perform any necessary operations for a cached function. This method is called when a function is found in the cache. It may perform platform-specific operations such as checking if the function still exists in the cloud, updating permissions, re-initializing transient client objects, or ensuring associated resources (like triggers) are correctly configured. Args: function: The cached function instance """ pass
[docs] @abstractmethod def update_function( self, function: Function, code_package: Benchmark, system_variant: SystemVariant, container_uri: str | None, ): """ Update an existing function in the FaaS platform with new code and/or configuration. Args: function: Existing function instance to update code_package: New benchmark containing the function code system_variant: Selected deployment variant container_uri: URI of the container image Raises: NotImplementedError: If the deployment variant is not supported """ pass
[docs] def build_function(self, code_package: Benchmark, func_name: Optional[str] = None): """Create a build deployment of the selected function. Args: code_package: seleted benchmark func_name: user-specified function name Raises: RuntimeError: unsupported language version for the platform """ if code_package.language_version not in self.system_config.supported_language_versions( self.name(), code_package.language_name, code_package.architecture ): raise Exception( "Unsupported {lang} version {ver} in {sys} for architecture {arch}!".format( lang=code_package.language_name, ver=code_package.language_version, sys=self.name(), arch=code_package.architecture, ) ) if not func_name: func_name = self.default_function_name(code_package) _, code_package_loc, system_variant, container_uri = code_package.build( self.package_code, self.container_client, self.finalize_container_build() ) if code_package_loc is not None: self.logging.info( f"Created code package for function {func_name} at {code_package_loc}" ) if system_variant.is_container: self.logging.info( f"Created container deployment for function {func_name}: {container_uri}" )
[docs] def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) -> Function: """ Get or create a function for a benchmark. This method handles the complete function creation/update workflow: 1. If a cached function with the given name exists and code has not changed, returns the cached function (after potential configuration checks/updates) 2. If a cached function exists but the code hash differs or rebuild is foreced, update the function code in the cloud. 3. If no cached function exists, creates a new function Benchmark code is built (via `code_package.build`) before these steps. The build might be skipped if source code hasn't changed and no update is forced. Args: code_package: The benchmark containing the function code func_name: Optional name for the function (will be generated if not provided) Returns: Function: The function instance Raises: Exception: If the language version is not supported by this platform """ if code_package.language_version not in self.system_config.supported_language_versions( self.name(), code_package.language_name, code_package.architecture ): raise Exception( "Unsupported {lang} version {ver} in {sys} for architecture {arch}!".format( lang=code_package.language_name, ver=code_package.language_version, sys=self.name(), arch=code_package.architecture, ) ) # Generate function name if not provided if not func_name: func_name = self.default_function_name(code_package) # Build the code package rebuilt, _, system_variant, container_uri = code_package.build( self.package_code, self.container_client, self.finalize_container_build() ) # Check if function exists in cache functions = code_package.functions is_function_cached = not (not functions or func_name not in functions) if is_function_cached: # Retrieve function from cache cached_function = functions[func_name] code_location = code_package.code_location try: function = self.function_type().deserialize(cached_function) except RuntimeError as e: self.logging.error( f"Cached function {cached_function['name']} is no longer available." ) self.logging.error(e) is_function_cached = False else: err_msg = self.can_reuse_cached_function(function, code_package) if err_msg is not None: self.logging.info( f"Cached function {func_name} is not compatible with the current " f"deployment configuration and will be replaced. Reason: {err_msg}" ) is_function_cached = False # Create new function if not cached or deserialize failed if not is_function_cached: msg = ( "function name not provided." if not func_name else "function {} not found in cache.".format(func_name) ) self.logging.info("Creating new function! Reason: " + msg) function = self.create_function(code_package, func_name, system_variant, container_uri) self.cache_client.add_function( deployment_name=self.name(), language_name=code_package.language_name, code_package=code_package, function=function, ) code_package.query_cache() return function else: assert function is not None self.cached_function(function) if code_package.system_variant.is_container: self.logging.info( f"Using cached function {func_name} container {code_package.container_uri}" ) else: self.logging.info(f"Using cached function {func_name} in {code_location}") # code up to date, but configuration needs to be updated if self.is_configuration_changed(function, code_package): self.update_function_configuration(function, code_package) self.cache_client.update_function(function) code_package.query_cache() # is the function up-to-date? if function.code_package_hash != code_package.hash or rebuilt: if function.code_package_hash != code_package.hash: self.logging.info( f"Cached function {func_name} with hash " f"{function.code_package_hash} is not up to date with " f"current build {code_package.hash} in " f"{code_location}, updating cloud version!" ) if rebuilt: self.logging.info( f"Enforcing rebuild and update of cached function " f"{func_name} with hash {function.code_package_hash}." ) # Update function code self.update_function(function, code_package, system_variant, container_uri) function.code_package_hash = code_package.hash function.updated_code = True self.cache_client.add_function( deployment_name=self.name(), language_name=code_package.language_name, code_package=code_package, function=function, ) code_package.query_cache() else: self.logging.info(f"Code of cached function: {func_name} is up to date.") return function
[docs] @abstractmethod def update_function_configuration(self, cached_function: Function, benchmark: Benchmark): """ Update the configuration of an existing function on the FaaS plaform. This method is called when a function's code is up-to-date but its configuration (memory, timeout, environment variable, etc.) needs to be updated. Args: cached_function: The function to update benchmark: The benchmark containing the new configuration """ pass
[docs] def is_configuration_changed(self, cached_function: Function, benchmark: Benchmark) -> bool: """ Check if a function's configuration needs to be updated. This function checks for common function parameters to verify if their values are still up to date with the benchmark configuration. Args: cached_function: The existing function benchmark: The benchmark with potential new configuration Returns: bool: True if configuration has changed, False otherwise """ changed = False # Check common configuration attributes for attr in ["timeout", "memory"]: new_val = getattr(benchmark.benchmark_config, attr) old_val = getattr(cached_function.config, attr) if new_val != old_val: self.logging.info( f"Updating function configuration due to changed attribute {attr}: " f"cached function has value {old_val} whereas {new_val} has been requested." ) changed = True setattr(cached_function.config, attr, new_val) # Check language/runtime attributes for lang_attr in [["language"] * 2, ["language_version", "version"]]: new_val = getattr(benchmark, lang_attr[0]) old_val = getattr(cached_function.config.runtime, lang_attr[1]) if new_val != old_val: # FIXME: should this even happen? we should never pick the function with # different runtime - that should be encoded in the name self.logging.info( f"Updating function configuration due to changed runtime attribute " f"{lang_attr}: cached function has value {old_val} whereas " f"{new_val} has been requested." ) changed = True setattr(cached_function.config.runtime, lang_attr[1], new_val) return changed
[docs] def can_reuse_cached_function( self, cached_function: Function, benchmark: Benchmark ) -> Optional[str]: """Check whether a cached function can be reused as-is. Args: cached_function: Cached function selected from SeBS cache. benchmark: Benchmark requesting the function. Returns: string explaining why the function cannot be reused, or None if it can be reused. """ return None
[docs] @abstractmethod def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None ) -> str: """ Generate a default function name for a benchmark. Args: code_package: The benchmark to generate a name for resources: Optional resources configuration Returns: str: Generated function name """ pass
[docs] @abstractmethod def enforce_cold_start(self, functions: List[Function], code_package: Benchmark): """ Force cold starts for the specified functions. This method implements platform-specific techniques to ensure that subsequent invocations of the functions will be cold starts. In practice, this usually uses an update of environment variables with new values. Args: functions: List of functions to enforce cold starts for code_package: The benchmark associated with the functions """ pass
[docs] @abstractmethod def download_metrics( self, function_name: str, start_time: int, end_time: int, requests: Dict[str, ExecutionResult], metrics: dict, ): """ Download provider-specific performance metrics from the cloud platform. This typically involves querying a logging or monitoring service (e.g., CloudWatch, Application Insights) for details like actual execution duration, memory usage, etc., and populating the `requests` (ExecutionResult objects) and `metrics` dictionaries. Args: function_name: Name of the function to get metrics for start_time: Start timestamp for metrics collection end_time: End timestamp for metrics collection requests: Dictionary of execution results metrics: Dictionary to store the downloaded metrics """ pass
[docs] @abstractmethod def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: """ Create a trigger for a function. Args: function: The function to create a trigger for trigger_type: Type of trigger to create Returns: Trigger: The created trigger """ pass
[docs] def disable_rich_output(self): """ Disable rich output for platforms that support it, e.g, progress of pushing Docker images. This is mostly used in testing environments or CI pipelines. """ pass
[docs] @abstractmethod def shutdown(self) -> None: """ Shutdown the FaaS system. This should release any acquired resources, stop any running local services (like Docker containers started by SeBS for CLI interactions), and update the cache with the final system configuration. This should be called when the system is no longer needed. """ try: self.cache_client.lock() self.config.update_cache(self.cache_client) finally: self.cache_client.unlock()
[docs] @staticmethod @abstractmethod def name() -> str: """ Get the name of the platform. Returns: str: Platform name (e.g., 'aws', 'azure', 'gcp') """ pass
[docs] def delete_function(self, func_name: str, function: Dict) -> None: """Delete cloud deployment of a function. Args: func_name: function name in the cloud. """ raise NotImplementedError(f"Function deletion not implemented for {self.name()}")
[docs] def cleanup_functions(self, dry_run: bool) -> List[str]: """Remove all created cloud functions. Args: dry_run: when true, skips actual deletion Returns: list of deleted function names """ functions = self.cache_client.get_all_functions(self.name()) deleted = [] for name, func in functions.items(): if not dry_run: self.delete_function(name, func) deleted.append(name) if dry_run: return deleted for name, func in functions.items(): if name not in deleted: continue self.cache_client.remove_function( self.name(), func["benchmark"], func["config"]["runtime"]["language"], name ) return deleted