Source code for sebs.experiments.invocation_overhead

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Invocation overhead measurement experiment implementation.

This module provides the InvocationOverhead experiment implementation, which
measures the overhead associated with invoking serverless functions. It can
measure:

- Overhead of different invocation methods (HTTP, SDK)
- Impact of code package size on deployment and invocation time
- Overhead of different input data sizes
- Cold vs. warm start invocation times

The experiment is designed to help identify performance bottlenecks and
optimize function deployment and invocation.
We deploy microbenchmark 030.clock-synchronization to exactly measure the
network latency between client and function.
"""

import csv
import os
import random
import time
from datetime import datetime
from typing import Dict, List, TYPE_CHECKING, Union

from sebs.benchmark import Benchmark
from sebs.faas.system import System as FaaSSystem
from sebs.experiments.experiment import Experiment
from sebs.experiments.config import Config as ExperimentConfig

if TYPE_CHECKING:
    from sebs import SeBS


[docs] class CodePackageSize: """Helper class for code package size experiments. This class handles creating and deploying functions with different code package sizes to measure the impact of package size on deployment and invocation overhead. Attributes: _benchmark_path: Path to the benchmark code _benchmark: Benchmark instance _deployment_client: Deployment client to use sizes: List of code package sizes to test functions: Dictionary mapping size to function instances """ def __init__(self, deployment_client: FaaSSystem, benchmark: Benchmark, settings: dict): """Initialize a new code package size experiment. Args: deployment_client: Deployment client to use benchmark: Benchmark instance settings: Experiment settings with code_package_begin, code_package_end, and code_package_points values """ import math from numpy import linspace # Generate code package sizes to test points = linspace( settings["code_package_begin"], settings["code_package_end"], settings["code_package_points"], ) from sebs.utils import find_benchmark # Use the clock synchronization benchmark as a base self._benchmark_path = find_benchmark("030.clock-synchronization", "benchmarks") self._benchmark = benchmark random.seed(1410) # estimate the size after zip compression self.pts = [int(pt) - 4 * 1024 for pt in points] self.pts = [math.floor((pt - 123) * 3 / 4) for pt in points] self._deployment_client = deployment_client self._benchmark = benchmark
[docs] def before_sample(self, size: int, input_benchmark: dict) -> None: """Prepare the benchmark with a specific code package size. Creates a file named 'randomdata.bin' with the specified size of random bytes within the benchmark's code package. Then, updates the function on the deployment. Args: size: Size of the code package to create input_benchmark: Benchmark input configuration (unused) """ arr = bytearray((random.getrandbits(8) for i in range(size))) self._benchmark.code_package_modify("randomdata.bin", bytes(arr)) function = self._deployment_client.get_function(self._benchmark) # FIXME: we might want a change in the future - support containers self._deployment_client.update_function( function, self._benchmark, self._benchmark.system_variant, "" )
[docs] class PayloadSize: """Helper class for payload size experiments. This class handles creating different payload sizes to measure the impact of input data size on function invocation overhead. Attributes: pts: List of payload sizes to test """ def __init__(self, settings: dict) -> None: """Initialize a new payload size experiment. Args: settings: Experiment settings with payload_begin, payload_end, and payload_points values """ from numpy import linspace points = linspace( settings["payload_begin"], settings["payload_end"], settings["payload_points"], ) self.pts = [int(pt) for pt in points]
[docs] def before_sample(self, size: int, input_benchmark: dict) -> None: """Prepare the benchmark input with a specific payload size. Generates different payload sizes by creating base64 encoded byte arrays. Args: size: Size of the payload to create input_benchmark: Benchmark input configuration to modify """ import base64 from io import BytesIO f = BytesIO() f.write(bytearray(size)) input_benchmark["data"] = base64.b64encode(f.getvalue()).decode()
[docs] class InvocationOverhead(Experiment): """Invocation overhead measurement experiment. This experiment measures the overhead associated with invoking serverless functions. It can measure the impact of code package size, input data size, and different invocation methods on performance. Attributes: settings: Experiment-specific settings _benchmark: Benchmark to use benchmark_input: Input data for the benchmark _storage: Storage service to use _function: Function to invoke _code_package: Code package size experiment helper _out_dir: Directory for storing results _deployment_client: Deployment client to use _sebs_client: SeBS client """ def __init__(self, config: ExperimentConfig): """Initialize a new InvocationOverhead experiment. Args: config: Experiment configuration """ super().__init__(config) self.settings = self.config.experiment_settings(self.name())
[docs] def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem) -> None: """Prepare the experiment for execution. This method sets up the benchmark, function, storage, and output directory for the experiment. It uses the clock-synchronization benchmark as a base and prepares the necessary resources for measuring invocation overhead. Args: sebs_client: The SeBS client to use deployment_client: The deployment client to use """ # Import needed modules from sebs import SeBS # noqa from sebs.faas.function import Trigger # Get the clock-synchronization benchmark self._benchmark = sebs_client.get_benchmark( "030.clock-synchronization", deployment_client, self.config ) # Prepare benchmark input self.benchmark_input = self._benchmark.prepare_input( deployment_client.system_resources, size="test", replace_existing=True ) # Get storage for testing self._storage = deployment_client.system_resources.get_storage(replace_existing=True) self._function = deployment_client.get_function(self._benchmark) triggers = self._function.triggers(Trigger.TriggerType.HTTP) if len(triggers) == 0: self._trigger = deployment_client.create_trigger( self._function, Trigger.TriggerType.HTTP ) else: self._trigger = triggers[0] self._out_dir = os.path.join( sebs_client.output_dir, "invocation-overhead", self.settings["type"] ) if not os.path.exists(self._out_dir): os.makedirs(self._out_dir) self._deployment_client = deployment_client
[docs] def run(self) -> None: """Execute the invocation overhead experiment. This method runs the main experiment by: 1. Setting up either code package size or payload size experiments 2. Running warm-up and cold start invocations 3. Measuring invocation overhead for different sizes (either code package or payload, based on settings) 4. Collecting and storing results in CSV format, including client-side and server-side timestamps """ from requests import get ip = get("http://checkip.amazonaws.com/").text.rstrip() repetitions = self.settings["repetitions"] N = self.settings["N"] experiment: Union[CodePackageSize, PayloadSize] if self.settings["type"] == "code": experiment = CodePackageSize(self._deployment_client, self._benchmark, self.settings) else: experiment = PayloadSize(self.settings) input_benchmark = { "server-address": ip, "repetitions": N, **self.benchmark_input, } output_file = os.path.join(self._out_dir, "result.csv") with open(output_file, "w") as csvfile: writer = csv.writer(csvfile, delimiter=",") writer.writerow( [ "size", "repetition", "is_cold", "connection_time", "start_timestamp", "finish_timestamp", "request_id", ] ) for result_type in ["warm", "cold"]: # warm up if result_type == "warm": self.logging.info("Warm up invocation!") self.receive_datagrams(input_benchmark, N, 12000, ip) for size in experiment.pts: experiment.before_sample(size, input_benchmark) for i in range(repetitions): successful = False while not successful: self.logging.info(f"Starting with {size} bytes, repetition {i}") if result_type == "cold": self._deployment_client.enforce_cold_start( [self._function], self._benchmark ) time.sleep(1) row = self.receive_datagrams(input_benchmark, N, 12000, ip) if result_type == "cold": if not row[0]: self.logging.info("Not cold!") continue else: if row[0]: self.logging.info("cold!") continue writer.writerow([size, i] + row) successful = True time.sleep(5) self._storage.download_bucket(self.benchmark_input["output-bucket"], self._out_dir)
[docs] def process( self, sebs_client: "SeBS", deployment_client, directory: str, logging_filename: str, extend_time_interval: int, ) -> None: """Process experiment results and generate summary statistics. This method processes the raw experiment results by: 1. Loading client-side timing data from CSV files and server-side UDP datagram timestamps 2. Computing clock drift and Round-Trip Time (RTT) 3. Creating a processed results file with invocation times Args: sebs_client: SeBS client instance deployment_client: Deployment client instance directory: Directory containing experiment results logging_filename: Name of the logging file (unused) """ import pandas as pd import glob from sebs import SeBS # noqa full_data: Dict[str, pd.Dataframe] = {} for f in glob.glob( os.path.join(directory, "invocation-overhead", self.settings["type"], "*.csv") ): if "result.csv" in f or "result-processed.csv" in f: continue request_id = os.path.basename(f).split("-", 1)[1].split(".")[0] data = pd.read_csv(f, sep=",").drop(["id"], axis=1) if request_id in full_data: full_data[request_id] = pd.concat([full_data[request_id], data], axis=1) full_data[request_id]["id"] = request_id else: full_data[request_id] = data df = pd.concat(full_data.values()).reset_index(drop=True) df["rtt"] = (df["server_rcv"] - df["client_send"]) + (df["client_rcv"] - df["server_send"]) df["clock_drift"] = ( (df["client_send"] - df["server_rcv"]) + (df["client_rcv"] - df["server_send"]) ) / 2 with open( os.path.join(directory, "invocation-overhead", self.settings["type"], "result.csv") ) as csvfile: with open( os.path.join( directory, "invocation-overhead", self.settings["type"], "result-processed.csv", ), "w", ) as csvfile2: reader = csv.reader(csvfile, delimiter=",") writer = csv.writer(csvfile2, delimiter=",") writer.writerow( [ "payload_size", "repetition", "is_cold", "connection_time", "start_timestamp", "finish_timestamp", "request_id", "clock_drift_mean", "clock_drift_std", "invocation_time", ] ) iter2 = iter(reader) next(iter2) for row in iter2: request_id = row[-1] clock_drift = df[df["id"] == request_id]["clock_drift"].mean() clock_drift_std = df[df["id"] == request_id]["clock_drift"].std() invocation_time = float(row[5]) - float(row[4]) - float(row[3]) + clock_drift writer.writerow(row + [clock_drift, clock_drift_std, invocation_time])
[docs] def receive_datagrams( self, input_benchmark: dict, repetitions: int, port: int, ip: str ) -> List: """Receive UDP datagrams from the function for clock synchronization. This method implements a UDP server that communicates with the function to measure clock synchronization and network timing. It opens a UDP socket, triggers an asynchronous function invocation, and then listens for a specified number of datagrams, recording timestamps for received and sent datagrams. Saves server-side timestamps to a CSV file named `server-{request_id}.csv`. Args: input_benchmark: Benchmark input configuration repetitions: Number of repetitions to perform port: UDP port to listen on ip: IP address of the client Returns: List containing invocation results: [is_cold, connection_time, start_timestamp, finish_timestamp, request_id] Raises: RuntimeError: If function invocation fails """ import socket input_benchmark["server-port"] = port self.logging.info(f"Starting invocation with {repetitions} repetitions on port {port}") socket.setdefaulttimeout(4) server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.bind(("", port)) fut = self._trigger.async_invoke(input_benchmark) begin = datetime.now() times = [] i = 0 j = 0 while True: try: message, address = server_socket.recvfrom(1024) timestamp_rcv = datetime.now().timestamp() if message.decode() == "stop": break timestamp_send = datetime.now().timestamp() server_socket.sendto(message, address) j = 0 except socket.timeout: j += 1 self.logging.warning("Packet loss!") # stop after 5 attempts if j == 5: self.logging.error( "Failing after 5 unsuccessfull attempts to " "communicate with the function!" ) break # check if function invocation failed, and if yes: raise the exception if fut.done(): raise RuntimeError("Function invocation failed") from None # continue to next iteration continue if i > 0: times.append([i, timestamp_rcv, timestamp_send]) i += 1 j += 1 end = datetime.now() # Save results even in case of failure - it might have happened in n-th iteration res = fut.result() server_timestamp = res.output["result"]["output"]["timestamp"] request_id = res.output["request_id"] is_cold = 1 if res.output["is_cold"] else 0 output_file = os.path.join(self._out_dir, f"server-{request_id}.csv") with open(output_file, "w") as csvfile: writer = csv.writer(csvfile, delimiter=",") writer.writerow(["id", "server_rcv", "server_send"]) for row in times: writer.writerow(row) self.logging.info(f"Finished {request_id} in {end - begin} [s]") return [ is_cold, res.times.http_startup, res.times.client_begin.timestamp(), server_timestamp, request_id, ]
[docs] @staticmethod def name() -> str: """Get the name of the experiment. Returns: The name "invocation-overhead" """ return "invocation-overhead"
[docs] @staticmethod def typename() -> str: """Get the type name of the experiment. Returns: The type name "Experiment.InvocOverhead" """ return "Experiment.InvocOverhead"