Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add lambda streaming support for remote invoke #5307

Merged
merged 17 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
chevron~=0.12
click~=8.0
Flask<2.3
#Need to add Schemas latest SDK.
boto3>=1.19.5,==1.*
#Need to add latest lambda changes which will return invoke mode details
boto3>=1.26.109,==1.*
mndeveci marked this conversation as resolved.
Show resolved Hide resolved
mndeveci marked this conversation as resolved.
Show resolved Hide resolved
jmespath~=1.0.1
ruamel_yaml~=0.17.21
PyYAML>=5.4.1,==5.*
Expand Down
132 changes: 118 additions & 14 deletions samcli/lib/remote_invoke/lambda_invoke_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import base64
import json
import logging
from abc import ABC, abstractmethod
from json import JSONDecodeError
from typing import Any, Dict, cast
from typing import Any, cast

from botocore.eventstream import EventStream
from botocore.exceptions import ClientError, ParamValidationError
from botocore.response import StreamingBody

Expand All @@ -26,12 +28,19 @@
LOG = logging.getLogger(__name__)
FUNCTION_NAME = "FunctionName"
PAYLOAD = "Payload"
EVENT_STREAM = "EventStream"
PAYLOAD_CHUNK = "PayloadChunk"
INVOKE_COMPLETE = "InvokeComplete"
LOG_RESULT = "LogResult"

INVOKE_MODE = "InvokeMode"
RESPONSE_STREAM = "RESPONSE_STREAM"

class LambdaInvokeExecutor(BotoActionExecutor):

class AbstractLambdaInvokeExecutor(BotoActionExecutor, ABC):
"""
Calls "invoke" method of "lambda" service with given input.
If a file location provided, the file handle will be passed as Payload object
Abstract class for different lambda invocation executors, see implementation for details.
For Payload parameter, if a file location provided, the file handle will be passed as Payload object
"""

_lambda_client: Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably out of scope of this PR - I'm not a big fan of using Any for annotation. Can we look into using https://youtype.github.io/boto3_stubs_docs/mypy_boto3_lambda/#lambdaclient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried setting its type to LambdaClient but then I've faced following issues;

  • First is the parameters. What we are doing today is to send some pre-defined and some arbitrary ones that can be set by customers. We are relying on boto3 errors for the case customer provides an invalid option for not keeping source of truth in our codebase. But if I set client to LambdaClient then I can't keep using decomposing a dictionary to use as parameters. I need to extract them one by one to use in function call since there is no type definition for the function parameters.
  • Second is the response, in mypy it has typed dictionary but mypy somehow doesn't like that to be returned by Dict[Any, Any] even though it should cover wider.

We will have a refactoring task next week, we will try to re-introduce typing there.

Expand Down Expand Up @@ -59,14 +68,9 @@ def validate_action_parameters(self, parameters: dict) -> None:
def _execute_action(self, payload: str):
self.request_parameters[FUNCTION_NAME] = self._function_name
self.request_parameters[PAYLOAD] = payload
LOG.debug(
"Calling lambda_client.invoke with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)

try:
response = self._lambda_client.invoke(**self.request_parameters)
return self._execute_lambda_invoke(payload)
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
Expand All @@ -80,7 +84,40 @@ def _execute_action(self, payload: str):
elif boto_utils.get_client_error_code(client_ex) == "InvalidRequestContentException":
raise InvalidResourceBotoParameterException(client_ex) from client_ex
raise ErrorBotoApiCallException(client_ex) from client_ex
return response

@abstractmethod
def _execute_lambda_invoke(self, payload: str):
pass


class LambdaInvokeExecutor(AbstractLambdaInvokeExecutor):
"""
Calls "invoke" method of "lambda" service with given input.
"""

def _execute_lambda_invoke(self, payload: str) -> dict:
LOG.debug(
"Calling lambda_client.invoke with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)
return cast(dict, self._lambda_client.invoke(**self.request_parameters))
mndeveci marked this conversation as resolved.
Show resolved Hide resolved


class LambdaInvokeWithResponseStreamExecutor(AbstractLambdaInvokeExecutor):
"""
Calls "invoke_with_response_stream" method of "lambda" service with given input.
"""

def _execute_lambda_invoke(self, payload: str) -> dict:
LOG.debug(
"Calling lambda_client.invoke_with_response_stream with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)
return cast(dict, self._lambda_client.invoke_with_response_stream(**self.request_parameters))
mndeveci marked this conversation as resolved.
Show resolved Hide resolved


class DefaultConvertToJSON(RemoteInvokeRequestResponseMapper):
Expand Down Expand Up @@ -124,6 +161,31 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe
return remote_invoke_input


class LambdaStreamResponseConverter(RemoteInvokeRequestResponseMapper):
"""
This class helps to convert response from lambda invoke_with_response_stream API call.
That API call returns 'EventStream' which yields 'PayloadChunk's and 'InvokeComplete' as they become available.
This mapper, gets all 'PayloadChunk's and 'InvokeComplete' events and decodes them for next mapper.
"""

def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo:
LOG.debug("Mapping Lambda response to string object")
if not isinstance(remote_invoke_input.response, dict):
raise InvalideBotoResponseException("Invalid response type received from Lambda service, expecting dict")

event_stream: EventStream = remote_invoke_input.response.get(EVENT_STREAM, [])
mndeveci marked this conversation as resolved.
Show resolved Hide resolved
decoded_event_stream = []
for event in event_stream:
if PAYLOAD_CHUNK in event:
decoded_payload_chunk = event.get(PAYLOAD_CHUNK).get(PAYLOAD).decode("utf-8")
decoded_event_stream.append({PAYLOAD_CHUNK: {PAYLOAD: decoded_payload_chunk}})
if INVOKE_COMPLETE in event:
log_output = event.get(INVOKE_COMPLETE).get(LOG_RESULT, b"")
decoded_event_stream.append({INVOKE_COMPLETE: {LOG_RESULT: log_output}})
remote_invoke_input.response[EVENT_STREAM] = decoded_event_stream
return remote_invoke_input


class LambdaResponseOutputFormatter(RemoteInvokeRequestResponseMapper):
"""
This class helps to format output response for lambda service that will be printed on the CLI.
Expand All @@ -139,8 +201,8 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe
"""
if remote_invoke_input.output_format == RemoteInvokeOutputFormat.DEFAULT:
LOG.debug("Formatting Lambda output response")
boto_response = cast(Dict, remote_invoke_input.response)
log_field = boto_response.get("LogResult")
boto_response = cast(dict, remote_invoke_input.response)
log_field = boto_response.get(LOG_RESULT)
if log_field:
log_result = base64.b64decode(log_field).decode("utf-8")
remote_invoke_input.log_output = log_result
Expand All @@ -152,3 +214,45 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe
remote_invoke_input.response = boto_response.get(PAYLOAD)

return remote_invoke_input


class LambdaStreamResponseOutputFormatter(RemoteInvokeRequestResponseMapper):
"""
This class helps to format streaming output response for lambda service that will be printed on the CLI.
It loops through EventStream elements and adds them to response, and once InvokeComplete is reached, it updates
log_output and response objects in remote_invoke_input.
"""

def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo:
"""
Maps the lambda response output to the type of output format specified as user input.
If output_format is original-boto-response, write the original boto API response
to stdout.
"""
if remote_invoke_input.output_format == RemoteInvokeOutputFormat.DEFAULT:
LOG.debug("Formatting Lambda output response")
boto_response = cast(dict, remote_invoke_input.response)
combined_response = ""
for event in boto_response.get(EVENT_STREAM, []):
if PAYLOAD_CHUNK in event:
payload_chunk = event.get(PAYLOAD_CHUNK).get(PAYLOAD)
combined_response = f"{combined_response}{payload_chunk}"
if INVOKE_COMPLETE in event:
log_result = base64.b64decode(event.get(INVOKE_COMPLETE).get(LOG_RESULT)).decode("utf-8")
remote_invoke_input.log_output = log_result
remote_invoke_input.response = combined_response
return remote_invoke_input


def _is_function_invoke_mode_response_stream(lambda_client: Any, function_name: str):
"""
Returns True if given function has RESPONSE_STREAM as InvokeMode, False otherwise
"""
try:
function_url_config = lambda_client.get_function_url_config(FunctionName=function_name)
function_invoke_mode = function_url_config.get(INVOKE_MODE)
LOG.debug("InvokeMode of function %s: %s", function_name, function_invoke_mode)
return function_invoke_mode == RESPONSE_STREAM
except ClientError as ex:
LOG.debug("Function %s, doesn't have Function URL configured, using regular invoke", function_name, exc_info=ex)
return False
22 changes: 21 additions & 1 deletion samcli/lib/remote_invoke/remote_invoke_executor_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
from samcli.lib.remote_invoke.lambda_invoke_executors import (
DefaultConvertToJSON,
LambdaInvokeExecutor,
LambdaInvokeWithResponseStreamExecutor,
LambdaResponseConverter,
LambdaResponseOutputFormatter,
LambdaStreamResponseConverter,
LambdaStreamResponseOutputFormatter,
_is_function_invoke_mode_response_stream,
)
from samcli.lib.remote_invoke.remote_invoke_executors import RemoteInvokeExecutor, ResponseObjectToJsonStringMapper
from samcli.lib.utils.cloudformation import CloudFormationResourceSummary
Expand Down Expand Up @@ -64,6 +68,22 @@ def _create_lambda_boto_executor(self, cfn_resource_summary: CloudFormationResou

:return: Returns the created remote invoke Executor
"""
lambda_client = self._boto_client_provider("lambda")
if _is_function_invoke_mode_response_stream(lambda_client, cfn_resource_summary.physical_resource_id):
LOG.debug("Creating response stream invocator for function %s", cfn_resource_summary.physical_resource_id)
return RemoteInvokeExecutor(
request_mappers=[DefaultConvertToJSON()],
response_mappers=[
LambdaStreamResponseConverter(),
LambdaStreamResponseOutputFormatter(),
ResponseObjectToJsonStringMapper(),
],
boto_action_executor=LambdaInvokeWithResponseStreamExecutor(
lambda_client,
cfn_resource_summary.physical_resource_id,
),
)

return RemoteInvokeExecutor(
request_mappers=[DefaultConvertToJSON()],
response_mappers=[
Expand All @@ -72,7 +92,7 @@ def _create_lambda_boto_executor(self, cfn_resource_summary: CloudFormationResou
ResponseObjectToJsonStringMapper(),
],
boto_action_executor=LambdaInvokeExecutor(
self._boto_client_provider("lambda"),
lambda_client,
cfn_resource_summary.physical_resource_id,
),
)
Expand Down
Loading