# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Performance and cost measurement experiment implementation.
This module provides the PerfCost experiment implementation, which measures
the performance characteristics and execution costs of serverless functions.
It can run several experiment types:
- Cold: Measures cold start performance by enforcing container recreation
- Warm: Measures warm execution performance with reused containers
- Burst: Measures performance under concurrent burst load
- Sequential: Measures performance with sequential invocations
The experiment collects detailed metrics about execution time, memory usage,
and costs, and provides statistical analysis of the results.
"""
import json
import os
import time
from enum import Enum
from multiprocessing.pool import ThreadPool
from typing import List, TYPE_CHECKING
from sebs.faas.system import System as FaaSSystem
from sebs.faas.function import Trigger
from sebs.experiments.experiment import Experiment
from sebs.experiments.result import Result as ExperimentResult
from sebs.experiments.config import Config as ExperimentConfig
from sebs.utils import serialize
from sebs.statistics import basic_stats, ci_tstudents, ci_le_boudec
# import cycle
if TYPE_CHECKING:
from sebs import SeBS
[docs]
class PerfCost(Experiment):
"""Performance and cost measurement experiment.
This experiment measures the performance characteristics and execution
costs of serverless functions under different execution conditions.
It can measure cold starts, warm execution, burst load, and sequential
execution patterns.
The experiment can be configured to run with different memory sizes,
allowing for comparison of performance across different resource allocations.
Attributes:
_benchmark: The benchmark to execute
_benchmark_input: The input data for the benchmark
_function: The function to invoke
_trigger: The trigger to use for invocation
_out_dir: Directory for storing results
_deployment_client: The deployment client to use
_sebs_client: The SeBS client
"""
def __init__(self, config: ExperimentConfig):
"""Initialize a new PerfCost experiment.
Args:
config: Experiment configuration
"""
super().__init__(config)
[docs]
@staticmethod
def name() -> str:
"""Get the name of the experiment.
Returns:
The name "perf-cost"
"""
return "perf-cost"
[docs]
@staticmethod
def typename() -> str:
"""Get the type name of the experiment.
Returns:
The type name "Experiment.PerfCost"
"""
return "Experiment.PerfCost"
[docs]
class RunType(Enum):
"""Types of experiment runs.
This enum defines the different types of experiment runs:
- WARM: Measure warm execution performance (reused containers)
- COLD: Measure cold start performance (new containers)
- BURST: Measure performance under concurrent burst load
- SEQUENTIAL: Measure performance with sequential invocations
"""
WARM = 0
COLD = 1
BURST = 2
SEQUENTIAL = 3
[docs]
def str(self) -> str:
"""Get the string representation of the run type.
Returns:
The lowercase name of the run type
"""
return self.name.lower()
[docs]
def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem) -> None:
"""Prepare the experiment for execution.
This method sets up the benchmark, function, trigger, and output
directory for the experiment. It creates or gets the function and
its HTTP trigger, and prepares the input data for the benchmark.
Args:
sebs_client: The SeBS client to use
deployment_client: The deployment client to use
"""
# Create benchmark instance
settings = self.config.experiment_settings(self.name())
self._benchmark = sebs_client.get_benchmark(
settings["benchmark"], deployment_client, self.config
)
# Prepare benchmark input
self._benchmark_input = self._benchmark.prepare_input(
deployment_client.system_resources,
size=settings["input-size"],
replace_existing=self.config.update_storage,
)
# Get or create function
self._function = deployment_client.get_function(self._benchmark)
# Add HTTP trigger if not already present
triggers = self._function.triggers(Trigger.TriggerType.HTTP)
if len(triggers) == 0:
self._trigger = deployment_client.create_trigger(
self._function, Trigger.TriggerType.HTTP
)
else:
self._trigger = triggers[0]
# Create output directory
self._out_dir = os.path.join(sebs_client.output_dir, "perf-cost")
if not os.path.exists(self._out_dir):
os.mkdir(self._out_dir)
# Save clients for later use
self._deployment_client = deployment_client
self._sebs_client = sebs_client
[docs]
def run(self) -> None:
"""Run the experiment.
This method runs the experiment with the configured settings.
If memory sizes are specified, it runs the experiment for each
memory size, updating the function configuration accordingly.
Otherwise, it runs the experiment once with the default memory
configuration.
"""
settings = self.config.experiment_settings(self.name())
# Get memory sizes to test
memory_sizes = settings["memory-sizes"]
# Run with default memory if no specific sizes are provided
if len(memory_sizes) == 0:
self.logging.info("Begin experiment with default memory configuration")
self.run_configuration(settings, settings["repetitions"])
# Run for each specified memory size
for memory in memory_sizes:
self.logging.info(f"Begin experiment on memory size {memory}")
# Update function memory configuration
self._function.config.memory = memory
self._deployment_client.update_function(
self._function,
self._benchmark,
self._benchmark.system_variant,
self._benchmark.container_uri
if self._benchmark.system_variant.is_container
else "",
)
self._sebs_client.cache_client.update_function(self._function)
# Run experiment with this memory configuration
self.run_configuration(settings, settings["repetitions"], suffix=str(memory))
[docs]
def compute_statistics(self, times: List[float]) -> None:
"""Compute statistical analysis of execution times.
This method computes basic statistics (mean, median, standard deviation,
coefficient of variation) and confidence intervals for the given times.
It computes both parametric (Student's t-distribution) and non-parametric
confidence intervals.
Args:
times: List of execution times in milliseconds
"""
# Compute basic statistics
mean, median, std, cv = basic_stats(times)
self.logging.info(f"Mean {mean} [ms], median {median} [ms], std {std}, CV {cv}")
# Compute confidence intervals for different confidence levels
for alpha in [0.95, 0.99]:
# Parametric confidence interval (Student's t-distribution)
ci_interval = ci_tstudents(alpha, times)
interval_width = ci_interval[1] - ci_interval[0]
ratio = 100 * interval_width / mean / 2.0
self.logging.info(
f"Parametric CI (Student's t-distribution) {alpha} from "
f"{ci_interval[0]} to {ci_interval[1]}, within {ratio}% of mean"
)
# Non-parametric confidence interval (Le Boudec's method)
# Only compute if we have enough samples (> 20)
if len(times) > 20:
ci_interval = ci_le_boudec(alpha, times)
interval_width = ci_interval[1] - ci_interval[0]
ratio = 100 * interval_width / median / 2.0
self.logging.info(
f"Non-parametric CI {alpha} from {ci_interval[0]} to "
f"{ci_interval[1]}, within {ratio}% of median"
)
def _run_configuration(
self,
run_type: "PerfCost.RunType",
settings: dict,
invocations: int,
repetitions: int,
suffix: str = "",
) -> None:
"""Run a specific experiment configuration.
This method executes the experiment with the specified run type,
collecting and recording the results. It handles different run types
(cold, warm, burst, sequential) appropriately, enforcing cold starts
when needed and collecting execution statistics.
Args:
run_type: Type of run (cold, warm, burst, sequential)
settings: Experiment settings
invocations: Number of concurrent invocations
repetitions: Total number of repetitions to run
suffix: Optional suffix for output file names (e.g., memory size)
"""
# Randomize starting value to ensure that it's not the same
# as in the previous run.
# Otherwise we could not change anything and containers won't be killed.
from random import randrange
self._deployment_client.cold_start_counter = randrange(100)
"""
Cold experiment: schedule all invocations in parallel.
"""
file_name = (
f"{run_type.str()}_results_{suffix}.json"
if suffix
else f"{run_type.str()}_results.json"
)
self.logging.info(f"Begin {run_type.str()} experiments")
incorrect_executions = []
error_executions = []
error_count = 0
incorrect_count = 0
colds_count = 0
with open(os.path.join(self._out_dir, file_name), "w") as out_f:
samples_gathered = 0
client_times = []
with ThreadPool(invocations) as pool:
result = ExperimentResult(self.config, self._deployment_client.config)
result.begin()
samples_generated = 0
# Warm up container
# For "warm" runs, we do it automatically by pruning cold results
if run_type == PerfCost.RunType.SEQUENTIAL:
self._trigger.sync_invoke(self._benchmark_input)
first_iteration = True
while samples_gathered < repetitions:
if run_type == PerfCost.RunType.COLD or run_type == PerfCost.RunType.BURST:
self._deployment_client.enforce_cold_start(
[self._function], self._benchmark
)
time.sleep(5)
results = []
for i in range(0, invocations):
results.append(
pool.apply_async(
self._trigger.sync_invoke, args=(self._benchmark_input,)
)
)
incorrect = []
for res in results:
try:
ret = res.get()
if first_iteration:
continue
if run_type == PerfCost.RunType.COLD and not ret.stats.cold_start:
self.logging.info(f"Invocation {ret.request_id} is not cold!")
incorrect.append(ret)
elif run_type == PerfCost.RunType.WARM and ret.stats.cold_start:
self.logging.info(f"Invocation {ret.request_id} is cold!")
else:
result.add_invocation(self._function, ret)
colds_count += ret.stats.cold_start
client_times.append(ret.times.client / 1000.0)
samples_gathered += 1
except Exception as e:
error_count += 1
error_executions.append(str(e))
samples_generated += invocations
if first_iteration:
self.logging.info(
f"Processed {invocations} warm-up samples, ignoring these results."
)
else:
self.logging.info(
f"Processed {samples_gathered} samples out of {repetitions},"
f" {error_count} errors"
)
first_iteration = False
if len(incorrect) > 0:
incorrect_executions.extend(incorrect)
incorrect_count += len(incorrect)
time.sleep(5)
result.end()
self.compute_statistics(client_times)
out_f.write(
serialize(
{
**json.loads(serialize(result)),
"statistics": {
"samples_generated": samples_gathered,
"failures": error_executions,
"failures_count": error_count,
"incorrect": incorrect_executions,
"incorrect_count": incorrect_count,
"cold_count": colds_count,
},
}
)
)
[docs]
def run_configuration(self, settings: dict, repetitions: int, suffix: str = "") -> None:
"""Run experiments for each configured experiment type.
This method runs the experiment for each experiment type specified
in the settings. It dispatches to the appropriate run type handler
for each experiment type.
Args:
settings: Experiment settings
repetitions: Number of repetitions to run
suffix: Optional suffix for output file names (e.g., memory size)
Raises:
RuntimeError: If an unknown experiment type is specified
"""
# Run each configured experiment type
for experiment_type in settings["experiments"]:
if experiment_type == "cold":
# Cold start experiments - enforce container recreation
self._run_configuration(
PerfCost.RunType.COLD,
settings,
settings["concurrent-invocations"],
repetitions,
suffix,
)
elif experiment_type == "warm":
# Warm execution experiments - reuse containers
self._run_configuration(
PerfCost.RunType.WARM,
settings,
settings["concurrent-invocations"],
repetitions,
suffix,
)
elif experiment_type == "burst":
# Burst load experiments - concurrent invocations
self._run_configuration(
PerfCost.RunType.BURST,
settings,
settings["concurrent-invocations"],
repetitions,
suffix,
)
elif experiment_type == "sequential":
# Sequential invocation experiments - one at a time
self._run_configuration(
PerfCost.RunType.SEQUENTIAL, settings, 1, repetitions, suffix
)
else:
raise RuntimeError(f"Unknown experiment type {experiment_type} for Perf-Cost!")
[docs]
def process(
self,
sebs_client: "SeBS",
deployment_client: FaaSSystem,
directory: str,
logging_filename: str,
extend_time_interval: int,
) -> None:
"""Process experiment results and generate a CSV report.
This method processes the experiment results, downloads additional
metrics if needed, and generates a CSV report with the results.
The report includes memory usage, execution times, and other metrics
for each experiment type and invocation.
Args:
sebs_client: The SeBS client to use
deployment_client: The deployment client to use
directory: Directory where results are stored
logging_filename: Filename for logs
extend_time_interval: Time interval to extend metrics retrieval by (in minutes)
"""
import glob
import csv
with open(os.path.join(directory, "perf-cost", "result.csv"), "w") as csvfile:
writer = csv.writer(csvfile, delimiter=",")
writer.writerow(
[
"memory",
"type",
"is_cold",
"exec_time",
"connection_time",
"client_time",
"provider_time",
"mem_used",
]
)
for f in glob.glob(os.path.join(directory, "perf-cost", "*.json")):
name, extension = os.path.splitext(f)
if "processed" in f:
with open(f) as in_f:
config = json.load(in_f)
experiments = ExperimentResult.deserialize(
config,
sebs_client.cache_client,
sebs_client.generate_logging_handlers(logging_filename),
)
fname = os.path.splitext(os.path.basename(f))[0].split("_")
if len(fname) > 2:
memory = int(fname[2].split("-")[0])
else:
memory = 0
exp_type = fname[0]
else:
if os.path.exists(
os.path.join(directory, "perf-cost", f"{name}-processed{extension}")
):
self.logging.info(f"Skipping already processed {f}")
continue
self.logging.info(f"Processing data in {f}")
fname = os.path.splitext(os.path.basename(f))[0].split("_")
if len(fname) > 2:
memory = int(fname[2])
else:
memory = 0
exp_type = fname[0]
with open(f, "r") as in_f:
config = json.load(in_f)
statistics = config["statistics"]
experiments = ExperimentResult.deserialize(
config,
sebs_client.cache_client,
sebs_client.generate_logging_handlers(logging_filename),
)
for func in experiments.functions():
if extend_time_interval > 0:
times = (
-extend_time_interval * 60 + experiments.times()[0],
extend_time_interval * 60 + experiments.times()[1],
)
else:
times = experiments.times()
deployment_client.download_metrics(
func,
int(times[0]),
int(times[1]),
experiments.invocations(func),
experiments.metrics(func),
)
# compress! remove output since it can be large but it's useless for us
for func in experiments.functions():
for id, invoc in experiments.invocations(func).items():
# FIXME: compatibility with old results
# Only process if result is a dict
# (some languages return primitives directly)
if isinstance(invoc.output["result"], dict):
if "output" in invoc.output["result"]:
del invoc.output["result"]["output"]
elif "result" in invoc.output["result"]:
del invoc.output["result"]["result"]
name, extension = os.path.splitext(f)
with open(
os.path.join(directory, "perf-cost", f"{name}-processed{extension}"),
"w",
) as out_f:
out_f.write(
serialize(
{**json.loads(serialize(experiments)), "statistics": statistics}
)
)
for func in experiments.functions():
for request_id, invoc in experiments.invocations(func).items():
writer.writerow(
[
memory,
exp_type,
invoc.stats.cold_start,
invoc.times.benchmark,
invoc.times.http_startup,
invoc.times.client,
invoc.provider_times.execution,
invoc.stats.memory_used,
]
)