From 82e3f2415c85dedc06cf2bce93a4310cfe2d3034 Mon Sep 17 00:00:00 2001 From: Niko Virtala Date: Fri, 13 Sep 2024 14:57:30 +0300 Subject: [PATCH 1/2] chore(lambda-event-sources): move tests under correct describe block --- .../test/kafka.test.ts | 98 +++++++++---------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index d5b1df2c9a657..122b17e6f9bda 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -308,6 +308,55 @@ describe('KafkaEventSource', () => { }); }); + test('consumerGroupId can be set for ManagedKafkaEventSource', () => { + + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const consumerGroupId = 'my-consumer-group-id'; + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + consumerGroupId, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); + + const template = Template.fromStack(stack); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId }, + }); + + }); + + test('ManagedKafkaEventSource name conforms to construct id rules', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); + }); + }); describe('self-managed kafka', () => { @@ -949,54 +998,5 @@ describe('KafkaEventSource', () => { }); }); - - test('consumerGroupId can be set for ManagedKafkaEventSource', () => { - - // GIVEN - const stack = new cdk.Stack(); - const fn = new TestFunction(stack, 'Fn'); - const clusterArn = 'some-arn'; - const kafkaTopic = 'some-topic'; - const consumerGroupId = 'my-consumer-group-id'; - - const mskEventMapping = new sources.ManagedKafkaEventSource( - { - clusterArn, - topic: kafkaTopic, - startingPosition: lambda.StartingPosition.TRIM_HORIZON, - consumerGroupId, - }); - - // WHEN - fn.addEventSource(mskEventMapping); - expect(mskEventMapping.eventSourceMappingId).toBeDefined(); - expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); - - const template = Template.fromStack(stack); - template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { - AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId }, - }); - - }); - - test('ManagedKafkaEventSource name conforms to construct id rules', () => { - // GIVEN - const stack = new cdk.Stack(); - const fn = new TestFunction(stack, 'Fn'); - const clusterArn = 'some-arn'; - const kafkaTopic = 'some-topic'; - - const mskEventMapping = new sources.ManagedKafkaEventSource( - { - clusterArn, - topic: kafkaTopic, - startingPosition: lambda.StartingPosition.TRIM_HORIZON, - }); - - // WHEN - fn.addEventSource(mskEventMapping); - expect(mskEventMapping.eventSourceMappingId).toBeDefined(); - expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); - }); }); }); From 499898d46da2040528d0b59698520efe87f1c3f3 Mon Sep 17 00:00:00 2001 From: Niko Virtala Date: Fri, 13 Sep 2024 15:11:27 +0300 Subject: [PATCH 2/2] chore(lambda-event-sources): expose eventSourceMappingArn and eventSourceMappingId for SelfManagedKafkaEventSource --- .../aws-lambda-event-sources/lib/kafka.ts | 27 ++++++++++++++++++- .../test/kafka.test.ts | 23 ++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3c75a45a51447..0afbb63a38fbd 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -226,6 +226,8 @@ export class ManagedKafkaEventSource extends StreamEventSource { export class SelfManagedKafkaEventSource extends StreamEventSource { // This is to work around JSII inheritance problems private innerProps: SelfManagedKafkaEventSourceProps; + private _eventSourceMappingId?: string = undefined; + private _eventSourceMappingArn?: string = undefined; constructor(props: SelfManagedKafkaEventSourceProps) { super(props); @@ -244,7 +246,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { public bind(target: lambda.IFunction) { if (!(target instanceof Construct)) { throw new Error('Function is not a construct. Unexpected error.'); } - target.addEventSourceMapping( + const eventSourceMapping = target.addEventSourceMapping( this.mappingId(target), this.enrichMappingOptions({ filters: this.innerProps.filters, @@ -259,6 +261,9 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { }), ); + this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId; + this._eventSourceMappingArn = eventSourceMapping.eventSourceMappingArn; + if (this.innerProps.secret !== undefined) { this.innerProps.secret.grantRead(target); } @@ -314,4 +319,24 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { ? undefined : sourceAccessConfigurations; } + + /** + * The identifier for this EventSourceMapping + */ + public get eventSourceMappingId(): string { + if (!this._eventSourceMappingId) { + throw new Error('KafkaEventSource is not yet bound to an event source mapping'); + } + return this._eventSourceMappingId; + } + + /** + * The ARN for this EventSourceMapping + */ + public get eventSourceMappingArn(): string { + if (!this._eventSourceMappingArn) { + throw new Error('KafkaEventSource is not yet bound to an event source mapping'); + } + return this._eventSourceMappingArn; + } } diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index 122b17e6f9bda..278338d1d7c4b 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -998,5 +998,28 @@ describe('KafkaEventSource', () => { }); }); + + test('eventSourceMappingArn and eventSourceMappingId are defined', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + + const eventSource = new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + }); + + // WHEN + fn.addEventSource(eventSource); + expect(eventSource.eventSourceMappingArn).toBeDefined(); + expect(eventSource.eventSourceMappingId).toBeDefined(); + }); + }); });