# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Configuration management for AWS SeBS integration.
This module provides configuration classes for AWS credentials, resources, and settings
used when deploying to AWS Lambda. It handles
AWS authentication, resource management including ECR repositories, IAM roles, and
HTTP APIs, along with caching and serialization capabilities.
Key classes:
AWSCredentials: Manages AWS access credentials and account information
AWSResources: Manages AWS resources like ECR repositories, IAM roles, and HTTP APIs
AWSConfig: Main configuration container combining credentials and resources
"""
import base64
import json
import os
import time
from typing import cast, Dict, List, Optional, Tuple
import boto3
from mypy_boto3_ecr import ECRClient
from sebs.cache import Cache
from sebs.faas.config import Config, Credentials, Resources
from sebs.aws.function import LambdaFunction
from sebs.utils import LoggingHandlers
[docs]
class AWSCredentials(Credentials):
"""AWS authentication credentials for SeBS.
This class manages AWS access credentials including access key, secret key,
and automatically retrieves the associated AWS account ID through STS.
Account ID is cached to retain information on which account was the benchmark
executed. Credentials are not cached.
Attributes:
_access_key: AWS access key ID
_secret_key: AWS secret access key
_account_id: AWS account ID retrieved via STS
"""
def __init__(self, access_key: str, secret_key: str) -> None:
"""Initialize AWS credentials.
Args:
access_key: AWS access key ID
secret_key: AWS secret access key
Raises:
ClientError: If AWS credentials are invalid or STS call fails
"""
super().__init__()
self._access_key = access_key
self._secret_key = secret_key
client = boto3.client(
"sts",
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
)
self._account_id = client.get_caller_identity()["Account"]
[docs]
@staticmethod
def typename() -> str:
"""Get the type name for these credentials.
Returns:
str: The type name 'AWS.Credentials'
"""
return "AWS.Credentials"
@property
def access_key(self) -> str:
"""Get the AWS access key ID.
Returns:
str: AWS access key ID
"""
return self._access_key
@property
def secret_key(self) -> str:
"""Get the AWS secret access key.
Returns:
str: AWS secret access key
"""
return self._secret_key
@property
def account_id(self) -> str:
"""Get the AWS account ID.
Returns:
str: AWS account ID
"""
return self._account_id
[docs]
@staticmethod
def initialize(dct: dict) -> "AWSCredentials":
"""Initialize AWS credentials from a dictionary.
Args:
dct: Dictionary containing 'access_key' and 'secret_key'
Returns:
AWSCredentials: Initialized credentials object
Raises:
KeyError: If required keys are missing from dictionary
"""
return AWSCredentials(dct["access_key"], dct["secret_key"])
[docs]
@staticmethod
def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials:
"""Deserialize AWS credentials from configuration and cache.
Loads AWS credentials from configuration file, environment variables, or cache.
Validates that credentials match cached account ID if available.
Args:
config: Configuration dictionary that may contain credentials
cache: Cache instance for retrieving/storing credentials
handlers: Logging handlers for error reporting
Returns:
Credentials: Deserialized AWSCredentials instance
Raises:
RuntimeError: If credentials are missing or don't match cached account
"""
cached_config = cache.get_config("aws")
ret: AWSCredentials
account_id: Optional[str] = None
# Load cached values
if cached_config and "credentials" in cached_config:
account_id = cached_config["credentials"]["account_id"]
# Check for new config.
# Loading old results might result in not having credentials in the JSON - need to check.
if "credentials" in config and "access_key" in config["credentials"]:
ret = AWSCredentials.initialize(config["credentials"])
elif "AWS_ACCESS_KEY_ID" in os.environ:
ret = AWSCredentials(
os.environ["AWS_ACCESS_KEY_ID"], os.environ["AWS_SECRET_ACCESS_KEY"]
)
else:
raise RuntimeError(
"AWS login credentials are missing! Please set "
"up environmental variables AWS_ACCESS_KEY_ID and "
"AWS_SECRET_ACCESS_KEY"
)
if account_id is not None and account_id != ret.account_id:
ret.logging.error(
f"The account id {ret.account_id} from provided credentials is different "
f"from the account id {account_id} found in the cache! Please change "
"your cache directory or create a new one!"
)
raise RuntimeError(
f"AWS login credentials do not match the account {account_id} in cache!"
)
ret.logging_handlers = handlers
return ret
[docs]
def update_cache(self, cache: Cache) -> None:
"""Update the cache with current credentials.
Args:
cache: Cache instance to update
"""
cache.update_config(val=self.account_id, keys=["aws", "credentials", "account_id"])
[docs]
def serialize(self) -> dict:
"""Serialize credentials to a dictionary.
Returns:
dict: Dictionary containing account_id
"""
out = {"account_id": self._account_id}
return out
[docs]
class AWSResources(Resources):
"""AWS resource management for SeBS.
This class manages AWS-specific resources including ECR repositories,
IAM roles, HTTP APIs, and Docker registry configurations. It provides
methods for creating and managing these resources with caching support.
Attributes:
_docker_registry: Docker registry URL (ECR repository URI)
_docker_username: Docker registry username
_docker_password: Docker registry password
_container_repository: ECR repository name
_lambda_role: IAM role ARN for Lambda execution
_http_apis: Dictionary of HTTP API configurations
"""
[docs]
class HTTPApi:
"""HTTP API configuration for AWS API Gateway.
Represents an HTTP API resource in AWS API Gateway with its ARN and endpoint.
Attributes:
_arn: API Gateway ARN
_endpoint: API Gateway endpoint URL
"""
def __init__(self, arn: str, endpoint: str) -> None:
"""Initialize HTTP API configuration.
Args:
arn: API Gateway ARN
endpoint: API Gateway endpoint URL
"""
self._arn = arn
self._endpoint = endpoint
@property
def arn(self) -> str:
"""Get the API Gateway ARN.
Returns:
str: API Gateway ARN
"""
return self._arn
@property
def endpoint(self) -> str:
"""Get the API Gateway endpoint URL.
Returns:
str: API Gateway endpoint URL
"""
return self._endpoint
[docs]
@staticmethod
def deserialize(dct: dict) -> "AWSResources.HTTPApi":
"""Deserialize HTTP API from dictionary.
Args:
dct: Dictionary containing 'arn' and 'endpoint'
Returns:
AWSResources.HTTPApi: Deserialized HTTP API instance
"""
return AWSResources.HTTPApi(dct["arn"], dct["endpoint"])
[docs]
def serialize(self) -> dict:
"""Serialize HTTP API to dictionary.
Returns:
dict: Dictionary containing arn and endpoint
"""
out = {"arn": self.arn, "endpoint": self.endpoint}
return out
def __init__(
self,
registry: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""Initialize AWS resources.
Args:
registry: Docker registry URL (ECR repository URI)
username: Docker registry username
password: Docker registry password
"""
super().__init__(name="aws")
self._docker_registry: Optional[str] = registry if registry != "" else None
self._docker_username: Optional[str] = username if username != "" else None
self._docker_password: Optional[str] = password if password != "" else None
self._container_repository: Optional[str] = None
self._lambda_role = ""
self._http_apis: Dict[str, AWSResources.HTTPApi] = {}
[docs]
@staticmethod
def typename() -> str:
"""Get the type name for these resources.
Returns:
str: The type name 'AWS.Resources'
"""
return "AWS.Resources"
@property
def docker_registry(self) -> Optional[str]:
"""Get the Docker registry URL.
Returns:
Optional[str]: Docker registry URL (ECR repository URI)
"""
return self._docker_registry
@property
def docker_username(self) -> Optional[str]:
"""Get the Docker registry username.
Returns:
Optional[str]: Docker registry username
"""
return self._docker_username
@property
def docker_password(self) -> Optional[str]:
"""Get the Docker registry password.
Returns:
Optional[str]: Docker registry password
"""
return self._docker_password
@property
def container_repository(self) -> Optional[str]:
"""Get the ECR repository name.
Returns:
Optional[str]: ECR repository name
"""
return self._container_repository
[docs]
def lambda_role(self, boto3_session: boto3.session.Session) -> str:
"""Get or create IAM role for Lambda execution.
Creates a Lambda execution role with S3 and basic execution permissions
if it doesn't already exist. The role allows Lambda functions to access
S3 and write CloudWatch logs.
Args:
boto3_session: Boto3 session for AWS API calls
Returns:
str: Lambda execution role ARN
Raises:
ClientError: If IAM operations fail
"""
if not self._lambda_role:
iam_client = boto3_session.client(service_name="iam")
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
role_name = "sebs-lambda-role"
attached_policies = [
"arn:aws:iam::aws:policy/AmazonS3FullAccess",
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
]
try:
out = iam_client.get_role(RoleName=role_name)
self._lambda_role = out["Role"]["Arn"]
self.logging.info(f"AWS: Selected {self._lambda_role} IAM role")
except iam_client.exceptions.NoSuchEntityException:
out = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
)
self._lambda_role = out["Role"]["Arn"]
self.logging.info(
f"AWS: Created {self._lambda_role} IAM role. "
"Sleep 10 seconds to avoid problems when using role immediately."
)
time.sleep(10)
# Attach basic AWS Lambda and S3 policies.
for policy in attached_policies:
iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy)
return self._lambda_role
[docs]
def http_api(
self, api_name: str, func: LambdaFunction, boto3_session: boto3.session.Session
) -> "AWSResources.HTTPApi":
"""Get or create HTTP API for Lambda function.
Creates an HTTP API Gateway that routes requests to the specified Lambda function.
If the API already exists, returns the cached instance.
Args:
api_name: Name of the HTTP API
func: Lambda function to route requests to
boto3_session: Boto3 session for AWS API calls
Returns:
AWSResources.HTTPApi: HTTP API configuration
Raises:
RuntimeError: If API creation fails after retries
TooManyRequestsException: If API Gateway rate limits are exceeded
"""
http_api = self._http_apis.get(api_name)
if not http_api:
# get apigateway client
api_client = boto3_session.client(
service_name="apigatewayv2", region_name=cast(str, self._region)
)
# check existing apis
api_data = None
for api in api_client.get_apis()["Items"]:
if api["Name"] == api_name:
self.logging.info(f"Using existing HTTP API {api_name}")
api_data = api
break
if not api_data:
self.logging.info(f"Creating HTTP API {api_name}")
retries = 0
while retries < 5:
try:
api_data = api_client.create_api( # type: ignore
Name=api_name, ProtocolType="HTTP", Target=func.arn
)
break
except api_client.exceptions.TooManyRequestsException as e:
retries += 1
if retries == 5:
self.logging.error("Failed to create the HTTP API!")
self.logging.error(e)
raise RuntimeError("Failed to create the HTTP API!")
else:
self.logging.info("HTTP API is overloaded, applying backoff.")
time.sleep(retries)
api_id = api_data["ApiId"] # type: ignore
endpoint = api_data["ApiEndpoint"] # type: ignore
# function's arn format is: arn:aws:{region}:{account-id}:{func}
# easier than querying AWS resources to get account id
account_id = func.arn.split(":")[4]
# API arn is:
arn = f"arn:aws:execute-api:{self._region}:{account_id}:{api_id}"
http_api = AWSResources.HTTPApi(arn, endpoint)
self._http_apis[api_name] = http_api
else:
self.logging.info(f"Using cached HTTP API {api_name}")
return http_api
[docs]
def cleanup_http_apis(
self, boto3_session: boto3.session.Session, cache_client: Cache, dry_run: bool = False
) -> List[str]:
"""Remove HTTP APIs allocated for HTTP triggers.
Args:
boto3_session: boto3 session for AWS API calls
cache_client: SeBS cache client
dry_run: when true, skip actual deletion
Returns:
list of deleted HTTP API names
"""
deleted: List[str] = []
dry_run_tag = "[DRY-RUN] " if dry_run else ""
api_client = boto3_session.client(
service_name="apigatewayv2", region_name=cast(str, self._region)
)
http_apis = cache_client.get_config_key(["aws", "resources", "http-apis"])
if http_apis is None:
return deleted
for name, http_api in http_apis.items():
self.logging.info(f"{dry_run_tag}Deleting HTTP API: {name} ({http_api['arn']})")
if not dry_run:
# We need to extract the ID
api_id = http_api["arn"].split(":")[-1]
api_client.delete_api(ApiId=api_id)
deleted.append(name)
if not dry_run:
for api_name in deleted:
cache_client.remove_config_key(["aws", "resources", "http-apis", api_name])
self._http_apis.pop(api_name, None)
return deleted
[docs]
def check_ecr_repository_exists(
self, ecr_client: ECRClient, repository_name: str
) -> Optional[str]:
"""Check if ECR repository exists.
Args:
ecr_client: ECR client instance
repository_name: Name of the ECR repository
Returns:
Optional[str]: Repository URI if exists, None otherwise
Raises:
Exception: If ECR operation fails (other than RepositoryNotFound)
"""
try:
resp = ecr_client.describe_repositories(repositoryNames=[repository_name])
return resp["repositories"][0]["repositoryUri"]
except ecr_client.exceptions.RepositoryNotFoundException:
return None
except Exception as e:
self.logging.error(f"Error checking repository: {e}")
raise e
[docs]
def get_ecr_repository(self, ecr_client: ECRClient) -> str:
"""Get or create ECR repository for container deployments.
Creates an ECR repository with a unique name based on the resource ID
if it doesn't already exist. Updates the docker_registry property.
Args:
ecr_client: ECR client instance
Returns:
str: ECR repository name
Raises:
ClientError: If ECR operations fail
"""
if self._container_repository is not None:
return self._container_repository
self._container_repository = "sebs-benchmarks-{}".format(self._resources_id)
self._docker_registry = self.check_ecr_repository_exists(
ecr_client, self._container_repository
)
if self._docker_registry is None:
try:
resp = ecr_client.create_repository(repositoryName=self._container_repository)
self.logging.info(f"Created ECR repository: {self._container_repository}")
self._docker_registry = resp["repository"]["repositoryUri"]
except ecr_client.exceptions.RepositoryAlreadyExistsException:
# Support the situation where two invocations concurrently initialize it.
self.logging.info(f"ECR repository {self._container_repository} already exists.")
self._docker_registry = self.check_ecr_repository_exists(
ecr_client, self._container_repository
)
return self._container_repository
[docs]
def cleanup_ecr_repository(
self,
boto3_session: boto3.session.Session,
cache_client: Cache,
dry_run: bool = False,
) -> List[str]:
"""Remove ECR repository used for container images.
Args:
boto3_session: boto3 session for AWS API calls
cache_client: SeBS cache instance
dry_run: when true, skip actual deletion
Returns:
list of deleted ECR repositories (currently always one)
"""
deleted: List[str] = []
dry_run_tag = "[DRY-RUN] " if dry_run else ""
repo_name = self._container_repository
if repo_name is None:
return deleted
try:
ecr_client = boto3_session.client("ecr", region_name=cast(str, self._region))
try:
ecr_client.describe_repositories(repositoryNames=[repo_name])
self.logging.info(f"{dry_run_tag}Deleting ECR repository: {repo_name}")
deleted.append(repo_name)
if dry_run:
return deleted
ecr_client.delete_repository(repositoryName=repo_name, force=True)
except ecr_client.exceptions.RepositoryNotFoundException:
self.logging.warning(f"ECR repository {repo_name} does not exist")
except Exception as e:
self.logging.error(f"Failed to delete ECR repository {repo_name}: {e}")
finally:
cache_client.remove_config_key(["aws", "resources", "container_repository"])
cache_client.remove_config_key(["aws", "resources", "docker"])
self._docker_registry = None
self._docker_username = None
self._container_repository = None
except Exception as e:
self.logging.error(f"Failed to create ECR client: {e}")
if not dry_run:
cache_client.invalidate_all_container_uris("aws")
return deleted
[docs]
def cleanup_cloudwatch_logs(
self, function_names: List[str], boto3_session: boto3.session.Session, dry_run: bool
) -> List[str]:
"""Remove CloudWatch logs for selected functions.
Args:
function_names: list of function names to clean up logs for
boto3_session: boto3 session for AWS API calls
dry_run: when true, skip actual deletion
Returns:
list of deleted log group names
"""
deleted: List[str] = []
dry_run_tag = "[DRY-RUN] " if dry_run else ""
if not function_names:
return deleted
logs_client = boto3_session.client("logs", region_name=self._region)
for func_name in function_names:
log_group = f"/aws/lambda/{func_name}"
try:
response = logs_client.describe_log_groups(logGroupNamePrefix=log_group)
for group in response.get("logGroups", []):
group_name = group["logGroupName"]
if group_name != log_group:
continue
self.logging.info(f"{dry_run_tag}Deleting log group: {group_name}")
deleted.append(group_name)
if dry_run:
continue
try:
logs_client.delete_log_group(logGroupName=group_name)
except Exception as e:
self.logging.error(f"Failed to delete log group {group_name}: {e}")
except Exception as e:
self.logging.error(f"Failed to describe log groups for {func_name}: {e}")
return deleted
[docs]
def ecr_repository_authorization(self, ecr_client: ECRClient) -> Tuple[str, str, str]:
"""Get ECR repository authorization credentials.
Retrieves temporary authorization token from ECR and extracts
username and password for Docker registry authentication.
Args:
ecr_client: ECR client instance
Returns:
Tuple[str, str, str]: Username, password, and registry URL
Raises:
AssertionError: If username or registry are None
ClientError: If ECR authorization fails
"""
if self._docker_password is None:
response = ecr_client.get_authorization_token()
auth_token = response["authorizationData"][0]["authorizationToken"]
decoded_token = base64.b64decode(auth_token).decode("utf-8")
# Split username:password
self._docker_username, self._docker_password = decoded_token.split(":")
assert self._docker_username is not None
assert self._docker_registry is not None
return self._docker_username, self._docker_password, self._docker_registry
[docs]
@staticmethod
def initialize(res: Resources, dct: dict) -> None:
"""Initialize AWS resources from dictionary.
Args:
res: Base Resources instance to initialize
dct: Dictionary containing resource configuration
Returns:
AWSResources: Initialized AWS resources instance
"""
ret = cast(AWSResources, res)
super(AWSResources, AWSResources).initialize(ret, dct)
if "docker" in dct:
ret._docker_registry = dct["docker"]["registry"]
ret._docker_username = dct["docker"]["username"]
ret._container_repository = dct["container_repository"]
ret._lambda_role = dct["lambda-role"] if "lambda-role" in dct else ""
if "http-apis" in dct:
for key, value in dct["http-apis"].items():
ret._http_apis[key] = AWSResources.HTTPApi.deserialize(value)
[docs]
def serialize(self) -> dict:
"""Serialize AWS resources to dictionary.
Returns:
dict: Serialized resource configuration
"""
out = {
**super().serialize(),
"lambda-role": self._lambda_role,
"http-apis": {key: value.serialize() for (key, value) in self._http_apis.items()},
"docker": {
"registry": self.docker_registry,
"username": self.docker_username,
},
"container_repository": self.container_repository,
}
return out
[docs]
def update_cache(self, cache: Cache) -> None:
"""Update cache with current resource configuration.
Args:
cache: Cache instance to update
"""
super().update_cache(cache)
cache.update_config(
val=self.docker_registry, keys=["aws", "resources", "docker", "registry"]
)
cache.update_config(
val=self.docker_username, keys=["aws", "resources", "docker", "username"]
)
cache.update_config(
val=self.container_repository,
keys=["aws", "resources", "container_repository"],
)
cache.update_config(val=self._lambda_role, keys=["aws", "resources", "lambda-role"])
for name, api in self._http_apis.items():
cache.update_config(val=api.serialize(), keys=["aws", "resources", "http-apis", name])
[docs]
@staticmethod
def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources:
"""Deserialize AWS resources from configuration and cache.
Args:
config: Configuration dictionary
cache: Cache instance for retrieving cached resources
handlers: Logging handlers for status messages
Returns:
Resources: Deserialized AWSResources instance
"""
ret = AWSResources()
cached_config = cache.get_config("aws")
# Load cached values
if cached_config and "resources" in cached_config:
AWSResources.initialize(ret, cached_config["resources"])
ret.logging_handlers = handlers
ret.logging.info("Using cached resources for AWS")
else:
# Check for new config
if "resources" in config:
AWSResources.initialize(ret, config["resources"])
ret.logging_handlers = handlers
ret.logging.info("No cached resources for AWS found, using user configuration.")
else:
AWSResources.initialize(ret, {})
ret.logging_handlers = handlers
ret.logging.info("No resources for AWS found, initialize!")
return ret
[docs]
class AWSConfig(Config):
"""Main AWS configuration container.
Combines AWS credentials and resources into a single configuration object
for use by the AWS SeBS implementation.
Attributes:
_credentials: AWS authentication credentials
_resources: AWS resource management configuration
"""
def __init__(self, credentials: AWSCredentials, resources: AWSResources) -> None:
"""Initialize AWS configuration.
Args:
credentials: AWS authentication credentials
resources: AWS resource management configuration
"""
super().__init__(name="aws")
self._credentials = credentials
self._resources = resources
[docs]
@staticmethod
def typename() -> str:
"""Get the type name for this configuration.
Returns:
str: The type name 'AWS.Config'
"""
return "AWS.Config"
@property
def credentials(self) -> AWSCredentials:
"""Get AWS credentials.
Returns:
AWSCredentials: AWS authentication credentials
"""
return self._credentials
@property
def resources(self) -> AWSResources:
"""Get AWS resources configuration.
Returns:
AWSResources: AWS resource management configuration
"""
return self._resources
[docs]
@staticmethod
def initialize(cfg: Config, dct: dict) -> None:
"""Initialize AWS configuration from dictionary.
Args:
cfg: Base Config instance to initialize
dct: Dictionary containing 'region' configuration
"""
config = cast(AWSConfig, cfg)
config._region = dct["region"]
[docs]
@staticmethod
def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config:
"""Deserialize AWS configuration from config and cache.
Creates an AWSConfig instance by deserializing credentials and resources,
then loading region configuration from cache or user-provided config.
Args:
config: Configuration dictionary
cache: Cache instance for retrieving cached configuration
handlers: Logging handlers for status messages
Returns:
Config: Deserialized AWSConfig instance
"""
cached_config = cache.get_config("aws")
credentials = cast(AWSCredentials, AWSCredentials.deserialize(config, cache, handlers))
resources = cast(AWSResources, AWSResources.deserialize(config, cache, handlers))
config_obj = AWSConfig(credentials, resources)
config_obj.logging_handlers = handlers
# Load cached values
if cached_config:
config_obj.logging.info("Using cached config for AWS")
AWSConfig.initialize(config_obj, cached_config)
else:
config_obj.logging.info("Using user-provided config for AWS")
AWSConfig.initialize(config_obj, config)
resources.region = config_obj.region
return config_obj
[docs]
def update_cache(self, cache: Cache) -> None:
"""Update the contents of the user cache.
The changes are directly written to the file system.
Updates region, credentials, and resources in the cache.
Args:
cache: Cache instance to update
"""
cache.update_config(val=self.region, keys=["aws", "region"])
self.credentials.update_cache(cache)
self.resources.update_cache(cache)
[docs]
def serialize(self) -> dict:
"""Serialize AWS configuration to dictionary.
Returns:
dict: Serialized configuration including name, region, credentials, and resources
"""
out = {
"name": "aws",
"region": self._region,
"credentials": self._credentials.serialize(),
"resources": self._resources.serialize(),
}
return out