From b36bc1146d06c7b9decface9f4ed9edeca61aa56 Mon Sep 17 00:00:00 2001 From: Raphael Manke Date: Tue, 6 Sep 2022 18:23:57 +0200 Subject: [PATCH 1/2] feat(lambda-event-sources): add kafka consumerGroupId support (#21791) This PR adds the capability to specify the consumerGroupId in the event-source-mapping when connection to Kafka. Adds the feature described in issue https://github.com/aws/aws-cdk/issues/21734 This features allows you to specify the consumerGroupId while connecting to a Kafka cluster. This feature was recently annouced https://aws.amazon.com/blogs/compute/using-custom-consumer-group-id-support-for-the-aws-lambda-event-sources-for-msk-and-self-managed-kafka/ and wasn't part of the cdk Kafka construct before. Things done: * Added missing attributes to class EventSourceMapping * amazonManagedKafkaEventSourceConfig * selfManagedKafkaEventSourceConfig * Added validation for the consumerGroupId value based in [CfnSpec](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html) * Added a common api `consumerGroupId` for adding the consumerGroupId independant if selfManaged or awsManaged * Updated existing integration test for SelfManagesKafkaConfig * The ManagedKafka Config is not integration tested because it requires a ManagedKafkaCluster which is not possible to deploy right now via cdk * Added Tests for consumerGroupId validation * Added Tests for template synth for SelfManagedKafka and AwsManagedKafka ---- ### All Submissions: * [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) ### Adding new Unconventional Dependencies: * [ ] This PR adds new unconventional dependencies following the process described [here](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md/#adding-new-unconventional-dependencies) ### New Features * [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/main/INTEGRATION_TESTS.md)? * [x] Did you use `yarn integ` to deploy the infrastructure and generate the snapshot (i.e. `yarn integ` without `--dry-run`)? *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- .../aws-lambda-event-sources/README.md | 4 ++ .../aws-lambda-event-sources/lib/kafka.ts | 9 +++ .../test/integ.kafka-selfmanaged.ts | 1 + ...nt-source-kafka-self-managed.template.json | 3 + .../manifest.json | 5 +- .../tree.json | 3 + .../test/kafka.test.ts | 56 +++++++++++++++++++ .../aws-lambda/lib/event-source-mapping.ts | 30 ++++++++++ .../test/event-source-mapping.test.ts | 40 +++++++++++++ 9 files changed, 150 insertions(+), 1 deletion(-) diff --git a/packages/@aws-cdk/aws-lambda-event-sources/README.md b/packages/@aws-cdk/aws-lambda-event-sources/README.md index c7143701ea422..dcd9eeb44958e 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/README.md +++ b/packages/@aws-cdk/aws-lambda-event-sources/README.md @@ -264,10 +264,14 @@ declare const secret: Secret; // (Optional) The secret containing the root CA certificate that your Kafka brokers use for TLS encryption declare const encryption: Secret; +// (Optional) The consumer group id to use when connecting to the Kafka broker. If omitted the UUID of the event source mapping will be used. +const consumerGroupId: "my-consumer-group-id"; + declare const myFunction: lambda.Function; myFunction.addEventSource(new SelfManagedKafkaEventSource({ bootstrapServers: bootstrapServers, topic: topic, + consumerGroupId: consumerGroupId, secret: secret, batchSize: 100, // default startingPosition: lambda.StartingPosition.TRIM_HORIZON, diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts index 491c2011f071b..39acc9fd414a4 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts @@ -22,6 +22,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps { * @default none */ readonly secret?: secretsmanager.ISecret + /** + * The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'. + * @see https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id + * + * @default - none + */ + readonly consumerGroupId?: string; } /** @@ -125,6 +132,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { startingPosition: this.innerProps.startingPosition, sourceAccessConfigurations: this.sourceAccessConfigurations(), kafkaTopic: this.innerProps.topic, + kafkaConsumerGroupId: this.innerProps.consumerGroupId, }), ); @@ -199,6 +207,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { this.enrichMappingOptions({ kafkaBootstrapServers: this.innerProps.bootstrapServers, kafkaTopic: this.innerProps.topic, + kafkaConsumerGroupId: this.innerProps.consumerGroupId, startingPosition: this.innerProps.startingPosition, sourceAccessConfigurations: this.sourceAccessConfigurations(), }), diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts index 8fa4532e80e16..eca822e362c6e 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts @@ -48,6 +48,7 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== new SelfManagedKafkaEventSource({ bootstrapServers, topic: 'my-test-topic', + consumerGroupId: 'myTestConsumerGroup', secret: clientCertificatesSecret, authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH, rootCACertificate: rootCASecret, diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/lambda-event-source-kafka-self-managed.template.json b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/lambda-event-source-kafka-self-managed.template.json index 498d118a948c0..0c9dd29d0fe28 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/lambda-event-source-kafka-self-managed.template.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/lambda-event-source-kafka-self-managed.template.json @@ -98,6 +98,9 @@ ] } }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "myTestConsumerGroup" + }, "SourceAccessConfigurations": [ { "Type": "CLIENT_CERTIFICATE_TLS_AUTH", diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/manifest.json b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/manifest.json index 33977182b934f..41a3e0e2372f3 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/manifest.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/manifest.json @@ -60,7 +60,10 @@ "/lambda-event-source-kafka-self-managed/F/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic/Resource": [ { "type": "aws:cdk:logicalId", - "data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798" + "data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798", + "trace": [ + "!!DESTRUCTIVE_CHANGES: WILL_REPLACE" + ] } ], "/lambda-event-source-kafka-self-managed/S/Resource": [ diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/tree.json b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/tree.json index 7fbae658dd3ee..b1f236bc57cf9 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/tree.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka-selfmanaged.integ.snapshot/tree.json @@ -165,6 +165,9 @@ ] } }, + "selfManagedKafkaEventSourceConfig": { + "consumerGroupId": "myTestConsumerGroup" + }, "sourceAccessConfigurations": [ { "type": "CLIENT_CERTIFICATE_TLS_AUTH", diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts index 3d0924e26de67..4688765ca1146 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts @@ -634,6 +634,62 @@ describe('KafkaEventSource', () => { }); }); + test('consumerGroupId can be set for SelfManagedKafkaEventSource', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + const consumerGroupId = 'my-consumer-group-id'; + const eventSourceMapping = new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + consumerGroupId: consumerGroupId, + }); + // WHEN + fn.addEventSource(eventSourceMapping); + + // THEN + const template = Template.fromStack(stack); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + SelfManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId }, + }); + + }); + + test('consumerGroupId can be set for ManagedKafkaEventSource', () => { + + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const consumerGroupId = 'my-consumer-group-id'; + + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + consumerGroupId, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + + const template = Template.fromStack(stack); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId }, + }); + + }); + test('ManagedKafkaEventSource name conforms to construct id rules', () => { // GIVEN const stack = new cdk.Stack(); diff --git a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts index d5e2d568bdd2e..08e43ca31e310 100644 --- a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts +++ b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts @@ -215,6 +215,16 @@ export interface EventSourceMappingOptions { */ readonly kafkaBootstrapServers?: string[] + /** + * The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id). + * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-amazonmanagedkafkaeventsourceconfig.html + * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html + * + * @default - none + */ + readonly kafkaConsumerGroupId?: string + + /** * Specific settings like the authentication protocol or the VPC components to secure access to your event source. * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html @@ -319,6 +329,10 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP'); } + if (props.kafkaConsumerGroupId) { + this.validateKafkaConsumerGroupIdOrThrow(props.kafkaConsumerGroupId); + } + let destinationConfig; if (props.onFailure) { @@ -332,6 +346,8 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } }; } + let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined; + const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', { batchSize: props.batchSize, bisectBatchOnFunctionError: props.bisectBatchOnError, @@ -350,9 +366,23 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(), sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}), selfManagedEventSource, + selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined, + amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined, }); this.eventSourceMappingId = cfnEventSourceMapping.ref; } + + private validateKafkaConsumerGroupIdOrThrow(kafkaConsumerGroupId: string) { + if (kafkaConsumerGroupId.length > 200 ||kafkaConsumerGroupId.length < 1) { + throw new Error('kafkaConsumerGroupId must be a valid string between 1 and 200 characters'); + } + + const regex = new RegExp(/[a-zA-Z0-9-\/*:_+=.@-]*/); + const patternMatch = regex.exec(kafkaConsumerGroupId); + if (patternMatch === null || patternMatch[0] !== kafkaConsumerGroupId) { + throw new Error('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"'); + } + } } /** diff --git a/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts b/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts index 05a6ea811ceba..d283c63f09c09 100644 --- a/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts +++ b/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts @@ -154,6 +154,46 @@ describe('event source mapping', () => { })).toThrow(/kafkaBootStrapServers must not be empty if set/); }); + test('throws if kafkaConsumerGroupId is invalid', () => { + expect(() => new EventSourceMapping(stack, 'test', { + eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2', + kafkaConsumerGroupId: 'some invalid', + target: fn, + })).toThrow('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"'); + }); + + test('throws if kafkaConsumerGroupId is too long', () => { + expect(() => new EventSourceMapping(stack, 'test', { + eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2', + kafkaConsumerGroupId: 'x'.repeat(201), + target: fn, + })).toThrow('kafkaConsumerGroupId must be a valid string between 1 and 200 characters'); + }); + + test('not throws if kafkaConsumerGroupId is empty', () => { + expect(() => new EventSourceMapping(stack, 'test', { + eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2', + kafkaConsumerGroupId: '', + target: fn, + })).not.toThrow(); + }); + + test('not throws if kafkaConsumerGroupId is valid for amazon managed kafka', () => { + expect(() => new EventSourceMapping(stack, 'test', { + eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2', + kafkaConsumerGroupId: 'someValidConsumerGroupId', + target: fn, + })).not.toThrow(); + }); + + test('not throws if kafkaConsumerGroupId is valid for self managed kafka', () => { + expect(() => new EventSourceMapping(stack, 'test', { + kafkaBootstrapServers: ['kafka-broker-1:9092', 'kafka-broker-2:9092'], + kafkaConsumerGroupId: 'someValidConsumerGroupId', + target: fn, + })).not.toThrow(); + }); + test('eventSourceArn appears in stack', () => { const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', { type: 'String', From 9f2ea458609b29a91eb792165be6de596ce1aea9 Mon Sep 17 00:00:00 2001 From: Rico Huijbers Date: Tue, 6 Sep 2022 18:04:27 +0100 Subject: [PATCH 2/2] fix(core): `--debug` doesn't record stack traces (#21931) In v1 we used to record construct stack traces; those have disappeared in v2 because the defaults in the underlying `constructs` library changed. Re-add them when `--debug` is passed. ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- packages/@aws-cdk/core/lib/cfn-element.ts | 3 +++ .../@aws-cdk/core/test/cfn-resource.test.ts | 20 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/packages/@aws-cdk/core/lib/cfn-element.ts b/packages/@aws-cdk/core/lib/cfn-element.ts index f233288c02aa8..46a6a8f8d2833 100644 --- a/packages/@aws-cdk/core/lib/cfn-element.ts +++ b/packages/@aws-cdk/core/lib/cfn-element.ts @@ -1,6 +1,7 @@ import * as cxschema from '@aws-cdk/cloud-assembly-schema'; import * as cxapi from '@aws-cdk/cx-api'; import { Construct, Node } from 'constructs'; +import { debugModeEnabled } from './debug'; import { Lazy } from './lazy'; const CFN_ELEMENT_SYMBOL = Symbol.for('@aws-cdk/core.CfnElement'); @@ -71,6 +72,7 @@ export abstract class CfnElement extends Construct { if (!this.node.tryGetContext(cxapi.DISABLE_LOGICAL_ID_METADATA)) { Node.of(this).addMetadata(cxschema.ArtifactMetadataEntryType.LOGICAL_ID, this.logicalId, { + stackTrace: debugModeEnabled(), traceFromFunction: this.constructor, }); } @@ -204,3 +206,4 @@ function notTooLong(x: string) { import { CfnReference } from './private/cfn-reference'; import { Stack } from './stack'; import { Token } from './token'; + diff --git a/packages/@aws-cdk/core/test/cfn-resource.test.ts b/packages/@aws-cdk/core/test/cfn-resource.test.ts index 30a240a234a54..4bc11c5ea43a4 100644 --- a/packages/@aws-cdk/core/test/cfn-resource.test.ts +++ b/packages/@aws-cdk/core/test/cfn-resource.test.ts @@ -1,3 +1,4 @@ +import * as cxschema from '@aws-cdk/cloud-assembly-schema'; import { VALIDATE_SNAPSHOT_REMOVAL_POLICY } from '@aws-cdk/cx-api'; import { Construct } from 'constructs'; import * as core from '../lib'; @@ -255,4 +256,23 @@ describe('cfn resource', () => { }); }).toThrow(/should be created in the scope of a Stack, but no Stack found/); }); + + test('CfnResource has logical ID metadata with stack trace attached', () => { + process.env.CDK_DEBUG = '1'; + try { + const app = new core.App(); + const stack = new core.Stack(app, 'Stack'); + const res = new core.CfnResource(stack, 'SomeCfnResource', { + type: 'Some::Resource', + }); + + // THEN + const metadata = res.node.metadata.find(m => m.type === cxschema.ArtifactMetadataEntryType.LOGICAL_ID); + expect(metadata).toBeDefined(); + expect(metadata?.trace).toBeDefined(); + expect(metadata?.trace?.length).toBeGreaterThan(0); + } finally { + delete process.env.CDK_DEBUG; + } + }); });