From 68b969e6006212b2930844226f7817c81cfd8e18 Mon Sep 17 00:00:00 2001 From: Roger Zhang Date: Thu, 21 Nov 2024 15:34:49 -0800 Subject: [PATCH] feat(lambda): support for Provisioned Pollers (#32205) ### Reason for this change Lambda is introducing a new property in Event Sources named `ProvisionedPollerConfig` to set provisioned pollers that read from the event sources. When specified, it allows control over the minimum and maximum number of pollers that can be provisioned to process events from the source. This feature is currently supported for MSK and Self-managed Kafka event sources. ### Description of changes This new property can be opted in by setting the `ProvisionedPollerConfig` field while creating event sources. The example of setting `ProvisionedPollerConfig` for is shown below: ``` fn.addEventSource(new sources.ManagedKafkaEventSource( { clusterArn, topic: kafkaTopic, startingPosition: lambda.StartingPosition.TRIM_HORIZON, provisionedPollerConfig: { minimumPollers: 1, maximumPollers: 3, }, })) ``` ### Description of how you validated changes Have added unit test and integration test to validate the implementation ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- ...efaultTestDeployAssertAF78BD0F.assets.json | 2 +- .../cdk.out | 2 +- .../integ.json | 2 +- ...vent-source-kafka-self-managed.assets.json | 6 +- ...nt-source-kafka-self-managed.template.json | 132 +++++++++++ .../manifest.json | 30 ++- .../tree.json | 212 +++++++++++++++++- .../test/integ.kafka-selfmanaged.ts | 23 ++ .../aws-lambda-event-sources/README.md | 23 ++ .../aws-lambda-event-sources/lib/kafka.ts | 3 + .../aws-lambda-event-sources/lib/stream.ts | 41 ++++ .../test/kafka.test.ts | 188 ++++++++++++++++ .../aws-lambda/lib/event-source-mapping.ts | 43 ++++ .../test/event-source-mapping.test.ts | 73 +++++- packages/aws-cdk-lib/awslint.json | 12 + 15 files changed, 781 insertions(+), 11 deletions(-) diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json index a01ffb4d5f4c8..18daf0934754d 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "files": { "21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22": { "source": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out index 1f0068d32659a..c6e612584e352 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out @@ -1 +1 @@ -{"version":"36.0.0"} \ No newline at end of file +{"version":"38.0.1"} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json index eb53722c5afaf..b2142e0e738f8 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "testCases": { "LambdaEventSourceKafkaSelfManagedTest/DefaultTest": { "stacks": [ diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json index cb4ec6e990114..983f0b88ef58f 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json @@ -1,7 +1,7 @@ { - "version": "36.0.0", + "version": "38.0.1", "files": { - "4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f": { + "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1": { "source": { "path": "lambda-event-source-kafka-self-managed.template.json", "packaging": "file" @@ -9,7 +9,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f.json", + "objectKey": "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json index dd921a80f1344..5a5f9daa01865 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json @@ -322,6 +322,138 @@ "my-test-topic2" ] } + }, + "F3ServiceRole2F65FFC0": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "F3ServiceRoleDefaultPolicy1C0463D1": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "F3ServiceRoleDefaultPolicy1C0463D1", + "Roles": [ + { + "Ref": "F3ServiceRole2F65FFC0" + } + ] + } + }, + "F38FF9B13A": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "F3ServiceRole2F65FFC0", + "Arn" + ] + }, + "Runtime": "nodejs18.x" + }, + "DependsOn": [ + "F3ServiceRoleDefaultPolicy1C0463D1", + "F3ServiceRole2F65FFC0" + ] + }, + "F3KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic3ED015F25": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "BatchSize": 100, + "FilterCriteria": { + "Filters": [ + { + "Pattern": "{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}" + } + ] + }, + "FunctionName": { + "Ref": "F38FF9B13A" + }, + "ProvisionedPollerConfig": { + "MaximumPollers": 3, + "MinimumPollers": 1 + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "myTestConsumerGroup3" + }, + "SourceAccessConfigurations": [ + { + "Type": "CLIENT_CERTIFICATE_TLS_AUTH", + "URI": { + "Ref": "SC0855C491" + } + }, + { + "Type": "SERVER_ROOT_CA_CERTIFICATE", + "URI": { + "Ref": "S509448A1" + } + } + ], + "StartingPosition": "TRIM_HORIZON", + "Topics": [ + "my-test-topic3" + ] + } } }, "Parameters": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json index 06655a65cd8cb..d41cc1cc8586e 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "artifacts": { "lambda-event-source-kafka-self-managed.assets": { "type": "cdk:asset-manifest", @@ -16,9 +16,10 @@ "templateFile": "lambda-event-source-kafka-self-managed.template.json", "terminationProtection": false, "validateOnSynth": false, + "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -100,6 +101,30 @@ "data": "F2KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic20A678189" } ], + "/lambda-event-source-kafka-self-managed/F3/ServiceRole/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3ServiceRole2F65FFC0" + } + ], + "/lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3ServiceRoleDefaultPolicy1C0463D1" + } + ], + "/lambda-event-source-kafka-self-managed/F3/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F38FF9B13A" + } + ], + "/lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic3ED015F25" + } + ], "/lambda-event-source-kafka-self-managed/BootstrapVersion": [ { "type": "aws:cdk:logicalId", @@ -130,6 +155,7 @@ "templateFile": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.template.json", "terminationProtection": false, "validateOnSynth": false, + "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22.json", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json index 54543d8610b3a..168d1caad8aee 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json @@ -531,6 +531,214 @@ "version": "0.0.0" } }, + "F3": { + "id": "F3", + "path": "lambda-event-source-kafka-self-managed/F3", + "children": { + "ServiceRole": { + "id": "ServiceRole", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole", + "children": { + "ImportServiceRole": { + "id": "ImportServiceRole", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/ImportServiceRole", + "constructInfo": { + "fqn": "aws-cdk-lib.Resource", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Role", + "aws:cdk:cloudformation:props": { + "assumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "managedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnRole", + "version": "0.0.0" + } + }, + "DefaultPolicy": { + "id": "DefaultPolicy", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Policy", + "aws:cdk:cloudformation:props": { + "policyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "policyName": "F3ServiceRoleDefaultPolicy1C0463D1", + "roles": [ + { + "Ref": "F3ServiceRole2F65FFC0" + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnPolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Policy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Role", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::Function", + "aws:cdk:cloudformation:props": { + "code": { + "zipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "handler": "index.handler", + "role": { + "Fn::GetAtt": [ + "F3ServiceRole2F65FFC0", + "Arn" + ] + }, + "runtime": "nodejs18.x" + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnFunction", + "version": "0.0.0" + } + }, + "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3": { + "id": "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3", + "path": "lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::EventSourceMapping", + "aws:cdk:cloudformation:props": { + "batchSize": 100, + "filterCriteria": { + "filters": [ + { + "pattern": "{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}" + } + ] + }, + "functionName": { + "Ref": "F38FF9B13A" + }, + "provisionedPollerConfig": { + "minimumPollers": 1, + "maximumPollers": 3 + }, + "selfManagedEventSource": { + "endpoints": { + "kafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "selfManagedKafkaEventSourceConfig": { + "consumerGroupId": "myTestConsumerGroup3" + }, + "sourceAccessConfigurations": [ + { + "type": "CLIENT_CERTIFICATE_TLS_AUTH", + "uri": { + "Ref": "SC0855C491" + } + }, + { + "type": "SERVER_ROOT_CA_CERTIFICATE", + "uri": { + "Ref": "S509448A1" + } + } + ], + "startingPosition": "TRIM_HORIZON", + "topics": [ + "my-test-topic3" + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnEventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.EventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.Function", + "version": "0.0.0" + } + }, "BootstrapVersion": { "id": "BootstrapVersion", "path": "lambda-event-source-kafka-self-managed/BootstrapVersion", @@ -566,7 +774,7 @@ "path": "LambdaEventSourceKafkaSelfManagedTest/DefaultTest/Default", "constructInfo": { "fqn": "constructs.Construct", - "version": "10.3.0" + "version": "10.4.2" } }, "DeployAssert": { @@ -612,7 +820,7 @@ "path": "Tree", "constructInfo": { "fqn": "constructs.Construct", - "version": "10.3.0" + "version": "10.4.2" } } }, diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts index 3af619c6f8bc2..a9a8b266a2029 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts @@ -87,6 +87,29 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== ], filterEncryption: myKey, })); + + const fn3 = new TestFunction(this, 'F3'); + rootCASecret.grantRead(fn3); + clientCertificatesSecret.grantRead(fn3); + + fn3.addEventSource(new SelfManagedKafkaEventSource({ + bootstrapServers, + topic: 'my-test-topic3', + consumerGroupId: 'myTestConsumerGroup3', + secret: clientCertificatesSecret, + authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH, + rootCACertificate: rootCASecret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + filters: [ + lambda.FilterCriteria.filter({ + numericEquals: lambda.FilterRule.isEqual(1), + }), + ], + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + })); } } diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md index 57aa2b3fe3e5c..1d4f12937a812 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md @@ -396,6 +396,29 @@ myFunction.addEventSource(new ManagedKafkaEventSource({ })); ``` +Set configuration for provisioned pollers that read from the event source. + +```ts +import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; + +// Your MSK cluster arn +declare const clusterArn: string + +// The Kafka topic you want to subscribe to +const topic = 'some-cool-topic'; + +declare const myFunction: lambda.Function; +myFunction.addEventSource(new ManagedKafkaEventSource({ + clusterArn, + topic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, +})); +``` + ## Roadmap Eventually, this module will support all the event sources described under diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3c75a45a51447..c7a837b23ff16 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -164,6 +164,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { kafkaConsumerGroupId: this.innerProps.consumerGroupId, onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, + provisionedPollerConfig: this.innerProps.provisionedPollerConfig, }), ); @@ -240,6 +241,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { throw new Error('secret must be set if Kafka brokers accessed over Internet'); } this.innerProps = props; + } public bind(target: lambda.IFunction) { @@ -256,6 +258,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { sourceAccessConfigurations: this.sourceAccessConfigurations(), onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, + provisionedPollerConfig: this.innerProps.provisionedPollerConfig, }), ); diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts index 40b765da22c63..f65cb6a9852a1 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts @@ -44,6 +44,29 @@ export interface BaseStreamEventSourceProps{ * @default true */ readonly enabled?: boolean; + + /** + * Configuration for provisioned pollers that read from the event source. + * When specified, allows control over the minimum and maximum number of pollers + * that can be provisioned to process events from the source. + * @default - no provisioned pollers + */ + readonly provisionedPollerConfig?: ProvisionedPollerConfig; +} + +export interface ProvisionedPollerConfig { + /** + * The minimum number of pollers that should be provisioned. + * + * @default 1 + */ + readonly minimumPollers: number; + /** + * The maximum number of pollers that can be provisioned. + * + * @default 200 + */ + readonly maximumPollers: number; } /** @@ -151,6 +174,24 @@ export interface StreamEventSourceProps extends BaseStreamEventSourceProps { */ export abstract class StreamEventSource implements lambda.IEventSource { protected constructor(protected readonly props: StreamEventSourceProps) { + if (props.provisionedPollerConfig) { + const { minimumPollers, maximumPollers } = props.provisionedPollerConfig; + if (minimumPollers != undefined) { + if (minimumPollers < 1 || minimumPollers > 200) { + throw new Error('Minimum provisioned pollers must be between 1 and 200 inclusive'); + } + } + if (maximumPollers != undefined) { + if (maximumPollers < 1 || maximumPollers > 2000) { + throw new Error('Maximum provisioned pollers must be between 1 and 2000 inclusive'); + } + } + if (minimumPollers != undefined && maximumPollers != undefined) { + if (minimumPollers > maximumPollers) { + throw new Error('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers'); + } + } + } } public abstract bind(_target: lambda.IFunction): void; diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index d5b1df2c9a657..5775a8ad1507f 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -308,6 +308,105 @@ describe('KafkaEventSource', () => { }); }); + test('with provisioned pollers', () => { + // GIVEN + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + // WHEN + testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + DestinationConfig: { + OnFailure: { + Destination: { + 'Fn::Join': ['', ['arn:', { Ref: 'AWS::Partition' }, ':s3:::my-bucket']], + }, + }, + }, + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('maximum provisioned poller is out of limit', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + }))).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('minimum provisioned poller is out of limit', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 0, + maximumPollers: 3, + }, + }))).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 1, + }, + }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); describe('self-managed kafka', () => { @@ -998,5 +1097,94 @@ describe('KafkaEventSource', () => { expect(mskEventMapping.eventSourceMappingId).toBeDefined(); expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); }); + + test('with provisioned pollers', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); + + const template = Template.fromStack(stack); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('maximum provisioned poller is out of limit', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + }))).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('minimum provisioned poller is out of limit', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 0, + maximumPollers: 3, + }, + }))).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 1, + }, + }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); }); diff --git a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts index 842cdeb66fecc..fe4f4cc78b88f 100644 --- a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts +++ b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts @@ -81,6 +81,21 @@ export interface SourceAccessConfiguration { readonly uri: string; } +export interface ProvisionedPollerConfig { + /** + * The minimum number of pollers that should be provisioned. + * + * @default - 1 + */ + readonly minimumPollers?: number; + /** + * The maximum number of pollers that can be provisioned. + * + * @default - 200 + */ + readonly maximumPollers?: number; +} + export interface EventSourceMappingOptions { /** * The Amazon Resource Name (ARN) of the event source. Any record added to @@ -270,6 +285,14 @@ export interface EventSourceMappingOptions { */ readonly supportS3OnFailureDestination?: boolean; + /** + * Configuration for provisioned pollers that read from the event source. + * When specified, allows control over the minimum and maximum number of pollers + * that can be provisioned to process events from the source. + * @default - no provisioned pollers + */ + readonly provisionedPollerConfig?: ProvisionedPollerConfig; + /** * Configuration for enhanced monitoring metrics collection * When specified, enables collection of additional metrics for the stream event source @@ -383,6 +406,25 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp throw new Error('eventSourceArn and kafkaBootstrapServers are mutually exclusive'); } + if (props.provisionedPollerConfig) { + const { minimumPollers, maximumPollers } = props.provisionedPollerConfig; + if (minimumPollers != undefined) { + if (minimumPollers < 1 || minimumPollers > 200) { + throw new Error('Minimum provisioned pollers must be between 1 and 200 inclusive'); + } + } + if (maximumPollers != undefined) { + if (maximumPollers < 1 || maximumPollers > 2000) { + throw new Error('Maximum provisioned pollers must be between 1 and 2000 inclusive'); + } + } + if (minimumPollers != undefined && maximumPollers != undefined) { + if (minimumPollers > maximumPollers) { + throw new Error('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers'); + } + } + } + if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) { throw new Error('kafkaBootStrapServers must not be empty if set'); } @@ -482,6 +524,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp kmsKeyArn: props.filterEncryption?.keyArn, selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined, amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined, + provisionedPollerConfig: props.provisionedPollerConfig, metricsConfig: props.metricsConfig, }); this.eventSourceMappingId = cfnEventSourceMapping.ref; diff --git a/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts b/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts index 315074ec62210..b8baaa24d22dd 100644 --- a/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts +++ b/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts @@ -2,7 +2,7 @@ import { Match, Template } from '../../assertions'; import { Key } from '../../aws-kms'; import * as cdk from '../../core'; import * as lambda from '../lib'; -import { Code, EventSourceMapping, Function, Runtime, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib'; +import { Code, EventSourceMapping, Function, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib'; let stack: cdk.Stack; let fn: Function; @@ -532,4 +532,75 @@ describe('event source mapping', () => { }, }); }); + + test('provisioned pollers is set', () => { + new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + }); + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('minimum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 0, + }, + })).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('maximum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + maximumPollers: 2001, + }, + })).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('only maximum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + })).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 2, + }, + })).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); diff --git a/packages/aws-cdk-lib/awslint.json b/packages/aws-cdk-lib/awslint.json index 528f143682a16..465f57aceeeef 100644 --- a/packages/aws-cdk-lib/awslint.json +++ b/packages/aws-cdk-lib/awslint.json @@ -405,6 +405,18 @@ "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.S3EventSource.bucket", "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.SnsEventSource.topic", "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.SqsEventSource.queue", + "docs-public-apis:aws-cdk-lib.aws_lambda.EventSourceMappingOptions.provisionedPollerConfig", + "props-default-doc:aws-cdk-lib.aws_lambda.EventSourceMappingOptions.provisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.maximumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.minimumPollers", + "props-default-doc:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.maximumPollers", + "props-default-doc:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.minimumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.KafkaEventSourceProps.provisionedPollerConfig", + "props-default-doc:aws-cdk-lib.aws_lambda_event_sources.KafkaEventSourceProps.provisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig.maximumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig.minimumPollers", "docs-public-apis:aws-cdk-lib.aws_logs.CrossAccountDestination.addToPolicy", "docs-public-apis:aws-cdk-lib.aws_logs.DataIdentifier.*", "docs-public-apis:aws-cdk-lib.aws_logs.JsonPattern.jsonPatternString",