# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Invocation overhead measurement experiment implementation.
This module provides the InvocationOverhead experiment implementation, which
measures the overhead associated with invoking serverless functions. It can
measure:
- Overhead of different invocation methods (HTTP, SDK)
- Impact of code package size on deployment and invocation time
- Overhead of different input data sizes
- Cold vs. warm start invocation times
The experiment is designed to help identify performance bottlenecks and
optimize function deployment and invocation.
We deploy microbenchmark 030.clock-synchronization to exactly measure the
network latency between client and function.
"""
import csv
import os
import random
import time
from datetime import datetime
from typing import Dict, List, TYPE_CHECKING, Union
from sebs.benchmark import Benchmark
from sebs.faas.system import System as FaaSSystem
from sebs.experiments.experiment import Experiment
from sebs.experiments.config import Config as ExperimentConfig
if TYPE_CHECKING:
from sebs import SeBS
[docs]
class CodePackageSize:
"""Helper class for code package size experiments.
This class handles creating and deploying functions with different code
package sizes to measure the impact of package size on deployment and
invocation overhead.
Attributes:
_benchmark_path: Path to the benchmark code
_benchmark: Benchmark instance
_deployment_client: Deployment client to use
sizes: List of code package sizes to test
functions: Dictionary mapping size to function instances
"""
def __init__(self, deployment_client: FaaSSystem, benchmark: Benchmark, settings: dict):
"""Initialize a new code package size experiment.
Args:
deployment_client: Deployment client to use
benchmark: Benchmark instance
settings: Experiment settings with code_package_begin, code_package_end,
and code_package_points values
"""
import math
from numpy import linspace
# Generate code package sizes to test
points = linspace(
settings["code_package_begin"],
settings["code_package_end"],
settings["code_package_points"],
)
from sebs.utils import find_benchmark
# Use the clock synchronization benchmark as a base
self._benchmark_path = find_benchmark("030.clock-synchronization", "benchmarks")
self._benchmark = benchmark
random.seed(1410)
# estimate the size after zip compression
self.pts = [int(pt) - 4 * 1024 for pt in points]
self.pts = [math.floor((pt - 123) * 3 / 4) for pt in points]
self._deployment_client = deployment_client
self._benchmark = benchmark
[docs]
def before_sample(self, size: int, input_benchmark: dict) -> None:
"""Prepare the benchmark with a specific code package size.
Creates a file named 'randomdata.bin' with the specified size of random bytes
within the benchmark's code package. Then, updates the function on the deployment.
Args:
size: Size of the code package to create
input_benchmark: Benchmark input configuration (unused)
"""
arr = bytearray((random.getrandbits(8) for i in range(size)))
self._benchmark.code_package_modify("randomdata.bin", bytes(arr))
function = self._deployment_client.get_function(self._benchmark)
self._deployment_client.update_function(function, self._benchmark, False, "")
[docs]
class PayloadSize:
"""Helper class for payload size experiments.
This class handles creating different payload sizes to measure the impact
of input data size on function invocation overhead.
Attributes:
pts: List of payload sizes to test
"""
def __init__(self, settings: dict) -> None:
"""Initialize a new payload size experiment.
Args:
settings: Experiment settings with payload_begin, payload_end,
and payload_points values
"""
from numpy import linspace
points = linspace(
settings["payload_begin"],
settings["payload_end"],
settings["payload_points"],
)
self.pts = [int(pt) for pt in points]
[docs]
def before_sample(self, size: int, input_benchmark: dict) -> None:
"""Prepare the benchmark input with a specific payload size.
Generates different payload sizes by creating base64 encoded byte arrays.
Args:
size: Size of the payload to create
input_benchmark: Benchmark input configuration to modify
"""
import base64
from io import BytesIO
f = BytesIO()
f.write(bytearray(size))
input_benchmark["data"] = base64.b64encode(f.getvalue()).decode()
[docs]
class InvocationOverhead(Experiment):
"""Invocation overhead measurement experiment.
This experiment measures the overhead associated with invoking serverless
functions. It can measure the impact of code package size, input data size,
and different invocation methods on performance.
Attributes:
settings: Experiment-specific settings
_benchmark: Benchmark to use
benchmark_input: Input data for the benchmark
_storage: Storage service to use
_function: Function to invoke
_code_package: Code package size experiment helper
_out_dir: Directory for storing results
_deployment_client: Deployment client to use
_sebs_client: SeBS client
"""
def __init__(self, config: ExperimentConfig):
"""Initialize a new InvocationOverhead experiment.
Args:
config: Experiment configuration
"""
super().__init__(config)
self.settings = self.config.experiment_settings(self.name())
[docs]
def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem) -> None:
"""Prepare the experiment for execution.
This method sets up the benchmark, function, storage, and output directory
for the experiment. It uses the clock-synchronization benchmark as a base
and prepares the necessary resources for measuring invocation overhead.
Args:
sebs_client: The SeBS client to use
deployment_client: The deployment client to use
"""
# Import needed modules
from sebs import SeBS # noqa
from sebs.faas.function import Trigger
# Get the clock-synchronization benchmark
self._benchmark = sebs_client.get_benchmark(
"030.clock-synchronization", deployment_client, self.config
)
# Prepare benchmark input
self.benchmark_input = self._benchmark.prepare_input(
deployment_client.system_resources, size="test", replace_existing=True
)
# Get storage for testing
self._storage = deployment_client.system_resources.get_storage(replace_existing=True)
self._function = deployment_client.get_function(self._benchmark)
triggers = self._function.triggers(Trigger.TriggerType.HTTP)
if len(triggers) == 0:
self._trigger = deployment_client.create_trigger(
self._function, Trigger.TriggerType.HTTP
)
else:
self._trigger = triggers[0]
self._out_dir = os.path.join(
sebs_client.output_dir, "invocation-overhead", self.settings["type"]
)
if not os.path.exists(self._out_dir):
os.makedirs(self._out_dir)
self._deployment_client = deployment_client
[docs]
def run(self) -> None:
"""Execute the invocation overhead experiment.
This method runs the main experiment by:
1. Setting up either code package size or payload size experiments
2. Running warm-up and cold start invocations
3. Measuring invocation overhead for different sizes
(either code package or payload, based on settings)
4. Collecting and storing results in CSV format,
including client-side and server-side timestamps
"""
from requests import get
ip = get("http://checkip.amazonaws.com/").text.rstrip()
repetitions = self.settings["repetitions"]
N = self.settings["N"]
experiment: Union[CodePackageSize, PayloadSize]
if self.settings["type"] == "code":
experiment = CodePackageSize(self._deployment_client, self._benchmark, self.settings)
else:
experiment = PayloadSize(self.settings)
input_benchmark = {
"server-address": ip,
"repetitions": N,
**self.benchmark_input,
}
output_file = os.path.join(self._out_dir, "result.csv")
with open(output_file, "w") as csvfile:
writer = csv.writer(csvfile, delimiter=",")
writer.writerow(
[
"size",
"repetition",
"is_cold",
"connection_time",
"start_timestamp",
"finish_timestamp",
"request_id",
]
)
for result_type in ["warm", "cold"]:
# warm up
if result_type == "warm":
self.logging.info("Warm up invocation!")
self.receive_datagrams(input_benchmark, N, 12000, ip)
for size in experiment.pts:
experiment.before_sample(size, input_benchmark)
for i in range(repetitions):
successful = False
while not successful:
self.logging.info(f"Starting with {size} bytes, repetition {i}")
if result_type == "cold":
self._deployment_client.enforce_cold_start(
[self._function], self._benchmark
)
time.sleep(1)
row = self.receive_datagrams(input_benchmark, N, 12000, ip)
if result_type == "cold":
if not row[0]:
self.logging.info("Not cold!")
continue
else:
if row[0]:
self.logging.info("cold!")
continue
writer.writerow([size, i] + row)
successful = True
time.sleep(5)
self._storage.download_bucket(self.benchmark_input["output-bucket"], self._out_dir)
[docs]
def process(
self,
sebs_client: "SeBS",
deployment_client,
directory: str,
logging_filename: str,
extend_time_interval: int,
) -> None:
"""Process experiment results and generate summary statistics.
This method processes the raw experiment results by:
1. Loading client-side timing data from CSV files
and server-side UDP datagram timestamps
2. Computing clock drift and Round-Trip Time (RTT)
3. Creating a processed results file with invocation times
Args:
sebs_client: SeBS client instance
deployment_client: Deployment client instance
directory: Directory containing experiment results
logging_filename: Name of the logging file (unused)
"""
import pandas as pd
import glob
from sebs import SeBS # noqa
full_data: Dict[str, pd.Dataframe] = {}
for f in glob.glob(
os.path.join(directory, "invocation-overhead", self.settings["type"], "*.csv")
):
if "result.csv" in f or "result-processed.csv" in f:
continue
request_id = os.path.basename(f).split("-", 1)[1].split(".")[0]
data = pd.read_csv(f, sep=",").drop(["id"], axis=1)
if request_id in full_data:
full_data[request_id] = pd.concat([full_data[request_id], data], axis=1)
full_data[request_id]["id"] = request_id
else:
full_data[request_id] = data
df = pd.concat(full_data.values()).reset_index(drop=True)
df["rtt"] = (df["server_rcv"] - df["client_send"]) + (df["client_rcv"] - df["server_send"])
df["clock_drift"] = (
(df["client_send"] - df["server_rcv"]) + (df["client_rcv"] - df["server_send"])
) / 2
with open(
os.path.join(directory, "invocation-overhead", self.settings["type"], "result.csv")
) as csvfile:
with open(
os.path.join(
directory,
"invocation-overhead",
self.settings["type"],
"result-processed.csv",
),
"w",
) as csvfile2:
reader = csv.reader(csvfile, delimiter=",")
writer = csv.writer(csvfile2, delimiter=",")
writer.writerow(
[
"payload_size",
"repetition",
"is_cold",
"connection_time",
"start_timestamp",
"finish_timestamp",
"request_id",
"clock_drift_mean",
"clock_drift_std",
"invocation_time",
]
)
iter2 = iter(reader)
next(iter2)
for row in iter2:
request_id = row[-1]
clock_drift = df[df["id"] == request_id]["clock_drift"].mean()
clock_drift_std = df[df["id"] == request_id]["clock_drift"].std()
invocation_time = float(row[5]) - float(row[4]) - float(row[3]) + clock_drift
writer.writerow(row + [clock_drift, clock_drift_std, invocation_time])
[docs]
def receive_datagrams(
self, input_benchmark: dict, repetitions: int, port: int, ip: str
) -> List:
"""Receive UDP datagrams from the function for clock synchronization.
This method implements a UDP server that communicates with the function
to measure clock synchronization and network timing.
It opens a UDP socket, triggers an asynchronous function invocation, and then
listens for a specified number of datagrams, recording timestamps for
received and sent datagrams.
Saves server-side timestamps to a CSV file named `server-{request_id}.csv`.
Args:
input_benchmark: Benchmark input configuration
repetitions: Number of repetitions to perform
port: UDP port to listen on
ip: IP address of the client
Returns:
List containing invocation results: [is_cold, connection_time,
start_timestamp, finish_timestamp, request_id]
Raises:
RuntimeError: If function invocation fails
"""
import socket
input_benchmark["server-port"] = port
self.logging.info(f"Starting invocation with {repetitions} repetitions on port {port}")
socket.setdefaulttimeout(4)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(("", port))
fut = self._trigger.async_invoke(input_benchmark)
begin = datetime.now()
times = []
i = 0
j = 0
while True:
try:
message, address = server_socket.recvfrom(1024)
timestamp_rcv = datetime.now().timestamp()
if message.decode() == "stop":
break
timestamp_send = datetime.now().timestamp()
server_socket.sendto(message, address)
j = 0
except socket.timeout:
j += 1
self.logging.warning("Packet loss!")
# stop after 5 attempts
if j == 5:
self.logging.error(
"Failing after 5 unsuccessfull attempts to "
"communicate with the function!"
)
break
# check if function invocation failed, and if yes: raise the exception
if fut.done():
raise RuntimeError("Function invocation failed") from None
# continue to next iteration
continue
if i > 0:
times.append([i, timestamp_rcv, timestamp_send])
i += 1
j += 1
end = datetime.now()
# Save results even in case of failure - it might have happened in n-th iteration
res = fut.result()
server_timestamp = res.output["result"]["output"]["timestamp"]
request_id = res.output["request_id"]
is_cold = 1 if res.output["is_cold"] else 0
output_file = os.path.join(self._out_dir, f"server-{request_id}.csv")
with open(output_file, "w") as csvfile:
writer = csv.writer(csvfile, delimiter=",")
writer.writerow(["id", "server_rcv", "server_send"])
for row in times:
writer.writerow(row)
self.logging.info(f"Finished {request_id} in {end - begin} [s]")
return [
is_cold,
res.times.http_startup,
res.times.client_begin.timestamp(),
server_timestamp,
request_id,
]
[docs]
@staticmethod
def name() -> str:
"""Get the name of the experiment.
Returns:
The name "invocation-overhead"
"""
return "invocation-overhead"
[docs]
@staticmethod
def typename() -> str:
"""Get the type name of the experiment.
Returns:
The type name "Experiment.InvocOverhead"
"""
return "Experiment.InvocOverhead"