Source code for sebs.faas.storage

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Object storage abstraction for serverless benchmarks.

This module provides the PersistentStorage abstract base class for managing
object storage across different cloud platforms and local deployments. It
handles bucket management, file operations, and benchmark data organization.

The storage abstraction supports:
- Cross-platform object storage (S3, Azure Blob, GCS, MinIO)
- Benchmark data organization with input/output separation
- Bucket lifecycle management and naming conventions
- Benchmark files upload/download operations with caching
- Deployment discovery and resource management

Each platform provides concrete implementations that handle platform-specific
API calls while following the common interface defined here.
"""

import os
import re

from abc import ABC
from abc import abstractmethod
from typing import List, Optional, Tuple

from sebs.faas.config import Resources
from sebs.cache import Cache
from sebs.utils import LoggingBase


[docs] class PersistentStorage(ABC, LoggingBase): """Abstract base class for persistent object storage implementations. This class defines the interface for object storage services across different cloud platforms. It manages buckets, files, and benchmark data organization while providing a consistent API regardless of the underlying storage service. Attributes: cached: Whether bucket configuration is cached _cache_client: Cache client for storing configuration _input_prefixes: List of input data prefixes for benchmarks _output_prefixes: List of output data prefixes for benchmarks input_prefixes_files: Files associated with input prefixes _replace_existing: Whether to replace existing files during uploads _region: Cloud region for storage operations _cloud_resources: Resource configuration for the platform """
[docs] @staticmethod @abstractmethod def deployment_name() -> str: """Return the name of the FaaS deployment this storage belongs to (e.g., "aws", "azure"). Returns: str: Platform name (e.g., 'aws', 'azure', 'gcp', 'minio') """ pass
@property def cache_client(self) -> Cache: """Get the cache client for configuration storage. Returns: Cache: Cache client instance """ return self._cache_client @property def replace_existing(self) -> bool: """Flag indicating whether to replace existing files during operations. Returns: bool: True if existing files should be replaced, False otherwise """ return self._replace_existing @replace_existing.setter def replace_existing(self, val: bool): """Set flag indicating whether to replace existing files during operations. Args: val: True to replace existing files, False to skip """ self._replace_existing = val @property def region(self) -> str: """Get the cloud region for storage operations. Returns: str: Cloud region identifier """ return self._region def __init__( self, region: str, cache_client: Cache, resources: Resources, replace_existing: bool ): """Initialize the persistent storage instance. Args: region: Cloud region for storage operations cache_client: Cache client for configuration persistence resources: Resource configuration for the platform replace_existing: Whether to replace existing files during uploads """ super().__init__() self._cache_client = cache_client self.cached = False self._input_prefixes: List[str] = [] self._output_prefixes: List[str] = [] self.input_prefixes_files: List[List[str]] = [] self._replace_existing = replace_existing self._region = region self._cloud_resources = resources @property def input_prefixes(self) -> List[str]: """Get the list of input data prefixes for benchmarks. These are paths within the benchmark data bucket. Returns: List[str]: List of input prefix names """ return self._input_prefixes @property def output_prefixes(self) -> List[str]: """Get the list of output data prefixes for benchmarks. These are paths within the benchmark data bucket. Returns: List[str]: List of output prefix names """ return self._output_prefixes
[docs] @abstractmethod def correct_name(self, name: str) -> str: """Correct a bucket name to comply with platform naming requirements. Different platforms have different naming restrictions (character sets, length limits, etc.). This method applies platform-specific corrections. Args: name: Original bucket name Returns: str: Corrected bucket name that complies with platform requirements """ pass
[docs] def find_deployments(self) -> List[str]: """Find existing SeBS deployments by scanning bucket names. Scans all buckets in the storage service and extracts deployment IDs from bucket names that follow the SeBS naming convention. This helps identify existing deployments that can be reused. Looks for buckets named "sebs-benchmarks-*". Returns: List[str]: List of deployment resource IDs found in bucket names """ deployments = [] buckets = self.list_buckets() for bucket in buckets: # The benchmarks bucket must exist in every deployment. deployment_search = re.match("sebs-benchmarks-(.*)", bucket) if deployment_search: deployments.append(deployment_search.group(1)) return deployments
@abstractmethod def _create_bucket( self, name: str, buckets: Optional[List[str]] = None, randomize_name: bool = False ) -> str: """Create a new storage bucket with platform-specific implementation. Args: name: Desired bucket name buckets: Optional list of existing buckets to check against randomize_name: Whether to add random suffix for uniqueness Returns: str: Name of the created bucket Raises: Platform-specific exceptions for bucket creation failures """ pass
[docs] @abstractmethod def download(self, bucket_name: str, key: str, filepath: str) -> None: """Download a file from a storage bucket. Args: bucket_name: Name of the source bucket key: Storage source filepath (object key) filepath: Local destination filepath Raises: Platform-specific exceptions for download failures """ pass
[docs] @abstractmethod def upload(self, bucket_name: str, filepath: str, key: str) -> None: """Upload a file to a storage bucket. Bypasses caching and directly uploads the file. Useful for uploading code packages to storage when required by the deployment platform. Args: bucket_name: Name of the destination bucket filepath: Local source filepath key: Storage destination filepath (object key) Raises: Platform-specific exceptions for upload failures """ pass
[docs] @abstractmethod def list_bucket(self, bucket_name: str, prefix: str = "") -> List[str]: """Retrieve list of files in a storage bucket. Args: bucket_name: Name of the bucket to list prefix: Optional prefix to filter objects Returns: List[str]: List of file keys in the bucket matching the prefix Raises: Platform-specific exceptions for listing failures """ pass
[docs] @abstractmethod def list_buckets(self, bucket_name: Optional[str] = None) -> List[str]: """List all storage buckets/containers, optionally filtering them with a prefix. Args: bucket_name: Optional specific bucket prefix name to check for Returns: List[str]: List of bucket names. If bucket_name is provided, returns [bucket_name] if it exists, empty list otherwise. Raises: Platform-specific exceptions for listing failures """ pass
[docs] @abstractmethod def exists_bucket(self, bucket_name: str) -> bool: """Check if a storage bucket/container exists. Args: bucket_name: Name of the bucket to check Returns: bool: True if bucket exists, False otherwise Raises: Platform-specific exceptions for access failures """ pass
[docs] @abstractmethod def clean_bucket(self, bucket_name: str) -> None: """Remove all objects from a storage bucket. Args: bucket_name: Name of the bucket to clean Raises: Platform-specific exceptions for deletion failures """ pass
[docs] @abstractmethod def remove_bucket(self, bucket: str) -> None: """Delete a storage bucket completely. The bucket must often be emptied afterwards. Args: bucket: Name of the bucket to remove Raises: Platform-specific exceptions for deletion failures """ pass
[docs] def cleanup_buckets(self, dry_run: bool = False) -> List[str]: """Remove all allocated object storage buckets. Args: dry_run: when true, skips actual deletion Returns: list of deleted buckets """ deleted = [] dry_run_tag = "[DRY-RUN] " if dry_run else "" bucket_names = self._cloud_resources.get_buckets() existing_buckets = self.list_buckets() for bucket in bucket_names: if bucket in existing_buckets: self.logging.info(f"{dry_run_tag}Deleting S3 bucket: {bucket}") deleted.append(bucket) if dry_run: continue try: self.clean_bucket(bucket) self.remove_bucket(bucket) except Exception as e: self.logging.error(f"Failed to delete S3 bucket {bucket}: {e}") if not dry_run: self.cache_client.remove_storage(self.deployment_name()) self._cloud_resources.cleanup_deleted_buckets(self._cache_client) return deleted
[docs] def benchmark_data( self, benchmark: str, requested_buckets: Tuple[int, int] ) -> Tuple[List[str], List[str]]: """Allocate storage prefixes for benchmark input and output data. Creates logical prefixes within the benchmarks bucket for organizing benchmark input and output data. Checks cache first to avoid redundant allocation and validates existing prefix configuration. Prefix naming format: - Input: "benchmark-{idx}-input" - Output: "benchmark-{idx}-output" Args: benchmark: Name of the benchmark requested_buckets: Tuple of (input_prefix_count, output_prefix_count) Returns: Tuple[List[str], List[str]]: Lists of (input_prefixes, output_prefixes) """ # Add input prefixes inside benchmarks bucket # Prefix format: name-idx-input for i in range(0, requested_buckets[0]): self.input_prefixes.append("{}-{}-input".format(benchmark, i)) # Add output prefixes inside benchmarks bucket # Prefix format: name-idx-output for i in range(0, requested_buckets[1]): self.output_prefixes.append("{}-{}-output".format(benchmark, i)) cached_storage = self.cache_client.get_storage_config(self.deployment_name(), benchmark) self.cached = True if cached_storage is not None: cached_buckets = cached_storage["buckets"] # verify the input is up to date for prefix in self.input_prefixes: if prefix not in cached_buckets["input"]: self.cached = False for prefix in self.output_prefixes: if prefix not in cached_buckets["output"]: self.cached = False else: self.cached = False if cached_storage is not None and cached_storage["buckets"]["input_uploaded"] is False: self.cached = False # query buckets if the input prefixes changed, or the input is not up to date. if self.cached is False: for prefix in self.input_prefixes: self.input_prefixes_files.append( self.list_bucket( self.get_bucket(Resources.StorageBucketType.BENCHMARKS), self.input_prefixes[-1], ) ) self._cache_client.update_storage( self.deployment_name(), benchmark, { "buckets": { "input": self.input_prefixes, "output": self.output_prefixes, "input_uploaded": self.cached, } }, ) return self.input_prefixes, self.output_prefixes
[docs] def get_bucket(self, bucket_type: Resources.StorageBucketType) -> str: """Get or create a storage bucket for the specified type. Checks if the bucket is already known in `_cloud_resources`. If not, generates a bucket name following the standard naming convention, checks if it exists in the cloud, creates it if necessary, and then stores it in `_cloud_resources`. Args: bucket_type: Type of bucket to retrieve (BENCHMARKS, EXPERIMENTS, DEPLOYMENT) Returns: str: Name of the bucket for the specified type Raises: Platform-specific exceptions for bucket operations """ bucket = self._cloud_resources.get_storage_bucket(bucket_type) if bucket is None: description = { Resources.StorageBucketType.BENCHMARKS: "benchmarks", Resources.StorageBucketType.EXPERIMENTS: "experiment results", Resources.StorageBucketType.DEPLOYMENT: "code deployment", } name = self._cloud_resources.get_storage_bucket_name(bucket_type) if not self.exists_bucket(name): self.logging.info(f"Initialize a new bucket for {description[bucket_type]}") bucket = self._create_bucket( self.correct_name(name), randomize_name=False, ) else: self.logging.info(f"Using existing bucket {name} for {description[bucket_type]}") bucket = name self._cloud_resources.set_storage_bucket(bucket_type, bucket) return bucket
[docs] @abstractmethod def uploader_func(self, bucket_idx: int, file: str, filepath: str) -> None: """Upload benchmark input data to storage with smart caching. Implements a utility function for uploading benchmark input data that respects caching preferences. Skips uploading existing files unless the storage client has been configured to override existing data. This is used by each benchmark to prepare input benchmark files. Args: bucket_idx: Index of the input prefix/bucket file: Name of the file to upload filepath: Storage destination filepath (object key) Raises: Platform-specific exceptions for upload failures """ pass
[docs] def download_bucket(self, bucket_name: str, output_dir: str) -> None: """Download all files from a storage bucket to a local directory. Downloads every file from the specified bucket to a local output directory. Only downloads files that don't already exist locally. Warning: Assumes flat directory structure in bucket. Does not handle object keys with directory separators (e.g., 'dir1/dir2/file'). Args: bucket_name: Name of the bucket to download from output_dir: Local directory to download files to Raises: Platform-specific exceptions for download failures """ files = self.list_bucket(bucket_name) for file_key in files: output_file = os.path.join(output_dir, file_key) if not os.path.exists(output_file): self.download(bucket_name, file_key, output_file)