Source code for sebs.cache

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Caching system for SeBS (Serverless Benchmarking Suite).

This module provides comprehensive caching functionality for the SeBS framework,
including configuration caching, code package management, function deployment
tracking, and storage resource management.

The Cache class manages persistent storage of benchmark configurations, compiled
code packages, Docker containers, deployed functions, and cloud resource
configurations to optimize repeated benchmark executions and deployments.

This class is essential for efficient benchmarking - we avoid regenerating
cloud resources, and we do not have to keep querying them every time
we start the benchmark. This is particularly important for cloud platforms
like Azure, where queries require CLI tool running in a container and can
take long time to resolve.

Example:
    Basic cache usage:
        cache = Cache("/path/to/cache", docker_client)
        config = cache.get_benchmark_config("aws", "110.dynamic-html")
        cache.add_code_package("aws", benchmark_instance)
"""

import collections.abc
import docker
import datetime
import json
import os
import shutil
import threading
import tempfile
from typing import Any, Dict, List, Mapping, Optional, Tuple, TYPE_CHECKING  # noqa

from sebs.utils import LoggingBase, serialize

if TYPE_CHECKING:
    from sebs.benchmark import Benchmark
    from sebs.faas.function import Function


[docs] def update(d: Dict[str, Any], u: Mapping[str, Any]) -> Dict[str, Any]: """Recursively update nested dictionary with another dictionary. This function performs deep merge of two dictionaries, merging nested dictionary values rather than replacing them entirely. Args: d (Dict[str, Any]): The target dictionary to update. u (Mapping[str, Any]): The source dictionary with updates. Returns: Dict[str, Any]: The updated dictionary. """ # https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth for k, v in u.items(): if isinstance(v, collections.abc.Mapping): d[k] = update(d.get(k, {}), v) else: d[k] = v return d
[docs] def update_dict(cfg: Dict[str, Any], val: Any, keys: List[str]) -> None: """Update dictionary value at nested key path. Updates a nested dictionary by setting a value at a path specified by a list of keys. Creates intermediate dictionaries as needed. Args: cfg (Dict[str, Any]): The dictionary to update. val (Any): The value to set at the key path. keys (List[str]): List of keys forming the path to the target location. """ def map_keys(obj: Dict[str, Any], val: Any, keys: List[str]) -> Dict[str, Any]: """Helper to construct the nested dictionary structure for the update. First element of `keys` becomes the key for the current level, and the value is either the final value (if no more keys), or a result of a recursive call to map the remaining keys. Args: obj: Main dictionary. val: value to insert keys: list of nested keys Returns: [TODO:return] """ if len(keys): return {keys[0]: map_keys(obj, val, keys[1:])} else: return val update(cfg, map_keys(cfg, val, keys))
[docs] def keys_exist(obj: Dict, keys: List[Any]) -> bool: """Find if a nested object exists in a dictionary. example: {key1: {key2: {key3: value}}} for [key1, key2, key3] -> True Args: obj: dictionary keys: dynamic list of nested keys Returns: true if the nested object exists """ for key in keys: if isinstance(obj, Dict) and key in obj: obj = obj[key] else: return False return True
[docs] def keys_get(obj: Dict, keys: List[Any]) -> Any: """Find if a nested object exists in a dictionary. example: {key1: {key2: {key3: value}}} for [key1, key2, key3] -> True Args: obj: dictionary keys: dynamic list of nested keys Returns: true if the nested object exists """ current = obj for key in keys: if isinstance(current, Dict) and key in current: current = current[key] return current
[docs] class Cache(LoggingBase): """Persistent caching system for SeBS benchmark configurations and deployments. This class provides comprehensive caching functionality for SeBS benchmarks, including configuration management, code package storage, function tracking, and cloud resource management. It uses a file-based cache system with thread-safe operations. Attributes: cached_config (Dict[str, Any]): In-memory cache of cloud configurations. config_updated (bool): Flag indicating if configuration needs to be saved. cache_dir (str): Absolute path to the cache directory. ignore_functions (bool): Flag to skip function caching operations. ignore_storage (bool): Flag to skip storage resource caching. docker_client (docker.DockerClient): Docker client for container operations. """ _lock_registry_guard = threading.Lock() _lock_registry: Dict[str, threading.RLock] = {} def __init__(self, cache_dir: str, docker_client: docker.DockerClient) -> None: """Initialize the Cache with directory and Docker client. Sets up the cache directory structure and loads existing configurations. Creates the cache directory if it doesn't exist, otherwise loads existing cached configurations. Args: cache_dir (str): Path to the cache directory. docker_client (docker.DockerClient): Docker client for container operations. """ super().__init__() self.cached_config: Dict[str, Any] = {} self.config_updated: bool = False self.docker_client = docker_client self.cache_dir = os.path.abspath(cache_dir) self.ignore_functions: bool = False self.ignore_storage: bool = False self._lock = self._cache_dir_lock(self.cache_dir) if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir, exist_ok=True) else: self.load_config()
[docs] @staticmethod def typename() -> str: """Get the typename for this cache. Returns: str: The cache type name. """ return "Cache"
@classmethod def _cache_dir_lock(cls, cache_dir: str) -> threading.RLock: """Return a shared lock for all Cache instances pointing at one cache dir.""" with cls._lock_registry_guard: if cache_dir not in cls._lock_registry: cls._lock_registry[cache_dir] = threading.RLock() return cls._lock_registry[cache_dir] @staticmethod def _write_json_atomic(path: str, data: Any) -> None: """Atomically replace a JSON file after fully writing it to a temp file.""" directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=directory, prefix=".tmp-", suffix=".json") try: with os.fdopen(fd, "w") as fp: json.dump(data, fp, indent=2) fp.flush() os.fsync(fp.fileno()) os.replace(tmp_path, path) except Exception: if os.path.exists(tmp_path): os.unlink(tmp_path) raise @staticmethod def _write_serialized_atomic(path: str, data: Dict[str, Any]) -> None: """Atomically replace a JSON file using the SeBS serializer.""" directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=directory, prefix=".tmp-", suffix=".json") try: with os.fdopen(fd, "w") as fp: fp.write(serialize(data)) fp.flush() os.fsync(fp.fileno()) os.replace(tmp_path, path) except Exception: if os.path.exists(tmp_path): os.unlink(tmp_path) raise
[docs] def load_config(self) -> None: """Load cached cloud configurations from disk. Reads configuration files for all supported cloud platforms from the cache directory and loads them into memory. """ with self._lock: for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) if os.path.exists(cloud_config_file): with open(cloud_config_file, "r") as f: self.cached_config[cloud] = json.load(f)
[docs] def get_config(self, cloud: str) -> Optional[Dict[str, Any]]: """Get cached configuration for a specific cloud provider. Args: cloud (str): Cloud provider name (e.g., 'aws', 'azure', 'gcp'). Returns: Optional[Dict[str, Any]]: The cached configuration or None if not found. """ return self.cached_config[cloud] if cloud in self.cached_config else None
[docs] def update_config(self, val: Any, keys: List[str]) -> None: """Update configuration values at nested key path. Updates cached configuration by setting a value at the specified nested key path. Sets the config_updated flag to ensure changes are persisted to disk. Args: val (Any): New value to store. keys (List[str]): Array of consecutive keys for multi-level dictionary. """ with self._lock: update_dict(self.cached_config, val, keys) self.config_updated = True
[docs] def lock(self) -> None: """Acquire the cache lock for thread-safe operations.""" self._lock.acquire()
[docs] def unlock(self) -> None: """Release the cache lock.""" self._lock.release()
[docs] def shutdown(self) -> None: """Save cached configurations to disk if they were updated. Writes all updated cloud configurations back to their respective JSON files in the cache directory. """ if self.config_updated: with self._lock: for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: if cloud in self.cached_config: cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) self.logging.info("Update cached config {}".format(cloud_config_file)) self._write_json_atomic(cloud_config_file, self.cached_config[cloud])
[docs] def get_benchmark_config(self, deployment: str, benchmark: str) -> Optional[Dict[str, Any]]: """Access cached configuration of a benchmark. Args: deployment (str): Deployment platform ('aws', 'azure', 'gcp', 'openwhisk', 'local'). benchmark (str): Benchmark name (e.g., '110.dynamic-html'). Returns: Optional[Dict[str, Any]]: Benchmark configuration or None if not found. """ with self._lock: benchmark_dir = os.path.join(self.cache_dir, benchmark) if os.path.exists(benchmark_dir): config_file = os.path.join(benchmark_dir, "config.json") if os.path.exists(config_file): with open(config_file, "r") as fp: cfg = json.load(fp) return cfg[deployment] if deployment in cfg else None return None
[docs] def get_code_package( self, deployment_name: str, code_package: "Benchmark" ) -> Optional[Dict[str, Any]]: """Access cached version of benchmark code package. Args: deployment (str): Deployment platform name. code_package (Benchmark): Benchmark package. Returns: Optional[Dict[str, Any]]: Code package configuration or None if not found. """ cfg = self.get_benchmark_config(deployment_name, code_package.benchmark) base_keys, extra_keys = self.code_cache_keys(code_package) if cfg and keys_exist(cfg, [*base_keys, *extra_keys]): return keys_get(cfg, [*base_keys, *extra_keys]) else: return None
[docs] def get_container( self, deployment_name: str, code_package: "Benchmark" ) -> Optional[Dict[str, Any]]: """Access cached container configuration for a benchmark. Args: deployment (str): Deployment platform name. code_package (Benchmark): Benchmark package. Returns: Optional[Dict[str, Any]]: Container configuration or None if not found. """ cfg = self.get_benchmark_config(deployment_name, code_package.benchmark) base_keys, extra_keys = self.code_cache_keys(code_package) if cfg and keys_exist(cfg, [*base_keys, *extra_keys]): return keys_get(cfg, [*base_keys, *extra_keys]) else: return None
[docs] def invalidate_all_container_uris(self, deployment: str) -> None: """Set image-uri to None for all cached containers of a deployment. Walks all benchmark directories and clears the image-uri field for every container entry under the given deployment. This forces a re-push to the registry on next use without invalidating the rest of the cached state. This function is used primarily after cleaning up cloud resources. Args: deployment (str): Deployment platform name. """ def clear_nested_container_uris(obj: Dict[str, Any]) -> bool: """Internal method - recursively check for all image-uri. Simpler method then walking all nested variants of containers.""" modified = False if "image-uri" in obj: obj["image-uri"] = None return True for value in obj.values(): if isinstance(value, dict): modified = clear_nested_container_uris(value) or modified return modified with self._lock: if not os.path.exists(self.cache_dir): return for entry in os.listdir(self.cache_dir): config_path = os.path.join(self.cache_dir, entry, "config.json") if not os.path.exists(config_path): continue with open(config_path, "r") as fp: config = json.load(fp) dep_cfg = config.get(deployment) if dep_cfg is None: continue modified = False for lang_cfg in dep_cfg.values(): if lang_cfg is None: continue containers = lang_cfg.get("containers") if containers is None: continue modified = clear_nested_container_uris(containers) or modified if modified: self._write_json_atomic(config_path, config)
[docs] def update_container_uri( self, deployment_name: str, code_package: "Benchmark", uri: str ) -> None: """Update the image-uri for a specific cached container entry. Used when the image is cached locally, but needs to be pushed to the registry to be accessible for cloud deployment. Args: deployment_name (str): Deployment platform name. code_package (Benchmark): Benchmark package identifying the cache entry. uri (str): New image URI to store. """ with self._lock: config_path = os.path.join(self.cache_dir, code_package.benchmark, "config.json") if not os.path.exists(config_path): return with open(config_path, "r") as fp: config = json.load(fp) base_keys, extra_keys = self.code_cache_keys(code_package) keys = [deployment_name, *base_keys, *extra_keys] if not keys_exist(config, keys): return keys_get(config, keys)["image-uri"] = uri self._write_json_atomic(config_path, config)
[docs] def get_functions( self, deployment: str, benchmark: str, language: str ) -> Optional[Dict[str, Any]]: """Get cached function configurations for a benchmark. Args: deployment (str): Deployment platform name. benchmark (str): Benchmark name. language (str): Programming language. Returns: Optional[Dict[str, Any]]: Function configurations or None if not found. """ cfg = self.get_benchmark_config(deployment, benchmark) if cfg and language in cfg and not self.ignore_functions: return cfg[language]["functions"] else: return None
[docs] def get_all_functions(self, deployment: str) -> Dict[str, Any]: """Get all cached function configurations for a given deployment. Iterates all benchmarks and languages in the cache to collect every function deployed to the specified platform. Args: deployment (str): Deployment platform name Returns: Mapping of function name to function configuration, aggregated across benchmarks and languages. """ result: Dict[str, Any] = {} if not os.path.exists(self.cache_dir) or self.ignore_functions: return result with self._lock: for entry in os.listdir(self.cache_dir): config_path = os.path.join(self.cache_dir, entry, "config.json") if not os.path.exists(config_path): continue with open(config_path, "r") as fp: config = json.load(fp) dep_cfg = config.get(deployment) if dep_cfg is None: continue for lang_cfg in dep_cfg.values(): if lang_cfg is None: continue functions = lang_cfg.get("functions") if functions is not None: result.update(functions) return result
[docs] def get_storage_config(self, deployment: str, benchmark: str) -> Optional[Dict[str, Any]]: """Access cached storage configuration of a benchmark. Args: deployment (str): Deployment platform name. benchmark (str): Benchmark name. Returns: Optional[Dict[str, Any]]: Storage configuration or None if not found. """ return self._get_resource_config(deployment, benchmark, "storage")
[docs] def get_nosql_config(self, deployment: str, benchmark: str) -> Optional[Dict[str, Any]]: """Access cached NoSQL configuration of a benchmark. Args: deployment (str): Deployment platform name. benchmark (str): Benchmark name. Returns: Optional[Dict[str, Any]]: NoSQL configuration or None if not found. """ return self._get_resource_config(deployment, benchmark, "nosql")
[docs] def get_nosql_configs(self, deployment: str) -> Dict[str, Any]: """Access cached NoSQL configuration for all benchmarks. Iterates all benchmark directories in the cache and merges their NoSQL table configurations for the given deployment into a single dict. Args: deployment (str): Deployment platform name. Returns: NoSQL configurations across all benchmarks """ result: Dict[str, Any] = {} if not os.path.exists(self.cache_dir): return result with self._lock: for entry in os.listdir(self.cache_dir): config_path = os.path.join(self.cache_dir, entry, "config.json") if not os.path.exists(config_path): continue with open(config_path, "r") as fp: config = json.load(fp) dep_cfg = config.get(deployment) if dep_cfg is None: continue nosql = dep_cfg.get("nosql") if nosql is not None: result.update(nosql) return result
def _get_resource_config( self, deployment: str, benchmark: str, resource: str ) -> Optional[Dict[str, Any]]: """Helper to retrieve a specific type of resource configuration from the benchmark's cache. Args: deployment (str): Deployment platform name. benchmark (str): Benchmark name. resource (str): Resource type ('storage' or 'nosql'). Returns: Optional[Dict[str, Any]]: Resource configuration or None if not found. """ cfg = self.get_benchmark_config(deployment, benchmark) return cfg[resource] if cfg and resource in cfg and not self.ignore_storage else None
[docs] def update_storage(self, deployment: str, benchmark: str, config: Dict[str, Any]) -> None: """Update cached storage configuration for a benchmark. Args: deployment (str): Deployment platform name. benchmark (str): Benchmark name. config (Dict[str, Any]): Storage configuration to cache. """ if self.ignore_storage: return self._update_resources(deployment, benchmark, "storage", config)
[docs] def update_nosql(self, deployment: str, benchmark: str, config: Dict[str, Any]) -> None: """Update cached NoSQL configuration for a benchmark. Args: deployment (str): Deployment platform name. benchmark (str): Benchmark name. config (Dict[str, Any]): NoSQL configuration to cache. """ if self.ignore_storage: return self._update_resources(deployment, benchmark, "nosql", config)
[docs] def remove_function(self, deployment: str, benchmark: str, language: str, function_name: str): """Remove a function entry from all benchmark cache configs. Args: function_name: function for removal """ with self._lock: if not os.path.exists(self.cache_dir): return config_path = os.path.join(self.cache_dir, benchmark, "config.json") with open(config_path, "r") as fp: config = json.load(fp) if deployment not in config: return if language not in config[deployment]: return lang_cfg = config[deployment][language] if function_name not in lang_cfg["functions"]: return self.logging.info(f"Deleting function {function_name} from cache") del lang_cfg["functions"][function_name] self._write_json_atomic(config_path, config)
[docs] def remove_storage(self, deployment: str): """Remove storage config entries across all benchmarks for a deployment. Args: deployment: cloud platform name """ self._remove_resource_config(deployment, "storage")
[docs] def remove_nosql(self, deployment: str): """Remove nosql config entries across all benchmarks for a deployment. Args: deployment: cloud platform name """ self._remove_resource_config(deployment, "nosql")
def _remove_resource_config(self, deployment: str, resource: str): """Remove a resource configuration entry from all benchmark cache configs. Args: deployment: Deployment platform name. resource: Resource type ('storage' or 'nosql'). """ with self._lock: if not os.path.exists(self.cache_dir): return for entry in os.listdir(self.cache_dir): config_path = os.path.join(self.cache_dir, entry, "config.json") if not os.path.exists(config_path): continue with open(config_path, "r") as fp: config = json.load(fp) if deployment in config and resource in config[deployment]: del config[deployment][resource] self._write_json_atomic(config_path, config)
[docs] def get_config_key(self, keys: List[str]) -> Optional[Any]: """Return the value at a nested key path in the cached configuration. Does not throw an error if the key path does not exist. Args: keys: key path needed to access the config value Returns: The value at the specified key path, or None if not found. """ with self._lock: cfg = self.cached_config for key in keys[:-1]: if not isinstance(cfg, dict) or key not in cfg: return None cfg = cfg[key] if isinstance(cfg, dict): return cfg.get(keys[-1]) return None
[docs] def remove_config_key(self, keys: List[str]): """Removes a configuration entry nested within cache dictiariony. Used after deleting a specific cloud resource. Does not throw an error if the key path does not exist. Args: keys: key path needed to access the config value """ with self._lock: cfg = self.cached_config for key in keys[:-1]: if not isinstance(cfg, dict) or key not in cfg: return cfg = cfg[key] if isinstance(cfg, dict) and keys[-1] in cfg: del cfg[keys[-1]] self.config_updated = True
def _update_resources( self, deployment: str, benchmark: str, resource: str, config: Dict[str, Any] ) -> None: """Internal helper to update a resource configuration (storage or NoSQL) in the cache. Since the benchmark data is prepared before creating and caching a function, it ensures the benchmark's cache directory exists and updates the `config.json` file within it. Args: deployment (str): Deployment platform name. benchmark (str): Benchmark name. resource (str): Resource type ('storage' or 'nosql'). config (Dict[str, Any]): Resource configuration to cache. """ if self.ignore_storage: return benchmark_dir = os.path.join(self.cache_dir, benchmark) os.makedirs(benchmark_dir, exist_ok=True) with self._lock: config_file = os.path.join(benchmark_dir, "config.json") if os.path.exists(config_file): with open(config_file, "r") as fp: cached_config = json.load(fp) else: cached_config = {} if deployment in cached_config: cached_config[deployment][resource] = config else: cached_config[deployment] = {resource: config} self._write_json_atomic(config_file, cached_config)
[docs] @staticmethod def code_cache_keys(code_package: "Benchmark") -> Tuple[List[str], List[str]]: """ Add language and system variant suffixes to the package cache key so differing build artifacts do not conflict in cache. """ base_key = [code_package.language.value] base_key.append( "containers" if code_package.system_variant.is_container else "code_package" ) extra_keys = [] extra_keys.append(code_package.language_variant) if code_package.system_variant_suffix is not None: extra_keys.append(code_package.system_variant_suffix) extra_keys.append(code_package.language_version) extra_keys.append(code_package.architecture) return base_key, extra_keys
[docs] def add_code_package( self, deployment_name: str, code_package: "Benchmark", ) -> None: """Add a new code package to the cache. Copies the code package (directory or zip file) into the cache structure. Records metadata (hash, size, location, timestamps, image details if container) in the benchmark's `config.json` within the cache. Handles both package and container deployments. Args: deployment_name (str): Name of the deployment platform. code_package (Benchmark): The benchmark code package to cache. Raises: RuntimeError: If cached application already exists for the deployment. """ with self._lock: benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) os.makedirs(benchmark_dir, exist_ok=True) # Check if cache directory for this deployment exist base_keys, extra_keys = self.code_cache_keys(code_package) cached_dir = os.path.join(benchmark_dir, deployment_name, *base_keys, *extra_keys) if not os.path.exists(cached_dir): os.makedirs(cached_dir, exist_ok=True) language_config = code_package.serialize() if code_package.code_location is not None: self.logging.info( f"Caching code package created at {code_package.code_location}" ) # copy code if os.path.isdir(code_package.code_location): cached_location = os.path.join(cached_dir, "code") shutil.copytree(code_package.code_location, cached_location) # copy zip file else: package_name = os.path.basename(code_package.code_location) cached_location = os.path.join(cached_dir, package_name) shutil.copy2(code_package.code_location, cached_dir) # don't store absolute path to avoid problems with moving cache dir relative_cached_loc = os.path.relpath(cached_location, self.cache_dir) language_config["location"] = relative_cached_loc self.logging.info(f"Updating cached code package {cached_location}") else: self.logging.info(f"Caching container pushed to: {code_package.container_uri}") date = str(datetime.datetime.now()) language_config["date"] = { "created": date, "modified": date, } config: Dict[str, Any] = { "containers": {}, "code_package": {}, "functions": {}, } if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) language_config["image-uri"] = code_package.container_uri language_config["image-id"] = image.id update_dict(config["containers"], language_config, extra_keys) else: update_dict(config["code_package"], language_config, extra_keys) # make sure to not replace other entries if os.path.exists(os.path.join(benchmark_dir, "config.json")): with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: cached_config = json.load(fp) keys = [*base_keys, *extra_keys] language = keys[0] # if we produced no code package (which can happen, e.g., on OpenWhisk), # there will be no "deployment" etrny at all: if deployment_name not in cached_config: cached_config[deployment_name] = {language: config} if language in cached_config[deployment_name]: # language known - add code package, # but do not overwrite existing entries update_dict( cached_config[deployment_name][language], language_config, keys[1:] ) else: # language unknown - add new dictionary # everything else needs to be initialized cached_config[deployment_name][language] = config config = cached_config else: # entirely new entry language = base_keys[0] config = {deployment_name: {language: config}} self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: raise RuntimeError( "Cached application {} for {} already exists!".format( code_package.benchmark, deployment_name ) )
[docs] def update_code_package( self, deployment_name: str, code_package: "Benchmark", ) -> None: """Update an existing code package in the cache. Copies the new code package version over the old one. Updates metadata (hash, size, modification timestamp, image details if container) in the benchmark's `config.json`. If the cached package doesn't exist, adds it as a new package. Args: deployment_name (str): Name of the deployment platform. code_package (Benchmark): The benchmark code package to update. """ with self._lock: benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) # Check if cache directory for this deployment exist base_keys, extra_keys = self.code_cache_keys(code_package) cached_dir = os.path.join(benchmark_dir, deployment_name, *base_keys, *extra_keys) config_location = os.path.join(benchmark_dir, "config.json") if not os.path.exists(config_location): self.add_code_package(deployment_name, code_package) return with open(config_location, "r") as fp: config = json.load(fp) date = str(datetime.datetime.now()) """ Check that a package of this type - code package or container - exists. A simple check of directory existence is insufficient, as we might have created a code package earlier (which creates a directory), but not a container. """ package_exists = keys_exist(config, [deployment_name, *base_keys, *extra_keys]) if not package_exists: """ We have no such cache entry - fallback. However, we still have directory, a possible leftover after crash. Whatever was left, we remove it since we have no information what is there. """ if os.path.exists(cached_dir): shutil.rmtree(cached_dir) if package_exists: if code_package.code_location is not None: self.logging.info( f"Caching code package created at {code_package.code_location}" ) # copy code if os.path.isdir(code_package.code_location): cached_location = os.path.join(cached_dir, "code") # could be replaced with dirs_exists_ok in copytree # available in 3.8 shutil.rmtree(cached_location) shutil.copytree(src=code_package.code_location, dst=cached_location) # copy zip file else: package_name = os.path.basename(code_package.code_location) cached_location = os.path.join(cached_dir, package_name) if code_package.code_location != cached_location: shutil.copy2(code_package.code_location, cached_dir) self.logging.info(f"Updated cached code package {cached_location}") else: self.logging.info(f"Caching container pushed to: {code_package.container_uri}") cached_config = keys_get(config, [deployment_name, *base_keys, *extra_keys]) cached_config["date"]["modified"] = date cached_config["hash"] = code_package.hash cached_config["size"] = code_package.code_size if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) cached_config["image-id"] = image.id cached_config["image-uri"] = code_package.container_uri self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: self.add_code_package(deployment_name, code_package)
[docs] def add_function( self, deployment_name: str, language_name: str, code_package: "Benchmark", function: "Function", ) -> None: """Add new function to cache. Caches a deployed function configuration for a benchmark. Links the function to its corresponding code package. Args: deployment_name (str): Name of the deployment platform. language_name (str): Programming language name. code_package (Benchmark): The benchmark code package. function (Function): The deployed function to cache. Raises: RuntimeError: If code package doesn't exist in cache. """ if self.ignore_functions: return with self._lock: benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) language = language_name cache_config = os.path.join(benchmark_dir, "config.json") if os.path.exists(cache_config): functions_config: Dict[str, Any] = {function.name: {**function.serialize()}} with open(cache_config, "r") as fp: cached_config = json.load(fp) if language not in cached_config[deployment_name]: cached_config[deployment_name][language] = { "functions": functions_config, "code_package": {}, "containers": {}, } elif "functions" not in cached_config[deployment_name][language]: cached_config[deployment_name][language]["functions"] = functions_config else: cached_config[deployment_name][language]["functions"].update( functions_config ) config = cached_config self._write_serialized_atomic(cache_config, config) else: raise RuntimeError( "Can't cache function {} for a non-existing code package!".format(function.name) )
[docs] def update_function(self, function: "Function") -> None: """Update an existing function in the cache. Updates cached function configuration with new metadata. Searches across all deployments and languages to find the function by name. Args: function (Function): The function with updated configuration. Raises: RuntimeError: If function's code package doesn't exist in cache. """ if self.ignore_functions: return with self._lock: benchmark_dir = os.path.join(self.cache_dir, function.benchmark) cache_config = os.path.join(benchmark_dir, "config.json") if os.path.exists(cache_config): with open(cache_config, "r") as fp: cached_config = json.load(fp) for deployment, cfg in cached_config.items(): for language, cfg2 in cfg.items(): if "functions" not in cfg2: continue for name, func in cfg2["functions"].items(): if name == function.name: cached_config[deployment][language]["functions"][ name ] = function.serialize() self._write_serialized_atomic(cache_config, cached_config) else: raise RuntimeError( "Can't cache function {} for a non-existing code package!".format(function.name) )