Source code for sebs.azure.azure

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Azure serverless platform implementation for SeBS benchmarking.

This module provides the Azure implementation of the SeBS serverless
benchmarking system. It handles Azure Functions deployment, resource
management, code packaging, and benchmark execution on Microsoft Azure.

Key features:
    - Azure Functions deployment and management
    - Azure Storage integration for code and data
    - CosmosDB support for NoSQL benchmarks
    - HTTP trigger configuration and invocation
    - Performance metrics collection via Application Insights
    - Resource lifecycle management

The main class Azure extends the base System class to provide Azure-specific
functionality for serverless function benchmarking.

Example:
    Basic usage for Azure benchmarking::

        from sebs.azure.azure import Azure
        from sebs.azure.config import AzureConfig

        # Initialize Azure system with configuration
        azure_system = Azure(sebs_config, azure_config, cache, docker_client, handlers)
        azure_system.initialize()

        # Deploy and benchmark functions
        function = azure_system.create_function(code_package, func_name, False, "")
        result = function.invoke(payload)
"""

import datetime
import json
import random
import re
import os
import shutil
import time
import uuid
from typing import cast, Dict, List, Optional, Set, Tuple, Type  # noqa

import docker

from sebs.azure.blob_storage import BlobStorage
from sebs.azure.cli import AzureCLI
from sebs.azure.cosmosdb import CosmosDB
from sebs.azure.function import AzureFunction
from sebs.azure.config import AzureConfig, AzureResources
from sebs.azure.system_resources import AzureSystemResources
from sebs.azure.triggers import AzureTrigger, HTTPTrigger
from sebs.faas.function import Trigger
from sebs.benchmark import Benchmark
from sebs.cache import Cache
from sebs.config import SeBSConfig
from sebs.experiments.config import SystemVariant
from sebs.utils import LoggingHandlers, execute
from sebs.faas.function import Function, FunctionConfig, ExecutionResult
from sebs.faas.system import System
from sebs.faas.config import Resources
from sebs.sebs_types import Language


[docs] class Azure(System): """Azure serverless platform implementation. This class implements the Azure-specific functionality for the SeBS benchmarking suite. It handles Azure Functions deployment, resource management, and benchmark execution on Microsoft Azure platform. Attributes: logs_client: Azure logs client (currently unused) storage: BlobStorage instance for Azure Blob Storage operations cached: Flag indicating if resources are cached _config: Azure configuration containing credentials and resources AZURE_RUNTIMES: Mapping of language names to Azure runtime identifiers """ logs_client = None storage: BlobStorage cached: bool = False _config: AzureConfig # runtime mapping AZURE_RUNTIMES = {"python": "python", "nodejs": "node", "java": "java"}
[docs] @staticmethod def name() -> str: """Get the platform name. Returns: Platform name 'azure'. """ return "azure"
@staticmethod def _normalize_runtime_version(language: str, version: str) -> str: """ Azure Functions Java expects versions with a minor component (e.g. 17.0 instead of 17). Other languages can keep the version as-is. """ if language == "java" and re.match(r"^\d+$", str(version)): return f"{version}.0" return version @property def config(self) -> AzureConfig: """Get Azure configuration. Returns: Azure configuration containing credentials and resources. """ return self._config
[docs] @staticmethod def function_type() -> Type[Function]: """Get the function type for Azure. Returns: AzureFunction class type. """ return AzureFunction
@property def cli_instance(self) -> AzureCLI: """Get Azure CLI instance. Returns: Azure CLI instance for executing Azure commands. """ return cast(AzureSystemResources, self._system_resources).cli_instance def __init__( self, sebs_config: SeBSConfig, config: AzureConfig, cache_client: Cache, docker_client: docker.client.DockerClient, logger_handlers: LoggingHandlers, ) -> None: """Initialize Azure system. Args: sebs_config: SeBS configuration settings config: Azure-specific configuration cache_client: Cache for storing function and resource data docker_client: Docker client for container operations logger_handlers: Logging handlers for output management """ super().__init__( sebs_config, cache_client, docker_client, AzureSystemResources(sebs_config, config, cache_client, docker_client, logger_handlers), ) self.logging_handlers = logger_handlers self._config = config
[docs] def initialize( self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None, quiet: bool = False, ) -> None: """Initialize Azure system and start CLI container. Initializes Azure resources and allocates shared resources like data storage account. Starts the Docker container with Azure CLI tools. Args: config: Additional configuration parameters resource_prefix: Optional prefix for resource naming """ self.initialize_resources(select_prefix=resource_prefix, quiet=quiet) self.allocate_shared_resource()
[docs] def shutdown(self) -> None: """Shutdown Azure system and cleanup resources. Stops the Azure CLI container and performs cleanup of system resources. """ cast(AzureSystemResources, self._system_resources).shutdown() super().shutdown()
[docs] def find_deployments(self) -> List[str]: """Find existing SeBS deployments by scanning resource groups. Looks for Azure resource groups matching the SeBS naming pattern - sebs_resource_group_(.*) - to identify existing deployments that can be reused. Returns: List of deployment identifiers found in resource groups. """ resource_groups = self.config.resources.list_resource_groups(self.cli_instance) deployments = [] for group in resource_groups: # The benchmarks bucket must exist in every deployment. deployment_search = re.match("sebs_resource_group_(.*)", group) if deployment_search: deployments.append(deployment_search.group(1)) return deployments
[docs] def allocate_shared_resource(self) -> None: """Allocate shared data storage account. Creates or retrieves the shared data storage account used for benchmark input/output data. This allows multiple deployment clients to share the same storage, simplifying regression testing. """ self.config.resources.data_storage_account(self.cli_instance)
[docs] def package_code( self, directory: str, language: Language, language_version: str, architecture: str, benchmark: str, is_cached: bool, ) -> Tuple[str, float]: """Package function code for Azure Functions deployment. Creates the proper directory structure and configuration files required for Azure Functions deployment. The structure includes: - handler/ directory with source files and Azure wrappers - function.json with trigger and binding configuration - host.json with runtime configuration - requirements.txt or package.json with dependencies Args: directory: Directory containing the function code language: Programming language (python, nodejs) language_version: Language runtime version architecture: Target architecture (currently unused) benchmark: Name of the benchmark is_cached: Whether the package is from cache system_variant: Selected deployment variant Returns: Tuple of (directory_path, code_size_bytes, container_uri) """ # In previous step we ran a Docker container which installed packages # Python packages are in .python_packages because this is expected by Azure EXEC_FILES = { Language.PYTHON: "handler.py", Language.NODEJS: "handler.js", Language.JAVA: "../lib/function.jar", } CONFIG_FILES = { Language.PYTHON: ["requirements.txt", ".python_packages"], Language.NODEJS: ["package.json", "node_modules"], Language.JAVA: ["lib", "src", "pom.xml", "target", ".mvn", "mvnw", "mvnw.cmd"], } package_config = CONFIG_FILES[language] handler_dir = os.path.join(directory, "handler") os.makedirs(handler_dir) # For Java, create lib directory for JARs and exclude build artifacts if language == Language.JAVA: lib_dir = os.path.join(directory, "lib") os.makedirs(lib_dir, exist_ok=True) # Move function.jar to lib directory if os.path.exists(os.path.join(directory, "function.jar")): shutil.move( os.path.join(directory, "function.jar"), os.path.join(lib_dir, "function.jar") ) # move all files to 'handler' except package config for f in os.listdir(directory): if f not in package_config: source_file = os.path.join(directory, f) shutil.move(source_file, handler_dir) # For Java, clean up build artifacts that we don't want to deploy if language == Language.JAVA: for artifact in ["src", "pom.xml", "target", ".mvn", "mvnw", "mvnw.cmd"]: artifact_path = os.path.join(directory, artifact) if os.path.exists(artifact_path): if os.path.isdir(artifact_path): shutil.rmtree(artifact_path) else: os.remove(artifact_path) # generate function.json # TODO: extension to other triggers than HTTP if language == Language.JAVA: # Java Azure Functions - For annotation-based functions, function.json # should include scriptFile and entryPoint # The @FunctionName annotation determines the function name default_function_json = { "scriptFile": "../lib/function.jar", "entryPoint": "org.serverlessbench.Handler.handleRequest", "bindings": [ { "type": "httpTrigger", "direction": "in", "name": "req", "methods": ["get", "post"], "authLevel": "anonymous", }, {"type": "http", "direction": "out", "name": "$return"}, ], } else: default_function_json = { "scriptFile": EXEC_FILES[language], "bindings": [ { "authLevel": "anonymous", "type": "httpTrigger", "direction": "in", "name": "req", "methods": ["get", "post"], }, {"type": "http", "direction": "out", "name": "$return"}, ], } json_out = os.path.join(directory, "handler", "function.json") json.dump(default_function_json, open(json_out, "w"), indent=2) # generate host.json default_host_json = { "version": "2.0", "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[4.0.0, 5.0.0)", }, } json.dump(default_host_json, open(os.path.join(directory, "host.json"), "w"), indent=2) code_size = Benchmark.directory_size(directory) execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True, cwd=directory) return directory, code_size
def _execute_cli_with_retry( self, cmd: str, max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 32.0, retryable_errors: Optional[Set[str]] = None, ) -> bytes: """Execute Azure CLI command with retry logic for transient errors. Handles transient CLI errors by retrying with exponential backoff and jitter. Specific error patterns can be configured for retry. Args: cmd: Azure CLI command to execute max_retries: Maximum number of retry attempts (default: 5) base_delay: Base delay in seconds for exponential backoff (default: 1.0) max_delay: Maximum delay between retries in seconds (default: 32.0) retryable_errors: Set of error patterns to trigger retries (default: NotFound, TooManyRequests, find app with name) Returns: Command output as bytes Raises: RuntimeError: If the command fails with a non-retryable error or after exhausting all retry attempts """ if retryable_errors is None: retryable_errors = { "NotFound", "TooManyRequests", "find app with name", "ServiceUnavailable", "InternalServerError", } attempt = 0 last_error = None while attempt <= max_retries: try: result = self.cli_instance.execute(cmd) if attempt > 0: self.logging.info(f"CLI command succeeded after {attempt} retries") return result except RuntimeError as e: error_message = str(e) last_error = e # Check if error is retryable is_retryable = any(pattern in error_message for pattern in retryable_errors) if not is_retryable: raise # Check if we have retries left if attempt >= max_retries: self.logging.error( f"Max retries ({max_retries}) exhausted for CLI command, " f"failing with error: {error_message}" ) raise # Calculate delay with exponential backoff and jitter delay = min(base_delay * (2**attempt) + random.uniform(0, 1), max_delay) if attempt == 0: self.logging.warning( f"Transient CLI error, retrying (attempt {attempt + 1}/{max_retries}): " f"{error_message[:100]}" ) else: self.logging.info( f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s backoff" ) time.sleep(delay) attempt += 1 # This should not be reached, but just in case if last_error: raise last_error raise RuntimeError("Unexpected state in retry logic")
[docs] def publish_function( self, function: Function, code_package: Benchmark, container_dest: str, repeat_on_failure: bool = False, ) -> str: """Publish function code to Azure Functions. Deploys the packaged function code to Azure Functions using the Azure Functions CLI tools. Handles retries with exponential backoff and jitter for transient errors. This is useful to handle delays in Azure cache updates and service availability issues. Args: function: Function instance to publish code_package: Benchmark code package to deploy container_dest: Destination path in the CLI container repeat_on_failure: Whether to retry on failure Returns: URL for invoking the published function. Raises: RuntimeError: If function publication fails or URL cannot be found. """ self.logging.info("Attempting publish of function {}".format(function.name)) publish_cmd = ( f"bash -c 'cd {container_dest} " "&& func azure functionapp publish {} --{} --no-build'".format( function.name, self.AZURE_RUNTIMES[code_package.language_name] ) ) # Execute publish command with retry if requested if repeat_on_failure: ret = self._execute_cli_with_retry(publish_cmd) else: ret = self.cli_instance.execute(publish_cmd) self.logging.debug(f"Function app publish of {function.name}, ret {ret.decode('utf-8')}") # Extract URL from publish output url = "" ret_str = ret.decode("utf-8") for line in ret_str.split("\n"): if "Invoke url:" in line: url = line.split("Invoke url:")[1].strip() break # Fallback: query function details if URL not found in publish output if url == "": self.logging.warning( "Couldn't find function URL in the publish output: {}".format(ret.decode("utf-8")) ) self.logging.info("Querying function details to retrieve URL") resource_group = self.config.resources.resource_group(self.cli_instance) query_cmd = ( "az functionapp function show --function-name handler " f"--name {function.name} --resource-group {resource_group}" ) # Use retry for the query as well if repeat_on_failure is enabled if repeat_on_failure: ret = self._execute_cli_with_retry(query_cmd) else: ret = self.cli_instance.execute(query_cmd) self.logging.debug(f"Function query for {function.name}! Return {ret.decode('utf-8')}") try: url = json.loads(ret.decode("utf-8"))["invokeUrlTemplate"] except json.decoder.JSONDecodeError: raise RuntimeError(f"Couldn't find the function URL in {ret.decode('utf-8')}") return url
[docs] def update_function( self, function: Function, code_package: Benchmark, system_variant: SystemVariant, container_uri: str | None, ) -> None: """Update existing Azure Function with new code. Updates an existing Azure Function with new code package, including environment variables and function configuration. It also ensures an HTTP trigger is correctly associated with the function's URL. Args: function: Function instance to update code_package: New benchmark code package system_variant: Selected deployment variant container_uri: Container URI (unused for Azure) Raises: NotImplementedError: If container deployment is requested. """ if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Azure") assert code_package.has_input_processed # Update environment variables first since it has a non-deterministic # processing time. self.update_envs(function, code_package) # Mount code package in Docker instance container_dest = self._mount_function_code(code_package) function_url = self.publish_function(function, code_package, container_dest, True) # Avoid duplication of HTTP trigger found_trigger = False for trigger in function.triggers_all(): if isinstance(trigger, HTTPTrigger): found_trigger = True trigger.url = function_url break if not found_trigger: trigger = HTTPTrigger( function_url, self.config.resources.data_storage_account(self.cli_instance) ) trigger.logging_handlers = self.logging_handlers function.add_trigger(trigger)
[docs] def update_envs( self, function: Function, code_package: Benchmark, env_variables: dict = {} ) -> None: """Update environment variables for Azure Function. Sets up environment variables required for benchmark execution, including storage connection strings and NoSQL database credentials. Preserves existing environment variables while adding new ones. Args: function: Function instance to update code_package: Benchmark code package with requirements env_variables: Additional environment variables to set Raises: RuntimeError: If environment variable operations fail. """ envs = env_variables.copy() if code_package.uses_nosql: nosql_storage = cast(CosmosDB, self._system_resources.get_nosql_storage()) # If we use NoSQL, then the handle must be allocated _, url, creds = nosql_storage.credentials() db = nosql_storage.benchmark_database(code_package.benchmark) envs["NOSQL_STORAGE_DATABASE"] = db envs["NOSQL_STORAGE_URL"] = url envs["NOSQL_STORAGE_CREDS"] = creds for original_name, actual_name in nosql_storage.get_tables( code_package.benchmark ).items(): envs[f"NOSQL_STORAGE_TABLE_{original_name}"] = actual_name if code_package.uses_storage: envs["STORAGE_CONNECTION_STRING"] = self.config.resources.data_storage_account( self.cli_instance ).connection_string resource_group = self.config.resources.resource_group(self.cli_instance) # Retrieve existing environment variables to prevent accidental overwrite if len(envs) > 0: try: self.logging.info( f"Retrieving existing environment variables for function {function.name}" ) # First read existing properties response = self.cli_instance.execute( f"az functionapp config appsettings list --name {function.name} " f" --resource-group {resource_group} " ) old_envs = json.loads(response.decode()) # Find custom envs and copy them - unless they are overwritten now for env in old_envs: # Ignore vars set automatically by Azure found = False for prefix in ["FUNCTIONS_", "WEBSITE_", "APPINSIGHTS_", "Azure"]: if env["name"].startswith(prefix): found = True break # do not overwrite new value if not found and env["name"] not in envs: envs[env["name"]] = env["value"] except RuntimeError as e: self.logging.error("Failed to retrieve environment variables!") self.logging.error(e) raise e if len(envs) > 0: try: env_string = "" for k, v in envs.items(): env_string += f" {k}={v}" self.logging.info(f"Exporting environment variables for function {function.name}") self.cli_instance.execute( f"az functionapp config appsettings set --name {function.name} " f" --resource-group {resource_group} " f" --settings {env_string} " ) # if we don't do that, next invocation might still see old values # Disabled since we swapped the order - we first update envs, then we publish. # self.logging.info( # "Sleeping for 10 seconds - Azure needs more time to propagate changes. " # "Otherwise, functions might not see new variables and fail unexpectedly." # ) except RuntimeError as e: self.logging.error("Failed to set environment variable!") self.logging.error(e) raise e
[docs] def update_function_configuration(self, function: Function, code_package: Benchmark) -> None: """Update Azure Function configuration. Currently not implemented for Azure Functions as memory and timeout configuration is handled at the consumption plan level. Args: function: Function instance to configure code_package: Benchmark code package with requirements """ # FIXME: this does nothing currently - we don't specify timeout self.logging.warning( "Updating function's memory and timeout configuration is not supported." )
[docs] def delete_function(self, func_name: str, function: Dict) -> None: """Delete an Azure Function App and its associated storage account. Args: func_name: Name of the Azure Function App to delete """ self.logging.info(f"Deleting function app {func_name}") """ For Azure, we need to retrieve the associated storage account. Each function has its own storage account. """ function_obj = cast(AzureFunction, self.function_type().deserialize(function)) try: self.cli_instance.execute( f"az functionapp delete --name {func_name} " f"--resource-group {self.config.resources.resource_group(self.cli_instance)}" ) self.logging.info(f"Function app {func_name} deleted successfully") except RuntimeError as e: self.logging.error(f"Failed to delete the function app {func_name}!") raise e self.logging.info( f"Deleting storage account {function_obj.function_storage.account_name} " f"associated with function {func_name}" ) self.config.resources.delete_storage_account( self.cli_instance, function_obj.function_storage )
def _mount_function_code(self, code_package: Benchmark) -> str: """Mount function code package in Azure CLI container. Uploads the function code package to a temporary location in the Azure CLI container for deployment operations. Args: code_package: Benchmark code package to mount Returns: Path to mounted code in the CLI container. """ dest = os.path.join("/mnt", "function", uuid.uuid4().hex) if code_package.code_location is None: raise RuntimeError("Code location is not set") self.cli_instance.upload_package(code_package.code_location, dest) return dest
[docs] def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None ) -> str: """Generate default function name for Azure. Creates a globally unique function name based on resource ID, benchmark name, language, and version. Function app names must be globally unique across all of Azure. Args: code_package: Benchmark code package resources: Optional resources (unused) Returns: Globally unique function name for Azure. """ func_name = ( "sebs-{}-{}-{}-{}".format( self.config.resources.resources_id, code_package.benchmark, code_package.language_name, code_package.language_version, ) .replace(".", "-") .replace("_", "-") ) return func_name
[docs] def create_function( self, code_package: Benchmark, func_name: str, system_variant: SystemVariant, container_uri: str | None, ) -> AzureFunction: """Create new Azure Function. Creates a new Azure Function App and deploys the provided code package. Handles function app creation, storage account allocation, and initial deployment with proper configuration. Args: code_package: Benchmark code package to deploy func_name: Name for the Azure Function App system_variant: Selected deployment variant container_uri: Container URI (unused for Azure) Returns: AzureFunction instance representing the created function. Raises: NotImplementedError: If container deployment is requested. RuntimeError: If function creation fails. """ if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Azure") language = code_package.language_name language_runtime = self._normalize_runtime_version(language, code_package.language_version) # ensure string form is passed to Azure CLI language_runtime = str(language_runtime) if language == "java" and "." not in language_runtime: language_runtime = f"{language_runtime}.0" resource_group = self.config.resources.resource_group(self.cli_instance) region = self.config.region function_cfg = FunctionConfig.from_benchmark(code_package) config = { "resource_group": resource_group, "func_name": func_name, "region": region, "runtime": self.AZURE_RUNTIMES[language], "runtime_version": language_runtime, } # check if function does not exist # no API to verify existence try: ret = self.cli_instance.execute( ( " az functionapp config appsettings list " " --resource-group {resource_group} " " --name {func_name} " ).format(**config) ) for setting in json.loads(ret.decode()): if setting["name"] == "AzureWebJobsStorage": connection_string = setting["value"] elems = [z for y in connection_string.split(";") for z in y.split("=")] account_name = elems[elems.index("AccountName") + 1] function_storage_account = AzureResources.Storage.from_cache( account_name, connection_string ) self.logging.info("Azure: Selected {} function app".format(func_name)) except RuntimeError: function_storage_account = self.config.resources.add_storage_account(self.cli_instance) config["storage_account"] = function_storage_account.account_name # FIXME: only Linux type is supported while True: try: # create function app ret = self.cli_instance.execute( ( " az functionapp create --resource-group {resource_group} " " --os-type Linux --consumption-plan-location {region} " " --runtime {runtime} --runtime-version {runtime_version} " " --name {func_name} --storage-account {storage_account}" " --functions-version 4 " ).format(**config) ) self.logging.debug(f"Function app {func_name}, ret {ret.decode('utf-8')}") self.logging.info("Azure: Created function app {}".format(func_name)) break except RuntimeError as e: # Azure does not allow some concurrent operations if "another operation is in progress" in str(e): self.logging.info( f"Repeat {func_name} creation, another operation in progress" ) # Rethrow -> another error else: raise e from None function = AzureFunction( name=func_name, benchmark=code_package.benchmark, code_hash=code_package.hash, function_storage=function_storage_account, cfg=function_cfg, ) # update existing function app self.update_function(function, code_package, system_variant, container_uri) self.cache_client.add_function( deployment_name=self.name(), language_name=language, code_package=code_package, function=function, ) return function
[docs] def cached_function(self, function: Function) -> None: """Initialize cached function with current configuration. Sets up a cached function with current data storage account and logging handlers for all triggers. Args: function: Function instance loaded from cache """ data_storage_account = self.config.resources.data_storage_account(self.cli_instance) for trigger in function.triggers_all(): azure_trigger = cast(AzureTrigger, trigger) azure_trigger.logging_handlers = self.logging_handlers azure_trigger.data_storage_account = data_storage_account
[docs] def download_metrics( self, function_name: str, start_time: int, end_time: int, requests: Dict[str, ExecutionResult], metrics: Dict[str, dict], ) -> None: """Download execution metrics from Azure Application Insights. Retrieves performance metrics for function executions from Azure Application Insights and updates the execution results with provider-specific timing information. Args: function_name: Name of the Azure Function start_time: Start timestamp for metrics collection end_time: End timestamp for metrics collection requests: Dictionary of execution results to update metrics: Additional metrics dictionary (unused) """ self.cli_instance.install_insights() resource_group = self.config.resources.resource_group(self.cli_instance) # Avoid warnings in the next step self.cli_instance.execute( "az feature register --name AIWorkspacePreview " "--namespace microsoft.insights" ) app_id_query = self.cli_instance.execute( ("az monitor app-insights component show " "--app {} --resource-group {}").format( function_name, resource_group ) ).decode("utf-8") application_id = json.loads(app_id_query)["appId"] # Azure CLI requires date in the following format # Format: date (yyyy-mm-dd) time (hh:mm:ss.xxxxx) timezone (+/-hh:mm) # Include microseconds time to make sure we're not affected by # miliseconds precision. start_time_str = datetime.datetime.fromtimestamp(start_time).strftime( "%Y-%m-%d %H:%M:%S.%f" ) end_time_str = datetime.datetime.fromtimestamp(end_time + 1).strftime("%Y-%m-%d %H:%M:%S") from tzlocal import get_localzone timezone_str = datetime.datetime.now(get_localzone()).strftime("%z") query = ( "requests | project timestamp, operation_Name, success, " "resultCode, duration, cloud_RoleName, " "invocationId=customDimensions['InvocationId'], " "functionTime=customDimensions['FunctionExecutionTimeMs']" ) invocations_processed: Set[str] = set() invocations_to_process = set(requests.keys()) # while len(invocations_processed) < len(requests.keys()): self.logging.info("Azure: Running App Insights query.") ret_bytes = self.cli_instance.execute( ( 'az monitor app-insights query --app {} --analytics-query "{}" ' "--start-time {} {} --end-time {} {}" ).format( application_id, query, start_time_str, timezone_str, end_time_str, timezone_str, ) ) ret_str = ret_bytes.decode("utf-8") json_data = json.loads(ret_str) table_data = json_data["tables"][0] # time is last, invocation is second to last for request in table_data["rows"]: invocation_id = request[-2] # might happen that we get invocation from another experiment if invocation_id not in requests: continue # duration = request[4] func_exec_time = request[-1] invocations_processed.add(invocation_id) requests[invocation_id].provider_times.execution = int(float(func_exec_time) * 1000) self.logging.info( f"Azure: Found time metrics for {len(invocations_processed)} " f"out of {len(requests.keys())} invocations." ) if len(invocations_processed) < len(requests.keys()): time.sleep(5) self.logging.info(f"Missing the requests: {invocations_to_process - invocations_processed}")
# TODO: query performance counters for mem def _enforce_cold_start(self, function: Function, code_package: Benchmark) -> None: """Enforce cold start for a single function. Updates environment variable to force cold start behavior. Args: function: Function instance to update code_package: Benchmark code package """ self.update_envs(function, code_package, {"ForceColdStart": str(self.cold_start_counter)}) # FIXME: is this sufficient to enforce cold starts? # self.update_function(function, code_package, False, "")
[docs] def enforce_cold_start(self, functions: List[Function], code_package: Benchmark) -> None: """Enforce cold start for multiple functions. Forces cold start behavior for all provided functions by updating environment variables and waiting for changes to propagate: sleep is added to allow changes to propagate. Args: functions: List of functions to enforce cold start for code_package: Benchmark code package """ self.cold_start_counter += 1 for func in functions: self._enforce_cold_start(func, code_package) import time time.sleep(20)
[docs] def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: """Create trigger for Azure Function. Currently not implemented as HTTP triggers are automatically created for each function during deployment. Args: function: Function to create trigger for trigger_type: Type of trigger to create Raises: NotImplementedError: Trigger creation is not supported. """ raise NotImplementedError()