# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""
Utility functions and classes for the Serverless Benchmarking Suite (SeBs).
This module provides common utilities used throughout the framework, including:
- File system operations and path management
- Process execution and command handling
- JSON serialization and data manipulation
- Logging configuration and utilities
- Platform detection functions
"""
import json
import logging
import os
import shutil
import subprocess
import uuid
import click
import datetime
import platform
import threading
import re
from pathlib import Path
from typing import List, Optional, Pattern
# Global constants
PROJECT_DIR = Path(__file__).parent
# if we cloned from git, then path above "sebs" will contain .git folder
IS_PACKAGE_INSTALL = not ((PROJECT_DIR.parent / ".git").exists())
[docs]
def get_project_root() -> Path:
"""Get project root directory.
This points to directory where everything is located.
Returns:
- For git clone: repository root
- For package install: main installation path
"""
if IS_PACKAGE_INSTALL:
return PROJECT_DIR
else:
return PROJECT_DIR.parent
[docs]
def get_benchmarks_data_path() -> Path:
"""Get path to benchmarks-data directory.
Returns:
- For git clone: ./benchmarks-data/
- For package install: ~/.sebs/benchmarks-data/
"""
if IS_PACKAGE_INSTALL:
root = Path.home() / ".sebs"
root.mkdir(parents=True, exist_ok=True)
path = root
else:
path = PROJECT_DIR.parent
return path / "benchmarks-data"
[docs]
def get_resource_path(*path_parts: str) -> Path:
"""Get path to a resource (config, benchmarks, dockerfiles, tools).
Resolves to path within git repository (outside of sebs)
or in the installed package.
Args:
*path_parts: Path components (e.g., "config", "systems.json")
Returns:
Path to the resource
"""
if IS_PACKAGE_INSTALL:
from importlib.resources import files
# Build path from package resources
base = files("sebs")
for part in path_parts:
base = base / part
return Path(str(base))
else:
# Git clone mode: use relative paths from project root
return get_project_root() / Path(*path_parts)
[docs]
class JSONSerializer(json.JSONEncoder):
"""
Custom JSON encoder for objects with serialize method.
This encoder handles objects by:
1. Using their serialize() method if available
2. Converting dictionaries to strings
3. Using vars() to get object attributes
4. Falling back to string representation
"""
[docs]
def default(self, o):
"""
Custom serialization for objects.
Args:
o: Object to serialize
Returns:
JSON serializable representation of the object
"""
if hasattr(o, "serialize"):
return o.serialize()
elif isinstance(o, dict):
return str(o)
else:
try:
return vars(o)
except TypeError:
return str(o)
[docs]
def serialize(obj) -> str:
"""
Serialize an object to a JSON string.
Applies `serialize` method when defined by the object.
Args:
obj: Object to serialize
Returns:
str: JSON string representation of the object
"""
if hasattr(obj, "serialize"):
return json.dumps(obj.serialize(), sort_keys=True, indent=2)
else:
return json.dumps(obj, cls=JSONSerializer, sort_keys=True, indent=2)
[docs]
def execute(cmd, shell=False, cwd=None) -> str:
"""
Execute a shell command and capture its output, handling errors.
Args:
cmd: Command to execute (string)
shell: Whether to use shell execution (enables wildcards, pipes, etc.)
cwd: Working directory for command execution
Returns:
str: Command output as string
Raises:
RuntimeError: If command execution fails
"""
if not shell and isinstance(cmd, str):
cmd = cmd.split()
ret = subprocess.run(
cmd, shell=shell, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if ret.returncode:
raise RuntimeError(
"Running {} failed!\n Output: {}".format(cmd, ret.stdout.decode("utf-8"))
)
return ret.stdout.decode("utf-8")
[docs]
def update_nested_dict(cfg: dict, keys: List[str], value: Optional[str]) -> None:
"""
Update a nested dictionary with a value at the specified key path.
Args:
cfg: Dictionary to update
keys: List of keys forming a path to the value
value: Value to set (skipped if None)
"""
if value is not None:
# make sure parent keys exist
for key in keys[:-1]:
cfg = cfg.setdefault(key, {})
cfg[keys[-1]] = value
[docs]
def append_nested_dict(cfg: dict, keys: List[str], value: Optional[dict]) -> None:
"""
Append a dictionary to a nested location in another dictionary.
Args:
cfg: Dictionary to update
keys: List of keys forming a path to the value
value: Dictionary to append (skipped if None or empty)
"""
if value:
# make sure parent keys exist
for key in keys[:-1]:
cfg = cfg.setdefault(key, {})
cfg[keys[-1]] = {**cfg[keys[-1]], **value}
[docs]
def find(name: str, path: str) -> Optional[str]:
"""
Find a directory with the given name in the specified path.
Args:
name: Directory name to find
path: Path to search in
Returns:
str: Path to the found directory, or None if not found
"""
for root, dirs, files in os.walk(path):
if name in dirs:
return os.path.join(root, name)
return None
[docs]
def create_output(directory: str, preserve_dir: bool, verbose: bool) -> str:
"""
Create or clean an output directory for benchmark results.
Args:
directory: Path to create
preserve_dir: Whether to preserve existing directory
verbose: Verbosity level for logging
Returns:
str: Absolute path to the output directory
"""
output_dir = os.path.abspath(directory)
if os.path.exists(output_dir) and not preserve_dir:
shutil.rmtree(output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
configure_logging()
return output_dir
[docs]
def find_benchmark(benchmark: str, path: str) -> Optional[str]:
"""
Locate directory corresponding to a benchmark in the repository.
Searches for a benchmark directory in either the benchmarks or
benchmarks-data directories.
Args:
benchmark: Benchmark name
path: Path for lookup, relative to repository (usually 'benchmarks' or 'benchmarks-data')
Returns:
str: Path to benchmark directory, or None if not found
"""
if path == "benchmarks-data":
benchmarks_dir = str(get_benchmarks_data_path())
else:
benchmarks_dir = str(get_resource_path(path))
benchmark_path = find(benchmark, benchmarks_dir)
return benchmark_path
[docs]
def global_logging() -> None:
"""
Set up basic global logging configuration.
Configures the root logger with a standard format, timestamp, and INFO level.
This provides a baseline for all logging in the application.
"""
logging_format = "%(asctime)s,%(msecs)d %(levelname)s %(name)s: %(message)s"
logging_date_format = "%H:%M:%S"
logging.basicConfig(format=logging_format, datefmt=logging_date_format, level=logging.INFO)
[docs]
class SensitiveDataFilter(logging.Filter):
"""Logging filter that removes function URLs and resource ID from output.
Attributes:
_DEFAULT_URL_PATTERNS: List of patterns for URLs of functions.
REDACTED: String to replace with.
"""
_DEFAULT_URL_PATTERNS: tuple[str, ...] = (
r"https?://[a-zA-Z0-9-]+\.execute-api\.[a-z0-9-]+\.amazonaws\.com[^\s\"']*", # API Gateway
r"https?://[a-zA-Z0-9-]+\.lambda-url\.[a-z0-9-]+\.on\.aws[^\s\"']*", # Lambda URLs
r"https?://[a-zA-Z0-9-]+\.azurewebsites\.net[^\s\"']*", # Azure Functions
r"https?://[a-z0-9-]+-[a-z0-9-]+\.[a-z0-9-]+\.run\.app[^\s\"']*", # GCP Cloud Run
r"https?://[a-zA-Z0-9-]+\.cloudfunctions\.net[^\s\"']*", # GCP Functions
r"https?://[a-zA-Z0-9-]+\.workers\.dev[^\s\"']*", # Cloudflare Workers
)
REDACTED = "[REDACTED]"
"""Redacts serverless endpoint URLs and configurable resource IDs from logs.
Resource IDs can be added/removed at runtime as deployments happen.
This allows hiding exact resource names in the cloud from publically visible logs,
e.g., in CI workers.
"""
def __init__(self) -> None:
"""Initialize logging filter."""
super().__init__()
self._url_re: Pattern[str] = re.compile("|".join(SensitiveDataFilter._DEFAULT_URL_PATTERNS))
self._resource_id: Optional[str] = None
self._resource_re: Optional[Pattern[str]] = None
self._lock = threading.Lock()
[docs]
def set_resource_id(self, resource_id: str, cloud_id: Optional[str] = None) -> None:
"""Set filtering for a specific resource ID.
The function is idempotent - we can only set the resource ID once.
It is also thread-safe, so multiple threads doing multithreading
can call it many times and we initialize only once.
Args:
resource_id:
"""
with self._lock:
if self._resource_id is not None:
return
self._resource_id = resource_id
from sebs.aws.aws import AWS
from sebs.gcp.gcp import GCP
resources_ids = set(
[
self._resource_id,
AWS.format_function_name(self._resource_id),
GCP.format_function_name(self._resource_id),
]
)
if cloud_id is not None:
resources_ids.add(cloud_id)
alternation = "|".join(re.escape(r) for r in resources_ids)
self._resource_re = re.compile(alternation)
def _scrub(self, text: str) -> str:
"""Replace secrets with redacted.
Args:
text: logged messages
Returns:
logged messages with secrets replaced
"""
text = self._url_re.sub(SensitiveDataFilter.REDACTED, text)
if self._resource_re is not None:
text = self._resource_re.sub(SensitiveDataFilter.REDACTED, text)
return text
[docs]
def filter_string(self, msg: str) -> str:
"""Apply redaction to custom messages.
Args:
msg: message
Returns:
message with data redacted
"""
return self._scrub(msg)
[docs]
def filter(self, record: logging.LogRecord) -> bool:
"""Apply redaction to logging messages.
Args:
record: logging record
Returns:
always true
"""
# Redact the format string itself (covers pre-formatted f-strings).
if isinstance(record.msg, str):
record.msg = self._scrub(record.msg)
# Redact lazy-formatting args: logger.info("deployed %s", url).
if record.args:
if isinstance(record.args, dict):
record.args = {
k: self._scrub(v) if isinstance(v, str) else v for k, v in record.args.items()
}
else:
record.args = tuple(
self._scrub(a) if isinstance(a, str) else a for a in record.args
)
# never drop the record, just rewrite it
return True
[docs]
class ColoredWrapper:
"""
Wrapper for logging with colored console output.
This class provides formatted, colorized logging output for better readability
in terminal environments. It optionally propagates messages to the standard
Python logger.
Attributes:
SUCCESS: Green color code for success messages
STATUS: Blue color code for status/info messages
WARNING: Yellow color code for warnings
ERROR: Red color code for errors
BOLD: Bold text formatting code
END: Code to reset text formatting
"""
SUCCESS = "\033[92m"
STATUS = "\033[94m"
WARNING = "\033[93m"
ERROR = "\033[91m"
BOLD = "\033[1m"
END = "\033[0m"
def __init__(self, prefix, logger, verbose=True, propagte=False):
"""
Initialize the colored logging wrapper.
Args:
prefix: Prefix for log messages (usually class name)
logger: Python logger to propagate to
verbose: Whether to show debug messages
propagte: Whether to propagate messages to the Python logger
"""
self.verbose = verbose
self.propagte = propagte
self.prefix = prefix
self._logging = logger
self._filter: Optional[SensitiveDataFilter] = None
[docs]
def debug(self, message):
"""
Log a debug message.
Args:
message: The message to log
"""
if self.verbose:
self._print(message, ColoredWrapper.STATUS)
if self.propagte:
self._logging.debug(message)
[docs]
def info(self, message):
"""
Log an informational message.
Args:
message: The message to log
"""
self._print(message, ColoredWrapper.SUCCESS)
if self.propagte:
self._logging.info(message)
[docs]
def warning(self, message):
"""
Log a warning message.
Args:
message: The message to log
"""
self._print(message, ColoredWrapper.WARNING)
if self.propagte:
self._logging.warning(message)
[docs]
def error(self, message):
"""
Log an error message.
Args:
message: The message to log
"""
self._print(message, ColoredWrapper.ERROR)
if self.propagte:
self._logging.error(message)
[docs]
def critical(self, message):
"""
Log a critical error message.
Args:
message: The message to log
"""
self._print(message, ColoredWrapper.ERROR)
if self.propagte:
self._logging.critical(message)
def _print(self, message, color):
"""
Print a formatted message to the console.
Args:
message: The message to print
color: ANSI color code to use
"""
timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")
if self._filter is not None:
message = self._filter.filter_string(message)
click.echo(
f"{color}{ColoredWrapper.BOLD}[{timestamp}]{ColoredWrapper.END} "
f"{ColoredWrapper.BOLD}{self.prefix}{ColoredWrapper.END} {message}"
)
[docs]
def set_filter(self, filter: SensitiveDataFilter):
"""Set custom data filter.
Args:
filter:
"""
self._filter = filter
[docs]
class LoggingHandlers:
"""
Configures and manages logging handlers.
This class sets up handlers for logging to files and tracks verbosity settings
for use with ColoredWrapper.
Attributes:
handler: FileHandler for logging to a file
verbosity: Whether to include debug-level messages
"""
def __init__(self, verbose: bool = False, filename: Optional[str] = None):
"""
Initialize logging handlers.
Args:
verbose: Whether to include debug-level messages
filename: Optional file to log to
"""
logging_format = "%(asctime)s,%(msecs)d %(levelname)s %(name)s: %(message)s"
logging_date_format = "%H:%M:%S"
formatter = logging.Formatter(logging_format, logging_date_format)
self.handler: Optional[logging.FileHandler] = None
# Remember verbosity for colored wrapper
self.verbosity = verbose
# Add file output if needed
if filename:
file_out = logging.FileHandler(filename=filename, mode="w")
file_out.setFormatter(formatter)
file_out.setLevel(logging.DEBUG if verbose else logging.INFO)
self.handler = file_out
[docs]
class LoggingBase:
"""
Base class providing consistent logging functionality across the framework.
This class sets up a logger with a unique identifier and provides methods
for logging at different levels with consistent formatting. It supports
both console output with color coding and optional file logging.
Attributes:
log_name: Unique identifier for this logger
logging: ColoredWrapper for formatted console output
"""
REDACTION_FILTER: Optional[SensitiveDataFilter] = None
def __init__(self):
"""
Initialize the logging base with a unique identifier.
Creates a unique name for the logger based on class name and a random ID,
then configures a standard logger and colored wrapper.
"""
uuid_name = str(uuid.uuid4())[0:4]
if hasattr(self, "typename"):
self.log_name = f"{self.typename()}-{uuid_name}"
else:
self.log_name = f"{self.__class__.__name__}-{uuid_name}"
self._logging = logging.getLogger(self.log_name)
self._logging.setLevel(logging.INFO)
self.wrapper = ColoredWrapper(self.log_name, self._logging)
if LoggingBase.REDACTION_FILTER is not None:
self._logging.addFilter(LoggingBase.REDACTION_FILTER)
self.wrapper.set_filter(LoggingBase.REDACTION_FILTER)
[docs]
@classmethod
def enable_filtering(cls) -> None:
"""Enable sensitive data filtering for all loggers."""
if cls.REDACTION_FILTER is None:
cls.REDACTION_FILTER = SensitiveDataFilter()
[docs]
@classmethod
def set_filtering_resource_id(cls, resource_id: str, cloud_id: Optional[str] = None) -> None:
"""Add resource ID and cloud user IDs to logging filtering.
Args:
resource_id: SeBS cloud ID
cloud_id: cloud-specific user ID (e.g., AWS account ID or GCP project name)
"""
assert (
cls.REDACTION_FILTER is not None
), "Filtering must be enabled before setting resource ID"
cls.REDACTION_FILTER.set_resource_id(resource_id, cloud_id)
@property
def logging(self) -> ColoredWrapper:
"""
Get the colored logging wrapper.
Returns:
ColoredWrapper: The logging wrapper for this instance
"""
# This would always print log with color. And only if
# filename in LoggingHandlers is set, it would log to file.
return self.wrapper
@property
def logging_handlers(self) -> LoggingHandlers:
"""
Get the logging handlers configuration.
Returns:
LoggingHandlers: The current handlers configuration
"""
return self._logging_handlers
@logging_handlers.setter
def logging_handlers(self, handlers: LoggingHandlers):
"""
Set new logging handlers configuration.
Args:
handlers: The new handlers configuration to use
"""
self._logging_handlers = handlers
self._logging.propagate = False
self.wrapper = ColoredWrapper(
self.log_name,
self._logging,
verbose=handlers.verbosity,
propagte=handlers.handler is not None,
)
if LoggingBase.REDACTION_FILTER is not None:
self._logging.addFilter(LoggingBase.REDACTION_FILTER)
self.wrapper.set_filter(LoggingBase.REDACTION_FILTER)
if self._logging_handlers.handler is not None:
self._logging.addHandler(self._logging_handlers.handler)
[docs]
def is_linux() -> bool:
"""
Check if the system is Linux and not Windows Subsystem for Linux.
Returns:
bool: True if native Linux, False otherwise
"""
return platform.system() == "Linux" and "microsoft" not in platform.release().lower()
[docs]
def catch_interrupt() -> None:
"""
Set up a signal handler to catch interrupt signals (Ctrl+C).
Prints a stack trace and exits when an interrupt is received.
This helps with debugging by showing the execution context at
the time of the interruption.
"""
import signal
import sys
import traceback
def handler(x, y):
"""
Handle interrupt signal by printing stack trace and exiting.
Args:
x: Signal number
y: Frame object
"""
traceback.print_stack()
sys.exit(signal.SIGINT)
signal.signal(signal.SIGINT, handler)
[docs]
def ensure_benchmarks_data(logger: ColoredWrapper) -> Path:
"""Ensure benchmarks-data exists, cloning if necessary.
For local installation, we use submodule to ensure that
benchmarks-data is initialized. For package installation,
we clone benchmarks-data to a home directory if it doesn't exist.
Returns:
Path to benchmarks-data directory
Raises:
RuntimeError: If cloning fails
"""
data_dir = get_benchmarks_data_path()
# Check if data already exists and is not empty
if data_dir.exists() and any(data_dir.iterdir()):
return data_dir
# Create parent directory if needed
data_dir.parent.mkdir(parents=True, exist_ok=True)
if IS_PACKAGE_INSTALL:
# In package: clone to a home directory
url = "https://github.com/spcl/serverless-benchmarks-data.git"
logger.info(f"Initialize benchmarks data to {data_dir} from {url}...")
try:
subprocess.run(
[
"git",
"clone",
url,
str(data_dir),
],
check=True,
capture_output=True,
text=True,
)
logger.info(f"Benchmarks-data cloned from {url} successfully")
return data_dir
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to clone benchmarks-data: {e.stderr}") from e
except FileNotFoundError:
raise RuntimeError("git command not found. Please install git to use SeBS") from None
else:
# Git clone mode: use submodule
logger.info("Initializing benchmarks data submodule...")
try:
subprocess.run(
["git", "submodule", "update", "--init", "--recursive"],
cwd=get_project_root(),
check=True,
capture_output=True,
text=True,
)
logger.info("Benchmarks-data submodule initialized successfully")
return data_dir
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to initialize benchmarks-data submodule: {e.stderr}") from e
except FileNotFoundError:
raise RuntimeError("git command not found. Please install git to use SeBS") from None