diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3c75a45a51447..0afbb63a38fbd 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -226,6 +226,8 @@ export class ManagedKafkaEventSource extends StreamEventSource { export class SelfManagedKafkaEventSource extends StreamEventSource { // This is to work around JSII inheritance problems private innerProps: SelfManagedKafkaEventSourceProps; + private _eventSourceMappingId?: string = undefined; + private _eventSourceMappingArn?: string = undefined; constructor(props: SelfManagedKafkaEventSourceProps) { super(props); @@ -244,7 +246,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { public bind(target: lambda.IFunction) { if (!(target instanceof Construct)) { throw new Error('Function is not a construct. Unexpected error.'); } - target.addEventSourceMapping( + const eventSourceMapping = target.addEventSourceMapping( this.mappingId(target), this.enrichMappingOptions({ filters: this.innerProps.filters, @@ -259,6 +261,9 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { }), ); + this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId; + this._eventSourceMappingArn = eventSourceMapping.eventSourceMappingArn; + if (this.innerProps.secret !== undefined) { this.innerProps.secret.grantRead(target); } @@ -314,4 +319,24 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { ? undefined : sourceAccessConfigurations; } + + /** + * The identifier for this EventSourceMapping + */ + public get eventSourceMappingId(): string { + if (!this._eventSourceMappingId) { + throw new Error('KafkaEventSource is not yet bound to an event source mapping'); + } + return this._eventSourceMappingId; + } + + /** + * The ARN for this EventSourceMapping + */ + public get eventSourceMappingArn(): string { + if (!this._eventSourceMappingArn) { + throw new Error('KafkaEventSource is not yet bound to an event source mapping'); + } + return this._eventSourceMappingArn; + } } diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index d5b1df2c9a657..278338d1d7c4b 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -308,6 +308,55 @@ describe('KafkaEventSource', () => { }); }); + test('consumerGroupId can be set for ManagedKafkaEventSource', () => { + + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const consumerGroupId = 'my-consumer-group-id'; + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + consumerGroupId, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); + + const template = Template.fromStack(stack); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId }, + }); + + }); + + test('ManagedKafkaEventSource name conforms to construct id rules', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); + }); + }); describe('self-managed kafka', () => { @@ -950,53 +999,27 @@ describe('KafkaEventSource', () => { }); - test('consumerGroupId can be set for ManagedKafkaEventSource', () => { - + test('eventSourceMappingArn and eventSourceMappingId are defined', () => { // GIVEN const stack = new cdk.Stack(); const fn = new TestFunction(stack, 'Fn'); - const clusterArn = 'some-arn'; const kafkaTopic = 'some-topic'; - const consumerGroupId = 'my-consumer-group-id'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; - const mskEventMapping = new sources.ManagedKafkaEventSource( + const eventSource = new sources.SelfManagedKafkaEventSource( { - clusterArn, + bootstrapServers: bootstrapServers, topic: kafkaTopic, + secret: secret, startingPosition: lambda.StartingPosition.TRIM_HORIZON, - consumerGroupId, }); // WHEN - fn.addEventSource(mskEventMapping); - expect(mskEventMapping.eventSourceMappingId).toBeDefined(); - expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); - - const template = Template.fromStack(stack); - template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { - AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId }, - }); - + fn.addEventSource(eventSource); + expect(eventSource.eventSourceMappingArn).toBeDefined(); + expect(eventSource.eventSourceMappingId).toBeDefined(); }); - test('ManagedKafkaEventSource name conforms to construct id rules', () => { - // GIVEN - const stack = new cdk.Stack(); - const fn = new TestFunction(stack, 'Fn'); - const clusterArn = 'some-arn'; - const kafkaTopic = 'some-topic'; - - const mskEventMapping = new sources.ManagedKafkaEventSource( - { - clusterArn, - topic: kafkaTopic, - startingPosition: lambda.StartingPosition.TRIM_HORIZON, - }); - - // WHEN - fn.addEventSource(mskEventMapping); - expect(mskEventMapping.eventSourceMappingId).toBeDefined(); - expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); - }); }); });