Source code for sebs.experiments.eviction_model

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Container eviction model experiment implementation.

This module provides the EvictionModel experiment implementation, which
measures how serverless platforms manage function container eviction.
It determines how long idle containers are kept alive before being
recycled by the platform, which affects cold start frequency.

The experiment involves invoking functions at increasing time intervals
and observing when cold starts occur, thus inferring the platform's
container caching and eviction policies.

This implemnetation is slightly different than the original one,
which used the 010.sleep benchmark. Here, we use the 040.server-reply
to double check that all functions are "alive" at the same time.
However, the sleep logic is not currently implemented in 040.server-reply.
"""

import logging
import os
import time
from datetime import datetime
from typing import List, Optional, Tuple, TYPE_CHECKING, Dict, Any
import multiprocessing
from multiprocessing.pool import AsyncResult, ThreadPool

from sebs.faas.system import System as FaaSSystem
from sebs.faas.function import Function, Trigger
from sebs.experiments import Experiment, ExperimentResult
from sebs.experiments.config import Config as ExperimentConfig
from sebs.utils import serialize

if TYPE_CHECKING:
    from sebs import SeBS


[docs] class EvictionModel(Experiment): """Container eviction model experiment. This experiment measures how serverless platforms manage function container eviction. It determines how long idle containers are kept alive before being recycled by the platform, which affects cold start frequency. The experiment invokes functions at different time intervals (defined in the 'times' list) and observes when cold starts occur, thus inferring the platform's container caching and eviction policies. Attributes: times: List of time intervals (in seconds) between invocations _function: Function to invoke _trigger: Trigger to use for invocation _out_dir: Directory for storing results _deployment_client: Deployment client to use _sebs_client: SeBS client """ # Time intervals (in seconds) between invocations # Uncomment additional intervals as needed for longer tests times = [ 1, # 2, # 4, # 8, # 15, # 30, # 60, # 120, # 180, # 240, # 300, # 360, # 480, # 600, # 720, # 900, # 1080, # 1200, ] # TODO: temporal fix # function_copies_per_time = 5 function_copies_per_time = 1 def __init__(self, config: ExperimentConfig): """Initialize a new EvictionModel experiment. Args: config: Experiment configuration """ super().__init__(config)
[docs] @staticmethod def name() -> str: """Get the name of the experiment. Returns: The name "eviction-model" """ return "eviction-model"
[docs] @staticmethod def typename() -> str: """Get the type name of the experiment. Returns: The type name "Experiment.EvictionModel" """ return "Experiment.EvictionModel"
[docs] @staticmethod def accept_replies(port: int, invocations: int) -> None: """Accept TCP connections from functions and respond to them. This static method acts as a TCP server, accepting connections from functions and responding to them. It runs two rounds of connection acceptance to ensure functions receive a response. The method logs all activity to a file. This is used by the '040.server-reply' benchmark to confirm function execution. Args: port: TCP port to listen on invocations: Number of expected function invocations """ with open(f"server_{invocations}.log", "w") as f: import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) s.bind(("", port)) s.listen(invocations + 1) print(f"Listen on {port} and wait for {invocations}", file=f) # First repetition connections: List[Tuple[socket.socket, str]] = [] # wait for functions to connect while len(connections) < invocations: c, addr = s.accept() print(f"Accept connection from {addr}", file=f) connections.append((c, addr)) for connection, addr in connections: print(f"Send message to {addr}", file=f) connection.send(b"accepted") connection.close() # Second repetition connections = [] # wait for functions to connect while len(connections) < invocations: c, addr = s.accept() print(f"Accept connection from {addr}", file=f) connections.append((c, addr)) for connection, addr in connections: print(f"Send message to {addr}", file=f) connection.send(b"accepted") connection.close() s.close()
[docs] @staticmethod def execute_instance( sleep_time: int, pid: int, tid: int, func: Function, payload: dict ) -> dict: """Execute a single instance of the eviction model test. This method performs two invocations of a function with a sleep interval between them. The first invocation should be a cold start, and the second will indicate whether the container was evicted during the sleep period. This function is intended to be run in a separate thread; it performs two synchronous HTTP invocations of the given function. Args: sleep_time: Time to sleep between invocations (seconds) pid: Process ID for logging tid: Thread ID for logging func: Function to invoke payload: Payload to send to the function Returns: Dictionary with invocation results and timing information Raises: RuntimeError: If the first invocation fails """ try: print(f"Process {pid} Thread {tid} Invoke function {func.name} with {payload} now!") begin = datetime.now() res = func.triggers(Trigger.TriggerType.HTTP)[0].sync_invoke(payload) end = datetime.now() if not res.stats.cold_start: logging.error( f"First invocation not cold on func {func.name} time {sleep_time} " f"pid {pid} tid {tid} id {res.request_id}" ) except Exception as e: logging.error(f"First Invocation Failed at function {func.name}, {e}") raise RuntimeError() time_spent = float(datetime.now().strftime("%s.%f")) - float(end.strftime("%s.%f")) seconds_sleep = sleep_time - time_spent print(f"PID {pid} TID {tid} with time {sleep_time}, sleep {seconds_sleep}") time.sleep(seconds_sleep) try: second_begin = datetime.now() second_res = func.triggers(Trigger.TriggerType.HTTP)[0].sync_invoke(payload) second_end = datetime.now() except Exception: logging.error(f"Second Invocation Failed at function {func.name}") return { "first": res, "first_times": [begin.timestamp(), end.timestamp()], "second": second_res, "second_times": [second_begin.timestamp(), second_end.timestamp()], "invocation": pid, }
[docs] @staticmethod def process_function( repetition: int, pid: int, invocations: int, functions: List[Function], times: List[int], payload: dict, ) -> List[dict]: """Process a function with multiple time intervals. This method executes multiple functions with different sleep times in parallel, starting with the largest sleep time to overlap executions. The total time should be equal to the maximum execution time. Args: repetition: Current repetition number pid: Process ID for logging invocations: Number of invocations to perform functions: List of functions to invoke times: List of sleep times corresponding to functions payload: Payload to send to functions Returns: List of dictionaries containing invocation results Raises: RuntimeError: If any execution fails """ b = multiprocessing.Semaphore(invocations) print(f"Begin at PID {pid}, repetition {repetition}") threads = len(functions) final_results: List[dict] = [] with ThreadPool(threads) as pool: results: List[Optional[AsyncResult]] = [None] * threads """ Invoke multiple functions with different sleep times. Start with the largest sleep time to overlap executions; total time should be equal to maximum execution time. """ for idx in reversed(range(0, len(functions))): payload_copy = payload.copy() payload_copy["port"] += idx b.acquire() results[idx] = pool.apply_async( EvictionModel.execute_instance, args=(times[idx], pid, idx, functions[idx], payload_copy), ) failed = False for result in results: try: assert result res = result.get() res["repetition"] = repetition final_results.append(res) except Exception as e: print(e) failed = True if failed: print("Execution failed!") raise RuntimeError() return final_results
[docs] def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem) -> None: """Prepare the experiment for execution. This method sets up the benchmark, functions, and output directory for the experiment. Retrieves the '040.server-reply' benchmark, sets up result storage, and creates a separate function for each time interval and copy combination, allowing for parallel testing of different eviction times. Args: sebs_client: The SeBS client to use deployment_client: The deployment client to use """ # Get the server-reply benchmark self._benchmark = sebs_client.get_benchmark( "040.server-reply", deployment_client, self.config ) self._deployment_client = deployment_client self._result = ExperimentResult(self.config, deployment_client.config) # Create function names for each time interval and copy name = deployment_client.default_function_name(self._benchmark) self.functions_names = [ f"{name}-{time}-{copy}" for time in self.times for copy in range(self.function_copies_per_time) ] # Create output directory self._out_dir = os.path.join(sebs_client.output_dir, "eviction-model") if not os.path.exists(self._out_dir): os.mkdir(self._out_dir) self.functions = [] for fname in self.functions_names: self.functions.append(deployment_client.get_function(self._benchmark, func_name=fname))
[docs] def run(self) -> None: """Execute the eviction model experiment. This method runs the main eviction model experiment by: 1. Setting up server instances to handle function responses 2. Executing parallel invocations with different sleep times 3. Collecting and storing results The experiment determines container eviction patterns by measuring whether functions experience cold starts after different idle periods. """ settings = self.config.experiment_settings(self.name()) invocations = settings["invocations"] sleep = settings["sleep"] repetitions = settings["repetitions"] invocation_idx = settings["function_copy_idx"] port = settings["client-port"] from requests import get ip = get("http://checkip.amazonaws.com/").text.rstrip() # function_names = self.functions_names[invocation_idx :: self.function_copies_per_time] # flake8 issue # https://github.com/PyCQA/pycodestyle/issues/373 functions = self.functions[invocation_idx :: self.function_copies_per_time] # noqa results: Dict[int, List[List[Dict[str, Any]]]] = {} # Disable logging - otherwise we have RLock that can't get be pickled for func in functions: # func.disable_logging() for tr in func.triggers_all(): del tr._logging_handlers # self.disable_logging() # del self.logging for t in self.times: results[t] = [] fname = f"results_{invocations}_{repetitions}_{sleep}.json" """ Allocate one process for each invocation => process N invocations in parallel. Each process uses M threads to execute in parallel invocations, with a different time sleep between executions. The result: repeated N invocations for M different imes. """ threads = len(self.times) with multiprocessing.Pool(processes=(invocations + threads)) as pool: for i in range(0, repetitions): """ Attempt to kill all existing containers. """ # for func in functions: # self._deployment_client.enforce_cold_start(func) # time.sleep(5) for _, t in enumerate(self.times): results[t].append([]) local_results: List[AsyncResult] = [] servers_results: List[AsyncResult] = [] """ Start M server instances. Each one handles one set of invocations. """ for j in range(0, threads): servers_results.append( pool.apply_async(EvictionModel.accept_replies, args=(port + j, invocations)) ) """ Start N parallel invocations """ for j in range(0, invocations): payload = {"ip-address": ip, "port": port} print(payload) local_results.append( pool.apply_async( EvictionModel.process_function, args=(i, j, invocations, functions, self.times, payload), ) ) time.sleep(10) import sys sys.stdout.flush() """ Rethrow exceptions if appear """ for result in servers_results: result.get() for result in local_results: local_ret = result.get() for i, val in enumerate(local_ret): results[self.times[i]][-1].append(val) """ Make sure that parallel invocations are truly parallel, i.e. no execution happens after another one finished. """ # verify_results(results) with open(os.path.join(self._out_dir, fname), "w") as out_f: print(f"Write results to {os.path.join(self._out_dir, fname)}") out_f.write(serialize(results))