Source code for sebs.openwhisk.triggers

# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved.
"""Trigger implementations for OpenWhisk function invocation in SeBS.

This module provides different trigger types for invoking OpenWhisk functions,
including library-based (CLI) triggers and HTTP-based triggers.

Classes:
    LibraryTrigger: CLI-based function invocation using wsk tool
    HTTPTrigger: HTTP-based function invocation using web actions
"""

import concurrent.futures
import datetime
import json
import subprocess
from typing import Dict, List, Optional, Any  # noqa

from sebs.faas.function import ExecutionResult, Trigger


[docs] class LibraryTrigger(Trigger): """ CLI-based trigger for OpenWhisk function invocation. This trigger uses the wsk CLI tool to invoke OpenWhisk actions directly, providing synchronous and asynchronous invocation capabilities. It handles parameter passing and result parsing for CLI-based invocations. Attributes: fname: Name of the OpenWhisk action to invoke _wsk_cmd: Complete WSK CLI command for function invocation Example: >>> trigger = LibraryTrigger("my-function", ["wsk", "-i"]) >>> result = trigger.sync_invoke({"key": "value"}) """ def __init__(self, fname: str, wsk_cmd: Optional[List[str]] = None) -> None: """ Initialize library trigger for OpenWhisk function. Args: fname: Name of the OpenWhisk action to invoke wsk_cmd: Optional WSK CLI command prefix (including flags) """ super().__init__() self.fname = fname if wsk_cmd: self._wsk_cmd = [*wsk_cmd, "action", "invoke", "--result", self.fname]
[docs] @staticmethod def trigger_type() -> "Trigger.TriggerType": """ Get the trigger type identifier. Returns: TriggerType.LIBRARY for CLI-based invocation """ return Trigger.TriggerType.LIBRARY
@property def wsk_cmd(self) -> List[str]: """ Get the complete WSK CLI command for invocation. Returns: List of command arguments for WSK CLI invocation Raises: AssertionError: If wsk_cmd has not been set """ assert self._wsk_cmd return self._wsk_cmd @wsk_cmd.setter def wsk_cmd(self, wsk_cmd: List[str]) -> None: """ Set the WSK CLI command prefix. Args: wsk_cmd: WSK CLI command prefix (including any flags) """ self._wsk_cmd = [*wsk_cmd, "action", "invoke", "--result", self.fname]
[docs] @staticmethod def get_command(payload: Dict[str, Any]) -> List[str]: """ Convert payload dictionary to WSK CLI parameter arguments. Args: payload: Dictionary of parameters to pass to the function Returns: List of CLI arguments for passing parameters to WSK Example: >>> get_command({"key1": "value1", "key2": 42}) ["--param", "key1", '"value1"', "--param", "key2", "42"] """ params = [] for key, value in payload.items(): params.append("--param") params.append(key) params.append(json.dumps(value)) return params
[docs] def sync_invoke(self, payload: Dict[str, Any]) -> ExecutionResult: """ Synchronously invoke the OpenWhisk function via CLI. Args: payload: Dictionary of parameters to pass to the function Returns: ExecutionResult containing timing information and function output """ command = self.wsk_cmd + self.get_command(payload) error = None try: begin = datetime.datetime.now() response = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) end = datetime.datetime.now() parsed_response = response.stdout.decode("utf-8") except (subprocess.CalledProcessError, FileNotFoundError) as e: end = datetime.datetime.now() error = e openwhisk_result = ExecutionResult.from_times(begin, end) if error is not None: self.logging.error("Invocation of {} failed!".format(self.fname)) openwhisk_result.stats.failure = True return openwhisk_result return_content = json.loads(parsed_response) openwhisk_result.parse_benchmark_output(return_content) return openwhisk_result
[docs] def async_invoke(self, payload: Dict[str, Any]) -> concurrent.futures.Future: """ Asynchronously invoke the OpenWhisk function via CLI. Args: payload: Dictionary of parameters to pass to the function Returns: Future object that will contain the ExecutionResult """ pool = concurrent.futures.ThreadPoolExecutor() fut = pool.submit(self.sync_invoke, payload) return fut
[docs] def serialize(self) -> Dict[str, str]: """ Serialize trigger configuration to dictionary. Returns: Dictionary containing trigger type and function name """ return {"type": "Library", "name": self.fname}
[docs] @staticmethod def deserialize(obj: Dict[str, str]) -> Trigger: """ Deserialize trigger from configuration dictionary. Args: obj: Dictionary containing serialized trigger data Returns: LibraryTrigger instance """ return LibraryTrigger(obj["name"])
[docs] @staticmethod def typename() -> str: """ Get the trigger type name. Returns: String identifier for this trigger type """ return "OpenWhisk.LibraryTrigger"
[docs] class HTTPTrigger(Trigger): """ HTTP-based trigger for OpenWhisk web action invocation. This trigger uses HTTP requests to invoke OpenWhisk web actions, providing an alternative to CLI-based invocation. It inherits HTTP invocation capabilities from the base Trigger class. Attributes: fname: Name of the OpenWhisk action url: HTTP URL for the web action endpoint Example: >>> trigger = HTTPTrigger( ... "my-function", ... "https://openwhisk.example.com/api/v1/web/guest/default/my-function.json" ... ) >>> result = trigger.sync_invoke({"key": "value"}) """ def __init__(self, fname: str, url: str) -> None: """ Initialize HTTP trigger for OpenWhisk web action. Args: fname: Name of the OpenWhisk action url: HTTP URL for the web action endpoint """ super().__init__() self.fname = fname self.url = url
[docs] @staticmethod def typename() -> str: """ Get the trigger type name. Returns: String identifier for this trigger type """ return "OpenWhisk.HTTPTrigger"
[docs] @staticmethod def trigger_type() -> Trigger.TriggerType: """ Get the trigger type identifier. Returns: TriggerType.HTTP for HTTP-based invocation """ return Trigger.TriggerType.HTTP
[docs] def sync_invoke(self, payload: Dict[str, Any]) -> ExecutionResult: """ Synchronously invoke the OpenWhisk function via HTTP. Args: payload: Dictionary of parameters to pass to the function Returns: ExecutionResult containing timing information and function output """ self.logging.debug(f"Invoke function {self.url}") return self._http_invoke(payload, self.url, False)
[docs] def async_invoke(self, payload: Dict[str, Any]) -> concurrent.futures.Future: """ Asynchronously invoke the OpenWhisk function via HTTP. Args: payload: Dictionary of parameters to pass to the function Returns: Future object that will contain the ExecutionResult """ pool = concurrent.futures.ThreadPoolExecutor() fut = pool.submit(self.sync_invoke, payload) return fut
[docs] def serialize(self) -> Dict[str, str]: """ Serialize trigger configuration to dictionary. Returns: Dictionary containing trigger type, function name, and URL """ return {"type": "HTTP", "fname": self.fname, "url": self.url}
[docs] @staticmethod def deserialize(obj: Dict[str, str]) -> Trigger: """ Deserialize trigger from configuration dictionary. Args: obj: Dictionary containing serialized trigger data Returns: HTTPTrigger instance """ return HTTPTrigger(obj["fname"], obj["url"])