# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""
Module for handling benchmarks in the Serverless Benchmarking Suite (SeBS).
This module provides classes for benchmark configuration, code packaging, and execution.
It handles the preparation of code packages with dependencies for deployment to
various serverless platforms, including caching mechanisms to avoid redundant builds.
"""
import glob
import hashlib
import json
import subprocess
import os
import shutil
import textwrap
from abc import abstractmethod
from typing import Any, Callable, Dict, List, Optional, Tuple
import docker
from sebs.cpp_dependencies import CppDependencies
from sebs.config import SeBSConfig
from sebs.cache import Cache
from sebs.faas.config import Resources
from sebs.faas.container import DockerContainer
from sebs.faas.resources import SystemResources
from sebs.utils import find_benchmark, project_absolute_path, LoggingBase
from sebs.types import BenchmarkModule, Language
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from sebs.experiments.config import Config as ExperimentConfig
[docs]
class BenchmarkConfig:
"""
Configuration for a benchmark in the Serverless Benchmarking Suite.
This class stores the configuration parameters for a benchmark, including
timeout, memory allocation, supported languages, and included modules.
Attributes:
timeout: Maximum execution time in seconds
memory: Memory allocation in MB
languages: List of supported programming languages
modules: List of benchmark modules/features required
"""
def __init__(
self,
timeout: int,
memory: int,
languages: List["Language"],
modules: List[BenchmarkModule],
cpp_dependencies: Optional[List[CppDependencies]] = None,
):
"""
Initialize a benchmark configuration.
Args:
timeout: Maximum execution time in seconds
memory: Memory allocation in MB
languages: List of supported programming languages
modules: List of benchmark modules/features required
"""
self._timeout = timeout
self._memory = memory
self._languages = languages
self._modules = modules
self._cpp_dependencies = cpp_dependencies or []
@property
def timeout(self) -> int:
"""
Get the maximum execution time in seconds.
Returns:
int: The timeout value
"""
return self._timeout
@timeout.setter
def timeout(self, val: int):
"""
Set the maximum execution time in seconds.
Args:
val: The new timeout value
"""
self._timeout = val
@property
def memory(self) -> int:
"""
Get the memory allocation in MB.
Returns:
int: The memory allocation
"""
return self._memory
@memory.setter
def memory(self, val: int):
"""
Set the memory allocation in MB.
Args:
val: The new memory allocation value
"""
self._memory = val
@property
def languages(self) -> List["Language"]:
"""
Get the list of supported programming languages.
Returns:
List[Language]: Supported programming languages
"""
return self._languages
@property
def modules(self) -> List[BenchmarkModule]:
"""
Get the list of benchmark modules/features required.
Returns:
List[BenchmarkModule]: Required benchmark modules
"""
return self._modules
[docs]
@staticmethod
def deserialize(json_object: dict) -> "BenchmarkConfig":
"""
Create a BenchmarkConfig instance from a JSON object.
Args:
json_object: Dictionary containing benchmark configuration
Returns:
BenchmarkConfig: A new instance with the deserialized data
"""
from sebs.faas.function import Language
return BenchmarkConfig(
json_object["timeout"],
json_object["memory"],
[Language.deserialize(x) for x in json_object["languages"]],
[BenchmarkModule(x) for x in json_object["modules"]],
cpp_dependencies=[
CppDependencies.deserialize(x) for x in json_object.get("cpp_dependencies", [])
],
)
[docs]
class Benchmark(LoggingBase):
"""
Creates code package representing a benchmark with all code and assets.
This class handles building, packaging, and deploying benchmark code for
serverless platforms.
This includes copying source files, adding deployment-specific wrappers,
adding deployment-specific dependencies, and installing application dependencies
within Docker images corresponding to the target cloud deployment.
Code packages are cached.
The behavior of this class, particularly the `build` method, depends on the
state of the SeBS cache:
1. If no cache entry exists for the benchmark (for the current language, deployment, etc.),
a new code package is built.
2. If a cache entry exists, the hash of the benchmark's source directory is computed
and compared with the hash of cached package. If they differ, or if an update is forced,
the package is rebuilt.
3. Otherwise (cache entry exists and hash matches), the cached code package is used.
Attributes:
benchmark: Name of the benchmark
benchmark_path: Path to the benchmark directory
benchmark_config: Configuration for the benchmark
code_package: Dictionary with code package information
functions: Dictionary of functions for this benchmark
code_location: Location of the code package
is_cached: Whether the benchmark is cached
is_cached_valid: Whether the cached benchmark is valid
code_size: Size of the code package in bytes
container_uri: URI of the container for container deployments
language: Programming language for the benchmark
language_name: Name of the programming language
language_version: Version of the programming language
has_input_processed: Whether input processing has been performed
uses_storage: Whether the benchmark uses cloud storage
uses_nosql: Whether the benchmark uses NoSQL databases
architecture: CPU architecture of the deployment target
container_deployment: Whether using container deployment
"""
_hash_value: Optional[str]
[docs]
@staticmethod
def typename() -> str:
"""
Get the type name of this class.
Returns:
str: The type name
"""
return "Benchmark"
@property
def benchmark(self) -> str:
"""
Get the benchmark name.
Returns:
str: Name of the benchmark
"""
return self._benchmark
@property
def benchmark_path(self) -> str:
"""
Get the path to the benchmark directory.
Returns:
str: Path to the benchmark directory
"""
assert self._benchmark_path is not None
return self._benchmark_path
@property
def benchmark_config(self) -> BenchmarkConfig:
"""
Get the benchmark configuration.
Returns:
BenchmarkConfig: Configuration for the benchmark
"""
return self._benchmark_config
@property
def code_package(self) -> Dict[str, Any]:
"""
Get the cached code package information, if available.
This typically includes 'location' (relative to cache_dir), 'hash', and 'size'.
Returns:
Dict[str, Any]: Dictionary with code package information
"""
assert self._code_package is not None
return self._code_package
@property
def functions(self) -> Dict[str, Any]:
"""
Get the cached information about deployed functions associated
with this benchmark for the current deployment, keyed by function name.
Returns:
Dict[str, Any]: Dictionary of functions
"""
assert self._functions is not None
return self._functions
@property
def code_location(self) -> str | None:
"""
Get the absolute path to the prepared code package.
If cached, it points to the location within the SeBS cache directory.
Otherwise, it points to the build output directory.
Returns:
str: Path to the code package
"""
if self._code_package:
if "location" in self.code_package:
"""
Access cached code package instead of a built one.
"""
return os.path.join(self._cache_client.cache_dir, self.code_package["location"])
return None
else:
return self._code_location
@property
def is_cached(self) -> bool:
"""
Check if the benchmark is cached.
Returns:
bool: True if cached, False otherwise
"""
return self._is_cached
@is_cached.setter
def is_cached(self, val: bool):
"""
Set whether the benchmark is cached.
Args:
val: True if cached, False otherwise
"""
self._is_cached = val
@property
def is_cached_valid(self) -> bool:
"""
True if a cached code package exists and its hash matches the current
benchmark source code hash.
Returns:
bool: True if valid, False otherwise
"""
return self._is_cached_valid
@is_cached_valid.setter
def is_cached_valid(self, val: bool):
"""
Set whether the cached benchmark is valid.
Args:
val: True if valid, False otherwise
"""
self._is_cached_valid = val
@property
def code_size(self) -> int:
"""
Get the size of the code package in bytes.
Returns:
int: Size in bytes
"""
return self._code_size
@property
def container_uri(self) -> str:
"""
Get the URI of the container for container deployments.
Returns:
str: Container URI
Raises:
AssertionError: If container URI is None
"""
assert self._container_uri is not None
return self._container_uri
@property
def language(self) -> "Language":
"""
Get the programming language for the benchmark.
Returns:
Language: Programming language
"""
return self._language
@property
def language_name(self) -> str:
"""
Get the name of the programming language, e.g., "python".
Returns:
str: Name of the language
"""
return self._language.value
@property
def language_version(self) -> str:
"""
Get the version of the programming language, e.g. "3.8".
Returns:
str: Version of the language
"""
return self._language_version
@property
def has_input_processed(self) -> bool:
"""
Check if input processing has been performed.
Returns:
bool: True if processed, False otherwise
"""
return self._input_processed
@property
def uses_storage(self) -> bool:
"""
Check if the benchmark uses cloud storage.
Returns:
bool: True if using storage, False otherwise
"""
return self._uses_storage
@property
def uses_nosql(self) -> bool:
"""
Check if the benchmark uses NoSQL databases.
Returns:
bool: True if using NoSQL, False otherwise
"""
return self._uses_nosql
@property
def architecture(self) -> str:
"""
Get the CPU architecture of the deployment target.
Returns:
str: Architecture name (e.g., 'x86_64', 'arm64')
"""
return self._architecture
@property
def container_deployment(self) -> bool:
"""
Check if using container deployment.
Returns:
bool: True if using container deployment, False otherwise
"""
return self._container_deployment
@property # noqa: A003
def hash(self) -> str:
"""
Get the hash of the benchmark code.
Computes an MD5 hash of the benchmark directory to determine if
the code has changed since the last build.
Returns:
str: MD5 hash as a hexadecimal string
"""
path = os.path.join(self.benchmark_path, self.language_name)
self._hash_value = Benchmark.hash_directory(path, self._deployment_name, self.language)
return self._hash_value
@hash.setter # noqa: A003
def hash(self, val: str):
"""
Set the hash of the benchmark code.
Used only for testing purposes.
Args:
val: MD5 hash as a hexadecimal string
"""
self._hash_value = val
def __init__(
self,
benchmark: str,
deployment_name: str,
config: "ExperimentConfig",
system_config: SeBSConfig,
output_dir: str,
cache_client: Cache,
docker_client: docker.client.DockerClient,
):
"""
Initialize a Benchmark instance.
Sets up a benchmark for a specific deployment platform, including configuration,
language runtime, and caching. Loads the benchmark configuration from the JSON file
and validates the language support.
Args:
benchmark: Name of the benchmark
deployment_name: Name of the deployment platform (e.g., 'aws', 'azure')
config: Experiment configuration
system_config: SeBs system configuration
output_dir: Directory for output files
cache_client: Cache client for caching code packages
docker_client: Docker client for building dependencies
Raises:
RuntimeError: If the benchmark is not found or doesn't support the language
"""
super().__init__()
self._benchmark = benchmark
self._deployment_name = deployment_name
self._experiment_config = config
self._language = config.runtime.language
self._language_version = config.runtime.version
self._architecture = self._experiment_config.architecture
self._container_deployment = config.container_deployment
benchmark_path = find_benchmark(self.benchmark, "benchmarks")
if not benchmark_path:
raise RuntimeError("Benchmark {benchmark} not found!".format(benchmark=self._benchmark))
self._benchmark_path = benchmark_path
with open(os.path.join(self.benchmark_path, "config.json")) as json_file:
self._benchmark_config: BenchmarkConfig = BenchmarkConfig.deserialize(
json.load(json_file)
)
if self.language not in self.benchmark_config.languages:
raise RuntimeError(
"Benchmark {} not available for language {}".format(self.benchmark, self.language)
)
self._cache_client = cache_client
self._docker_client = docker_client
self._system_config = system_config
self._code_location: Optional[str] = None
self._output_dir = os.path.join(
output_dir,
f"{benchmark}_code",
self._language.value,
self._language_version,
self._architecture,
"container" if self._container_deployment else "package",
)
self._container_uri: Optional[str] = None
# verify existence of function in cache
self.query_cache()
if config.update_code:
self._is_cached_valid = False
# Load input module
self._benchmark_data_path = find_benchmark(self._benchmark, "benchmarks-data")
self._benchmark_input_module = load_benchmark_input(self._benchmark_path)
# Check if input has been processed
self._input_processed: bool = False
self._uses_storage: bool = False
self._uses_nosql: bool = False
[docs]
@staticmethod
def hash_directory(directory: str, deployment: str, language: Language):
"""
Compute MD5 hash of an entire directory.
Calculates a hash of the benchmark source code by combining hashes of all
relevant files. This includes language-specific files, deployment wrappers,
and shared files like shell scripts and JSON configuration.
Args:
directory: Path to the directory to hash
deployment: Name of the deployment platform
language: Programming language name
Returns:
str: MD5 hash as a hexadecimal string
"""
hash_sum = hashlib.md5()
FILES = {
Language.PYTHON: ["*.py", "requirements.txt*"],
Language.NODEJS: ["*.js", "package.json"],
Language.JAVA: ["*.java", "pom.xml"],
Language.CPP: ["*.cpp", "*.hpp", "dependencies.json"],
}
WRAPPERS = {
Language.PYTHON: ["*.py"],
Language.NODEJS: ["*.js"],
Language.JAVA: ["src"],
Language.CPP: ["*.cpp", "*.hpp"],
}
NON_LANG_FILES = ["*.sh", "*.json"]
selected_files = FILES[language] + NON_LANG_FILES
for file_type in selected_files:
for f in glob.glob(os.path.join(directory, "**", file_type), recursive=True):
if os.path.isfile(f):
path = os.path.join(directory, f)
with open(path, "rb") as opened_file:
hash_sum.update(opened_file.read())
# wrappers
wrapper_patterns = WRAPPERS[language]
for pattern in wrapper_patterns:
wrappers = project_absolute_path(
"benchmarks", "wrappers", deployment, language.value, pattern
)
for f in glob.glob(wrappers):
if os.path.isdir(f):
for root, _, files in os.walk(f):
for file in files:
path = os.path.join(root, file)
with open(path, "rb") as opened_file:
hash_sum.update(opened_file.read())
else:
with open(f, "rb") as opened_file:
hash_sum.update(opened_file.read())
return hash_sum.hexdigest()
[docs]
def serialize(self) -> dict:
"""
Serialize the benchmark to a dictionary.
Returns:
dict: Dictionary containing size and hash of the benchmark code
"""
return {"size": self.code_size, "hash": self.hash}
[docs]
def query_cache(self) -> None:
"""
Query the cache for existing benchmark code packages and functions.
Checks if there's a cached code package or container for this benchmark
and deployment combination. Updates the cache status fields based on
whether the cache exists and if it's still valid (hash matches).
"""
if self.container_deployment:
self._code_package = self._cache_client.get_container(
deployment=self._deployment_name,
benchmark=self._benchmark,
language=self.language_name,
language_version=self.language_version,
architecture=self.architecture,
)
if self._code_package is not None:
self._container_uri = self._code_package["image-uri"]
else:
self._code_package = self._cache_client.get_code_package(
deployment=self._deployment_name,
benchmark=self._benchmark,
language=self.language_name,
language_version=self.language_version,
architecture=self.architecture,
)
self._functions = self._cache_client.get_functions(
deployment=self._deployment_name,
benchmark=self._benchmark,
language=self.language_name,
)
if self._code_package is not None:
# compare hashes
current_hash = self.hash
old_hash = self._code_package["hash"]
self._code_size = self._code_package["size"]
self._is_cached = True
self._is_cached_valid = current_hash == old_hash
else:
self._is_cached = False
self._is_cached_valid = False
[docs]
def copy_code(self, output_dir: str) -> None:
"""Copy benchmark source code to output directory.
Copies language-specific source files and dependency files from the
benchmark directory to the output directory for deployment preparation.
Handles Python requirements files, Node.js package.json files, and Java projects.
Args:
output_dir: Destination directory for copied files
"""
FILES = {
Language.PYTHON: ["*.py", "requirements.txt*"],
Language.NODEJS: ["*.js", "package.json"],
Language.JAVA: [],
Language.CPP: ["*.cpp", "*.hpp", "dependencies.json"],
}
path = os.path.join(self.benchmark_path, self.language_name)
if self.language == Language.JAVA:
# In Java, we copy the entire nested directory.
shutil.copytree(path, output_dir, dirs_exist_ok=True)
return
for file_type in FILES[self.language]:
for f in glob.glob(os.path.join(path, file_type)):
shutil.copy2(os.path.join(path, f), output_dir)
# support node.js benchmarks with language specific packages
nodejs_package_json = os.path.join(path, f"package.json.{self.language_version}")
if os.path.exists(nodejs_package_json):
shutil.copy2(nodejs_package_json, os.path.join(output_dir, "package.json"))
[docs]
def add_benchmark_data(self, output_dir: str) -> None:
"""Add benchmark-specific data and assets to output directory.
Executes benchmark initialization scripts (init.sh) if present in
the benchmark directory. These scripts typically download or generate
additional data files required by the benchmark.
Args:
output_dir: Directory where benchmark data should be added
"""
cmd = "/bin/bash '{benchmark_path}/init.sh' '{output_dir}' false {architecture}"
paths = [
self.benchmark_path,
os.path.join(self.benchmark_path, self.language_name),
]
for path in paths:
if os.path.exists(os.path.join(path, "init.sh")):
full_cmd = cmd.format(
benchmark_path=path,
output_dir=output_dir,
architecture=self._experiment_config._architecture,
)
self.logging.debug("Adding benchmark data with command: {}".format(full_cmd))
result = subprocess.run(
full_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
output = result.stdout.decode("utf-8", errors="replace").strip()
if output:
self.logging.debug("init.sh output:\n{}".format(output))
if result.returncode != 0:
raise RuntimeError(
"init.sh failed (exit {}): {}".format(result.returncode, output)
)
[docs]
def add_deployment_files(self, output_dir: str) -> None:
"""Add deployment-specific wrapper files to output directory.
Copies platform-specific wrapper files (handlers, adapters) that
integrate the benchmark code with the target FaaS platform's
execution environment.
Files are sourced from `benchmarks/wrappers/{deployment_name}/{language_name}/`.
Args:
output_dir: Directory where deployment files should be added
"""
handlers_dir = project_absolute_path(
"benchmarks", "wrappers", self._deployment_name, self.language_name
)
handlers = [
os.path.join(handlers_dir, file)
for file in self._system_config.deployment_files(
self._deployment_name, self.language_name
)
]
for file in handlers:
destination = os.path.join(output_dir, os.path.basename(file))
if os.path.isdir(file):
shutil.copytree(file, destination, dirs_exist_ok=True)
else:
if not os.path.exists(destination):
shutil.copy2(file, destination)
[docs]
def add_deployment_package_python(self, output_dir: str) -> None:
"""Add Python deployment packages to requirements file.
Appends platform-specific Python packages and benchmark module
dependencies to the requirements.txt file for the deployment.
Handles versioned requirements files (e.g., requirements.txt.3.8).
Args:
output_dir: Directory containing the requirements file to modify
"""
destination_file = f"requirements.txt.{self._language_version}"
if not os.path.exists(os.path.join(output_dir, destination_file)):
destination_file = "requirements.txt"
# append to the end of requirements file
with open(os.path.join(output_dir, destination_file), "a") as out:
packages = self._system_config.deployment_packages(
self._deployment_name, self.language_name
)
for package in packages:
out.write(package)
module_packages = self._system_config.deployment_module_packages(
self._deployment_name, self.language_name
)
for bench_module in self._benchmark_config.modules:
if bench_module.value in module_packages:
for package in module_packages[bench_module.value]:
out.write(package)
[docs]
def add_deployment_package_nodejs(self, output_dir: str) -> None:
"""Add Node.js deployment packages to package.json.
Modifies the package.json file to include platform-specific
Node.js dependencies required for deployment.
Handles versioned package.json files (e.g., package.json.12).
Args:
output_dir: Directory containing the package.json file to modify
"""
# modify package.json
packages = self._system_config.deployment_packages(
self._deployment_name, self.language_name
)
if len(packages):
package_config = os.path.join(output_dir, f"package.json.{self._language_version}")
if not os.path.exists(package_config):
package_config = os.path.join(output_dir, "package.json")
with open(package_config, "r") as package_file:
package_json = json.load(package_file)
for key, val in packages.items():
package_json["dependencies"][key] = val
with open(package_config, "w") as package_file:
json.dump(package_json, package_file, indent=2)
[docs]
def add_deployment_package_java(self, output_dir: str):
"""Extend benchmark's pom.xml with system-specific packages.
All Java dependencies for each platform are defined in systems.json.
Args:
output_dir: benchmark directory containing pom.xml to modify
Raises:
ValueError: when benchmark's pom.xml is missing placeholder
"""
pom_path = os.path.join(output_dir, "pom.xml")
with open(pom_path, "r") as f:
pom_content = f.read()
packages = self._system_config.deployment_packages(
self._deployment_name, self.language_name
)
dependency_blocks = ""
if len(packages):
for key, val in packages.items():
dependency_name = key.strip('"').strip("'")
dependency_version = val.strip('"').strip("'")
dependency_blocks += (
self.format_maven_dependency(dependency_name, dependency_version) + "\n"
)
if "<!-- PLATFORM_DEPENDENCIES -->" not in pom_content:
raise ValueError(
"pom.xml template is missing <!-- PLATFORM_DEPENDENCIES --> placeholder"
)
pom_content = pom_content.replace(
"<!-- PLATFORM_DEPENDENCIES -->", dependency_blocks.strip()
)
with open(pom_path, "w") as f:
f.write(pom_content)
[docs]
def add_deployment_package_cpp(self, output_dir: str) -> None:
"""Generates CMakeLists.txt file for C++ benchmark.
The CMake file contains multiple steps:
* Basic definition of benchmark target.
* Packaging instructions for AWS.
* Linking dependencies required by the benchmark.
* Linking AWS SDK and Hiredis.
Args:
output_dir: Benchmark directory
"""
files = ["handler.cpp", "utils.cpp", "main.cpp"]
if BenchmarkModule.STORAGE in self.benchmark_config.modules:
files.append("storage.cpp")
if BenchmarkModule.NOSQL in self.benchmark_config.modules:
files.append("key-value.cpp")
# TODO: add module for redis
files_str = " ".join(files)
cmake_script = f"""
cmake_minimum_required(VERSION 3.9)
set(CMAKE_CXX_STANDARD 14)
# set(CMAKE_CXX_FLAGS "-Os")
project(benchmark LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
add_executable(
${{PROJECT_NAME}} {files_str}
)
target_include_directories(${{PROJECT_NAME}} PRIVATE ".")
target_compile_features(${{PROJECT_NAME}} PRIVATE "cxx_std_14")
target_compile_options(${{PROJECT_NAME}} PRIVATE "-Wall" "-Wextra")
find_package(aws-lambda-runtime)
target_link_libraries(${{PROJECT_NAME}} PRIVATE AWS::aws-lambda-runtime)
"""
for dependency in self._benchmark_config._cpp_dependencies:
cmake_script += CppDependencies.to_cmake_list(dependency)
"""
FIXME: we disabled Hiredis as this is currently not used.
We need a proper module for that.
"""
cmake_script += """
# find_package(PkgConfig REQUIRED)
# set(ENV{PKG_CONFIG_PATH} "/opt/lib/pkgconfig")
# pkg_check_modules(HIREDIS REQUIRED IMPORTED_TARGET hiredis)
# target_include_directories(${PROJECT_NAME} PUBLIC PkgConfig::HIREDIS)
# target_link_libraries(${PROJECT_NAME} PUBLIC PkgConfig::HIREDIS)
# this line creates a target that packages your binary and zips it up
aws_lambda_package_target(${PROJECT_NAME})
"""
self.logging.info(
f"CPP benchmark {self.benchmark} has "
+ str(len(self._benchmark_config._cpp_dependencies))
+ " dependencies."
)
build_script = os.path.join(output_dir, "CMakeLists.txt")
with open(build_script, "w") as script_file:
script_file.write(textwrap.dedent(cmake_script))
[docs]
def add_deployment_package(self, output_dir: str) -> None:
"""Add deployment packages based on programming language.
Delegates to language-specific package addition methods to include
platform-specific dependencies in the deployment package.
Args:
output_dir: Directory where deployment packages should be added
Raises:
NotImplementedError: If the language is not supported
"""
from sebs.faas.function import Language
if self.language == Language.PYTHON:
self.add_deployment_package_python(output_dir)
elif self.language == Language.NODEJS:
self.add_deployment_package_nodejs(output_dir)
elif self.language == Language.JAVA:
self.add_deployment_package_java(output_dir)
elif self.language == Language.CPP:
self.add_deployment_package_cpp(output_dir)
else:
raise NotImplementedError
[docs]
@staticmethod
def directory_size(directory: str) -> int:
"""Calculate total size of all files in a directory.
Recursively calculates the total size in bytes of all files
within the specified directory and its subdirectories.
Args:
directory: Path to the directory to measure
Returns:
int: Total size in bytes of all files in the directory
"""
from pathlib import Path
root = Path(directory)
sizes = [f.stat().st_size for f in root.glob("**/*") if f.is_file()]
return sum(sizes)
[docs]
def builder_image_name(self) -> Tuple[str, str]:
"""Image names of builder Docker images for preparing benchmarks.
We are progressively replacing all unversioned image names with versioned ones.
Returns:
Tuple of unversioned and versioned image names.
"""
unversioned_image_name = "build.{deployment}.{language}.{runtime}".format(
deployment=self._deployment_name,
language=self.language_name,
runtime=self.language_version,
)
image_name = "{base_image_name}-{sebs_version}".format(
base_image_name=unversioned_image_name,
sebs_version=self._system_config.version(),
)
return unversioned_image_name, image_name
[docs]
def install_dependencies(self, output_dir: str) -> None:
"""Install benchmark dependencies using Docker.
Uses Docker containers to install language-specific dependencies
(pip packages for Python, npm packages for Node.js) in an environment
matching the target deployment platform.
Pulls a pre-built Docker image specific to the deployment, language, and
runtime version. Mounts the output directory into the container and runs
an installer script (`/sebs/installer.sh`) within the container.
Handles fallbacks to unversioned Docker images if versioned ones are not found.
Supports copying files to/from Docker for environments where volume mounting
is problematic (e.g., CircleCI).
Args:
output_dir: Directory containing the code package to build
Raises:
RuntimeError: If Docker image pull fails
docker.errors.ContainerError: If dependency installation fails
"""
# do we have docker image for this run and language?
if "build" not in self._system_config.docker_image_types(
self._deployment_name, self.language_name
):
self.logging.info(
(
"There is no Docker build image for {deployment} run in {language}, "
"thus skipping the Docker-based installation of dependencies."
).format(deployment=self._deployment_name, language=self.language_name)
)
else:
repo_name = self._system_config.docker_repository()
unversioned_image_name, image_name = self.builder_image_name()
def ensure_image(name: str) -> None:
"""Internal implementation of checking for Docker image existence.
Args:
name: image name
Raises:
RuntimeError: when neither versioned nor unversioned images exists.
"""
try:
self._docker_client.images.get(repo_name + ":" + name)
except docker.errors.ImageNotFound:
try:
self.logging.info(
"Docker pull of image {repo}:{image}".format(repo=repo_name, image=name)
)
self._docker_client.images.pull(repo_name, name)
except docker.errors.APIError:
raise RuntimeError(
"Docker pull of image {}:{} failed!".format(repo_name, name)
)
try:
ensure_image(image_name)
except RuntimeError as e:
self.logging.warning(
"Failed to ensure image {}, falling back to {}: {}".format(
image_name, unversioned_image_name, e
)
)
try:
ensure_image(unversioned_image_name)
except RuntimeError:
raise
# update `image_name` in the context to the fallback image name
image_name = unversioned_image_name
# Create set of mounted volumes unless Docker volumes are disabled
if not self._experiment_config.check_flag("docker_copy_build_files"):
volumes = {os.path.abspath(output_dir): {"bind": "/mnt/function", "mode": "rw"}}
package_script = os.path.abspath(
os.path.join(self._benchmark_path, self.language_name, "package.sh")
)
# does this benchmark has package.sh script?
if os.path.exists(package_script):
volumes[package_script] = {
"bind": "/mnt/function/package.sh",
"mode": "ro",
}
# run Docker container to install packages
PACKAGE_FILES = {
Language.PYTHON: "requirements.txt",
Language.NODEJS: "package.json",
Language.CPP: "CMakeLists.txt",
Language.JAVA: "pom.xml",
}
file = os.path.join(output_dir, PACKAGE_FILES[self.language])
if os.path.exists(file):
try:
self.logging.info(
"Docker build of benchmark dependencies in container "
"of image {repo}:{image}".format(repo=repo_name, image=image_name)
)
uid = os.getuid()
# Standard, simplest build
if not self._experiment_config.check_flag("docker_copy_build_files"):
self.logging.info(
"Docker mount of benchmark code from path {path}".format(
path=os.path.abspath(output_dir)
)
)
container = self._docker_client.containers.run(
"{}:{}".format(repo_name, image_name),
volumes=volumes,
environment={
"CONTAINER_UID": str(os.getuid()),
"CONTAINER_GID": str(os.getgid()),
"CONTAINER_USER": "docker_user",
"APP": self.benchmark,
"PLATFORM": self._deployment_name.upper(),
"TARGET_ARCHITECTURE": self._experiment_config._architecture,
},
remove=False,
detach=True,
)
try:
exit_code = container.wait()
stdout = container.logs()
if exit_code["StatusCode"] != 0:
error_log_path = os.path.join(output_dir, "error.log")
with open(error_log_path, "wb") as error_file:
error_file.write(stdout)
self.logging.error(
f"Build failed! Container exited with "
f"code {exit_code['StatusCode']}"
)
self.logging.error(f"Logs saved to {error_log_path}")
raise RuntimeError("Package build failed!")
finally:
container.remove()
# Hack to enable builds on platforms where Docker mounted volumes
# are not supported. Example: CircleCI docker environment
else:
container = self._docker_client.containers.run(
"{}:{}".format(repo_name, image_name),
environment={"APP": self.benchmark},
# user="1000:1000",
user=uid,
remove=True,
detach=True,
tty=True,
command="/bin/bash",
)
# copy application files
import tarfile
self.logging.info(
"Send benchmark code from path {path} to "
"Docker instance".format(path=os.path.abspath(output_dir))
)
tar_archive = os.path.join(output_dir, os.path.pardir, "function.tar")
with tarfile.open(tar_archive, "w") as tar:
for f in os.listdir(output_dir):
tar.add(os.path.join(output_dir, f), arcname=f)
with open(tar_archive, "rb") as data:
container.put_archive("/mnt/function", data.read())
# do the build step
exit_code, stdout = container.exec_run(
cmd="/bin/bash /sebs/installer.sh",
user="docker_user",
stdout=True,
stderr=True,
)
# copy updated code with package
data, stat = container.get_archive("/mnt/function")
with open(tar_archive, "wb") as output_filef:
for chunk in data:
output_filef.write(chunk)
with tarfile.open(tar_archive, "r") as tar:
tar.extractall(output_dir)
# docker packs the entire directory with basename function
for f in os.listdir(os.path.join(output_dir, "function")):
shutil.move(
os.path.join(output_dir, "function", f),
os.path.join(output_dir, f),
)
shutil.rmtree(os.path.join(output_dir, "function"))
container.stop()
# Pass to output information on optimizing builds.
# Useful for AWS where packages have to obey size limits.
for line in stdout.decode("utf-8").split("\n"):
if "size" in line:
self.logging.info("Docker build: {}".format(line))
except docker.errors.ContainerError as e:
self.logging.error("Package build failed!")
self.logging.error(f"Stderr: {e.stderr}")
self.logging.error(f"Docker mount volumes: {volumes}")
raise e from None
[docs]
def recalculate_code_size(self) -> int:
"""Recalculate and update the code package size.
Measures the current size of the output directory and updates
the internal code size tracking.
Returns:
int: Updated code package size in bytes
"""
self._code_size = Benchmark.directory_size(self._output_dir)
return self._code_size
[docs]
def build(
self,
package_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, int]],
container_client: DockerContainer | None,
container_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, int]]
| None,
) -> Tuple[bool, str | None, bool, str | None]:
"""Build the complete benchmark deployment package.
Orchestrates the entire build process for a benchmark, including:
- Code copying and dependency installation
- Adding benchmark data and deployment-specific files
- Running platform-specific build and packaging steps
(e.g., zipping, creating container image).
- Cache validation and reuse if possible
- Cache updates after successful build
Args:
package_build_step: Platform-specific build function for code package
container_client: Docker client for building container images (if container deployment)
container_build_step: Platform-specific build function for container deployments
Returns:
Tuple containing:
- bool: Whether a new build was performed (False if cached)
- str: Path to the built code package
- bool: Whether this is a container deployment
- str: Container URI (empty string if not container deployment)
"""
# Skip build if files are up to date and user didn't enforce rebuild
if self.is_cached and self.is_cached_valid:
if self.container_deployment:
if self._container_uri is None:
assert container_client is not None
self._container_uri = container_client.push_to_registry(
self.benchmark,
self.language_name,
self.language_version,
self.architecture,
)
self._cache_client.update_container_uri(
self._deployment_name,
self._benchmark,
self.language_name,
self.language_version,
self.architecture,
self._container_uri,
)
self.logging.info(
"Using cached benchmark {} from container image {}".format(
self.benchmark, self.container_uri
)
)
return False, None, self.container_deployment, self.container_uri
else:
self.logging.info(
"Using cached benchmark {} at {}".format(self.benchmark, self.code_location)
)
return False, self.code_location, self.container_deployment, None
msg = (
"no cached code package/container."
if not self.is_cached
else "cached code package is not up to date/build enforced."
)
self.logging.info("Building benchmark {}. Reason: {}".format(self.benchmark, msg))
# clear existing cache information
self._code_package = None
# create directory to be deployed
if os.path.exists(self._output_dir):
shutil.rmtree(self._output_dir)
os.makedirs(self._output_dir)
self.copy_code(self._output_dir)
self.add_benchmark_data(self._output_dir)
self.add_deployment_files(self._output_dir)
self.add_deployment_package(self._output_dir)
"""
We have two main build paths:
(1) Code Package. There, we finish by installing dependencies,
and let the platform figure out the details of code package,
such as exact directory distribution.
(2) Container build. There, we install all dependencies inside the container.
"""
self._container_uri = None
if self.container_deployment:
assert container_client is not None
repo_name = self._system_config.docker_repository()
_, image_name = self.builder_image_name()
"""
Generate custom Dockerfile for C++ benchmarks
"""
if self.language == Language.CPP:
from sebs.cpp_dependencies import CppDependencies
from sebs.utils import DOCKER_DIR
template_path = os.path.join(
DOCKER_DIR, self._deployment_name, "cpp", "Dockerfile.function"
)
with open(template_path, "r") as f:
dockerfile_template = f.read()
dockerfile_content = CppDependencies.generate_dockerfile(
self._benchmark_config._cpp_dependencies,
dockerfile_template,
self._system_config.version(),
)
dockerfile_path = os.path.join(self._output_dir, "Dockerfile")
with open(dockerfile_path, "w") as f:
f.write(dockerfile_content)
self.logging.info(
f"Generated custom Dockerfile for C++ benchmark with "
f"{len(self._benchmark_config._cpp_dependencies)} explicit dependencies"
)
_, self._container_uri, self._code_size = container_client.build_base_image(
os.path.abspath(self._output_dir),
self.language,
self.language_version,
self.architecture,
self.benchmark,
self.is_cached_valid,
f"{repo_name}:{image_name}",
)
self.logging.info(
f"Created function container {self._container_uri},"
f" code package (source hash: {self.hash}), for run on {self._deployment_name}"
f" with {self.language_name}:{self.language_version}"
)
"""
OpenWhisk requires a code package in addition to the container.
"""
if container_build_step is not None:
self._code_location, self._code_size = package_build_step(
os.path.abspath(self._output_dir),
self.language,
self.language_version,
self.architecture,
self.benchmark,
self.is_cached_valid,
)
else:
self.install_dependencies(self._output_dir)
self._code_location, self._code_size = package_build_step(
os.path.abspath(self._output_dir),
self.language,
self.language_version,
self.architecture,
self.benchmark,
self.is_cached_valid,
)
self.logging.info(
(
"Created code package (source hash: {hash}), for run on {deployment}"
+ " with {language}:{runtime}"
).format(
hash=self.hash,
deployment=self._deployment_name,
language=self.language_name,
runtime=self.language_version,
)
)
"""
Update can handle both update of an existing cache structure
or creating an entirely new one.
"""
self._cache_client.update_code_package(self._deployment_name, self)
self.query_cache()
return (
True,
self._code_location,
self._container_deployment,
self._container_uri,
)
[docs]
def code_package_modify(self, filename: str, data: bytes) -> None:
"""
Updates a specific file within the code package without rebuilding
the entire package. Currently only supports ZIP archive packages.
This is used in experiments that modify the size of input package.
Does not support resizing containers or Azure deployments (non-ZIP).
Args:
filename: Name of the file to modify within the package
data: New content for the file as bytes
Raises:
NotImplementedError: If the code package is not a ZIP archive
"""
if not self.container_deployment and self.code_package_is_archive():
assert self.code_location is not None
self._update_zip(self.code_location, filename, data)
new_size = self.code_package_recompute_size() / 1024.0 / 1024.0
self.logging.info(f"Modified zip package {self.code_location}, new size {new_size} MB")
else:
raise NotImplementedError()
[docs]
def code_package_is_archive(self) -> bool:
"""Check if the code package is an archive file.
Determines whether the code package is stored as an archive file
(ZIP) rather than a directory structure.
Returns:
bool: True if package is a ZIP archive, False if it's a directory
"""
if self.container_deployment:
return False
code_location = self.code_location
assert code_location is not None
if os.path.isfile(code_location):
extension = os.path.splitext(code_location)[1]
return extension in [".zip"]
return False
[docs]
def code_package_recompute_size(self) -> float:
"""Recalculate the size of the code package file.
Updates the internal size tracking after modifications to the
code package file.
Returns:
float: Updated package size in bytes
"""
if self.container_deployment:
raise NotImplementedError()
if self.code_location is None:
raise RuntimeError("Code location is not set!")
bytes_size = os.path.getsize(self.code_location)
self._code_size = bytes_size
return bytes_size
@staticmethod
def _update_zip(zipname: str, filename: str, data: bytes) -> None:
"""Update a file within a ZIP archive.
Replaces the content of a specific file within a ZIP archive
while preserving all other files and archive metadata.
Creates a temporary zip file, copies all items from the original except
the target file (if it exists), and adds/replaces the target file with
new data. Finally, replaces the original zip with the temporary one.
Based on method from:
https://stackoverflow.com/questions/25738523/how-to-update-one-file-inside-zip-file-using-python
Args:
zipname: Path to the ZIP archive to modify
filename: Name of the file to update within the archive
data: New content for the file as bytes
"""
import zipfile
import tempfile
# generate a temp file
tmpfd, tmpname = tempfile.mkstemp(dir=os.path.dirname(zipname))
os.close(tmpfd)
# create a temp copy of the archive without filename
with zipfile.ZipFile(zipname, "r") as zin:
with zipfile.ZipFile(tmpname, "w") as zout:
zout.comment = zin.comment # preserve the comment
for item in zin.infolist():
if item.filename != filename:
zout.writestr(item, zin.read(item.filename))
# replace with the temp archive
os.remove(zipname)
os.rename(tmpname, zipname)
# now add filename with its new data
with zipfile.ZipFile(zipname, mode="a", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(filename, data)
[docs]
class BenchmarkModuleInterface:
"""Interface definition for benchmark input modules.
Useful for static type hinting with mypy and documentation.
This class defines the interface that benchmark input modules
must implement to provide input data generation, storage allocation,
and NoSQL database setup for benchmarks.
All methods are static as they operate on benchmark data rather than
instance state. Benchmark modules are dynamically loaded from the
input.py file in each benchmark directory.
"""
[docs]
@staticmethod
@abstractmethod
def buckets_count() -> Tuple[int, int]:
"""Get the number of storage buckets required by the benchmark.
Returns:
Tuple[int, int]: Number of (input_buckets, output_buckets) needed
"""
pass
[docs]
@staticmethod
@abstractmethod
def allocate_nosql() -> Dict[str, Dict[str, str]]:
"""Define NoSQL table requirements for the benchmark.
Returns:
Dict containing table definitions with primary and secondary keys:
{
'table_name': {
'primary_key': 'key_field_name',
'secondary_key': 'optional_secondary_key_name'
}
}
"""
pass