Skip to content
This repository has been archived by the owner on Nov 22, 2022. It is now read-only.

Commit

Permalink
feat(lambda-event-sources): add filters to SQS, DynamoDB, and Kinesis…
Browse files Browse the repository at this point in the history
… event sources (aws#21917)

## Description

Adds the ability to create filters for SQS, DynamoDB and Kinesis, enabling filter criteria settings for event sources

## Use Cases

With this PR will be possible, for example, to filter events from a DynamoDB Stream allowing only INSERT events to be transmitted as shown in the example below

```typescript
    const fn = new NodejsFunction(this, 'Fn');
    const table = new dynamodb.Table(this, 'T', {
      partitionKey: {
        name: 'id',
        type: dynamodb.AttributeType.STRING,
      },
      stream: dynamodb.StreamViewType.NEW_IMAGE,
    });

    fn.addEventSource(new sources.DynamoEventSource(table, {
      startingPosition: lambda.StartingPosition.LATEST,
      filters: [
         lambda.FilterCriteria.filter({
            eventName: FilterRule.isEqual('INSERT'),
         }),
      ],
    }));
```

Closes aws#17874 
----

### All Submissions:

* [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md)

### Adding new Unconventional Dependencies:

* [ ] This PR adds new unconventional dependencies following the process described [here](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md/#adding-new-unconventional-dependencies)

### New Features

* [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/main/INTEGRATION_TESTS.md)?
	* [x] Did you use `yarn integ` to deploy the infrastructure and generate the snapshot (i.e. `yarn integ` without `--dry-run`)?

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
marciocadev authored and Kruspe committed Sep 13, 2022
1 parent e643c1a commit 8755dec
Show file tree
Hide file tree
Showing 32 changed files with 2,006 additions and 1 deletion.
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ export interface SqsEventSourceProps {
* @default true
*/
readonly enabled?: boolean;

/**
* Add filter criteria option
*
* @default - None
*/
readonly filters?: Array<{[key: string]: any}>;
}

/**
Expand Down Expand Up @@ -73,6 +80,7 @@ export class SqsEventSource implements lambda.IEventSource {
reportBatchItemFailures: this.props.reportBatchItemFailures,
enabled: this.props.enabled,
eventSourceArn: this.queue.queueArn,
filters: this.props.filters,
});
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ export interface StreamEventSourceProps extends BaseStreamEventSourceProps {
* @default - discarded records are ignored
*/
readonly onFailure?: lambda.IEventSourceDlq;

/**
* Add filter criteria option
*
* @default - None
*/
readonly filters?: Array<{[key: string]: any}>;
}

/**
Expand All @@ -132,6 +139,7 @@ export abstract class StreamEventSource implements lambda.IEventSource {
onFailure: this.props.onFailure,
tumblingWindow: this.props.tumblingWindow,
enabled: this.props.enabled,
filters: this.props.filters,
};
}
}
51 changes: 51 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/dynamo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,57 @@ describe('DynamoEventSource', () => {

});

test('adding filter criteria', () => {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
},
stream: dynamodb.StreamViewType.NEW_IMAGE,
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.LATEST,
filters: [
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual('INSERT'),
dynamodb: {
Keys: {
id: {
S: lambda.FilterRule.exists(),
},
},
},
}),
],
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'TD925BC7E',
'StreamArn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'FilterCriteria': {
'Filters': [
{
'Pattern': '{"eventName":["INSERT"],"dynamodb":{"Keys":{"id":{"S":[{"exists":true}]}}}}',
},
],
},
'StartingPosition': 'LATEST',
});
});

test('specific maxBatchingWindow', () => {
// GIVEN
const stack = new cdk.Stack();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "21.0.0",
"files": {
"21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22": {
"source": {
"path": "DynamoDBFilterCriteriaDefaultTestDeployAssert448231D5.template.json",
"packaging": "file"
},
"destinations": {
"current_account-current_region": {
"bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}",
"objectKey": "21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22.json",
"assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}"
}
}
}
},
"dockerImages": {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Parameters": {
"BootstrapVersion": {
"Type": "AWS::SSM::Parameter::Value<String>",
"Default": "/cdk-bootstrap/hnb659fds/version",
"Description": "Version of the CDK Bootstrap resources in this environment, automatically retrieved from SSM Parameter Store. [cdk:skip]"
}
},
"Rules": {
"CheckBootstrapVersion": {
"Assertions": [
{
"Assert": {
"Fn::Not": [
{
"Fn::Contains": [
[
"1",
"2",
"3",
"4",
"5"
],
{
"Ref": "BootstrapVersion"
}
]
}
]
},
"AssertDescription": "CDK bootstrap stack version 6 required. Please run 'cdk bootstrap' with a recent version of the CDK CLI."
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":"21.0.0"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"version": "21.0.0",
"testCases": {
"DynamoDBFilterCriteria/DefaultTest": {
"stacks": [
"lambda-event-source-filter-criteria-dynamodb"
],
"assertionStack": "DynamoDBFilterCriteria/DefaultTest/DeployAssert",
"assertionStackName": "DynamoDBFilterCriteriaDefaultTestDeployAssert448231D5"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "21.0.0",
"files": {
"8ddecd300b64f1e8196cec45f3e0f752e4a04060a8cd32fbcd88c192b14499fc": {
"source": {
"path": "lambda-event-source-filter-criteria-dynamodb.template.json",
"packaging": "file"
},
"destinations": {
"current_account-current_region": {
"bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}",
"objectKey": "8ddecd300b64f1e8196cec45f3e0f752e4a04060a8cd32fbcd88c192b14499fc.json",
"assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}"
}
}
}
},
"dockerImages": {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
{
"Resources": {
"FServiceRole3AC82EE1": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"FServiceRoleDefaultPolicy17A19BFA": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "dynamodb:ListStreams",
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "FServiceRoleDefaultPolicy17A19BFA",
"Roles": [
{
"Ref": "FServiceRole3AC82EE1"
}
]
}
},
"FC4345940": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}"
},
"Role": {
"Fn::GetAtt": [
"FServiceRole3AC82EE1",
"Arn"
]
},
"Handler": "index.handler",
"Runtime": "nodejs14.x"
},
"DependsOn": [
"FServiceRoleDefaultPolicy17A19BFA",
"FServiceRole3AC82EE1"
]
},
"FDynamoDBEventSourcelambdaeventsourcefiltercriteriadynamodbT9CFE7D0657833F11": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 5,
"EventSourceArn": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
},
"FilterCriteria": {
"Filters": [
{
"Pattern": "{\"eventName\":[\"INSERT\"],\"dynamodb\":{\"Keys\":{\"id\":{\"S\":[{\"exists\":true}]}}}}"
}
]
},
"StartingPosition": "LATEST"
}
},
"TD925BC7E": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"StreamSpecification": {
"StreamViewType": "NEW_IMAGE"
}
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
}
},
"Parameters": {
"BootstrapVersion": {
"Type": "AWS::SSM::Parameter::Value<String>",
"Default": "/cdk-bootstrap/hnb659fds/version",
"Description": "Version of the CDK Bootstrap resources in this environment, automatically retrieved from SSM Parameter Store. [cdk:skip]"
}
},
"Rules": {
"CheckBootstrapVersion": {
"Assertions": [
{
"Assert": {
"Fn::Not": [
{
"Fn::Contains": [
[
"1",
"2",
"3",
"4",
"5"
],
{
"Ref": "BootstrapVersion"
}
]
}
]
},
"AssertDescription": "CDK bootstrap stack version 6 required. Please run 'cdk bootstrap' with a recent version of the CDK CLI."
}
]
}
}
}
Loading

0 comments on commit 8755dec

Please sign in to comment.