Skip to content

Commit

Permalink
feat(lambda-event-sources): add kafka consumerGroupId support (#21791)
Browse files Browse the repository at this point in the history
This PR adds the capability to specify the consumerGroupId in the event-source-mapping when connection to Kafka.
Adds the feature described in issue #21734

This features allows you to specify the consumerGroupId while connecting to a Kafka cluster. This feature was recently annouced https://aws.amazon.com/blogs/compute/using-custom-consumer-group-id-support-for-the-aws-lambda-event-sources-for-msk-and-self-managed-kafka/ and wasn't part of the cdk Kafka construct before.

Things done: 
* Added missing attributes to class EventSourceMapping
  * amazonManagedKafkaEventSourceConfig
  * selfManagedKafkaEventSourceConfig
* Added validation for the consumerGroupId value based in [CfnSpec](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html)
* Added a common api `consumerGroupId` for adding the consumerGroupId independant if selfManaged or awsManaged
* Updated existing integration test for SelfManagesKafkaConfig
  * The ManagedKafka Config is not integration tested because it requires a ManagedKafkaCluster which is not possible to deploy right now via cdk
* Added Tests for consumerGroupId validation
* Added Tests for template synth for SelfManagedKafka and AwsManagedKafka

----

### All Submissions:

* [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md)

### Adding new Unconventional Dependencies:

* [ ] This PR adds new unconventional dependencies following the process described [here](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md/#adding-new-unconventional-dependencies)

### New Features

* [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/main/INTEGRATION_TESTS.md)?
	* [x] Did you use `yarn integ` to deploy the infrastructure and generate the snapshot (i.e. `yarn integ` without `--dry-run`)?

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
RaphaelManke authored Sep 6, 2022
1 parent bcdd2a8 commit b36bc11
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 1 deletion.
4 changes: 4 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,14 @@ declare const secret: Secret;
// (Optional) The secret containing the root CA certificate that your Kafka brokers use for TLS encryption
declare const encryption: Secret;

// (Optional) The consumer group id to use when connecting to the Kafka broker. If omitted the UUID of the event source mapping will be used.
const consumerGroupId: "my-consumer-group-id";

declare const myFunction: lambda.Function;
myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers: bootstrapServers,
topic: topic,
consumerGroupId: consumerGroupId,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
Expand Down
9 changes: 9 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
* @default none
*/
readonly secret?: secretsmanager.ISecret
/**
* The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'.
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id
*
* @default - none
*/
readonly consumerGroupId?: string;
}

/**
Expand Down Expand Up @@ -125,6 +132,7 @@ export class ManagedKafkaEventSource extends StreamEventSource {
startingPosition: this.innerProps.startingPosition,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
}),
);

Expand Down Expand Up @@ -199,6 +207,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
this.enrichMappingOptions({
kafkaBootstrapServers: this.innerProps.bootstrapServers,
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
startingPosition: this.innerProps.startingPosition,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
new SelfManagedKafkaEventSource({
bootstrapServers,
topic: 'my-test-topic',
consumerGroupId: 'myTestConsumerGroup',
secret: clientCertificatesSecret,
authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
rootCACertificate: rootCASecret,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@
]
}
},
"SelfManagedKafkaEventSourceConfig": {
"ConsumerGroupId": "myTestConsumerGroup"
},
"SourceAccessConfigurations": [
{
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@
"/lambda-event-source-kafka-self-managed/F/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798"
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798",
"trace": [
"!!DESTRUCTIVE_CHANGES: WILL_REPLACE"
]
}
],
"/lambda-event-source-kafka-self-managed/S/Resource": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@
]
}
},
"selfManagedKafkaEventSourceConfig": {
"consumerGroupId": "myTestConsumerGroup"
},
"sourceAccessConfigurations": [
{
"type": "CLIENT_CERTIFICATE_TLS_AUTH",
Expand Down
56 changes: 56 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,62 @@ describe('KafkaEventSource', () => {
});
});

test('consumerGroupId can be set for SelfManagedKafkaEventSource', () => {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const kafkaTopic = 'some-topic';
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const bootstrapServers = ['kafka-broker:9092'];
const consumerGroupId = 'my-consumer-group-id';
const eventSourceMapping = new sources.SelfManagedKafkaEventSource(
{
bootstrapServers: bootstrapServers,
topic: kafkaTopic,
secret: secret,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
consumerGroupId: consumerGroupId,
});
// WHEN
fn.addEventSource(eventSourceMapping);

// THEN
const template = Template.fromStack(stack);
template.hasResourceProperties('AWS::Lambda::EventSourceMapping', {
SelfManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId },
});

});

test('consumerGroupId can be set for ManagedKafkaEventSource', () => {

// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';
const consumerGroupId = 'my-consumer-group-id';


const mskEventMapping = new sources.ManagedKafkaEventSource(
{
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
consumerGroupId,
});

// WHEN
fn.addEventSource(mskEventMapping);
expect(mskEventMapping.eventSourceMappingId).toBeDefined();

const template = Template.fromStack(stack);
template.hasResourceProperties('AWS::Lambda::EventSourceMapping', {
AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId },
});

});

test('ManagedKafkaEventSource name conforms to construct id rules', () => {
// GIVEN
const stack = new cdk.Stack();
Expand Down
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ export interface EventSourceMappingOptions {
*/
readonly kafkaBootstrapServers?: string[]

/**
* The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id).
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-amazonmanagedkafkaeventsourceconfig.html
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html
*
* @default - none
*/
readonly kafkaConsumerGroupId?: string


/**
* Specific settings like the authentication protocol or the VPC components to secure access to your event source.
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html
Expand Down Expand Up @@ -319,6 +329,10 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
}

if (props.kafkaConsumerGroupId) {
this.validateKafkaConsumerGroupIdOrThrow(props.kafkaConsumerGroupId);
}

let destinationConfig;

if (props.onFailure) {
Expand All @@ -332,6 +346,8 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } };
}

let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined;

const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
bisectBatchOnFunctionError: props.bisectBatchOnError,
Expand All @@ -350,9 +366,23 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
selfManagedEventSource,
selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined,
amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined,
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
}

private validateKafkaConsumerGroupIdOrThrow(kafkaConsumerGroupId: string) {
if (kafkaConsumerGroupId.length > 200 ||kafkaConsumerGroupId.length < 1) {
throw new Error('kafkaConsumerGroupId must be a valid string between 1 and 200 characters');
}

const regex = new RegExp(/[a-zA-Z0-9-\/*:_+=.@-]*/);
const patternMatch = regex.exec(kafkaConsumerGroupId);
if (patternMatch === null || patternMatch[0] !== kafkaConsumerGroupId) {
throw new Error('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"');
}
}
}

/**
Expand Down
40 changes: 40 additions & 0 deletions packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,46 @@ describe('event source mapping', () => {
})).toThrow(/kafkaBootStrapServers must not be empty if set/);
});

test('throws if kafkaConsumerGroupId is invalid', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: 'some invalid',
target: fn,
})).toThrow('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"');
});

test('throws if kafkaConsumerGroupId is too long', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: 'x'.repeat(201),
target: fn,
})).toThrow('kafkaConsumerGroupId must be a valid string between 1 and 200 characters');
});

test('not throws if kafkaConsumerGroupId is empty', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: '',
target: fn,
})).not.toThrow();
});

test('not throws if kafkaConsumerGroupId is valid for amazon managed kafka', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: 'someValidConsumerGroupId',
target: fn,
})).not.toThrow();
});

test('not throws if kafkaConsumerGroupId is valid for self managed kafka', () => {
expect(() => new EventSourceMapping(stack, 'test', {
kafkaBootstrapServers: ['kafka-broker-1:9092', 'kafka-broker-2:9092'],
kafkaConsumerGroupId: 'someValidConsumerGroupId',
target: fn,
})).not.toThrow();
});

test('eventSourceArn appears in stack', () => {
const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', {
type: 'String',
Expand Down

0 comments on commit b36bc11

Please sign in to comment.