# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""ScyllaDB NoSQL storage implementation for the Serverless Benchmarking Suite.
This module implements NoSQL database storage using ScyllaDB, which provides a
DynamoDB-compatible API through its Alternator interface. ScyllaDB runs in a
Docker container, and the implementation uses boto3 while running locally
for development and testing purposes.
"""
import json
import platform
import time
from collections import defaultdict
from typing import Any, Dict, Optional, Tuple, Type, TypeVar
import boto3
from boto3.dynamodb.types import TypeSerializer
import docker
import docker.models.containers
from sebs.cache import Cache
from sebs.faas.config import Resources
from sebs.faas.nosql import NoSQLStorage
from sebs.sebs_types import NoSQLStorage as StorageType
from sebs.storage.config import ScyllaDBConfig
[docs]
class ScyllaDB(NoSQLStorage):
"""ScyllaDB implementation for DynamoDB-compatible NoSQL storage.
This class manages a ScyllaDB instance running in a Docker container,
providing DynamoDB-compatible NoSQL storage through ScyllaDB's Alternator
interface. It handles table creation, data operations, and container
lifecycle management.
Attributes:
_docker_client: Docker client for container management
_storage_container: Docker container running ScyllaDB
_cfg: ScyllaDB configuration settings
_tables: Mapping of benchmark names to table mappings
_serializer: DynamoDB type serializer for data conversion
client: Boto3 DynamoDB client configured for ScyllaDB
"""
[docs]
@staticmethod
def typename() -> str:
"""Get the qualified type name of this class.
Returns:
str: Full type name including deployment name
"""
return f"{ScyllaDB.deployment_name()}.ScyllaDB"
[docs]
@staticmethod
def deployment_name() -> str:
"""Get the deployment platform name.
Returns:
str: Deployment name ('scylladb')
"""
return "scylladb"
@property
def config(self) -> ScyllaDBConfig:
"""Get the ScyllaDB configuration.
Returns:
ScyllaDBConfig: The configuration object
"""
return self._cfg
# The region setting is required by DynamoDB API but not used for local ScyllaDB
SCYLLADB_REGION = "None"
def __init__(
self,
docker_client: docker.DockerClient,
cache_client: Cache,
config: ScyllaDBConfig,
resources: Optional[Resources] = None,
):
"""Initialize a ScyllaDB storage instance.
It will initialize a boto3 client if the ScyllaDB
address is provided in the configuration.
Args:
docker_client: Docker client for managing the ScyllaDB container
cache_client: Cache client for storing storage configuration
config: ScyllaDB configuration settings
resources: Resources configuration (optional)
"""
super().__init__(self.SCYLLADB_REGION, cache_client, resources) # type: ignore
self._docker_client = docker_client
self._storage_container: Optional[docker.models.containers.Container] = None
self._cfg = config
# Map benchmark -> orig_name -> table_name
self._tables: Dict[str, Dict[str, str]] = defaultdict(dict)
self._serializer = TypeSerializer()
if config.address != "":
self.client = boto3.client(
"dynamodb",
region_name="None",
aws_access_key_id="None",
aws_secret_access_key="None",
endpoint_url=f"http://{config.address}",
)
[docs]
def start(self) -> None:
"""Start a ScyllaDB storage container.
Creates and runs a Docker container with ScyllaDB, configuring it with
the specified CPU and memory resources. The container runs in detached
mode and exposes the Alternator DynamoDB-compatible API on the configured port.
The method waits for ScyllaDB to fully initialize by checking the nodetool
status until the service is ready.
Raises:
RuntimeError: If starting the ScyllaDB container fails or if ScyllaDB
fails to initialize within the timeout period
"""
# We replaced the default creation of a data volume on a local filesystem.
# ScyllaDB cannot work inside container as a non-root user.
# Unfortunately, this results in local directories owned by root.
# We're changing to use named Docker volumes instead.
# ScyllaDB issue: https://github.com/scylladb/scylladb/issues/16253
#
#
# if self._cfg.data_volume == "":
# scylladb_volume = os.path.join(os.getcwd(), "scylladb-volume")
# else:
# scylladb_volume = self._cfg.data_volume
# scylladb_volume = os.path.abspath(scylladb_volume)
# os.makedirs(scylladb_volume, exist_ok=True)
scylladb_volume = self._cfg.data_volume
if scylladb_volume == "":
scylladb_volume = "scylladb-volume"
volumes = {
scylladb_volume: {
"bind": "/var/lib/scylla/",
"mode": "rw",
}
}
try:
scylladb_args = ""
scylladb_args += f"--smp {self._cfg.cpus} "
scylladb_args += f"--memory {self._cfg.memory}M "
scylladb_args += "--overprovisioned 1 "
scylladb_args += "--alternator-port 8000 "
scylladb_args += "--alternator-write-isolation=only_rmw_uses_lwt "
self.logging.info("Starting ScyllaDB storage")
self._storage_container = self._docker_client.containers.run(
f"scylladb/scylla:{self._cfg.version}",
command=scylladb_args,
name="some-scylla",
hostname="some-scylla",
network_mode="bridge",
volumes=volumes,
ports={"8000": self._cfg.mapped_port},
remove=self.config.remove_containers,
stdout=True,
stderr=True,
detach=True,
)
assert self._storage_container.id is not None
self._cfg.instance_id = self._storage_container.id
# Wait until it boots up
attempts = 0
max_attempts = 30
while attempts < max_attempts:
exit_code, out = self._storage_container.exec_run("nodetool status")
if exit_code == 0:
self.logging.info("Started ScyllaDB successfully!")
break
time.sleep(1.0)
attempts += 1
if attempts == max_attempts:
self.logging.error("Failed to launch ScyllaDB!")
# exec_run without stream=True always returns bytes
assert isinstance(out, bytes)
self.logging.error(f"Last result of nodetool status: {out.decode('utf-8')}")
raise RuntimeError("Failed to launch ScyllaDB!")
self.configure_connection()
except docker.errors.APIError as e:
self.logging.error("Starting ScyllaDB storage failed! Reason: {}".format(e))
raise RuntimeError("Starting ScyllaDB storage unsuccessful")
except Exception as e:
self.logging.error("Starting ScyllaDB storage failed! Unknown error: {}".format(e))
raise RuntimeError("Starting ScyllaDB storage unsuccessful") from None
[docs]
def stop(self) -> None:
"""Stop the ScyllaDB container.
Gracefully stops the running ScyllaDB container if it exists.
"""
if self._storage_container is not None:
self.logging.info(f"Stopping ScyllaDB container at {self._cfg.address}.")
self._storage_container.stop()
self.logging.info(f"Stopped ScyllaDB container at {self._cfg.address}.")
else:
self.logging.error("Stopping ScyllaDB was not successful, storage container not known!")
[docs]
def envs(self) -> Dict[str, str]:
"""Generate environment variables for ScyllaDB configuration.
Creates environment variables that can be used by benchmark functions
to connect to the ScyllaDB storage instance.
Returns:
Dict[str, str]: Environment variables for ScyllaDB connection
"""
return {"NOSQL_STORAGE_TYPE": "scylladb", "NOSQL_STORAGE_ENDPOINT": self._cfg.address}
[docs]
def serialize(self) -> Tuple[StorageType, Dict[str, Any]]:
"""Serialize ScyllaDB configuration to a tuple.
Returns:
Tuple[StorageType, Dict[str, Any]]: Storage type and serialized configuration
"""
return StorageType.SCYLLADB, self._cfg.serialize()
T = TypeVar("T", bound="ScyllaDB")
@staticmethod
def _deserialize(
cached_config: ScyllaDBConfig, cache_client: Cache, resources: Resources, obj_type: Type[T]
) -> T:
"""Deserialize a ScyllaDB instance from cached configuration with custom type.
Creates a new instance of the specified class type from cached configuration
data. This allows platform-specific versions to be deserialized correctly
while sharing the core implementation.
FIXME: is this still needed? It looks like we stopped using
platform-specific implementations.
Args:
cached_config: Cached ScyllaDB configuration
cache_client: Cache client
resources: Resources configuration
obj_type: Type of object to create (a ScyllaDB subclass)
Returns:
T: Deserialized instance of the specified type
Raises:
RuntimeError: If the storage container does not exist
"""
docker_client = docker.from_env()
obj = obj_type(docker_client, cache_client, cached_config, resources)
if cached_config.instance_id:
instance_id = cached_config.instance_id
try:
obj._storage_container = docker_client.containers.get(instance_id)
except docker.errors.NotFound:
raise RuntimeError(f"Storage container {instance_id} does not exist!")
else:
obj._storage_container = None
return obj
[docs]
@staticmethod
def deserialize(
cached_config: ScyllaDBConfig, cache_client: Cache, resources: Resources
) -> "ScyllaDB":
"""Deserialize a ScyllaDB instance from cached configuration.
Creates a new ScyllaDB instance from cached configuration data.
Args:
cached_config: Cached ScyllaDB configuration
cache_client: Cache client
resources: Resources configuration
Returns:
ScyllaDB: Deserialized ScyllaDB instance
"""
return ScyllaDB._deserialize(cached_config, cache_client, resources, ScyllaDB)
[docs]
def retrieve_cache(self, benchmark: str) -> bool:
"""Retrieve cached table configuration for a benchmark.
Checks if table configuration for the given benchmark is already loaded
in memory, and if not, attempts to load it from the cache.
Args:
benchmark: Name of the benchmark
Returns:
bool: True if table configuration was found, False otherwise
"""
if benchmark in self._tables:
return True
cached_storage = self.cache_client.get_nosql_config(self.deployment_name(), benchmark)
if cached_storage is not None:
self._tables[benchmark] = cached_storage["tables"]
return True
return False
[docs]
def update_cache(self, benchmark: str) -> None:
"""Update the cache with table configuration for a benchmark.
Stores the table configuration for the specified benchmark in the cache
for future retrieval.
Args:
benchmark: Name of the benchmark
"""
self._cache_client.update_nosql(
self.deployment_name(),
benchmark,
{
"tables": self._tables[benchmark],
},
)
[docs]
def get_tables(self, benchmark: str) -> Dict[str, str]:
"""Get the table name mappings for a benchmark.
Args:
benchmark: Name of the benchmark
Returns:
Dict[str, str]: Mapping from original table names to actual table names
"""
return self._tables[benchmark]
def _get_table_name(self, benchmark: str, table: str) -> Optional[str]:
"""Get the actual table name for a benchmark's logical table name.
Args:
benchmark: Name of the benchmark
table: Logical table name
Returns:
Optional[str]: Actual table name or None if not found
"""
if benchmark not in self._tables:
return None
if table not in self._tables[benchmark]:
return None
return self._tables[benchmark][table]
[docs]
def write_to_table(
self,
benchmark: str,
table: str,
data: Dict[str, Any],
primary_key: Tuple[str, str],
secondary_key: Optional[Tuple[str, str]] = None,
) -> None:
"""Write data to a DynamoDB table in ScyllaDB.
Serializes the data using DynamoDB type serialization and writes it
to the specified table with the provided primary and optional secondary keys.
Args:
benchmark: Name of the benchmark
table: Logical table name
data: Data to write to the table
primary_key: Tuple of (key_name, key_value) for the primary key
secondary_key: Optional tuple of (key_name, key_value) for the secondary key
Raises:
AssertionError: If the table name is not found
"""
table_name = self._get_table_name(benchmark, table)
assert table_name is not None
for key in (primary_key, secondary_key):
if key is not None:
data[key[0]] = key[1]
serialized_data = {k: self._serializer.serialize(v) for k, v in data.items()}
self.client.put_item(TableName=table_name, Item=serialized_data)
[docs]
def create_table(
self, benchmark: str, name: str, primary_key: str, secondary_key: Optional[str] = None
) -> str:
"""Create a DynamoDB table in ScyllaDB.
Creates a new DynamoDB table with the specified primary key and optional
secondary key. The table name is constructed to be unique across benchmarks
and resource groups.
Note: Unlike cloud providers with hierarchical database structures,
ScyllaDB requires unique table names at the cluster level.
Note: PAY_PER_REQUEST billing mode has no effect here.
Args:
benchmark: Name of the benchmark
name: Logical table name
primary_key: Name of the primary key attribute
secondary_key: Optional name of the secondary key attribute
Returns:
str: The actual table name that was created
Raises:
RuntimeError: If table creation fails for unknown reasons
"""
table_name = f"sebs-benchmarks-{self._cloud_resources.resources_id}-{benchmark}-{name}"
try:
definitions = [{"AttributeName": primary_key, "AttributeType": "S"}]
key_schema = [{"AttributeName": primary_key, "KeyType": "HASH"}]
if secondary_key is not None:
definitions.append({"AttributeName": secondary_key, "AttributeType": "S"})
key_schema.append({"AttributeName": secondary_key, "KeyType": "RANGE"})
ret = self.client.create_table(
TableName=table_name,
BillingMode="PAY_PER_REQUEST",
AttributeDefinitions=definitions, # type: ignore
KeySchema=key_schema, # type: ignore
)
if ret["TableDescription"]["TableStatus"] == "CREATING":
self.logging.info(f"Waiting for creation of DynamoDB table {name}")
waiter = self.client.get_waiter("table_exists")
waiter.wait(TableName=name)
self.logging.info(f"Created DynamoDB table {name} for benchmark {benchmark}")
self._tables[benchmark][name] = table_name
return ret["TableDescription"]["TableName"]
except self.client.exceptions.ResourceInUseException as e:
if "already exists" in e.response["Error"]["Message"]:
self.logging.info(
f"Using existing DynamoDB table {table_name} for benchmark {benchmark}"
)
self._tables[benchmark][name] = table_name
return name
raise RuntimeError(f"Creating DynamoDB failed, unknown reason! Error: {e}")
[docs]
def clear_table(self, name: str) -> str:
"""Clear all data from a table.
Args:
name: Name of the table to clear
Returns:
str: Table name
Raises:
NotImplementedError: This method is not yet implemented
"""
raise NotImplementedError()
[docs]
def remove_table(self, name: str) -> str:
"""Remove a table completely.
Args:
name: Name of the table to remove
Returns:
str: Table name
Raises:
NotImplementedError: This method is not yet implemented
"""
raise NotImplementedError()