Source code for sebs.openwhisk.openwhisk

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""
Apache OpenWhisk serverless platform implementation for SeBS.

This module provides the main OpenWhisk system class that integrates OpenWhisk
serverless platform with the SeBS benchmarking framework. It handles function
deployment, execution, monitoring, and resource management for OpenWhisk clusters.
"""

import json
import os
import subprocess
from typing import cast, Callable, Dict, List, Optional, Tuple, Type

import docker

from sebs.benchmark import Benchmark
from sebs.cache import Cache
from sebs.faas import System
from sebs.faas.function import Function, ExecutionResult, Trigger
from sebs.openwhisk.container import OpenWhiskContainer
from sebs.openwhisk.triggers import LibraryTrigger, HTTPTrigger
from sebs.storage.resources import SelfHostedSystemResources
from sebs.storage.minio import Minio
from sebs.storage.scylladb import ScyllaDB
from sebs.utils import LoggingHandlers
from sebs.faas.config import Resources
from .config import OpenWhiskConfig
from .function import OpenWhiskFunction, OpenWhiskFunctionConfig
from ..config import SeBSConfig
from sebs.types import Language


[docs] class OpenWhisk(System): """ Apache OpenWhisk serverless platform implementation for SeBS. This class provides the main integration between SeBS and Apache OpenWhisk, handling function deployment, execution, container management, and resource management (primarily self-hosted storage like Minio/ScyllaDB via SelfHostedSystemResources), and interaction with the `wsk` CLI. It supports OpenWhisk deployments with Docker-based function packaging. We do not use code packages due to low package size limits. Attributes: _config: OpenWhisk-specific configuration settings container_client: Docker container client for function packaging logging_handlers: Logging handlers for the OpenWhisk system Example: >>> openwhisk = OpenWhisk(sys_config, ow_config, cache, docker_client, handlers) >>> function = openwhisk.create_function(benchmark, "test-func", True, "image:tag") """ _config: OpenWhiskConfig def __init__( self, system_config: SeBSConfig, config: OpenWhiskConfig, cache_client: Cache, docker_client: docker.client.DockerClient, logger_handlers: LoggingHandlers, ) -> None: """ Initialize OpenWhisk system with configuration and clients. Will log in to Docker registry. Args: system_config: Global SeBS system configuration config: OpenWhisk-specific configuration settings cache_client: Cache client for storing function and resource data docker_client: Docker client for container operations logger_handlers: Logging handlers for system operations """ super().__init__( system_config, cache_client, docker_client, SelfHostedSystemResources( "openwhisk", config, cache_client, docker_client, logger_handlers ), ) self._config = config self.logging_handlers = logger_handlers self._container_client = OpenWhiskContainer( self.system_config, self.config, self.docker_client, self.config.experimentalManifest, ) if self.config.resources.docker_username: if self.config.resources.docker_registry: docker_client.login( username=self.config.resources.docker_username, password=self.config.resources.docker_password, registry=self.config.resources.docker_registry, ) else: docker_client.login( username=self.config.resources.docker_username, password=self.config.resources.docker_password, )
[docs] def initialize( self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None ) -> None: """ Initialize OpenWhisk system resources. Args: config: Additional configuration parameters (currently unused) resource_prefix: Optional prefix for resource naming """ self.initialize_resources(select_prefix=resource_prefix)
@property def config(self) -> OpenWhiskConfig: """ Get OpenWhisk configuration. Returns: OpenWhisk configuration instance """ return self._config @property def container_client(self) -> OpenWhiskContainer: """ Get OpenWhisk container client. Returns: OpenWhisk container client """ return self._container_client
[docs] def shutdown(self) -> None: """ Shutdown OpenWhisk system and clean up resources. This method stops storage services if configured and optionally removes the OpenWhisk cluster based on configuration settings. """ if hasattr(self, "storage") and self.config.shutdownStorage: self.storage.stop() if self.config.removeCluster: from tools.openwhisk_preparation import delete_cluster # type: ignore delete_cluster() super().shutdown()
[docs] @staticmethod def name() -> str: """ Get the platform name identifier. Returns: Platform name as string """ return "openwhisk"
[docs] @staticmethod def typename() -> str: """ Get the platform type name. Returns: Platform type name as string """ return "OpenWhisk"
[docs] @staticmethod def function_type() -> "Type[Function]": """ Get the function type for this platform. Returns: OpenWhiskFunction class type """ return OpenWhiskFunction
[docs] def get_wsk_cmd(self) -> List[str]: """ Get the WSK CLI command with appropriate flags. Returns: List of command arguments for WSK CLI execution """ cmd = [self.config.wsk_exec] if self.config.wsk_bypass_security: cmd.append("-i") return cmd
[docs] def package_code( self, directory: str, language: Language, language_version: str, architecture: str, benchmark: str, is_cached: bool, ) -> Tuple[str, int]: """ Package benchmark code for OpenWhisk deployment. Creates a ZIP archive containing the benchmark code. The ZIP archive is required for OpenWhisk function registration even when using Docker-based deployment. It contains only the main handlers (`__main__.py` or `index.js`). For Java, extracts the JAR from the built container image - this a fix since we need to provide it as argument to OpenWhisk action, but we do not want to add extra builder image. Args: directory: Path to the benchmark code directory language: Programming language (e.g., 'python', 'nodejs', 'java') language_version: Language version (e.g., '3.8', '14', '17') architecture: Target architecture (e.g., 'x86_64') benchmark: Benchmark name is_cached: Whether Docker image is already cached Returns: Tuple containing: - Path to created ZIP archive (or JAR for Java) - Size of archive in bytes """ if language == Language.JAVA: # For Java, we need to extract the JAR from the built container # Get the container image URI that was just built _, _, _, image_uri = self._container_client.registry_name( benchmark, language.value, language_version, architecture ) self.logging.info(f"Extracting JAR from container image {image_uri}") # Run container to get the JAR file jar_path = os.path.join(directory, "function.jar") try: # Create and run a temporary container container = self.docker_client.containers.create(image_uri) # Copy JAR from container to build directory # Docker API expects a path to a tar stream import tarfile import io bits, _ = container.get_archive("/function/function.jar") # Extract tar stream to get the file tar_stream = io.BytesIO() for chunk in bits: tar_stream.write(chunk) tar_stream.seek(0) with tarfile.open(fileobj=tar_stream) as tar: # Extract function.jar from the tar jar_member = tar.getmember("function.jar") jar_file = tar.extractfile(jar_member) if jar_file is None: raise RuntimeError("Could not extract function.jar from container!") # Write to destination with open(jar_path, "wb") as f: f.write(jar_file.read()) # Clean up container container.remove() self.logging.info(f"Extracted function JAR to {jar_path}") bytes_size = os.path.getsize(jar_path) self.logging.info(f"JAR size {bytes_size / 1024.0 / 1024.0:.2f} MB") return jar_path, bytes_size except Exception as e: self.logging.error(f"Failed to extract JAR from container: {e}") raise RuntimeError(f"Failed to extract JAR from container {image_uri}: {e}") else: # For Python and Node.js, create a minimal ZIP with handlers # We deploy Minio config in code package since this depends on local # deployment - it cannot be a part of Docker image CONFIG_FILES = { Language.PYTHON: ["__main__.py"], Language.NODEJS: ["index.js"], } package_config = CONFIG_FILES[language] benchmark_archive = os.path.join(directory, f"{benchmark}.zip") subprocess.run( ["zip", benchmark_archive] + package_config, stdout=subprocess.DEVNULL, cwd=directory, ) self.logging.info(f"Created {benchmark_archive} archive") bytes_size = os.path.getsize(benchmark_archive) self.logging.info("Zip archive size {:2f} MB".format(bytes_size / 1024.0 / 1024.0)) return benchmark_archive, bytes_size
[docs] def finalize_container_build( self, ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, int]] | None: """ Regardless of Docker image status, we need to create .zip file to allow registration of function with OpenWhisk. By returning the code package routine, we ensure that a package will be created. Returns: Code packaging function. """ return self.package_code
[docs] def storage_arguments(self, code_package: Benchmark) -> List[str]: """ Generate storage-related arguments for function deployment. Creates WSK CLI parameters for Minio object storage and ScyllaDB NoSQL storage configurations based on the benchmark requirements. Args: code_package: Benchmark configuration requiring storage access Returns: List of WSK CLI parameter arguments for storage configuration """ envs = [] if self.config.resources.storage_config: storage_envs = self.config.resources.storage_config.envs() envs = [ "-p", "MINIO_STORAGE_SECRET_KEY", storage_envs["MINIO_SECRET_KEY"], "-p", "MINIO_STORAGE_ACCESS_KEY", storage_envs["MINIO_ACCESS_KEY"], "-p", "MINIO_STORAGE_CONNECTION_URL", storage_envs["MINIO_ADDRESS"], ] if code_package.uses_nosql: nosql_storage = self.system_resources.get_nosql_storage() for key, value in nosql_storage.envs().items(): envs.append("-p") envs.append(key) envs.append(value) for original_name, actual_name in nosql_storage.get_tables( code_package.benchmark ).items(): envs.append("-p") envs.append(f"NOSQL_STORAGE_TABLE_{original_name}") envs.append(actual_name) return envs
[docs] def create_function( self, code_package: Benchmark, func_name: str, container_deployment: bool, container_uri: str | None, ) -> "OpenWhiskFunction": """ Create or retrieve an OpenWhisk function (action). This method checks if a function already exists and updates it if necessary, or creates a new function with the appropriate configuration, storage settings, and Docker image. Args: code_package: Benchmark configuration and code package func_name: Name for the OpenWhisk action container_deployment: Whether to use container-based deployment container_uri: URI of the Docker image for the function Returns: OpenWhiskFunction instance configured with LibraryTrigger Raises: RuntimeError: If WSK CLI is not accessible or function creation fails """ if not container_deployment: raise RuntimeError("Non-container deployment is not supported in OpenWhisk!") self.logging.info("Creating function as an action in OpenWhisk.") try: actions = subprocess.run( [*self.get_wsk_cmd(), "action", "list"], stderr=subprocess.DEVNULL, stdout=subprocess.PIPE, ) function_found = False docker_image = "" for line in actions.stdout.decode().split("\n"): if line and func_name in line.split()[0]: function_found = True break function_cfg = OpenWhiskFunctionConfig.from_benchmark(code_package) if code_package.uses_storage: function_cfg.object_storage = cast( Minio, self.system_resources.get_storage() ).config if code_package.uses_nosql: function_cfg.nosql_storage = cast( ScyllaDB, self.system_resources.get_nosql_storage() ).config if function_found: # docker image is overwritten by the update res = OpenWhiskFunction( func_name, code_package.benchmark, code_package.hash, function_cfg ) # Update function - we don't know what version is stored self.logging.info(f"Retrieved existing OpenWhisk action {func_name}.") self.update_function(res, code_package, container_deployment, container_uri) else: try: self.logging.info(f"Creating new OpenWhisk action {func_name}") docker_image = self.system_config.benchmark_image_name( self.name(), code_package.benchmark, code_package.language_name, code_package.language_version, code_package.architecture, repository=self.config.dockerhub_repository, ) code_location = code_package.code_location if code_location is None: raise RuntimeError( "Code location must be set for OpenWhisk action!" ) from None run_arguments = [ *self.get_wsk_cmd(), "action", "create", func_name, "--web", "true", "--docker", docker_image, "--memory", str(code_package.benchmark_config.memory), "--timeout", str(code_package.benchmark_config.timeout * 1000), *self.storage_arguments(code_package), code_location, ] if code_package.language == Language.JAVA: run_arguments.extend(["--main", "org.serverlessbench.Handler"]) if code_package.code_location is None: raise RuntimeError( "Code location must be set for OpenWhisk action! " "OpenWhisk requires container deployment with a code package." ) subprocess.run( run_arguments, stderr=subprocess.PIPE, stdout=subprocess.PIPE, check=True, ) function_cfg.docker_image = docker_image res = OpenWhiskFunction( func_name, code_package.benchmark, code_package.hash, function_cfg, ) except subprocess.CalledProcessError as e: self.logging.error(f"Cannot create action {func_name}.") self.logging.error(f"Output: {e.stderr.decode('utf-8')}") raise RuntimeError(e) except FileNotFoundError: self.logging.error("Could not retrieve OpenWhisk functions - is path to wsk correct?") raise RuntimeError("Failed to access wsk binary") # Add LibraryTrigger to a new function trigger = LibraryTrigger(func_name, self.get_wsk_cmd()) trigger.logging_handlers = self.logging_handlers res.add_trigger(trigger) return res
[docs] def update_function( self, function: Function, code_package: Benchmark, container_deployment: bool, container_uri: str | None, ) -> None: """ Update an existing OpenWhisk function with new code and configuration. Args: function: Existing function to update code_package: New benchmark configuration and code package container_deployment: Whether to use container-based deployment container_uri: URI of the new Docker image Raises: RuntimeError: If WSK CLI is not accessible or update fails """ if not container_deployment: raise RuntimeError( "Code location must be set for OpenWhisk action! " "OpenWhisk requires container deployment with a code package." ) self.logging.info(f"Update an existing OpenWhisk action {function.name}.") function = cast(OpenWhiskFunction, function) docker_image = self.system_config.benchmark_image_name( self.name(), code_package.benchmark, code_package.language_name, code_package.language_version, code_package.architecture, repository=self.config.dockerhub_repository, ) if code_package.code_location is None: raise RuntimeError( "Code location must be set for OpenWhisk action! " "OpenWhisk requires container deployment with a code package." ) try: run_arguments = [ *self.get_wsk_cmd(), "action", "update", function.name, "--web", "true", "--docker", docker_image, "--memory", str(code_package.benchmark_config.memory), "--timeout", str(code_package.benchmark_config.timeout * 1000), *self.storage_arguments(code_package), code_package.code_location, ] if code_package.language == Language.JAVA: run_arguments.extend(["--main", "org.serverlessbench.Handler"]) subprocess.run( run_arguments, stderr=subprocess.PIPE, stdout=subprocess.PIPE, check=True, ) function.config.docker_image = docker_image except FileNotFoundError as e: self.logging.error("Could not update OpenWhisk function - is path to wsk correct?") raise RuntimeError(e) except subprocess.CalledProcessError as e: self.logging.error(f"Unknown error when running function update: {e}!") self.logging.error("Make sure to remove SeBS cache after restarting OpenWhisk!") self.logging.error(f"Output: {e.stderr.decode('utf-8')}") raise RuntimeError(e)
[docs] def update_function_configuration(self, function: Function, code_package: Benchmark) -> None: """ Update configuration of an existing OpenWhisk function. Updates memory allocation, timeout, and storage parameters without changing the function code or Docker image. Args: function: Function to update configuration for code_package: New benchmark configuration settings Raises: RuntimeError: If WSK CLI is not accessible or configuration update fails """ self.logging.info(f"Update configuration of an existing OpenWhisk action {function.name}.") try: subprocess.run( [ *self.get_wsk_cmd(), "action", "update", function.name, "--memory", str(code_package.benchmark_config.memory), "--timeout", str(code_package.benchmark_config.timeout * 1000), *self.storage_arguments(code_package), ], stderr=subprocess.PIPE, stdout=subprocess.PIPE, check=True, ) except FileNotFoundError as e: self.logging.error("Could not update OpenWhisk function - is path to wsk correct?") raise RuntimeError(e) except subprocess.CalledProcessError as e: self.logging.error(f"Unknown error when running function update: {e}!") self.logging.error("Make sure to remove SeBS cache after restarting OpenWhisk!") self.logging.error(f"Output: {e.stderr.decode('utf-8')}") raise RuntimeError(e)
[docs] def is_configuration_changed(self, cached_function: Function, benchmark: Benchmark) -> bool: """ Check if function configuration has changed compared to cached version. Compares current benchmark configuration and storage settings with the cached function configuration to determine if an update is needed. Args: cached_function: Previously cached function configuration benchmark: Current benchmark configuration to compare against Returns: True if configuration has changed and function needs updating """ changed = super().is_configuration_changed(cached_function, benchmark) if benchmark.uses_storage: storage = cast(Minio, self.system_resources.get_storage()) function = cast(OpenWhiskFunction, cached_function) # check if now we're using a new storage if function.config.object_storage != storage.config: self.logging.info( "Updating function configuration due to changed storage configuration." ) changed = True function.config.object_storage = storage.config if benchmark.uses_nosql: nosql_storage = cast(ScyllaDB, self.system_resources.get_nosql_storage()) function = cast(OpenWhiskFunction, cached_function) # check if now we're using a new storage if function.config.nosql_storage != nosql_storage.config: self.logging.info( "Updating function configuration due to changed NoSQL storage configuration." ) changed = True function.config.nosql_storage = nosql_storage.config return changed
[docs] def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None ) -> str: """ Generate default function name based on benchmark and resource configuration. Args: code_package: Benchmark package containing name and language info resources: Optional specific resources to use for naming Returns: Generated function name string """ resource_id = resources.resources_id if resources else self.config.resources.resources_id return ( f"sebs-{resource_id}-{code_package.benchmark}-" f"{code_package.language_name}-{code_package.language_version}" )
[docs] def enforce_cold_start(self, functions: List[Function], code_package: Benchmark) -> None: """ Enforce cold start for functions (not implemented for OpenWhisk). Args: functions: List of functions to enforce cold start for code_package: Benchmark package configuration Raises: NotImplementedError: Cold start enforcement not implemented for OpenWhisk """ raise NotImplementedError()
[docs] def download_metrics( self, function_name: str, start_time: int, end_time: int, requests: Dict[str, ExecutionResult], metrics: dict, ): """ Download metrics for function executions (no-op for OpenWhisk). Args: function_name: Name of the function to download metrics for start_time: Start time for metrics collection (epoch timestamp) end_time: End time for metrics collection (epoch timestamp) requests: Dictionary mapping request IDs to execution results metrics: Dictionary to store downloaded metrics Note: OpenWhisk metrics collection is not currently implemented. """ self.logging.info( f"OpenWhisk: Starting to download metrics for {len(requests)} invocations" ) processed_count = 0 for request_id in requests: try: result = subprocess.run( [*self.get_wsk_cmd(), "activation", "get", request_id], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if result.returncode != 0: self.logging.warning(f"OpenWhisk: Activation {request_id} not found") continue # Parse JSON output (skip first "ok" line) output = result.stdout.decode("utf-8") lines = output.strip().split("\n") if len(lines) <= 1: self.logging.warning(f"OpenWhisk: Activation {request_id} not found") continue json_str = "\n".join(lines[1:]) activation = json.loads(json_str) duration_ms = activation.get("duration") if duration_ms is None: self.logging.error(f"OpenWhisk: Duration not found in activation {request_id}") continue # Update execution time (convert milliseconds to microseconds) requests[request_id].provider_times.execution = int(float(duration_ms) * 1000) # Extract initTime from annotations (optional - only present on cold starts) annotations = activation.get("annotations", []) for annotation in annotations: if annotation.get("key") == "initTime": init_time_ms = annotation.get("value") if init_time_ms is not None: requests[request_id].provider_times.initialization = int( float(init_time_ms) * 1000 ) break processed_count += 1 except json.JSONDecodeError as e: self.logging.error( f"OpenWhisk: Failed to parse activation JSON for {request_id}: {e}" ) except Exception as e: self.logging.error(f"OpenWhisk: Error processing activation {request_id}: {e}") self.logging.info( f"OpenWhisk: Downloaded metrics for {processed_count} " f"out of {len(requests)} invocations" )
[docs] def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: """ Create a trigger for function invocation. Args: function: Function to create trigger for trigger_type: Type of trigger to create (LIBRARY or HTTP) Returns: Created trigger instance Raises: RuntimeError: If WSK CLI is not accessible or trigger type not supported """ if trigger_type == Trigger.TriggerType.LIBRARY: return function.triggers(Trigger.TriggerType.LIBRARY)[0] elif trigger_type == Trigger.TriggerType.HTTP: try: response = subprocess.run( [*self.get_wsk_cmd(), "action", "get", function.name, "--url"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) except FileNotFoundError as e: self.logging.error( "Could not retrieve OpenWhisk configuration - is path to wsk correct?" ) raise RuntimeError(e) stdout = response.stdout.decode("utf-8") url = stdout.strip().split("\n")[-1] + ".json" trigger = HTTPTrigger(function.name, url) trigger.logging_handlers = self.logging_handlers function.add_trigger(trigger) self.cache_client.update_function(function) return trigger else: raise RuntimeError("Not supported!")
[docs] def cached_function(self, function: Function) -> None: """ Configure a cached function with current system settings. Updates triggers with current logging handlers and WSK command configuration. Args: function: Cached function to configure """ for trigger in function.triggers(Trigger.TriggerType.LIBRARY): trigger.logging_handlers = self.logging_handlers cast(LibraryTrigger, trigger).wsk_cmd = self.get_wsk_cmd() for trigger in function.triggers(Trigger.TriggerType.HTTP): trigger.logging_handlers = self.logging_handlers
[docs] def disable_rich_output(self) -> None: """ Disable rich output formatting for container operations. This is useful for non-interactive environments or when plain text output is preferred. """ self.container_client.disable_rich_output = True