# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Google Cloud Datastore/Firestore implementation for SeBS NoSQL storage.
This module provides NoSQL database functionality using Google Cloud Firestore
in Datastore mode. It manages database allocation, table creation, and data
operations for benchmarks requiring NoSQL storage capabilities.
To create databases, we use the gcloud CLI instance since there is no API
that we could access directly.
Classes:
BenchmarkResources: Resource configuration for benchmark databases
Datastore: NoSQL storage implementation using Google Cloud Firestore
Example:
Using Datastore for benchmark NoSQL operations:
datastore = Datastore(cli_instance, cache, resources, region)
table_name = datastore.create_table("benchmark-name", "user-data", "user_id")
datastore.write_to_table("benchmark-name", table_name, data, primary_key, secondary_key)
"""
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
from sebs.cache import Cache
from sebs.faas.config import Resources
from sebs.faas.nosql import NoSQLStorage
from sebs.gcp.cli import GCloudCLI
from google.cloud import datastore
[docs]
@dataclass
class BenchmarkResources:
"""Resource configuration for a benchmark's Datastore database.
Tracks the allocated database name, table kinds, and client instance
for a specific benchmark's NoSQL storage requirements.
Attributes:
database: Name of the Firestore database in Datastore mode
kinds: List of entity kinds (table equivalents) in the database
database_client: Optional Datastore client instance (allocated dynamically)
"""
database: str
kinds: List[str]
# We allocate this dynamically - ignore when caching
database_client: Optional[datastore.Client] = None
[docs]
def serialize(self) -> Dict:
"""Serialize benchmark resources for cache storage.
Returns:
Dictionary containing database name and kinds list
"""
return {"database": self.database, "kinds": self.kinds}
[docs]
@staticmethod
def deserialize(config: Dict) -> "BenchmarkResources":
"""Deserialize benchmark resources from cached configuration.
Args:
config: Dictionary containing cached resource configuration
Returns:
BenchmarkResources instance with database and kinds
"""
return BenchmarkResources(database=config["database"], kinds=config["kinds"])
[docs]
class Datastore(NoSQLStorage):
"""Google Cloud Firestore/Datastore implementation for NoSQL storage.
Provides NoSQL database functionality using Google Cloud Firestore in
Datastore mode. Manages database allocation, entity kind creation, and
data operations for benchmarks requiring NoSQL capabilities.
Attributes:
_cli_instance: gcloud CLI interface for database management
_region: GCP region for database allocation
_benchmark_resources: Mapping of benchmarks to their database resources
"""
[docs]
@staticmethod
def typename() -> str:
"""Get the type name for this NoSQL storage implementation.
Returns:
Type name string for GCP Datastore
"""
return "GCP.Datastore"
[docs]
@staticmethod
def deployment_name() -> str:
"""Get the deployment name for this NoSQL storage implementation.
Returns:
Deployment name string 'gcp'
"""
return "gcp"
def __init__(
self, cli_instance: GCloudCLI, cache_client: Cache, resources: Resources, region: str
) -> None:
"""Initialize Datastore NoSQL storage manager.
Args:
cli_instance: gcloud CLI interface for database operations
cache_client: Cache instance for storing resource state
resources: Resource configuration
region: GCP region for database allocation
"""
super().__init__(region, cache_client, resources)
self._cli_instance = cli_instance
self._region = region
# Mapping: benchmark -> Datastore database
self._benchmark_resources: Dict[str, BenchmarkResources] = {}
[docs]
def get_tables(self, benchmark: str) -> Dict[str, str]:
"""Get table name mappings for a benchmark.
GCP Datastore requires no table mappings as the entity kind name
is the same as the benchmark table name.
Args:
benchmark: Name of the benchmark
Returns:
Empty dictionary (no mappings needed for GCP)
"""
return {}
def _get_table_name(self, benchmark: str, table: str) -> Optional[str]:
"""Get the actual table name for a benchmark table.
In Datastore's case, the table alias is the kind name if it's registered
for the benchmark.
Args:
benchmark: Name of the benchmark
table: Logical table name
Returns:
Table name if it exists in benchmark resources, None otherwise
"""
if benchmark not in self._benchmark_resources:
return None
if table not in self._benchmark_resources[benchmark].kinds:
return None
return table
[docs]
def retrieve_cache(self, benchmark: str) -> bool:
"""Retrieve benchmark resources from cache.
Args:
benchmark: Name of the benchmark to retrieve resources for
Returns:
True if resources were found in cache, False otherwise
"""
if benchmark in self._benchmark_resources:
return True
cached_storage = self.cache_client.get_nosql_config(self.deployment_name(), benchmark)
if cached_storage is not None:
self._benchmark_resources[benchmark] = BenchmarkResources.deserialize(cached_storage)
return True
return False
[docs]
def update_cache(self, benchmark: str) -> None:
"""Update cache with current benchmark resources.
Args:
benchmark: Name of the benchmark to cache resources for
"""
self._cache_client.update_nosql(
self.deployment_name(), benchmark, self._benchmark_resources[benchmark].serialize()
)
[docs]
def benchmark_database(self, benchmark: str) -> str:
"""Get the database name for a benchmark.
Args:
benchmark: Name of the benchmark
Returns:
Database name for the benchmark's NoSQL resources
"""
return self._benchmark_resources[benchmark].database
[docs]
def write_to_table(
self,
benchmark: str,
table: str,
data: Dict,
primary_key: Tuple[str, str],
secondary_key: Optional[Tuple[str, str]] = None,
) -> None:
"""Write data to a Datastore entity kind (table).
Args:
benchmark: Name of the benchmark
table: Name of the table (entity kind)
data: Dictionary of data to write
primary_key: Primary key tuple (name, value)
secondary_key: Secondary key tuple (name, value) - required for GCP
Raises:
AssertionError: If secondary_key is None (required for GCP)
"""
res = self._benchmark_resources[benchmark]
table_name = self._get_table_name(benchmark, table)
# FIXME: support both options
assert secondary_key is not None
if res.database_client is None:
res.database_client = datastore.Client(database=res.database)
parent_key = res.database_client.key(secondary_key[0], secondary_key[1])
key = res.database_client.key(
# kind determines the table
table_name,
# main ID key
secondary_key[1],
# organization key
parent=parent_key,
)
val = datastore.Entity(key=key)
val.update(data)
res.database_client.put(val)
[docs]
def create_table(
self, benchmark: str, name: str, primary_key: str, _: Optional[str] = None
) -> str:
"""Create a new entity kind (table) in Datastore.
Creates a new Firestore database in Datastore mode if needed using gloud CLI.
Datastore kinds are schemaless and created implicitly when an entity of that
kind is first written. This method primarily ensures the database exists and
registers the kind name for the benchmark. The `primary_key` is noted but
not directly used to create schema for the kind itself, as Datastore is schemaless.
Args:
benchmark: Name of the benchmark
name: Name of the entity kind (table) to create
primary_key: Primary key field name
_: Unused parameter for compatibility
Returns:
Name of the created entity kind
Raises:
RuntimeError: If database operations fail
"""
benchmark_resources = self._benchmark_resources.get(benchmark, None)
if benchmark_resources is not None and name in benchmark_resources.kinds:
self.logging.info(f"Using cached Datastore kind {name}")
return name
"""
No data for this benchmark -> we need to allocate a new Datastore database.
"""
if benchmark_resources is None:
database_name = f"sebs-benchmarks-{self._cloud_resources.resources_id}-{benchmark}"
try:
self._cli_instance.execute(
"gcloud firestore databases describe "
f" --database='{database_name}' "
" --format='json'"
)
except RuntimeError as e:
if "NOT_FOUND" in str(e):
"""
Allocate a new Firestore database, in datastore mode
"""
self.logging.info(f"Allocating a new Firestore database {database_name}")
self._cli_instance.execute(
"gcloud firestore databases create "
f" --database='{database_name}' "
f" --location={self.region} "
f" --type='datastore-mode' "
)
self.logging.info(f"Allocated a new Firestore database {database_name}")
else:
self.logging.error("Couldn't query Datastore instances!")
self.logging.error(e)
raise RuntimeError("Couldn't query Datastore instances!")
db_client = datastore.Client(database=database_name)
benchmark_resources = BenchmarkResources(
database=database_name, kinds=[], database_client=db_client
)
self._benchmark_resources[benchmark] = benchmark_resources
benchmark_resources.kinds.append(name)
return name
[docs]
def clear_table(self, name: str) -> str:
"""Clear all entities from a table.
Args:
name: Name of the table to clear
Returns:
Table name
Raises:
NotImplementedError: This method is not yet implemented
"""
raise NotImplementedError()
[docs]
def remove_table(self, name: str) -> str:
"""Remove a table from the database.
Args:
name: Name of the table to remove
Returns:
Table name
Raises:
NotImplementedError: This method is not yet implemented
"""
raise NotImplementedError()