# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Google Cloud Platform (GCP) serverless system implementation.
This module provides the main GCP implementation with function deployment, management,
monitoring, and resource allocation. It integrates with Google Cloud Functions,
Cloud Storage, Cloud Monitoring, and Cloud Logging.
The module handles:
- Function creation, updating, and lifecycle management
- Code packaging and deployment to Cloud Functions
- HTTP and library trigger management
- Performance metrics collection via Cloud Monitoring
- Execution logs retrieval via Cloud Logging
- Cold start enforcement for benchmarking
- Storage bucket management for code deployment
Classes:
GCP: Main system class implementing the FaaS System interface
Example:
Basic GCP system initialization:
config = GCPConfig(credentials, resources)
gcp_system = GCP(system_config, config, cache, docker_client, logging_handlers)
gcp_system.initialize()
"""
import docker
import os
import logging
import re
import shutil
import time
import math
import zipfile
from datetime import datetime, timezone
from typing import cast, Dict, Optional, Tuple, List, Type
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import google.cloud.monitoring_v3 as monitoring_v3
from google.cloud.devtools import cloudbuild_v1
from sebs.cache import Cache
from sebs.config import SeBSConfig
from sebs.benchmark import Benchmark
from sebs.faas.function import Function, FunctionConfig, Trigger
from sebs.faas.config import Resources
from sebs.faas.system import System
from sebs.gcp.config import GCPConfig
from sebs.gcp.resources import GCPSystemResources
from sebs.gcp.storage import GCPStorage
from sebs.gcp.function import GCPFunction
from sebs.utils import LoggingHandlers
from sebs.types import Language
[docs]
class GCP(System):
"""Google Cloud Platform serverless system implementation.
Provides complete integration with Google Cloud Functions including deployment,
monitoring, logging, and resource management. Handles code packaging, function
lifecycle management, trigger creation, and performance metrics collection.
Attributes:
_config: GCP-specific configuration including credentials and region
function_client: Google Cloud Functions API client
cold_start_counter: Counter for enforcing cold starts in benchmarking
logging_handlers: Logging configuration for status reporting
"""
def __init__(
self,
system_config: SeBSConfig,
config: GCPConfig,
cache_client: Cache,
docker_client: docker.client.DockerClient,
logging_handlers: LoggingHandlers,
) -> None:
"""Initialize GCP serverless system.
Args:
system_config: General SeBS system configuration
config: GCP-specific configuration with credentials and settings
cache_client: Cache instance for storing function and resource state
docker_client: Docker client for container operations (if needed)
logging_handlers: Logging configuration for status reporting
"""
super().__init__(
system_config,
cache_client,
docker_client,
GCPSystemResources(
system_config, config, cache_client, docker_client, logging_handlers
),
)
self._config = config
self.logging_handlers = logging_handlers
@property
def config(self) -> GCPConfig:
"""Get the GCP configuration instance.
Returns:
GCP configuration with credentials and region settings
"""
return self._config
[docs]
@staticmethod
def name() -> str:
"""Get the platform name identifier.
Returns:
Platform name string 'gcp'
"""
return "gcp"
[docs]
@staticmethod
def typename() -> str:
"""Get the platform type name for display.
Returns:
Platform type string 'GCP'
"""
return "GCP"
[docs]
@staticmethod
def function_type() -> "Type[Function]":
"""Get the function class type for this platform.
Returns:
GCPFunction class type
"""
return GCPFunction
[docs]
def initialize(
self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None
) -> None:
"""Initialize the GCP system for function deployment and management.
Sets up the Cloud Functions API client and initializes system resources
including storage buckets and other required infrastructure.
After this call, the GCP system should be ready to allocate functions,
manage storage, and invoke functions.
Args:
config: Additional system-specific configuration parameters
resource_prefix: Optional prefix for resource naming to avoid conflicts
"""
self.function_client = build("cloudfunctions", "v1", cache_discovery=False)
self.initialize_resources(select_prefix=resource_prefix)
[docs]
def get_function_client(self):
"""Get the Google Cloud Functions API client.
The client is initialized during the `initialize` call.
Returns:
Initialized Cloud Functions API client
"""
return self.function_client
[docs]
def default_function_name(
self, code_package: Benchmark, resources: Optional[Resources] = None
) -> str:
"""Generate a default function name for the given benchmark.
Creates a standardized function name using resource ID, benchmark name,
language, and version information. Formats the name according to GCP
Cloud Functions naming requirements.
Args:
code_package: Benchmark package containing metadata
resources: Optional resource configuration for ID generation
Returns:
Formatted function name suitable for GCP Cloud Functions
"""
# Create function name
resource_id = resources.resources_id if resources else self.config.resources.resources_id
func_name = "sebs-{}-{}-{}-{}".format(
resource_id,
code_package.benchmark,
code_package.language_name,
code_package.language_version,
)
return GCP.format_function_name(func_name)
def _poll_build_status(self, build_name: str, func_name: str, timeout: int = 300) -> None:
"""Poll build operation until completion or failure.
Monitors a Cloud Build operation, waiting for it to complete successfully
or fail. Provides detailed error information if the build fails.
Args:
build_name: Fully qualified build name from GCP API
func_name: Function name for logging purposes
timeout: Maximum time to wait in seconds (default: 300)
Raises:
RuntimeError: If build fails or timeout is reached
"""
build_client = cloudbuild_v1.CloudBuildClient()
begin = time.time()
while True:
build_status = build_client.get_build(name=build_name)
if build_status.status == cloudbuild_v1.Build.Status.SUCCESS:
self.logging.info(f"Function {func_name} - build completed successfully!")
break
elif build_status.status == cloudbuild_v1.Build.Status.FAILURE:
self.logging.error(f"Failed to build function: {func_name}")
self.logging.error(f"Reasons: {build_status.failure_info.detail}")
self.logging.error(f"URL for detailed error: {build_status.log_url}")
raise RuntimeError(f"Build failed for function {func_name}!") from None
elif build_status.status in (
cloudbuild_v1.Build.Status.CANCELLED,
cloudbuild_v1.Build.Status.TIMEOUT,
):
self.logging.error(f"Build was cancelled or timed out for function: {func_name}")
self.logging.error(f"URL for detailed error: {build_status.log_url}")
raise RuntimeError(f"Build failed for function {func_name}!") from None
if time.time() - begin > timeout:
self.logging.error(
f"Failed to build function: {func_name} after {timeout} seconds!"
)
raise RuntimeError(f"Build timeout for function {func_name}!") from None
time.sleep(3)
def _wait_for_build_and_poll(
self, func_name: str, timeout: int = 300, poll_interval: int = 2
) -> bool:
"""Wait for build to start, get build name, and poll until completion.
Since GCP operations typically don't immediately return a build name, this function
waits for the build to start, retrieves the build name from the function's
metadata, and then polls the build status.
Args:
func_name: Name of the function being built
timeout: Maximum time to wait in seconds (default: 300)
poll_interval: Seconds between polling attempts (default: 2)
Returns:
True if a build was found and completed successfully, False if no build was found
Raises:
RuntimeError: If build fails
"""
full_func_name = GCP.get_full_function_name(
self.config.project_name, self.config.region, func_name
)
begin = time.time()
build_name = None
previous_build_id = None
# First, try to get the current build ID to compare against
try:
get_req = (
self.function_client.projects().locations().functions().get(name=full_func_name)
)
func_details = get_req.execute()
if "buildId" in func_details:
previous_build_id = func_details["buildId"]
except HttpError:
pass
# Wait for build to start and get build name
self.logging.info(f"Waiting for build to start for function {func_name}...")
while build_name is None:
if time.time() - begin > timeout:
self.logging.warning(
f"No build found for {func_name} after {timeout}s - "
"might be a configuration-only update"
)
return False
try:
# Get function details to find the build
get_req = (
self.function_client.projects().locations().functions().get(name=full_func_name)
)
func_details = get_req.execute()
# Check if there's a new build in progress
if "buildId" in func_details:
build_id = func_details["buildId"]
# Only consider it a new build if it's different from the previous one
if previous_build_id is None or build_id != previous_build_id:
# Construct build name from build ID
build_name = (
f"projects/{self.config.project_name}/locations/"
f"{self.config.region}/builds/{build_id}"
)
self.logging.info(f"Found build {build_id} for function {func_name}!")
break
except HttpError as e:
self.logging.debug(f"Error getting function details: {e}")
time.sleep(poll_interval)
# Now poll the build status
if build_name:
self._poll_build_status(build_name, func_name, timeout)
return True
return False
def _wait_for_active_status(
self, func_name: str, expected_version: Optional[int] = None, timeout: int = 60
) -> int:
"""Wait for function to reach ACTIVE status after build completes.
After a build completes, the function may be in DEPLOY_IN_PROGRESS state
for a short time. This function polls until the status becomes ACTIVE.
Args:
func_name: Name of the function to check
expected_version: Optional version ID to verify (None to skip version check)
timeout: Maximum time to wait in seconds (default: 60)
Returns:
Current version ID of the function
Raises:
RuntimeError: If deployment fails or timeout is reached
"""
full_func_name = GCP.get_full_function_name(
self.config.project_name, self.config.region, func_name
)
begin = time.time()
self.logging.info(f"Waiting for function {func_name} to become ACTIVE...")
while True:
get_req = (
self.function_client.projects().locations().functions().get(name=full_func_name)
)
func_details = get_req.execute()
status = func_details["status"]
current_version = int(func_details["versionId"])
if status == "ACTIVE":
# Check version if specified
if expected_version is not None and current_version != expected_version:
self.logging.warning(
f"Function {func_name} is ACTIVE but version mismatch: "
f"expected {expected_version}, got {current_version}"
)
# Continue waiting as version might still be updating
else:
self.logging.info(f"Function {func_name} is ACTIVE (version {current_version})")
return current_version
elif status == "DEPLOY_IN_PROGRESS":
self.logging.debug(f"Function {func_name} deployment in progress...")
else:
# Unexpected status
self.logging.error(f"Function {func_name} has unexpected status: {status}")
raise RuntimeError(f"Function {func_name} deployment failed with status: {status}")
if time.time() - begin > timeout:
raise RuntimeError(
f"Timeout waiting for function {func_name} to become ACTIVE. "
f"Current status: {status}"
)
time.sleep(2)
[docs]
def package_code(
self,
directory: str,
language: Language,
language_version: str,
architecture: str,
benchmark: str,
is_cached: bool,
) -> Tuple[str, int]:
"""Package benchmark code for GCP Cloud Functions deployment.
Transforms the benchmark code directory structure to meet GCP Cloud Functions
requirements. Creates a zip archive with the appropriate handler file naming
and directory structure for the specified language runtime.
The packaging process:
1. Creates a 'function' subdirectory for benchmark sources
2. Renames handler files to GCP-required names (handler.py -> main.py)
3. Creates a zip archive for deployment
4. Restores original file structure
Args:
directory: Path to the benchmark code directory
language: Programming language (python, nodejs)
language_version: Language version (e.g., '3.8', '14')
architecture: Target architecture (x86_64, arm64)
benchmark: Benchmark name for archive naming
is_cached: Whether this package is from cache
Returns:
Tuple of (archive_path, archive_size_bytes)
"""
if language == Language.CPP:
raise NotImplementedError("C++ packaging is not supported on GCP!")
"""
While for Java we produce an archive alread (JAR),
we need to pack in a zip file as their build sysstem will unzip it
and complain that it finds classes, and not a JAR.
"""
CONFIG_FILES = {
Language.PYTHON: ["handler.py", ".python_packages"],
Language.NODEJS: ["handler.js", "node_modules"],
Language.JAVA: ["function.jar"],
}
HANDLER = {
Language.PYTHON: ("handler.py", "main.py"),
Language.NODEJS: ("handler.js", "index.js"),
}
package_config = CONFIG_FILES[language]
function_dir = os.path.join(directory, "function")
os.makedirs(function_dir)
for file in os.listdir(directory):
if file not in package_config:
file = os.path.join(directory, file)
shutil.move(file, function_dir)
# rename handler function.py since in gcp it has to be caled main.py
old_path, new_path = None, None
if language in HANDLER:
old_name, new_name = HANDLER[language]
old_path = os.path.join(directory, old_name)
new_path = os.path.join(directory, new_name)
shutil.move(old_path, new_path)
"""
zip the whole directory (the zip-file gets uploaded to gcp later)
Note that the function GCP.recursive_zip is slower than the use of e.g.
`utils.execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True)`
or `shutil.make_archive(benchmark_archive, direcory, directory)`
But both of the two alternatives need a change of directory
(shutil.make_archive does the directory change internaly)
which leads to a "race condition" when running several benchmarks
in parallel, since a change of the current directory is NOT Thread specfic.
"""
benchmark_archive = "{}.zip".format(os.path.join(directory, benchmark))
GCP.recursive_zip(directory, benchmark_archive)
logging.info("Created {} archive".format(benchmark_archive))
bytes_size = os.path.getsize(benchmark_archive)
mbytes = bytes_size / 1024.0 / 1024.0
logging.info("Zip archive size {:2f} MB".format(mbytes))
# rename the main.py back to handler.py
if new_path is not None and old_path is not None:
shutil.move(new_path, old_path)
return (
os.path.join(directory, "{}.zip".format(benchmark)),
bytes_size,
)
[docs]
def create_function(
self,
code_package: Benchmark,
func_name: str,
container_deployment: bool,
container_uri: str | None,
) -> "GCPFunction":
"""Create a new GCP Cloud Function or update existing one.
Deploys a benchmark as a Cloud Function, handling code upload to Cloud Storage,
function creation with proper configuration, and IAM policy setup for
unauthenticated invocations (HTTP triggers).
If the function already exists, updates it instead.
Args:
code_package: Benchmark package with code and configuration
func_name: Name for the Cloud Function
container_deployment: Whether to use container deployment (unsupported)
container_uri: Container image URI (unused for GCP)
Returns:
GCPFunction instance representing the deployed function
Raises:
NotImplementedError: If container_deployment is True
RuntimeError: If function creation or IAM configuration fails
"""
if container_deployment:
raise NotImplementedError("Container deployment is not supported in GCP")
package = code_package.code_location
if package is None:
raise RuntimeError("Code location is not set for GCP deployment")
benchmark = code_package.benchmark
language_runtime = code_package.language_version
timeout = code_package.benchmark_config.timeout
memory = code_package.benchmark_config.memory
code_bucket: Optional[str] = None
storage_client = self._system_resources.get_storage()
location = self.config.region
project_name = self.config.project_name
function_cfg = FunctionConfig.from_benchmark(code_package)
architecture = function_cfg.architecture.value
code_package_name = os.path.basename(package)
code_package_name = f"{architecture}-{code_package_name}"
code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT)
code_prefix = os.path.join(benchmark, code_package_name)
storage_client.upload(code_bucket, package, code_prefix)
self.logging.info("Uploading function {} code to {}".format(func_name, code_bucket))
full_func_name = GCP.get_full_function_name(project_name, location, func_name)
get_req = self.function_client.projects().locations().functions().get(name=full_func_name)
try:
get_req.execute()
except HttpError:
envs = self._generate_function_envs(code_package)
create_req = (
self.function_client.projects()
.locations()
.functions()
.create(
location="projects/{project_name}/locations/{location}".format(
project_name=project_name, location=location
),
body={
"name": full_func_name,
"entryPoint": (
"org.serverlessbench.Handler"
if code_package.language == Language.JAVA
else "handler"
),
"runtime": code_package.language_name + language_runtime.replace(".", ""),
"availableMemoryMb": memory,
"timeout": str(timeout) + "s",
"httpsTrigger": {},
"ingressSettings": "ALLOW_ALL",
"sourceArchiveUrl": "gs://" + code_bucket + "/" + code_prefix,
"environmentVariables": envs,
},
)
)
create_req.execute()
self.logging.info(
f"Function {func_name} is creating - GCP build&deployment is started!"
)
# Poll build status until completion or failure
build_found = self._wait_for_build_and_poll(func_name)
if not build_found:
raise RuntimeError(f"No build operation found for {func_name}!")
# Wait for deployment to become ACTIVE
self._wait_for_active_status(func_name)
allow_unauthenticated_req = (
self.function_client.projects()
.locations()
.functions()
.setIamPolicy(
resource=full_func_name,
body={
"policy": {
"bindings": [
{
"role": "roles/cloudfunctions.invoker",
"members": ["allUsers"],
}
]
}
},
)
)
# Avoid infinite loop
MAX_RETRIES = 5
counter = 0
while counter < MAX_RETRIES:
try:
allow_unauthenticated_req.execute()
break
except HttpError:
self.logging.info(
"Sleeping for 5 seconds because the created functions is not yet available!"
)
time.sleep(5)
counter += 1
else:
raise RuntimeError(
f"Failed to configure function {full_func_name} "
"for unauthenticated invocations!"
)
self.logging.info(f"Function {func_name} accepts now unauthenticated invocations!")
function = GCPFunction(
func_name, benchmark, code_package.hash, function_cfg, code_bucket
)
else:
# if result is not empty, then function does exists
self.logging.info("Function {} exists on GCP, update the instance.".format(func_name))
function = GCPFunction(
name=func_name,
benchmark=benchmark,
code_package_hash=code_package.hash,
cfg=function_cfg,
bucket=code_bucket,
)
self.update_function(function, code_package, container_deployment, container_uri)
# Add LibraryTrigger to a new function
from sebs.gcp.triggers import LibraryTrigger
trigger = LibraryTrigger(func_name, self)
trigger.logging_handlers = self.logging_handlers
function.add_trigger(trigger)
return function
[docs]
def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger:
"""Create a trigger for the given function.
Creates HTTP triggers for Cloud Functions, waiting for function deployment
to complete before extracting the trigger URL.
Only HTTP triggers are supported here; Library triggers are added by
default during function creation.
Args:
function: Function instance to create trigger for
trigger_type: Type of trigger to create (only HTTP supported)
Returns:
Created trigger instance with URL and configuration
Raises:
RuntimeError: If trigger type is not supported
"""
from sebs.gcp.triggers import HTTPTrigger
if trigger_type == Trigger.TriggerType.HTTP:
# Get the HTTPS trigger URL
location = self.config.region
project_name = self.config.project_name
full_func_name = GCP.get_full_function_name(project_name, location, function.name)
get_req = (
self.function_client.projects().locations().functions().get(name=full_func_name)
)
func_details = get_req.execute()
invoke_url = func_details["httpsTrigger"]["url"]
self.logging.info(f"Function {function.name} - HTTP trigger ready at {invoke_url}")
trigger = HTTPTrigger(invoke_url)
else:
raise RuntimeError("Not supported!")
trigger.logging_handlers = self.logging_handlers
function.add_trigger(trigger)
self.cache_client.update_function(function)
return trigger
[docs]
def cached_function(self, function: Function) -> None:
"""Configure a cached function instance for use.
Sets up library triggers for functions loaded from cache, ensuring
they have the proper deployment client and logging configuration.
Args:
function: Cached function instance to configure
"""
from sebs.faas.function import Trigger
from sebs.gcp.triggers import LibraryTrigger
for trigger in function.triggers(Trigger.TriggerType.LIBRARY):
gcp_trigger = cast(LibraryTrigger, trigger)
gcp_trigger.logging_handlers = self.logging_handlers
gcp_trigger.deployment_client = self
[docs]
def update_function(
self,
function: Function,
code_package: Benchmark,
container_deployment: bool,
container_uri: str | None,
) -> None:
"""Update an existing Cloud Function with new code and configuration.
Uploads new code package to Cloud Storage and patches the existing function
with updated runtime, memory, timeout, and environment variables. Waits
for deployment to complete before returning.
Args:
function: Existing function instance to update
code_package: New benchmark package with updated code
container_deployment: Whether to use container deployment (unsupported)
container_uri: Container image URI (unused)
Raises:
NotImplementedError: If container_deployment is True
RuntimeError: If function update fails after maximum retries
"""
if container_deployment:
raise NotImplementedError("Container deployment is not supported in GCP")
if code_package.code_location is None:
raise RuntimeError("Code location is not set for GCP deployment")
function = cast(GCPFunction, function)
language_runtime = code_package.language_version
function_cfg = FunctionConfig.from_benchmark(code_package)
architecture = function_cfg.architecture.value
code_package_name = os.path.basename(code_package.code_location)
storage = cast(GCPStorage, self._system_resources.get_storage())
code_package_name = f"{architecture}-{code_package_name}"
bucket = function.code_bucket(code_package.benchmark, storage)
storage.upload(bucket, code_package.code_location, code_package_name)
envs = self._generate_function_envs(code_package)
self.logging.info(f"Uploaded new code package to {bucket}/{code_package_name}")
full_func_name = GCP.get_full_function_name(
self.config.project_name, self.config.region, function.name
)
req = (
self.function_client.projects()
.locations()
.functions()
.patch(
name=full_func_name,
body={
"name": full_func_name,
"entryPoint": (
"org.serverlessbench.Handler"
if code_package.language == Language.JAVA
else "handler"
),
"runtime": code_package.language_name + language_runtime.replace(".", ""),
"availableMemoryMb": function.config.memory,
"timeout": str(function.config.timeout) + "s",
"httpsTrigger": {},
"sourceArchiveUrl": "gs://" + bucket + "/" + code_package_name,
"environmentVariables": envs,
},
)
)
res = req.execute()
self.logging.info(f"Function {function.name} code update initiated")
# Patch does not return buildName, need to wait for build to start
expected_version = int(res["metadata"]["versionId"])
build_found = self._wait_for_build_and_poll(function.name)
if not build_found:
self.logging.warning(
f"No build operation found for {function.name} - "
"this is unexpected for code updates"
)
# Wait for deployment to become ACTIVE with expected version
self._wait_for_active_status(function.name, expected_version)
self.logging.info("Published new function code and configuration.")
def _update_envs(self, full_function_name: str, envs: Dict) -> Dict:
"""Merge new environment variables with existing function environment.
Retrieves current function environment variables and merges them with
new variables, with new variables taking precedence on conflicts.
Args:
full_function_name: Fully qualified function name
envs: New environment variables to add/update
Returns:
Merged environment variables dictionary
"""
get_req = (
self.function_client.projects().locations().functions().get(name=full_function_name)
)
response = get_req.execute()
# preserve old variables while adding new ones.
# but for conflict, we select the new one
if "environmentVariables" in response:
envs = {**response["environmentVariables"], **envs}
return envs
def _generate_function_envs(self, code_package: Benchmark) -> Dict:
"""Generate environment variables for function based on benchmark requirements.
Creates environment variables needed by the benchmark, such as NoSQL
database connection information.
Args:
code_package: Benchmark package with module requirements
Returns:
Dictionary of environment variables for the function
"""
envs = {}
if code_package.uses_nosql:
db = (
cast(GCPSystemResources, self._system_resources)
.get_nosql_storage()
.benchmark_database(code_package.benchmark)
)
envs["NOSQL_STORAGE_DATABASE"] = db
return envs
[docs]
def update_function_configuration(
self, function: Function, code_package: Benchmark, env_variables: Dict = {}
) -> int:
"""Update function configuration including memory, timeout, and environment.
Updates the Cloud Function's memory allocation, timeout, and environment
variables without changing the code. Waits for deployment to complete.
Args:
function: Function instance to update
code_package: Benchmark package with configuration requirements
env_variables: Additional environment variables to set
Returns:
Version ID of the updated function
Raises:
RuntimeError: If configuration update fails after maximum retries
"""
assert code_package.has_input_processed
function = cast(GCPFunction, function)
full_func_name = GCP.get_full_function_name(
self.config.project_name, self.config.region, function.name
)
envs = self._generate_function_envs(code_package)
envs = {**envs, **env_variables}
# GCP might overwrite existing variables
# If we modify them, we need to first read existing ones and append.
if len(envs) > 0:
envs = self._update_envs(full_func_name, envs)
if len(envs) > 0:
req = (
self.function_client.projects()
.locations()
.functions()
.patch(
name=full_func_name,
updateMask="availableMemoryMb,timeout,environmentVariables",
body={
"availableMemoryMb": function.config.memory,
"timeout": str(function.config.timeout) + "s",
"environmentVariables": envs,
},
)
)
else:
req = (
self.function_client.projects()
.locations()
.functions()
.patch(
name=full_func_name,
updateMask="availableMemoryMb,timeout",
body={
"availableMemoryMb": function.config.memory,
"timeout": str(function.config.timeout) + "s",
},
)
)
res = req.execute()
expected_version = int(res["metadata"]["versionId"])
self.logging.info(f"Function {function.name} configuration update initiated")
# Wait for deployment to become ACTIVE with expected version
# Configuration updates don't trigger builds but still need deployment time
current_version = self._wait_for_active_status(function.name, expected_version, timeout=60)
self.logging.info("Published new function configuration.")
return current_version
[docs]
@staticmethod
def get_full_function_name(project_name: str, location: str, func_name: str) -> str:
"""Generate the fully qualified function name for GCP API calls.
Args:
project_name: GCP project ID
location: GCP region/location
func_name: Function name
Returns:
Fully qualified function name in GCP format
"""
return f"projects/{project_name}/locations/{location}/functions/{func_name}"
[docs]
def shutdown(self) -> None:
"""Shutdown the GCP system and clean up resources.
Performs cleanup of system resources and calls parent shutdown method.
"""
cast(GCPSystemResources, self._system_resources).shutdown()
super().shutdown()
[docs]
def download_metrics(
self,
function_name: str,
start_time: int,
end_time: int,
requests: Dict,
metrics: Dict,
) -> None:
"""Download execution metrics and logs from GCP monitoring services.
Retrieves function execution times from Cloud Logging and performance
metrics from Cloud Monitoring. Processes logs to extract execution times
and collects metrics like memory usage and network egress.
Args:
function_name: Name of the function to collect metrics for
start_time: Start timestamp for metric collection (Unix timestamp)
end_time: End timestamp for metric collection (Unix timestamp)
requests: Dictionary of requests keyed by execution ID
metrics: Dictionary to populate with collected metrics
"""
from google.api_core import exceptions
from time import sleep
def wrapper(gen):
"""Generator function to extract all results from GCP API paginated responses.
If we exhaust resource, we sleep 30 seconds before a retry.
Args:
gen: generator of HTTP responses
Yields:
each HTTP response
"""
while True:
try:
yield next(gen)
except StopIteration:
break
except exceptions.ResourceExhausted:
self.logging.info("Google Cloud resources exhausted, sleeping 30s")
sleep(30)
"""
Use GCP's logging system to find execution time of each function invocation.
There shouldn't be problem of waiting for complete results,
since logs appear very quickly here.
"""
import google.cloud.logging as gcp_logging
logging_client = gcp_logging.Client()
logger = logging_client.logger("cloudfunctions.googleapis.com%2Fcloud-functions")
"""
GCP accepts only single date format: 'YYYY-MM-DDTHH:MM:SSZ'.
Thus, we first convert timestamp to UTC timezone.
Then, we generate correct format.
Add 1 second to end time to ensure that removing
milliseconds doesn't affect query.
"""
timestamps = []
for timestamp in [start_time, end_time + 1]:
utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc)
timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ"))
invocations = logger.list_entries(
filter_=(
f'resource.labels.function_name = "{function_name}" '
f'timestamp >= "{timestamps[0]}" '
f'timestamp <= "{timestamps[1]}"'
),
page_size=1000,
)
invocations_processed = 0
if hasattr(invocations, "pages"):
pages = list(wrapper(invocations.pages))
else:
pages = [list(wrapper(invocations))]
entries = 0
for page in pages: # invocations.pages:
for invoc in page:
entries += 1
if "execution took" in invoc.payload:
execution_id = invoc.labels["execution_id"]
# might happen that we get invocation from another experiment
if execution_id not in requests:
continue
# find number of miliseconds
regex_result = re.search(r"\d+ ms", invoc.payload)
assert regex_result
exec_time = regex_result.group().split()[0]
# convert into microseconds
requests[execution_id].provider_times.execution = int(exec_time) * 1000
invocations_processed += 1
self.logging.info(
f"GCP: Received {entries} entries, found time metrics for {invocations_processed} "
f"out of {len(requests.keys())} invocations."
)
"""
Use metrics to find estimated values for maximum memory used, active instances
and network traffic.
https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudfunctions
"""
# Set expected metrics here
available_metrics = ["execution_times", "user_memory_bytes", "network_egress"]
client = monitoring_v3.MetricServiceClient()
project_name = client.common_project_path(self.config.project_name)
end_time_nanos, end_time_seconds = math.modf(end_time)
start_time_nanos, start_time_seconds = math.modf(start_time)
interval = monitoring_v3.TimeInterval(
{
"end_time": {"seconds": int(end_time_seconds) + 60},
"start_time": {"seconds": int(start_time_seconds)},
}
)
for metric in available_metrics:
metrics[metric] = []
list_request = monitoring_v3.ListTimeSeriesRequest(
name=project_name,
filter='metric.type = "cloudfunctions.googleapis.com/function/{}"'.format(metric),
interval=interval,
)
results = client.list_time_series(list_request)
for result in results:
if result.resource.labels.get("function_name") == function_name:
for point in result.points:
metrics[metric] += [
{
"mean_value": point.value.distribution_value.mean,
"executions_count": point.value.distribution_value.count,
}
]
def _enforce_cold_start(self, function: Function, code_package: Benchmark) -> int:
"""Force a cold start by updating function configuration.
Triggers a cold start by updating the function's environment variables
with a unique counter value, forcing GCP to create a new instance.
Args:
function: Function instance to enforce cold start on
code_package: Benchmark package for configuration
Returns:
Version ID of the updated function
"""
self.cold_start_counter += 1
new_version = self.update_function_configuration(
function, code_package, {"cold_start": str(self.cold_start_counter)}
)
return new_version
[docs]
def enforce_cold_start(self, functions: List[Function], code_package: Benchmark) -> None:
"""Enforce cold starts for multiple functions simultaneously.
Updates all provided functions to force cold starts and waits for
all deployments to complete before returning.
Args:
functions: List of functions to enforce cold starts on
code_package: Benchmark package for configuration
"""
new_versions = []
for func in functions:
new_versions.append((self._enforce_cold_start(func, code_package), func))
self.cold_start_counter -= 1
# verify deployment
undeployed_functions = []
deployment_done = False
while not deployment_done:
for versionId, func in new_versions:
is_deployed, last_version = self.is_deployed(func.name, versionId)
if not is_deployed:
undeployed_functions.append((versionId, func))
deployed = len(new_versions) - len(undeployed_functions)
self.logging.info(f"Redeployed {deployed} out of {len(new_versions)}")
if deployed == len(new_versions):
deployment_done = True
break
time.sleep(5)
new_versions = undeployed_functions
undeployed_functions = []
self.cold_start_counter += 1
[docs]
def get_functions(self, code_package: Benchmark, function_names: List[str]) -> List["Function"]:
"""Retrieve multiple function instances and ensure they are deployed.
Gets function instances for the provided names and waits for all
functions to be in ACTIVE deployment state.
Args:
code_package: Benchmark package for function creation
function_names: List of function names to retrieve
Returns:
List of deployed function instances
"""
functions: List["Function"] = []
undeployed_functions_before = []
for func_name in function_names:
func = self.get_function(code_package, func_name)
functions.append(func)
undeployed_functions_before.append(func)
# verify deployment
undeployed_functions = []
deployment_done = False
while not deployment_done:
for func in undeployed_functions_before:
is_deployed, last_version = self.is_deployed(func.name)
if not is_deployed:
undeployed_functions.append(func)
deployed = len(undeployed_functions_before) - len(undeployed_functions)
self.logging.info(f"Deployed {deployed} out of {len(undeployed_functions_before)}")
if deployed == len(undeployed_functions_before):
deployment_done = True
break
time.sleep(5)
undeployed_functions_before = undeployed_functions
undeployed_functions = []
self.logging.info(f"Waiting on {undeployed_functions_before}")
return functions
[docs]
def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]:
"""Check if a function is deployed and optionally verify its version.
Args:
func_name: Name of the function to check
versionId: Optional specific version ID to verify (-1 to check any)
Returns:
Tuple of (is_deployed, current_version_id)
"""
name = GCP.get_full_function_name(self.config.project_name, self.config.region, func_name)
function_client = self.get_function_client()
status_req = function_client.projects().locations().functions().get(name=name)
status_res = status_req.execute()
if versionId == -1:
return (status_res["status"] == "ACTIVE", status_res["versionId"])
else:
return (status_res["versionId"] == versionId, status_res["versionId"])
[docs]
def deployment_version(self, func: Function) -> int:
"""Get the current deployment version ID of a function.
Args:
func: Function instance to check
Returns:
Current version ID of the function
"""
name = GCP.get_full_function_name(self.config.project_name, self.config.region, func.name)
function_client = self.get_function_client()
status_req = function_client.projects().locations().functions().get(name=name)
status_res = status_req.execute()
return int(status_res["versionId"])
[docs]
@staticmethod
def helper_zip(base_directory: str, path: str, archive: zipfile.ZipFile) -> None:
"""Recursively add files and directories to a zip archive.
Helper method for recursive_zip that handles directory traversal
and adds files with relative paths to the archive.
Args:
base_directory: Base directory path for relative path calculation
path: Current path being processed (file or directory)
archive: ZipFile object to add files to
"""
paths = os.listdir(path)
for p in paths:
directory = os.path.join(path, p)
if os.path.isdir(directory):
GCP.helper_zip(base_directory, directory, archive)
else:
if directory != archive.filename: # prevent form including itself
archive.write(directory, os.path.relpath(directory, base_directory))
[docs]
@staticmethod
def recursive_zip(directory: str, archname: str) -> bool:
"""Create a zip archive of a directory with relative paths.
Creates a compressed zip archive of the specified directory, preserving
the relative directory structure. Uses maximum compression level.
Args:
directory: Absolute path to the directory to be zipped
archname: Path where the zip file should be created
Returns:
True if archiving was successful
"""
archive = zipfile.ZipFile(archname, "w", zipfile.ZIP_DEFLATED, compresslevel=9)
if os.path.isdir(directory):
GCP.helper_zip(directory, directory, archive)
else:
# if the passed directory is actually a file we just add the file to the zip archive
_, name = os.path.split(directory)
archive.write(directory, name)
archive.close()
return True