-
Notifications
You must be signed in to change notification settings - Fork 407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: Parser support from multiple event sources using the same Lambda #2237
Comments
Does this mean that the same Lambda would be connected to different event sources? |
Exactly man! In my point of view, a single lambda should only support a single event source because you can keep the single responsibility for that lambda and focus your business logic on it. But I see more and more users using fat lambdas, and Lambda supports multiple event sources, so I think we must hear from customers what the use case is and if the Powertools can help them. What do you think about this Ruben? |
This would be horrible in Go :) The thing I'm worried about is that you would have to have a clear identifier mark for each kind of even source, so you should be able to distinguish SQS/SNS/etc from a generic dictionary... that isn't fun isn't it? So it would work like a "reverse-parser": here's a dict, tell me what kind of payload it is. Really not sure how to feel about this... |
I thought the same, but it seems difficult and non-performatic to do this. |
I've done this before in Python...it works, but the code needs to understand what type of event it's handling and parse it appropriately. I don't want to imagine the pain-in-the-backside this would be in a staticly typed languge like Go or .NET. |
Does it mean the message within SQS and SNS will have the same structure and we have to unwrap the body nicely with one signature without additional conditional expressions? I think with 3.10 pydantic can handle these type of situations: from typing import List, Union
import pytest
from pydantic import BaseModel
class SQSRecord(BaseModel):
eventSource: str #"aws:sqs"
awsRegion: str
messageId: str
body: str
class SNSPayload(BaseModel):
Timestamp: str
MessageId: str
Message: str
class SNSRecord(BaseModel):
EventSource: str #"aws:sns"
EventVersion: str
Sns: SNSPayload
class Events(BaseModel):
Records: Union[List[SQSRecord], List[SNSRecord]]
def lambda_handler(event, context):
match Events(**event).Records[0]:
case SNSRecord(EventSource="aws:sns"):
return handle_sns(event, context)
case SQSRecord(eventSource="aws:sqs"):
return handle_sqs(event, context)
case _:
print("unknown event, you probably don't know what you are doing.")
def handle_sns(event, contex):
print('handling sns event')
return Events(**event).Records[0].Sns.Message
def handle_sqs(event, contex):
print('handling sqs event')
return Events(**event).Records[0].body
@pytest.fixture()
def sns_event():
""" Generates SNS Event"""
return {
"Records": [
{
"EventVersion": "1.0",
"EventSource": "aws:sns",
"Sns": {
"Timestamp": "2019-01-02T12:45:07.000Z",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"Message": "Hello from SNS!",
}
}
]
}
@pytest.fixture()
def sqs_event():
""" Generates SQS Event"""
return {
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:sqs",
"awsRegion": "us-east-1",
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"body": "Hello from SQS!"
}
]
}
def test_lambda_handler(sns_event, sqs_event):
ret = lambda_handler(sns_event, "")
assert ret == "Hello from SNS!"
ret = lambda_handler(sqs_event, "")
assert ret == "Hello from SQS!" Since we already have built-in models we can add this use case to our docs. |
I've worked with union before with pydantic and it has some issues when two schemas share common fields (results in invalid parsing/exceptions but it might be fixed). I think it's really important to understand why someone would connect one lambda to multiple sources and decide whether it's something that AWS/community should recommend doing. Usually, different outputs come from different pipelines, require different permissions and they usually process the records differently.
Can it be done technically? yes Should you? in my opinion, no but there's no 100% truth here - it's all about pros/cons but again, since it can achieved today with parser, i think it should be left alone. |
so where did we land here @leandrodamascena ? |
My $0.02: Some aspects of best-practice can be language-ecosystem-dependent, and rejecting this would be too opinionated when Python (maybe Pydantic less so...), TypeScript, etc have reasonable support for Union types. What about environments where legacy and new stacks might need to co-exist? What about the value of a tested, re-usable function component that 'Just Works' whether the rest of the architecture invokes it direct, via SNS, or through SNS->SQS? The use-case where I came across this was: trying to statically type and re-use a function for post-processing Amazon Textract results, regardless of whether it's connected to a Textract notification SNS topic, through a SQS queue to help manage concurrency, or called directly with a similar payload. |
Hey @athewsey, great to hear from you -- got an example pseudo code or not in mind? @seshubaws is working on a RFC for the idea of a pipeline (similar to Apache Beam), allowing you to combine data sources indefinitely w/ generics -- on mobile now but she can share later. |
Wondering if there's still any plans to add this feature, I have a use case which could benefit from it (although possibly odd). A bit of context on my use case, I have a Lambda connected to a DynamoDB stream, and on processing failure I manually push the DynamoDB record payload to a DLQ. After fixing the bug with the Lambda, I'd like to reprocess the DLQ messages, so I redrive them to a source queue, which the Lambda is also connected to. The problem is the payload for a
What I'm currently doing is checking what the raw payload format is, and then wrapping it in the appropriate data class. def handler(event: dict, context: LambdaContext) -> PartialItemFailureResponse:
try:
if event["Records"][0]["eventSource"] == "aws:sqs":
# The body payload is a DynamoDB stream event
event = json.loads(event["Records"][0]["body"])
except Exception:
pass
return process(event, context)
@event_source(data_class=DynamoDBStreamEvent)
def process(event: DynamoDBStreamEvent, context: LambdaContext) -> PartialItemFailureResponse:
... I was thinking in an ideal world something like the following could be possible with the help of powertools. @event_source(data_classes=(DynamoDBStreamEvent, SQSEvent))
def handler(event: DynamoDBStreamEvent | SQSEvent, context: LambdaContext) -> PartialItemFailureResponse:
if isinstance(event, DynamoDBStreamEvent):
...
if isinstance(event, SQSEvent):
... |
hey @abbasyadollahi - adding @seshubaws and @rafaelgsr who are working on this |
We acknowledge that some customers use multiple event sources for a single Lambda function, despite it not being a recommended practice. While this approach can serve specific use cases, implementing support for it in Powertools would significantly increase the complexity of our codebase and potentially impact performance in certain scenarios. However, we are addressing a related concern. We plan to resume work on PR #4069 soon, which will enable customers to unwrap nested event sources. This feature will provide a more structured way to handle complex event patterns like S3 -> SQS -> SNS -> Lambda. I'm closing this issue as not planned now and please reopen if you have any other idea on how to implement this. |
|
Use case
Context: AWS Lambda supports multiple different event sources mapping for the same Lambda - https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html
I'm opening this issue to discuss if the Lambda Powertools can support different parsers in a Lambda that has different EventSources. This thread started in our Discord server and it's interesting to hear customer feedback on this.
Original discussion: https://discord.com/channels/1006478942305263677/1006527338621710376/1103290540226785342
Solution/User Experience
We need to investigate this.
Alternative solutions
No response
Acknowledgment
The text was updated successfully, but these errors were encountered: