Source code for sebs.gcp.config

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Configuration classes for Google Cloud Platform (GCP) integration.

This module provides configuration classes for GCP,
including credentials management, resource allocation, and cloud region configuration.
It handles authentication through service account JSON files and manages project-specific
settings required for Cloud Functions deployment and execution.

Classes:
    GCPCredentials: Handles authentication and project identification
    GCPResources: Manages allocated cloud resources
    GCPConfig: Main configuration container for GCP deployment

Example:
    Basic GCP configuration setup:

        credentials = GCPCredentials("/path/to/service-account.json")
        resources = GCPResources()
        config = GCPConfig(credentials, resources)
"""
from __future__ import annotations

import json
import os
from typing import cast, Any, Dict, List, Optional, Tuple
import time
from googleapiclient.errors import HttpError

from sebs.cache import Cache
from sebs.faas.config import Config, Credentials, Resources
from sebs.utils import LoggingHandlers


[docs] class GCPCredentials(Credentials): """Credentials manager for Google Cloud Platform authentication. Handles authentication to GCP services using service account JSON files. Automatically extracts project ID from credentials and manages environment variable setup for Google Cloud SDK authentication. The class supports multiple credential sources in priority order: 1. User-provided credentials file path 2. GOOGLE_APPLICATION_CREDENTIALS environment variable 3. GCP_SECRET_APPLICATION_CREDENTIALS environment variable Attributes: _gcp_credentials: Path to the service account JSON file _project_id: GCP project ID extracted from credentials """ def __init__(self, gcp_credentials: str) -> None: """Initialize GCP credentials with service account file. Args: gcp_credentials: Path to the GCP service account JSON file Raises: FileNotFoundError: If the credentials file doesn't exist json.JSONDecodeError: If the credentials file is not valid JSON KeyError: If the credentials file doesn't contain project_id """ super().__init__() self._gcp_credentials = gcp_credentials gcp_data = json.load(open(self._gcp_credentials, "r")) self._project_id = gcp_data["project_id"] @property def gcp_credentials(self) -> str: """Get the path to the GCP service account credentials file. Returns: Path to the service account JSON file """ return self._gcp_credentials @property def project_name(self) -> str: """Get the GCP project ID from the credentials. Returns: The GCP project ID string """ return self._project_id
[docs] @staticmethod def initialize(gcp_credentials: str) -> "GCPCredentials": """Create a new GCPCredentials instance. Args: gcp_credentials: Path to the GCP service account JSON file Returns: A new GCPCredentials instance """ return GCPCredentials(gcp_credentials)
[docs] @staticmethod def deserialize(config: Dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: """Deserialize GCP credentials from configuration and cache. Loads credentials from multiple sources in priority order: 1. User-provided config with credentials-json path 2. GOOGLE_APPLICATION_CREDENTIALS environment variable 3. GCP_SECRET_APPLICATION_CREDENTIALS environment variable Sets the `GOOGLE_APPLICATION_CREDENTIALS` environment variable if credentials are loaded from SeBS config or SeBS-specific environment variables. Args: config: Configuration dictionary potentially containing credentials cache: Cache instance for storing/retrieving credentials handlers: Logging handlers for error reporting Returns: Initialized GCPCredentials instance Raises: RuntimeError: If no valid credentials are found or if project ID mismatch occurs between cache and new credentials """ cached_config = cache.get_config("gcp") ret: GCPCredentials project_id: Optional[str] = None # Load cached values if cached_config and "credentials" in cached_config: project_id = cached_config["credentials"]["project_id"] # Check for new config if "credentials" in config and "credentials-json" in config["credentials"]: ret = GCPCredentials.initialize(config["credentials"]["credentials-json"]) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ret.gcp_credentials # Look for default GCP credentials elif "GOOGLE_APPLICATION_CREDENTIALS" in os.environ: ret = GCPCredentials(os.environ["GOOGLE_APPLICATION_CREDENTIALS"]) # Look for our environment variables elif "GCP_SECRET_APPLICATION_CREDENTIALS" in os.environ: ret = GCPCredentials(os.environ["GCP_SECRET_APPLICATION_CREDENTIALS"]) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ret.gcp_credentials else: raise RuntimeError( "GCP login credentials are missing! Please set the path to .json " "with cloud credentials in config or in the GCP_SECRET_APPLICATION_CREDENTIALS " "environmental variable" ) ret.logging_handlers = handlers if project_id is not None and project_id != ret._project_id: ret.logging.error( f"The project id {ret._project_id} from provided " f"credentials is different from the ID {project_id} in the cache! " "Please change your cache directory or create a new one!" ) raise RuntimeError( f"GCP login credentials do not match the project {project_id} in cache!" ) return ret
[docs] def serialize(self) -> Dict: """Serialize credentials to dictionary for cache storage. Only stores the project_id, as the path to credentials might change or be environment-dependent. It also avoids any potential security issues. Returns: Dictionary containing project_id for cache storage """ out = {"project_id": self._project_id} return out
[docs] def update_cache(self, cache: Cache) -> None: """Update the cache with current GCP project id. Args: cache: Cache instance to update with project ID """ cache.update_config(val=self._project_id, keys=["gcp", "credentials", "project_id"])
[docs] class GCPFunctionGen1Config: """Configuration for Cloud Functions Gen1 deployments.""" def __init__(self, min_instances: int = 0, max_instances: int = 20): """Initialize Gen1 scaling settings. Args: min_instances: Minimum number of instances to keep warm. max_instances: Maximum number of instances allowed. """ self.min_instances = min_instances self.max_instances = max_instances
[docs] def serialize(self) -> Dict: """Serialize the Gen1 configuration. Returns: Dictionary containing the serialized Gen1 settings. """ return {"min-instances": self.min_instances, "max-instances": self.max_instances}
[docs] @staticmethod def deserialize(dct: Dict) -> "GCPFunctionGen1Config": """Deserialize a Gen1 configuration. Args: dct: Serialized Gen1 configuration. Returns: Deserialized Gen1 configuration object. """ return GCPFunctionGen1Config( min_instances=dct.get("min-instances", 0), max_instances=dct.get("max-instances", 20) )
def __eq__(self, other: object) -> bool: """Compare two Gen1 configurations. Args: other: Configuration object to compare against. Returns: True if both objects have the same Gen1 settings, otherwise False. """ if not isinstance(other, GCPFunctionGen1Config): return False return ( self.min_instances == other.min_instances and self.max_instances == other.max_instances )
[docs] class GCPFunctionGen2Config: """Configuration for Cloud Functions Gen2 deployments.""" def __init__( self, vcpus: int, gcp_concurrency: int, worker_concurrency: int, worker_threads: int, min_instances: int, max_instances: int, cpu_boost: bool, cpu_throttle: bool, ): """Initialize Gen2 runtime and scaling settings. Args: vcpus: Number of virtual CPUs assigned to the function. gcp_concurrency: Cloud Functions concurrency setting. worker_concurrency: Worker count used by the runtime server. worker_threads: Thread count used by the runtime server. min_instances: Minimum number of warm instances. max_instances: Maximum number of instances allowed. cpu_boost: Whether startup CPU boost is enabled. cpu_throttle: Whether CPU throttling is enabled when idle. """ self.vcpus = vcpus self.gcp_concurrency = gcp_concurrency self.worker_concurrency = worker_concurrency self.worker_threads = worker_threads self.min_instances = min_instances self.max_instances = max_instances self.cpu_boost = cpu_boost self.cpu_throttle = cpu_throttle
[docs] def serialize(self) -> Dict: """Serialize the Gen2 configuration. Returns: Dictionary containing the serialized Gen2 settings. """ return { "vcpus": self.vcpus, "gcp-concurrency": self.gcp_concurrency, "worker-concurrency": self.worker_concurrency, "worker-threads": self.worker_threads, "min-instances": self.min_instances, "max-instances": self.max_instances, "cpu-boost": self.cpu_boost, "cpu-throttle": self.cpu_throttle, }
[docs] @staticmethod def deserialize(dct: Dict) -> GCPFunctionGen2Config: """Deserialize a Gen2 configuration. Args: dct: Serialized Gen2 configuration. Returns: Deserialized Gen2 configuration object. """ return GCPFunctionGen2Config( vcpus=dct["vcpus"], gcp_concurrency=dct["gcp-concurrency"], worker_concurrency=dct["worker-concurrency"], worker_threads=dct["worker-threads"], min_instances=dct["min-instances"], max_instances=dct["max-instances"], cpu_boost=dct["cpu-boost"], cpu_throttle=dct["cpu-throttle"], )
def __eq__(self, other: object) -> bool: """Compare two Gen2 configurations. Args: other: Configuration object to compare against. Returns: True if both objects have the same Gen2 settings, otherwise False. """ if not isinstance(other, GCPFunctionGen2Config): return False return ( self.vcpus == other.vcpus and self.gcp_concurrency == other.gcp_concurrency and self.worker_concurrency == other.worker_concurrency and self.worker_threads == other.worker_threads and self.min_instances == other.min_instances and self.max_instances == other.max_instances and self.cpu_boost == other.cpu_boost and self.cpu_throttle == other.cpu_throttle )
[docs] class GCPContainerConfig(GCPFunctionGen2Config): """Configuration for Cloud Run container deployments.""" def __init__( self, environment: str, vcpus: int, gcp_concurrency: int, worker_concurrency: int, worker_threads: int, min_instances: int, max_instances: int, cpu_boost: bool, cpu_throttle: bool, ): """Initialize Cloud Run container settings. Args: environment: Cloud Run execution environment name. vcpus: Number of virtual CPUs assigned to the container. gcp_concurrency: Cloud Run request concurrency limit. worker_concurrency: Worker count used by the runtime server. worker_threads: Thread count used by the runtime server. min_instances: Minimum number of warm instances. max_instances: Maximum number of instances allowed. cpu_boost: Whether startup CPU boost is enabled. cpu_throttle: Whether CPU throttling is enabled when idle. """ super().__init__( vcpus, gcp_concurrency, worker_concurrency, worker_threads, min_instances, max_instances, cpu_boost, cpu_throttle, ) self.environment = environment
[docs] def serialize(self) -> Dict: """Serialize the container configuration. Returns: Dictionary containing the serialized container settings. """ return {**super().serialize(), "environment": self.environment}
[docs] @staticmethod def deserialize(dct: Dict) -> GCPContainerConfig: """Deserialize a container configuration. Args: dct: Serialized container configuration. Returns: Deserialized container configuration object. """ return GCPContainerConfig( environment=dct["environment"], vcpus=dct["vcpus"], gcp_concurrency=dct["gcp-concurrency"], worker_concurrency=dct["worker-concurrency"], worker_threads=dct["worker-threads"], min_instances=dct["min-instances"], max_instances=dct["max-instances"], cpu_boost=dct["cpu-boost"], cpu_throttle=dct["cpu-throttle"], )
def __eq__(self, other: object) -> bool: """Compare two container configurations. Args: other: Configuration object to compare against. Returns: True if both objects have the same container settings, otherwise False. """ if not isinstance(other, GCPContainerConfig): return False return ( self.environment == other.environment and self.vcpus == other.vcpus and self.gcp_concurrency == other.gcp_concurrency and self.worker_concurrency == other.worker_concurrency and self.worker_threads == other.worker_threads and self.min_instances == other.min_instances and self.max_instances == other.max_instances and self.cpu_boost == other.cpu_boost and self.cpu_throttle == other.cpu_throttle )
[docs] class GCPConfiguration: """User-provided configuration of workloads on the GCP. Currently, this class primarily inherits functionality from the base `Resources` class, as we do not need more GCP-specific resources beyond standard storage buckets. Attributes: Inherits all attributes from the base Resources class """ def __init__(self) -> None: """Initialize default GCP deployment configuration.""" self._function_gen1_config: GCPFunctionGen1Config self._function_gen2_config: GCPFunctionGen2Config self._container_config: GCPContainerConfig
[docs] @staticmethod def initialize(config: GCPConfiguration, dct: Dict) -> GCPConfiguration: """Populate a configuration object from serialized data. Args: config: Configuration object to populate. dct: Serialized deployment configuration. Returns: The populated configuration object. """ config._function_gen1_config = GCPFunctionGen1Config.deserialize(dct["function-gen1"]) config._function_gen2_config = GCPFunctionGen2Config.deserialize(dct["function-gen2"]) config._container_config = GCPContainerConfig.deserialize(dct["container"]) return config
[docs] def serialize(self) -> Dict: """Serialize resources to dictionary for cache storage. Returns: Dictionary representation of resources for cache storage """ out: Dict[str, Any] = {} out["function-gen1"] = self._function_gen1_config.serialize() out["function-gen2"] = self._function_gen2_config.serialize() out["container"] = self._container_config.serialize() return out
@property def function_gen1_config(self) -> GCPFunctionGen1Config: """Get the Gen1 deployment configuration. Returns: Gen1 deployment configuration object. """ return self._function_gen1_config @property def function_gen2_config(self) -> GCPFunctionGen2Config: """Get the Gen2 deployment configuration. Returns: Gen2 deployment configuration object. """ return self._function_gen2_config @property def container_config(self) -> GCPContainerConfig: """Get the Cloud Run container configuration. Returns: Cloud Run container deployment configuration object. """ return self._container_config
[docs] class GCPResources(Resources): """Resource manager for serverless resources on Google Cloud Platform. Currently, this class primarily inherits functionality from the base `Resources` class, as we do not need more GCP-specific resources beyond standard storage buckets. Attributes: Inherits all attributes from the base Resources class """ def __init__(self) -> None: """Initialize GCP resources manager.""" super().__init__(name="gcp") self._container_repository: Optional[str] = None
[docs] @staticmethod def initialize(res: Resources, dct: Dict) -> "GCPResources": """Initialize GCP resources from a dictionary configuration. Args: res: Base Resources instance to initialize dct: Dictionary containing resource configuration Returns: Initialized GCPResources instance """ ret = cast(GCPResources, res) super(GCPResources, GCPResources).initialize(ret, dct) return ret
[docs] def serialize(self) -> Dict: """Serialize resources to dictionary for cache storage. Returns: Dictionary representation of resources for cache storage """ out = super().serialize() out["container_repository"] = self._container_repository return out
[docs] @staticmethod def deserialize(config: Dict, cache: Cache, handlers: LoggingHandlers) -> "Resources": """Deserialize GCP resources from configuration and cache. Loads resources from cache if available, otherwise initializes from user configuration or creates empty resource set. Args: config: Configuration dictionary potentially containing resources cache: Cache instance for storing/retrieving resources handlers: Logging handlers for status reporting Returns: Initialized GCPResources instance """ cached_config = cache.get_config("gcp") ret = GCPResources() if cached_config and "resources" in cached_config: GCPResources.initialize(ret, cached_config["resources"]) ret.logging_handlers = handlers ret.logging.info("Using cached resources for GCP") else: if "resources" in config: GCPResources.initialize(ret, config["resources"]) ret.logging_handlers = handlers ret.logging.info("No cached resources for GCP found, using user configuration.") else: GCPResources.initialize(ret, {}) ret.logging_handlers = handlers ret.logging.info("No resources for GCP found, initialize!") return ret
[docs] def update_cache(self, cache: Cache) -> None: """Update the cache with current resource information. Args: cache: Cache instance to update with resource data """ super().update_cache(cache)
@property def container_repository(self) -> str: """Get the Artifact Registry repository name for containers.""" assert self._container_repository is not None, "Container repository has not been set yet!" return self._container_repository
[docs] def check_container_repository_exists(self, config: Config, ar_client): """Check whether the configured container repository exists.""" try: credentials = cast(GCPCredentials, config.credentials) parent = f"projects/{credentials.project_name}/locations/{config.region}" repo_full_name = f"{parent}/repositories/{self._container_repository}" self.logging.info("Checking if container repository exists...") ar_client.projects().locations().repositories().get(name=repo_full_name).execute() return True except HttpError as e: if e.resp.status == 404: self.logging.info("Container repository does not exist.") return False else: raise e
[docs] def create_container_repository(self, ar_client, parent): """Create the Artifact Registry repository for benchmark containers.""" request_body = {"format": "DOCKER", "description": "Container repository for SEBS"} self._container_repository = f"sebs-benchmarks-{self._resources_id}" operation = ( ar_client.projects() .locations() .repositories() .create(parent=parent, body=request_body, repositoryId=self._container_repository) .execute() ) attempt = 0 max_retries = 10 while attempt < max_retries: # Operations for AR are global or location specific op_name = operation["name"] op = ar_client.projects().locations().operations().get(name=op_name).execute() if op.get("done"): if "error" in op: raise Exception(f"Failed to create repo: {op['error']}") self.logging.info("Repository created successfully.") break time.sleep(2) attempt += 1 if attempt == max_retries: raise TimeoutError("Timed out waiting for container repository creation.")
[docs] def get_container_repository(self, config: Config, ar_client): """Get or create the Artifact Registry repository for container images.""" if self._container_repository is not None: return self._container_repository self._container_repository = f"sebs-benchmarks-{self._resources_id}" if self.check_container_repository_exists(config, ar_client): return self._container_repository credentials = cast(GCPCredentials, config.credentials) parent = f"projects/{credentials.project_name}/locations/{config.region}" self.create_container_repository(ar_client, parent) return self._container_repository
[docs] class GCPConfig(Config): """Main configuration class for Google Cloud Platform deployment. Combines credentials and resources into a complete configuration for GCP serverless function deployment. Manages cloud region settings, authentication, and resource allocation for the benchmarking suite. This class handles serialization/deserialization for cache persistence and provides validation for configuration consistency across sessions. Attributes: _project_name: GCP project identifier _region: GCP region for resource deployment _credentials: GCP authentication credentials _resources: Allocated GCP resources """ _project_name: str def __init__(self, credentials: GCPCredentials, resources: GCPResources) -> None: """Initialize GCP configuration with credentials and resources. Args: credentials: GCP authentication credentials resources: GCP resource allocation settings """ super().__init__(name="gcp") self._credentials = credentials self._resources = resources self._deployment_config = GCPConfiguration() @property def region(self) -> str: """Get the GCP region for resource deployment. Returns: GCP region identifier (e.g., 'us-central1') """ return self._region @property def project_name(self) -> str: """Get the GCP project name from credentials. Returns: GCP project identifier string """ return self.credentials.project_name @property def credentials(self) -> GCPCredentials: """Get the GCP credentials instance. Returns: GCP authentication credentials """ return self._credentials @property def resources(self) -> GCPResources: """Get the GCP resources instance. Returns: GCP resource allocation settings """ return self._resources @property def deployment_config(self) -> GCPConfiguration: """Get the deployment configuration for GCP workloads. Returns: GCPConfiguration instance containing workload deployment settings """ return self._deployment_config
[docs] @staticmethod def deserialize(config: Dict, cache: Cache, handlers: LoggingHandlers) -> "Config": """Deserialize GCP configuration from dictionary and cache. Loads complete GCP configuration including credentials and resources. Validates consistency between cached and provided configuration values, updating cache with new user-provided values when they differ. Args: config: Configuration dictionary with GCP settings cache: Cache instance for storing/retrieving configuration handlers: Logging handlers for status reporting Returns: Initialized GCPConfig instance """ cached_config = cache.get_config("gcp") credentials = cast(GCPCredentials, GCPCredentials.deserialize(config, cache, handlers)) resources = cast(GCPResources, GCPResources.deserialize(config, cache, handlers)) config_obj = GCPConfig(credentials, resources) config_obj.logging_handlers = handlers if cached_config: config_obj.logging.info("Loading cached config for GCP") GCPConfig.initialize(config_obj, cached_config) else: config_obj.logging.info("Using user-provided config for GCP") GCPConfig.initialize(config_obj, config) # deployment configuration is never loaded from cache - always fresh! if "configuration" in config: GCPConfiguration.initialize(config_obj.deployment_config, config["configuration"]) # mypy makes a mistake here updated_keys: List[Tuple[str, List[str]]] = [("region", ["gcp", "region"])] # type: ignore # for each attribute here, check if its version is different than the one provided by # user; if yes, then update the value for config_key, keys in updated_keys: old_value = getattr(config_obj, config_key) # ignore empty values if getattr(config_obj, config_key) != config[config_key] and config[config_key]: config_obj.logging.info( f"Updating cached key {config_key} with {old_value} " f"to user-provided value {config[config_key]}." ) setattr(config_obj, f"_{config_key}", config[config_key]) cache.update_config(val=getattr(config_obj, config_key), keys=keys) return config_obj
[docs] @staticmethod def initialize(cfg: Config, dct: Dict) -> None: """Initialize GCP configuration from dictionary. Args: cfg: Config instance to initialize (will be cast to GCPConfig) dct: Dictionary containing configuration values including region """ config = cast(GCPConfig, cfg) config._region = dct["region"]
[docs] def serialize(self) -> Dict: """Serialize configuration to dictionary for cache storage. Returns: Dictionary containing complete GCP configuration including name, region, credentials, and resources """ out = { "name": "gcp", "region": self._region, "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), } return out
[docs] def update_cache(self, cache: Cache) -> None: """Update cache with current configuration values. Updates region, credentials, and resources in the cache. Args: cache: Cache instance to update with configuration data """ cache.update_config(val=self.region, keys=["gcp", "region"]) self.credentials.update_cache(cache) self.resources.update_cache(cache)