Source code for sebs.faas.nosql

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""
Module for NoSQL database storage abstraction in the Serverless Benchmarking Suite.

This module provides an abstract base class for NoSQL database implementations
across different cloud platforms (AWS DynamoDB, Azure CosmosDB, Google Cloud Datastore)
and local development environments. It handles table creation, data writing, and
cache management for benchmark data stored in NoSQL databases.
"""

from abc import ABC
from abc import abstractmethod
from typing import Dict, List, Optional, Tuple

from sebs.faas.config import Resources
from sebs.cache import Cache
from sebs.utils import LoggingBase


[docs] class NoSQLStorage(ABC, LoggingBase): """ Abstract base class for NoSQL database storage implementations. This class defines the interface for NoSQL database operations across different cloud platforms and local environments. Concrete implementations handle the platform-specific details of creating tables, writing data, and managing resources. Attributes: cache_client: Client for caching database information region: Cloud region where the database is deployed """
[docs] @staticmethod @abstractmethod def deployment_name() -> str: """ Get the name of the deployment platform. Returns: str: Name of the deployment platform (e.g., 'aws', 'azure', 'gcp') """ pass
@property def cache_client(self) -> Cache: """ Get the cache client. Returns: Cache: The cache client for database information """ return self._cache_client @property def region(self) -> str: """ Get the cloud region. Returns: str: The cloud region where the database is deployed """ return self._region def __init__(self, region: str, cache_client: Cache, resources: Resources): """ Initialize a NoSQL storage instance. Args: region: Cloud region where the database is deployed cache_client: Client for caching database information resources: Resource configuration for the database """ super().__init__() self._cache_client = cache_client self._cached = False self._region = region self._cloud_resources = resources
[docs] @abstractmethod def get_tables(self, benchmark: str) -> Dict[str, str]: """ Get a mapping of benchmark-defined table names to actual cloud provider table names. Args: benchmark: Name of the benchmark Returns: Dict[str, str]: Dictionary mapping table logical names to physical table names """ pass
@abstractmethod def _get_table_name(self, benchmark: str, table: str) -> Optional[str]: """ Get the physical table name for a benchmark's logical table. Args: benchmark: Name of the benchmark table: Logical name of the table Returns: Optional[str]: Physical table name if it exists, None otherwise """ pass
[docs] @abstractmethod def retrieve_cache(self, benchmark: str) -> bool: """ Retrieve cached table information for a benchmark. Implementations should populate internal structures with cached table names/details. Args: benchmark: Name of the benchmark Returns: bool: True if cache was successfully retrieved, False otherwise """ pass
[docs] @abstractmethod def update_cache(self, benchmark: str): """ Update the cache with the latest table information for a benchmark. Args: benchmark: Name of the benchmark """ pass
[docs] def envs(self) -> dict: """ Return a dictionary of environment variables that are required by functions to access this NoSQL storage (e.g., connection strings, table names). Default implementation returns an empty dictionary. Subclasses should override if they need to expose environment variables. Returns: dict: Dictionary of environment variables """ return {}
[docs] def create_benchmark_tables( self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None, ): """ Checks if the table already exists in the cache. If not, creates a new table with the specified keys. Each table name follows this pattern: sebs-benchmarks-{resource_id}-{benchmark-name}-{table-name} Each implementation should do the following: 1. Retrieve cached data 2. Create missing tables that do not exist 3. Update cached data if anything new was created (done separately in benchmark.py once the data is uploaded by the benchmark) Args: benchmark: Name of the benchmark name: Logical name of the table primary_key: Primary key field name secondary_key: Optional secondary key field name """ if self.retrieve_cache(benchmark): table_name = self._get_table_name(benchmark, name) if table_name is not None: self.logging.info( f"Using cached NoSQL table {table_name} for benchmark {benchmark}" ) return self.logging.info(f"Preparing to create a NoSQL table {name} for benchmark {benchmark}") self.create_table(benchmark, name, primary_key, secondary_key)
[docs] @abstractmethod def create_table( self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None, ) -> str: """ Create a new table for a benchmark. Provider-specific implementation details: - AWS: DynamoDB Table - Azure: CosmosDB Container - Google Cloud: Firestore in Datastore Mode, Database/Collection Args: benchmark: Name of the benchmark name: Logical name of the table primary_key: Primary key field name secondary_key: Optional secondary key field name Returns: str: Physical name of the created table """ pass
[docs] @abstractmethod def write_to_table( self, benchmark: str, table: str, data: dict, primary_key: Tuple[str, str], secondary_key: Optional[Tuple[str, str]] = None, ): """ Write an item/document to the specified table/container. This is used by benchmarks to populate tables with test data. Args: Write data to a table. benchmark: Name of the benchmark table: Logical name of the table data: Dictionary of data to write primary_key: Tuple of (key_name, key_value) for the primary key secondary_key: Optional tuple of (key_name, key_value) for the secondary key """ pass
[docs] @abstractmethod def clear_table(self, name: str) -> str: """ Clear all items from a table/container. Currently not implemented for any of the providers. Provider-specific implementation details: - AWS DynamoDB: Removing & recreating table looks like the cheapest & fastest option. - Azure CosmosDB: Recreate container or use specific API to delete items. - Google Cloud: Likely recreate collection or use specific API. Args: name: Name of the table to clear Returns: str: Result message or status """ pass
[docs] @abstractmethod def remove_table(self, name: str) -> str: """ Remove a table completely. Args: name: Name of the table to remove Returns: str: Result message or status """ pass
def _get_tables(self) -> Dict[str, List[str]]: """Get list of all allocated NoSQL tables. Returns: mapping of benchmark names to lists of actual NoSQL table names. """ raise NotImplementedError()
[docs] def cleanup_tables(self, dry_run: bool = False) -> List[str]: """Remove all allocated NoSQL tables. Args: dry_run: when true, skips actual deletion Returns: list of deleted table names """ deleted = [] table_names = self._get_tables() for _, tables in table_names.items(): for table in tables: deleted.append(table) if dry_run: continue try: self.remove_table(table) except Exception as e: self.logging.error(f"Failed to delete NoSQL table: {table}: {e}") if not dry_run: self._cache_client.remove_nosql(self.deployment_name()) return deleted