Source code for sebs.aws.dynamodb

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""AWS DynamoDB NoSQL storage implementation for SeBS.

This module provides the DynamoDB class which implements NoSQL storage functionality
for the Serverless Benchmarking Suite using Amazon DynamoDB. It handles table
creation, data operations, and caching for benchmark data storage.

Key classes:
    DynamoDB: AWS DynamoDB NoSQL storage implementation
"""

from collections import defaultdict
from typing import Dict, List, Optional, Tuple

from sebs.cache import Cache
from sebs.faas.config import Resources
from sebs.faas.nosql import NoSQLStorage

import boto3
from boto3.dynamodb.types import TypeSerializer


[docs] class DynamoDB(NoSQLStorage): """AWS DynamoDB NoSQL storage implementation for SeBS. This class provides NoSQL storage functionality using Amazon DynamoDB. It handles table creation, data operations, caching, and provides a unified interface for benchmark data storage. Attributes: client: DynamoDB client for AWS API operations _tables: Mapping of benchmark names to table configurations _serializer: DynamoDB type serializer for data conversion """
[docs] @staticmethod def typename() -> str: """Get the type name for this storage system. Returns: str: Type name ('AWS.DynamoDB') """ return "AWS.DynamoDB"
[docs] @staticmethod def deployment_name() -> str: """Get the deployment name for this storage system. Returns: str: Deployment name ('aws') """ return "aws"
def __init__( self, session: boto3.session.Session, cache_client: Cache, resources: Resources, region: str, access_key: str, secret_key: str, ) -> None: """Initialize DynamoDB NoSQL storage. Args: session: AWS boto3 session cache_client: Cache client for storing table configurations resources: Cloud resource configuration region: AWS region name access_key: AWS access key ID secret_key: AWS secret access key """ super().__init__(region, cache_client, resources) self.client = session.client( "dynamodb", region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) # Map benchmark -> name used by benchmark -> actual table_name in AWS # Example "shopping_cart" -> "sebs-benchmarks-<resource-id>-130.crud-api-shopping_cart" self._tables: Dict[str, Dict[str, str]] = defaultdict(dict) self._serializer = TypeSerializer() def _get_tables(self) -> Dict[str, List[str]]: """Get list of all allocated DynamoDB tables. Returns: mapping of benchmark names to lists of actual DynamoDB table names. """ tables = self.cache_client.get_nosql_configs(self.deployment_name()) return {benchmark: list(v.values()) for benchmark, v in tables.items()}
[docs] def retrieve_cache(self, benchmark: str) -> bool: """Retrieve table configuration from cache. Args: benchmark: Name of the benchmark Returns: bool: True if cache was found and loaded, False otherwise """ if benchmark in self._tables: return True cached_storage = self.cache_client.get_nosql_config(self.deployment_name(), benchmark) if cached_storage is not None: self._tables[benchmark] = cached_storage["tables"] return True return False
[docs] def update_cache(self, benchmark: str) -> None: """Update cache with current table configuration. Args: benchmark: Name of the benchmark to update cache for """ self._cache_client.update_nosql( self.deployment_name(), benchmark, { "tables": self._tables[benchmark], }, )
[docs] def get_tables(self, benchmark: str) -> Dict[str, str]: """Get table mappings for a benchmark. Args: benchmark: Name of the benchmark Returns: Dict[str, str]: Mapping of logical table names to actual DynamoDB table names """ return self._tables[benchmark]
def _get_table_name(self, benchmark: str, table: str) -> Optional[str]: """Get the actual DynamoDB table name for a logical table. Args: benchmark: Name of the benchmark table: Logical table name used by the benchmark Returns: Optional[str]: Actual DynamoDB table name, or None if not found """ if benchmark not in self._tables: return None if table not in self._tables[benchmark]: return None return self._tables[benchmark][table]
[docs] def write_to_table( self, benchmark: str, table: str, data: dict, primary_key: Tuple[str, str], secondary_key: Optional[Tuple[str, str]] = None, ) -> None: """Write data to a DynamoDB table. Args: benchmark: Name of the benchmark table: Logical table name data: Data to write to the table primary_key: Primary key as (attribute_name, value) tuple secondary_key: Optional secondary key as (attribute_name, value) tuple Raises: AssertionError: If the table name is not found """ table_name = self._get_table_name(benchmark, table) assert table_name is not None for key in (primary_key, secondary_key): if key is not None: data[key[0]] = key[1] serialized_data = {k: self._serializer.serialize(v) for k, v in data.items()} self.client.put_item(TableName=table_name, Item=serialized_data)
[docs] def create_table( self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None, ) -> str: """Create a DynamoDB table for benchmark data. Creates a unique DynamoDB table name using resource ID, benchmark name, and provided name. Unlike Azure (account -> database -> container) and GCP (database per benchmark), AWS requires unique table names across the account. The function handles cases where the table already exists or is being created. Uses PAY_PER_REQUEST billing mode. Args: benchmark: Name of the benchmark name: Logical table name primary_key: Name of the primary key attribute secondary_key: Optional name of the secondary key attribute Returns: str: Name of the created table Raises: RuntimeError: If table creation fails for unknown reasons """ table_name = f"sebs-benchmarks-{self._cloud_resources.resources_id}-{benchmark}-{name}" try: definitions = [{"AttributeName": primary_key, "AttributeType": "S"}] key_schema = [{"AttributeName": primary_key, "KeyType": "HASH"}] if secondary_key is not None: definitions.append({"AttributeName": secondary_key, "AttributeType": "S"}) key_schema.append({"AttributeName": secondary_key, "KeyType": "RANGE"}) ret = self.client.create_table( TableName=table_name, BillingMode="PAY_PER_REQUEST", AttributeDefinitions=definitions, # type: ignore KeySchema=key_schema, # type: ignore ) if ret["TableDescription"]["TableStatus"] == "CREATING": self.logging.info(f"Waiting for creation of DynamoDB table {name}") waiter = self.client.get_waiter("table_exists") waiter.wait(TableName=table_name, WaiterConfig={"Delay": 1}) self.logging.info(f"Created DynamoDB table {name} for benchmark {benchmark}") self._tables[benchmark][name] = table_name return ret["TableDescription"]["TableName"] except self.client.exceptions.ResourceInUseException as e: if "already exists" in e.response["Error"]["Message"]: # We need this waiter. # Otheriwise, we still might get later `ResourceNotFoundException` # when uploading benchmark data. self.logging.info(f"Waiting for the existing table {table_name} to be created") waiter = self.client.get_waiter("table_exists") waiter.wait(TableName=table_name, WaiterConfig={"Delay": 1}) ret = self.client.describe_table(TableName=table_name) self.logging.info( f"Using existing DynamoDB table {table_name} for benchmark {benchmark}" ) self._tables[benchmark][name] = table_name return name if "being created" in e.response["Error"]["Message"]: self.logging.info(f"Waiting for the existing table {table_name} to be created") waiter = self.client.get_waiter("table_exists") waiter.wait(TableName=table_name, WaiterConfig={"Delay": 1}) ret = self.client.describe_table(TableName=table_name) self.logging.info( f"Using existing DynamoDB table {table_name} for benchmark {benchmark}" ) self._tables[benchmark][name] = table_name return name raise RuntimeError(f"Creating DynamoDB failed, unknown reason! Error: {e}")
[docs] def clear_table(self, name: str) -> str: """Clear all data from a table. Args: name: Name of the table to clear Returns: str: Result of the operation Raises: NotImplementedError: This operation is not yet implemented """ raise NotImplementedError()
[docs] def remove_table(self, name: str) -> str: """Remove a table completely. Args: name: Name of the table to remove Returns: str: Result of the operation """ self.client.delete_table(TableName=name) self.logging.info(f"Waiting for deletion of DynamoDB table {name}") waiter = self.client.get_waiter("table_not_exists") waiter.wait(TableName=name, WaiterConfig={"Delay": 1}) self.logging.info(f"Deleted DynamoDB table {name}") return name