Source code for sebs.gcp.gcp

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Google Cloud Platform (GCP) serverless system implementation.

This module provides the main GCP implementation with function deployment, management,
monitoring, and resource allocation. It integrates with Google Cloud Functions,
Cloud Storage, Cloud Monitoring, and Cloud Logging.

The module handles:
- Function creation, updating, and lifecycle management
- Code packaging and deployment to Cloud Functions
- HTTP and library trigger management
- Performance metrics collection via Cloud Monitoring
- Execution logs retrieval via Cloud Logging
- Cold start enforcement for benchmarking
- Storage bucket management for code deployment

Classes:
    GCP: Main system class implementing the FaaS System interface

Example:
    Basic GCP system initialization:

        config = GCPConfig(credentials, resources)
        gcp_system = GCP(system_config, config, cache, docker_client, logging_handlers)
        gcp_system.initialize()
"""

import docker
import os
import logging
import re
import shutil
import time
import math
import zipfile
from datetime import datetime, timezone
from typing import cast, Dict, Optional, Tuple, List, Type

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import google.cloud.monitoring_v3 as monitoring_v3
from google.cloud.devtools import cloudbuild_v1

from sebs.cache import Cache
from sebs.config import SeBSConfig
from sebs.benchmark import Benchmark
from sebs.faas.function import Function, FunctionConfig, Trigger
from sebs.faas.config import Resources
from sebs.faas.system import System
from sebs.gcp.config import GCPConfig
from sebs.gcp.resources import GCPSystemResources
from sebs.gcp.storage import GCPStorage
from sebs.gcp.function import GCPFunction
from sebs.utils import LoggingHandlers
from sebs.types import Language


[docs] class GCP(System): """Google Cloud Platform serverless system implementation. Provides complete integration with Google Cloud Functions including deployment, monitoring, logging, and resource management. Handles code packaging, function lifecycle management, trigger creation, and performance metrics collection. Attributes: _config: GCP-specific configuration including credentials and region function_client: Google Cloud Functions API client cold_start_counter: Counter for enforcing cold starts in benchmarking logging_handlers: Logging configuration for status reporting """ def __init__( self, system_config: SeBSConfig, config: GCPConfig, cache_client: Cache, docker_client: docker.client.DockerClient, logging_handlers: LoggingHandlers, ) -> None: """Initialize GCP serverless system. Args: system_config: General SeBS system configuration config: GCP-specific configuration with credentials and settings cache_client: Cache instance for storing function and resource state docker_client: Docker client for container operations (if needed) logging_handlers: Logging configuration for status reporting """ super().__init__( system_config, cache_client, docker_client, GCPSystemResources( system_config, config, cache_client, docker_client, logging_handlers ), ) self._config = config self.logging_handlers = logging_handlers @property def config(self) -> GCPConfig: """Get the GCP configuration instance. Returns: GCP configuration with credentials and region settings """ return self._config
[docs] @staticmethod def name() -> str: """Get the platform name identifier. Returns: Platform name string 'gcp' """ return "gcp"
[docs] @staticmethod def typename() -> str: """Get the platform type name for display. Returns: Platform type string 'GCP' """ return "GCP"
[docs] @staticmethod def function_type() -> "Type[Function]": """Get the function class type for this platform. Returns: GCPFunction class type """ return GCPFunction
[docs] def initialize( self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None ) -> None: """Initialize the GCP system for function deployment and management. Sets up the Cloud Functions API client and initializes system resources including storage buckets and other required infrastructure. After this call, the GCP system should be ready to allocate functions, manage storage, and invoke functions. Args: config: Additional system-specific configuration parameters resource_prefix: Optional prefix for resource naming to avoid conflicts """ self.function_client = build("cloudfunctions", "v1", cache_discovery=False) self.initialize_resources(select_prefix=resource_prefix)
[docs] def get_function_client(self): """Get the Google Cloud Functions API client. The client is initialized during the `initialize` call. Returns: Initialized Cloud Functions API client """ return self.function_client
[docs] def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None ) -> str: """Generate a default function name for the given benchmark. Creates a standardized function name using resource ID, benchmark name, language, and version information. Formats the name according to GCP Cloud Functions naming requirements. Args: code_package: Benchmark package containing metadata resources: Optional resource configuration for ID generation Returns: Formatted function name suitable for GCP Cloud Functions """ # Create function name resource_id = resources.resources_id if resources else self.config.resources.resources_id func_name = "sebs-{}-{}-{}-{}".format( resource_id, code_package.benchmark, code_package.language_name, code_package.language_version, ) return GCP.format_function_name(func_name)
[docs] @staticmethod def format_function_name(func_name: str) -> str: """Format function name according to GCP Cloud Functions requirements. Converts function names to comply with GCP naming rules by replacing hyphens and dots with underscores. GCP functions must begin with a letter and can only contain letters, numbers, and underscores. Args: func_name: Raw function name to format Returns: GCP-compliant function name """ # GCP functions must begin with a letter # however, we now add by default `sebs` in the beginning func_name = func_name.replace("-", "_") func_name = func_name.replace(".", "_") return func_name
def _poll_build_status(self, build_name: str, func_name: str, timeout: int = 300) -> None: """Poll build operation until completion or failure. Monitors a Cloud Build operation, waiting for it to complete successfully or fail. Provides detailed error information if the build fails. Args: build_name: Fully qualified build name from GCP API func_name: Function name for logging purposes timeout: Maximum time to wait in seconds (default: 300) Raises: RuntimeError: If build fails or timeout is reached """ build_client = cloudbuild_v1.CloudBuildClient() begin = time.time() while True: build_status = build_client.get_build(name=build_name) if build_status.status == cloudbuild_v1.Build.Status.SUCCESS: self.logging.info(f"Function {func_name} - build completed successfully!") break elif build_status.status == cloudbuild_v1.Build.Status.FAILURE: self.logging.error(f"Failed to build function: {func_name}") self.logging.error(f"Reasons: {build_status.failure_info.detail}") self.logging.error(f"URL for detailed error: {build_status.log_url}") raise RuntimeError(f"Build failed for function {func_name}!") from None elif build_status.status in ( cloudbuild_v1.Build.Status.CANCELLED, cloudbuild_v1.Build.Status.TIMEOUT, ): self.logging.error(f"Build was cancelled or timed out for function: {func_name}") self.logging.error(f"URL for detailed error: {build_status.log_url}") raise RuntimeError(f"Build failed for function {func_name}!") from None if time.time() - begin > timeout: self.logging.error( f"Failed to build function: {func_name} after {timeout} seconds!" ) raise RuntimeError(f"Build timeout for function {func_name}!") from None time.sleep(3) def _wait_for_build_and_poll( self, func_name: str, timeout: int = 300, poll_interval: int = 2 ) -> bool: """Wait for build to start, get build name, and poll until completion. Since GCP operations typically don't immediately return a build name, this function waits for the build to start, retrieves the build name from the function's metadata, and then polls the build status. Args: func_name: Name of the function being built timeout: Maximum time to wait in seconds (default: 300) poll_interval: Seconds between polling attempts (default: 2) Returns: True if a build was found and completed successfully, False if no build was found Raises: RuntimeError: If build fails """ full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, func_name ) begin = time.time() build_name = None previous_build_id = None # First, try to get the current build ID to compare against try: get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) func_details = get_req.execute() if "buildId" in func_details: previous_build_id = func_details["buildId"] except HttpError: pass # Wait for build to start and get build name self.logging.info(f"Waiting for build to start for function {func_name}...") while build_name is None: if time.time() - begin > timeout: self.logging.warning( f"No build found for {func_name} after {timeout}s - " "might be a configuration-only update" ) return False try: # Get function details to find the build get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) func_details = get_req.execute() # Check if there's a new build in progress if "buildId" in func_details: build_id = func_details["buildId"] # Only consider it a new build if it's different from the previous one if previous_build_id is None or build_id != previous_build_id: # Construct build name from build ID build_name = ( f"projects/{self.config.project_name}/locations/" f"{self.config.region}/builds/{build_id}" ) self.logging.info(f"Found build {build_id} for function {func_name}!") break except HttpError as e: self.logging.debug(f"Error getting function details: {e}") time.sleep(poll_interval) # Now poll the build status if build_name: self._poll_build_status(build_name, func_name, timeout) return True return False def _wait_for_active_status( self, func_name: str, expected_version: Optional[int] = None, timeout: int = 60 ) -> int: """Wait for function to reach ACTIVE status after build completes. After a build completes, the function may be in DEPLOY_IN_PROGRESS state for a short time. This function polls until the status becomes ACTIVE. Args: func_name: Name of the function to check expected_version: Optional version ID to verify (None to skip version check) timeout: Maximum time to wait in seconds (default: 60) Returns: Current version ID of the function Raises: RuntimeError: If deployment fails or timeout is reached """ full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, func_name ) begin = time.time() self.logging.info(f"Waiting for function {func_name} to become ACTIVE...") while True: get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) func_details = get_req.execute() status = func_details["status"] current_version = int(func_details["versionId"]) if status == "ACTIVE": # Check version if specified if expected_version is not None and current_version != expected_version: self.logging.warning( f"Function {func_name} is ACTIVE but version mismatch: " f"expected {expected_version}, got {current_version}" ) # Continue waiting as version might still be updating else: self.logging.info(f"Function {func_name} is ACTIVE (version {current_version})") return current_version elif status == "DEPLOY_IN_PROGRESS": self.logging.debug(f"Function {func_name} deployment in progress...") else: # Unexpected status self.logging.error(f"Function {func_name} has unexpected status: {status}") raise RuntimeError(f"Function {func_name} deployment failed with status: {status}") if time.time() - begin > timeout: raise RuntimeError( f"Timeout waiting for function {func_name} to become ACTIVE. " f"Current status: {status}" ) time.sleep(2)
[docs] def package_code( self, directory: str, language: Language, language_version: str, architecture: str, benchmark: str, is_cached: bool, ) -> Tuple[str, int]: """Package benchmark code for GCP Cloud Functions deployment. Transforms the benchmark code directory structure to meet GCP Cloud Functions requirements. Creates a zip archive with the appropriate handler file naming and directory structure for the specified language runtime. The packaging process: 1. Creates a 'function' subdirectory for benchmark sources 2. Renames handler files to GCP-required names (handler.py -> main.py) 3. Creates a zip archive for deployment 4. Restores original file structure Args: directory: Path to the benchmark code directory language: Programming language (python, nodejs) language_version: Language version (e.g., '3.8', '14') architecture: Target architecture (x86_64, arm64) benchmark: Benchmark name for archive naming is_cached: Whether this package is from cache Returns: Tuple of (archive_path, archive_size_bytes) """ if language == Language.CPP: raise NotImplementedError("C++ packaging is not supported on GCP!") """ While for Java we produce an archive alread (JAR), we need to pack in a zip file as their build sysstem will unzip it and complain that it finds classes, and not a JAR. """ CONFIG_FILES = { Language.PYTHON: ["handler.py", ".python_packages"], Language.NODEJS: ["handler.js", "node_modules"], Language.JAVA: ["function.jar"], } HANDLER = { Language.PYTHON: ("handler.py", "main.py"), Language.NODEJS: ("handler.js", "index.js"), } package_config = CONFIG_FILES[language] function_dir = os.path.join(directory, "function") os.makedirs(function_dir) for file in os.listdir(directory): if file not in package_config: file = os.path.join(directory, file) shutil.move(file, function_dir) # rename handler function.py since in gcp it has to be caled main.py old_path, new_path = None, None if language in HANDLER: old_name, new_name = HANDLER[language] old_path = os.path.join(directory, old_name) new_path = os.path.join(directory, new_name) shutil.move(old_path, new_path) """ zip the whole directory (the zip-file gets uploaded to gcp later) Note that the function GCP.recursive_zip is slower than the use of e.g. `utils.execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True)` or `shutil.make_archive(benchmark_archive, direcory, directory)` But both of the two alternatives need a change of directory (shutil.make_archive does the directory change internaly) which leads to a "race condition" when running several benchmarks in parallel, since a change of the current directory is NOT Thread specfic. """ benchmark_archive = "{}.zip".format(os.path.join(directory, benchmark)) GCP.recursive_zip(directory, benchmark_archive) logging.info("Created {} archive".format(benchmark_archive)) bytes_size = os.path.getsize(benchmark_archive) mbytes = bytes_size / 1024.0 / 1024.0 logging.info("Zip archive size {:2f} MB".format(mbytes)) # rename the main.py back to handler.py if new_path is not None and old_path is not None: shutil.move(new_path, old_path) return ( os.path.join(directory, "{}.zip".format(benchmark)), bytes_size, )
[docs] def create_function( self, code_package: Benchmark, func_name: str, container_deployment: bool, container_uri: str | None, ) -> "GCPFunction": """Create a new GCP Cloud Function or update existing one. Deploys a benchmark as a Cloud Function, handling code upload to Cloud Storage, function creation with proper configuration, and IAM policy setup for unauthenticated invocations (HTTP triggers). If the function already exists, updates it instead. Args: code_package: Benchmark package with code and configuration func_name: Name for the Cloud Function container_deployment: Whether to use container deployment (unsupported) container_uri: Container image URI (unused for GCP) Returns: GCPFunction instance representing the deployed function Raises: NotImplementedError: If container_deployment is True RuntimeError: If function creation or IAM configuration fails """ if container_deployment: raise NotImplementedError("Container deployment is not supported in GCP") package = code_package.code_location if package is None: raise RuntimeError("Code location is not set for GCP deployment") benchmark = code_package.benchmark language_runtime = code_package.language_version timeout = code_package.benchmark_config.timeout memory = code_package.benchmark_config.memory code_bucket: Optional[str] = None storage_client = self._system_resources.get_storage() location = self.config.region project_name = self.config.project_name function_cfg = FunctionConfig.from_benchmark(code_package) architecture = function_cfg.architecture.value code_package_name = os.path.basename(package) code_package_name = f"{architecture}-{code_package_name}" code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) code_prefix = os.path.join(benchmark, code_package_name) storage_client.upload(code_bucket, package, code_prefix) self.logging.info("Uploading function {} code to {}".format(func_name, code_bucket)) full_func_name = GCP.get_full_function_name(project_name, location, func_name) get_req = self.function_client.projects().locations().functions().get(name=full_func_name) try: get_req.execute() except HttpError: envs = self._generate_function_envs(code_package) create_req = ( self.function_client.projects() .locations() .functions() .create( location="projects/{project_name}/locations/{location}".format( project_name=project_name, location=location ), body={ "name": full_func_name, "entryPoint": ( "org.serverlessbench.Handler" if code_package.language == Language.JAVA else "handler" ), "runtime": code_package.language_name + language_runtime.replace(".", ""), "availableMemoryMb": memory, "timeout": str(timeout) + "s", "httpsTrigger": {}, "ingressSettings": "ALLOW_ALL", "sourceArchiveUrl": "gs://" + code_bucket + "/" + code_prefix, "environmentVariables": envs, }, ) ) create_req.execute() self.logging.info( f"Function {func_name} is creating - GCP build&deployment is started!" ) # Poll build status until completion or failure build_found = self._wait_for_build_and_poll(func_name) if not build_found: raise RuntimeError(f"No build operation found for {func_name}!") # Wait for deployment to become ACTIVE self._wait_for_active_status(func_name) allow_unauthenticated_req = ( self.function_client.projects() .locations() .functions() .setIamPolicy( resource=full_func_name, body={ "policy": { "bindings": [ { "role": "roles/cloudfunctions.invoker", "members": ["allUsers"], } ] } }, ) ) # Avoid infinite loop MAX_RETRIES = 5 counter = 0 while counter < MAX_RETRIES: try: allow_unauthenticated_req.execute() break except HttpError: self.logging.info( "Sleeping for 5 seconds because the created functions is not yet available!" ) time.sleep(5) counter += 1 else: raise RuntimeError( f"Failed to configure function {full_func_name} " "for unauthenticated invocations!" ) self.logging.info(f"Function {func_name} accepts now unauthenticated invocations!") function = GCPFunction( func_name, benchmark, code_package.hash, function_cfg, code_bucket ) else: # if result is not empty, then function does exists self.logging.info("Function {} exists on GCP, update the instance.".format(func_name)) function = GCPFunction( name=func_name, benchmark=benchmark, code_package_hash=code_package.hash, cfg=function_cfg, bucket=code_bucket, ) self.update_function(function, code_package, container_deployment, container_uri) # Add LibraryTrigger to a new function from sebs.gcp.triggers import LibraryTrigger trigger = LibraryTrigger(func_name, self) trigger.logging_handlers = self.logging_handlers function.add_trigger(trigger) return function
[docs] def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: """Create a trigger for the given function. Creates HTTP triggers for Cloud Functions, waiting for function deployment to complete before extracting the trigger URL. Only HTTP triggers are supported here; Library triggers are added by default during function creation. Args: function: Function instance to create trigger for trigger_type: Type of trigger to create (only HTTP supported) Returns: Created trigger instance with URL and configuration Raises: RuntimeError: If trigger type is not supported """ from sebs.gcp.triggers import HTTPTrigger if trigger_type == Trigger.TriggerType.HTTP: # Get the HTTPS trigger URL location = self.config.region project_name = self.config.project_name full_func_name = GCP.get_full_function_name(project_name, location, function.name) get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) func_details = get_req.execute() invoke_url = func_details["httpsTrigger"]["url"] self.logging.info(f"Function {function.name} - HTTP trigger ready at {invoke_url}") trigger = HTTPTrigger(invoke_url) else: raise RuntimeError("Not supported!") trigger.logging_handlers = self.logging_handlers function.add_trigger(trigger) self.cache_client.update_function(function) return trigger
[docs] def cached_function(self, function: Function) -> None: """Configure a cached function instance for use. Sets up library triggers for functions loaded from cache, ensuring they have the proper deployment client and logging configuration. Args: function: Cached function instance to configure """ from sebs.faas.function import Trigger from sebs.gcp.triggers import LibraryTrigger for trigger in function.triggers(Trigger.TriggerType.LIBRARY): gcp_trigger = cast(LibraryTrigger, trigger) gcp_trigger.logging_handlers = self.logging_handlers gcp_trigger.deployment_client = self
[docs] def update_function( self, function: Function, code_package: Benchmark, container_deployment: bool, container_uri: str | None, ) -> None: """Update an existing Cloud Function with new code and configuration. Uploads new code package to Cloud Storage and patches the existing function with updated runtime, memory, timeout, and environment variables. Waits for deployment to complete before returning. Args: function: Existing function instance to update code_package: New benchmark package with updated code container_deployment: Whether to use container deployment (unsupported) container_uri: Container image URI (unused) Raises: NotImplementedError: If container_deployment is True RuntimeError: If function update fails after maximum retries """ if container_deployment: raise NotImplementedError("Container deployment is not supported in GCP") if code_package.code_location is None: raise RuntimeError("Code location is not set for GCP deployment") function = cast(GCPFunction, function) language_runtime = code_package.language_version function_cfg = FunctionConfig.from_benchmark(code_package) architecture = function_cfg.architecture.value code_package_name = os.path.basename(code_package.code_location) storage = cast(GCPStorage, self._system_resources.get_storage()) code_package_name = f"{architecture}-{code_package_name}" bucket = function.code_bucket(code_package.benchmark, storage) storage.upload(bucket, code_package.code_location, code_package_name) envs = self._generate_function_envs(code_package) self.logging.info(f"Uploaded new code package to {bucket}/{code_package_name}") full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, function.name ) req = ( self.function_client.projects() .locations() .functions() .patch( name=full_func_name, body={ "name": full_func_name, "entryPoint": ( "org.serverlessbench.Handler" if code_package.language == Language.JAVA else "handler" ), "runtime": code_package.language_name + language_runtime.replace(".", ""), "availableMemoryMb": function.config.memory, "timeout": str(function.config.timeout) + "s", "httpsTrigger": {}, "sourceArchiveUrl": "gs://" + bucket + "/" + code_package_name, "environmentVariables": envs, }, ) ) res = req.execute() self.logging.info(f"Function {function.name} code update initiated") # Patch does not return buildName, need to wait for build to start expected_version = int(res["metadata"]["versionId"]) build_found = self._wait_for_build_and_poll(function.name) if not build_found: self.logging.warning( f"No build operation found for {function.name} - " "this is unexpected for code updates" ) # Wait for deployment to become ACTIVE with expected version self._wait_for_active_status(function.name, expected_version) self.logging.info("Published new function code and configuration.")
def _update_envs(self, full_function_name: str, envs: Dict) -> Dict: """Merge new environment variables with existing function environment. Retrieves current function environment variables and merges them with new variables, with new variables taking precedence on conflicts. Args: full_function_name: Fully qualified function name envs: New environment variables to add/update Returns: Merged environment variables dictionary """ get_req = ( self.function_client.projects().locations().functions().get(name=full_function_name) ) response = get_req.execute() # preserve old variables while adding new ones. # but for conflict, we select the new one if "environmentVariables" in response: envs = {**response["environmentVariables"], **envs} return envs def _generate_function_envs(self, code_package: Benchmark) -> Dict: """Generate environment variables for function based on benchmark requirements. Creates environment variables needed by the benchmark, such as NoSQL database connection information. Args: code_package: Benchmark package with module requirements Returns: Dictionary of environment variables for the function """ envs = {} if code_package.uses_nosql: db = ( cast(GCPSystemResources, self._system_resources) .get_nosql_storage() .benchmark_database(code_package.benchmark) ) envs["NOSQL_STORAGE_DATABASE"] = db return envs
[docs] def update_function_configuration( self, function: Function, code_package: Benchmark, env_variables: Dict = {} ) -> int: """Update function configuration including memory, timeout, and environment. Updates the Cloud Function's memory allocation, timeout, and environment variables without changing the code. Waits for deployment to complete. Args: function: Function instance to update code_package: Benchmark package with configuration requirements env_variables: Additional environment variables to set Returns: Version ID of the updated function Raises: RuntimeError: If configuration update fails after maximum retries """ assert code_package.has_input_processed function = cast(GCPFunction, function) full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, function.name ) envs = self._generate_function_envs(code_package) envs = {**envs, **env_variables} # GCP might overwrite existing variables # If we modify them, we need to first read existing ones and append. if len(envs) > 0: envs = self._update_envs(full_func_name, envs) if len(envs) > 0: req = ( self.function_client.projects() .locations() .functions() .patch( name=full_func_name, updateMask="availableMemoryMb,timeout,environmentVariables", body={ "availableMemoryMb": function.config.memory, "timeout": str(function.config.timeout) + "s", "environmentVariables": envs, }, ) ) else: req = ( self.function_client.projects() .locations() .functions() .patch( name=full_func_name, updateMask="availableMemoryMb,timeout", body={ "availableMemoryMb": function.config.memory, "timeout": str(function.config.timeout) + "s", }, ) ) res = req.execute() expected_version = int(res["metadata"]["versionId"]) self.logging.info(f"Function {function.name} configuration update initiated") # Wait for deployment to become ACTIVE with expected version # Configuration updates don't trigger builds but still need deployment time current_version = self._wait_for_active_status(function.name, expected_version, timeout=60) self.logging.info("Published new function configuration.") return current_version
[docs] @staticmethod def get_full_function_name(project_name: str, location: str, func_name: str) -> str: """Generate the fully qualified function name for GCP API calls. Args: project_name: GCP project ID location: GCP region/location func_name: Function name Returns: Fully qualified function name in GCP format """ return f"projects/{project_name}/locations/{location}/functions/{func_name}"
[docs] def shutdown(self) -> None: """Shutdown the GCP system and clean up resources. Performs cleanup of system resources and calls parent shutdown method. """ cast(GCPSystemResources, self._system_resources).shutdown() super().shutdown()
[docs] def download_metrics( self, function_name: str, start_time: int, end_time: int, requests: Dict, metrics: Dict, ) -> None: """Download execution metrics and logs from GCP monitoring services. Retrieves function execution times from Cloud Logging and performance metrics from Cloud Monitoring. Processes logs to extract execution times and collects metrics like memory usage and network egress. Args: function_name: Name of the function to collect metrics for start_time: Start timestamp for metric collection (Unix timestamp) end_time: End timestamp for metric collection (Unix timestamp) requests: Dictionary of requests keyed by execution ID metrics: Dictionary to populate with collected metrics """ from google.api_core import exceptions from time import sleep def wrapper(gen): """Generator function to extract all results from GCP API paginated responses. If we exhaust resource, we sleep 30 seconds before a retry. Args: gen: generator of HTTP responses Yields: each HTTP response """ while True: try: yield next(gen) except StopIteration: break except exceptions.ResourceExhausted: self.logging.info("Google Cloud resources exhausted, sleeping 30s") sleep(30) """ Use GCP's logging system to find execution time of each function invocation. There shouldn't be problem of waiting for complete results, since logs appear very quickly here. """ import google.cloud.logging as gcp_logging logging_client = gcp_logging.Client() logger = logging_client.logger("cloudfunctions.googleapis.com%2Fcloud-functions") """ GCP accepts only single date format: 'YYYY-MM-DDTHH:MM:SSZ'. Thus, we first convert timestamp to UTC timezone. Then, we generate correct format. Add 1 second to end time to ensure that removing milliseconds doesn't affect query. """ timestamps = [] for timestamp in [start_time, end_time + 1]: utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) invocations = logger.list_entries( filter_=( f'resource.labels.function_name = "{function_name}" ' f'timestamp >= "{timestamps[0]}" ' f'timestamp <= "{timestamps[1]}"' ), page_size=1000, ) invocations_processed = 0 if hasattr(invocations, "pages"): pages = list(wrapper(invocations.pages)) else: pages = [list(wrapper(invocations))] entries = 0 for page in pages: # invocations.pages: for invoc in page: entries += 1 if "execution took" in invoc.payload: execution_id = invoc.labels["execution_id"] # might happen that we get invocation from another experiment if execution_id not in requests: continue # find number of miliseconds regex_result = re.search(r"\d+ ms", invoc.payload) assert regex_result exec_time = regex_result.group().split()[0] # convert into microseconds requests[execution_id].provider_times.execution = int(exec_time) * 1000 invocations_processed += 1 self.logging.info( f"GCP: Received {entries} entries, found time metrics for {invocations_processed} " f"out of {len(requests.keys())} invocations." ) """ Use metrics to find estimated values for maximum memory used, active instances and network traffic. https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudfunctions """ # Set expected metrics here available_metrics = ["execution_times", "user_memory_bytes", "network_egress"] client = monitoring_v3.MetricServiceClient() project_name = client.common_project_path(self.config.project_name) end_time_nanos, end_time_seconds = math.modf(end_time) start_time_nanos, start_time_seconds = math.modf(start_time) interval = monitoring_v3.TimeInterval( { "end_time": {"seconds": int(end_time_seconds) + 60}, "start_time": {"seconds": int(start_time_seconds)}, } ) for metric in available_metrics: metrics[metric] = [] list_request = monitoring_v3.ListTimeSeriesRequest( name=project_name, filter='metric.type = "cloudfunctions.googleapis.com/function/{}"'.format(metric), interval=interval, ) results = client.list_time_series(list_request) for result in results: if result.resource.labels.get("function_name") == function_name: for point in result.points: metrics[metric] += [ { "mean_value": point.value.distribution_value.mean, "executions_count": point.value.distribution_value.count, } ]
def _enforce_cold_start(self, function: Function, code_package: Benchmark) -> int: """Force a cold start by updating function configuration. Triggers a cold start by updating the function's environment variables with a unique counter value, forcing GCP to create a new instance. Args: function: Function instance to enforce cold start on code_package: Benchmark package for configuration Returns: Version ID of the updated function """ self.cold_start_counter += 1 new_version = self.update_function_configuration( function, code_package, {"cold_start": str(self.cold_start_counter)} ) return new_version
[docs] def enforce_cold_start(self, functions: List[Function], code_package: Benchmark) -> None: """Enforce cold starts for multiple functions simultaneously. Updates all provided functions to force cold starts and waits for all deployments to complete before returning. Args: functions: List of functions to enforce cold starts on code_package: Benchmark package for configuration """ new_versions = [] for func in functions: new_versions.append((self._enforce_cold_start(func, code_package), func)) self.cold_start_counter -= 1 # verify deployment undeployed_functions = [] deployment_done = False while not deployment_done: for versionId, func in new_versions: is_deployed, last_version = self.is_deployed(func.name, versionId) if not is_deployed: undeployed_functions.append((versionId, func)) deployed = len(new_versions) - len(undeployed_functions) self.logging.info(f"Redeployed {deployed} out of {len(new_versions)}") if deployed == len(new_versions): deployment_done = True break time.sleep(5) new_versions = undeployed_functions undeployed_functions = [] self.cold_start_counter += 1
[docs] def get_functions(self, code_package: Benchmark, function_names: List[str]) -> List["Function"]: """Retrieve multiple function instances and ensure they are deployed. Gets function instances for the provided names and waits for all functions to be in ACTIVE deployment state. Args: code_package: Benchmark package for function creation function_names: List of function names to retrieve Returns: List of deployed function instances """ functions: List["Function"] = [] undeployed_functions_before = [] for func_name in function_names: func = self.get_function(code_package, func_name) functions.append(func) undeployed_functions_before.append(func) # verify deployment undeployed_functions = [] deployment_done = False while not deployment_done: for func in undeployed_functions_before: is_deployed, last_version = self.is_deployed(func.name) if not is_deployed: undeployed_functions.append(func) deployed = len(undeployed_functions_before) - len(undeployed_functions) self.logging.info(f"Deployed {deployed} out of {len(undeployed_functions_before)}") if deployed == len(undeployed_functions_before): deployment_done = True break time.sleep(5) undeployed_functions_before = undeployed_functions undeployed_functions = [] self.logging.info(f"Waiting on {undeployed_functions_before}") return functions
[docs] def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: """Check if a function is deployed and optionally verify its version. Args: func_name: Name of the function to check versionId: Optional specific version ID to verify (-1 to check any) Returns: Tuple of (is_deployed, current_version_id) """ name = GCP.get_full_function_name(self.config.project_name, self.config.region, func_name) function_client = self.get_function_client() status_req = function_client.projects().locations().functions().get(name=name) status_res = status_req.execute() if versionId == -1: return (status_res["status"] == "ACTIVE", status_res["versionId"]) else: return (status_res["versionId"] == versionId, status_res["versionId"])
[docs] def deployment_version(self, func: Function) -> int: """Get the current deployment version ID of a function. Args: func: Function instance to check Returns: Current version ID of the function """ name = GCP.get_full_function_name(self.config.project_name, self.config.region, func.name) function_client = self.get_function_client() status_req = function_client.projects().locations().functions().get(name=name) status_res = status_req.execute() return int(status_res["versionId"])
[docs] @staticmethod def helper_zip(base_directory: str, path: str, archive: zipfile.ZipFile) -> None: """Recursively add files and directories to a zip archive. Helper method for recursive_zip that handles directory traversal and adds files with relative paths to the archive. Args: base_directory: Base directory path for relative path calculation path: Current path being processed (file or directory) archive: ZipFile object to add files to """ paths = os.listdir(path) for p in paths: directory = os.path.join(path, p) if os.path.isdir(directory): GCP.helper_zip(base_directory, directory, archive) else: if directory != archive.filename: # prevent form including itself archive.write(directory, os.path.relpath(directory, base_directory))
[docs] @staticmethod def recursive_zip(directory: str, archname: str) -> bool: """Create a zip archive of a directory with relative paths. Creates a compressed zip archive of the specified directory, preserving the relative directory structure. Uses maximum compression level. Args: directory: Absolute path to the directory to be zipped archname: Path where the zip file should be created Returns: True if archiving was successful """ archive = zipfile.ZipFile(archname, "w", zipfile.ZIP_DEFLATED, compresslevel=9) if os.path.isdir(directory): GCP.helper_zip(directory, directory, archive) else: # if the passed directory is actually a file we just add the file to the zip archive _, name = os.path.split(directory) archive.write(directory, name) archive.close() return True