Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Sep 6, 2022
2 parents 120dd73 + 9f2ea45 commit 4653e24
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 1 deletion.
4 changes: 4 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,14 @@ declare const secret: Secret;
// (Optional) The secret containing the root CA certificate that your Kafka brokers use for TLS encryption
declare const encryption: Secret;

// (Optional) The consumer group id to use when connecting to the Kafka broker. If omitted the UUID of the event source mapping will be used.
const consumerGroupId: "my-consumer-group-id";

declare const myFunction: lambda.Function;
myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers: bootstrapServers,
topic: topic,
consumerGroupId: consumerGroupId,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
Expand Down
9 changes: 9 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
* @default none
*/
readonly secret?: secretsmanager.ISecret
/**
* The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'.
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id
*
* @default - none
*/
readonly consumerGroupId?: string;
}

/**
Expand Down Expand Up @@ -125,6 +132,7 @@ export class ManagedKafkaEventSource extends StreamEventSource {
startingPosition: this.innerProps.startingPosition,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
}),
);

Expand Down Expand Up @@ -199,6 +207,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
this.enrichMappingOptions({
kafkaBootstrapServers: this.innerProps.bootstrapServers,
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
startingPosition: this.innerProps.startingPosition,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
new SelfManagedKafkaEventSource({
bootstrapServers,
topic: 'my-test-topic',
consumerGroupId: 'myTestConsumerGroup',
secret: clientCertificatesSecret,
authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
rootCACertificate: rootCASecret,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@
]
}
},
"SelfManagedKafkaEventSourceConfig": {
"ConsumerGroupId": "myTestConsumerGroup"
},
"SourceAccessConfigurations": [
{
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@
"/lambda-event-source-kafka-self-managed/F/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798"
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798",
"trace": [
"!!DESTRUCTIVE_CHANGES: WILL_REPLACE"
]
}
],
"/lambda-event-source-kafka-self-managed/S/Resource": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@
]
}
},
"selfManagedKafkaEventSourceConfig": {
"consumerGroupId": "myTestConsumerGroup"
},
"sourceAccessConfigurations": [
{
"type": "CLIENT_CERTIFICATE_TLS_AUTH",
Expand Down
56 changes: 56 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,62 @@ describe('KafkaEventSource', () => {
});
});

test('consumerGroupId can be set for SelfManagedKafkaEventSource', () => {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const kafkaTopic = 'some-topic';
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const bootstrapServers = ['kafka-broker:9092'];
const consumerGroupId = 'my-consumer-group-id';
const eventSourceMapping = new sources.SelfManagedKafkaEventSource(
{
bootstrapServers: bootstrapServers,
topic: kafkaTopic,
secret: secret,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
consumerGroupId: consumerGroupId,
});
// WHEN
fn.addEventSource(eventSourceMapping);

// THEN
const template = Template.fromStack(stack);
template.hasResourceProperties('AWS::Lambda::EventSourceMapping', {
SelfManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId },
});

});

test('consumerGroupId can be set for ManagedKafkaEventSource', () => {

// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';
const consumerGroupId = 'my-consumer-group-id';


const mskEventMapping = new sources.ManagedKafkaEventSource(
{
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
consumerGroupId,
});

// WHEN
fn.addEventSource(mskEventMapping);
expect(mskEventMapping.eventSourceMappingId).toBeDefined();

const template = Template.fromStack(stack);
template.hasResourceProperties('AWS::Lambda::EventSourceMapping', {
AmazonManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId },
});

});

test('ManagedKafkaEventSource name conforms to construct id rules', () => {
// GIVEN
const stack = new cdk.Stack();
Expand Down
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ export interface EventSourceMappingOptions {
*/
readonly kafkaBootstrapServers?: string[]

/**
* The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id).
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-amazonmanagedkafkaeventsourceconfig.html
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html
*
* @default - none
*/
readonly kafkaConsumerGroupId?: string


/**
* Specific settings like the authentication protocol or the VPC components to secure access to your event source.
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html
Expand Down Expand Up @@ -319,6 +329,10 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
}

if (props.kafkaConsumerGroupId) {
this.validateKafkaConsumerGroupIdOrThrow(props.kafkaConsumerGroupId);
}

let destinationConfig;

if (props.onFailure) {
Expand All @@ -332,6 +346,8 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } };
}

let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined;

const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
bisectBatchOnFunctionError: props.bisectBatchOnError,
Expand All @@ -350,9 +366,23 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
selfManagedEventSource,
selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined,
amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined,
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
}

private validateKafkaConsumerGroupIdOrThrow(kafkaConsumerGroupId: string) {
if (kafkaConsumerGroupId.length > 200 ||kafkaConsumerGroupId.length < 1) {
throw new Error('kafkaConsumerGroupId must be a valid string between 1 and 200 characters');
}

const regex = new RegExp(/[a-zA-Z0-9-\/*:_+=.@-]*/);
const patternMatch = regex.exec(kafkaConsumerGroupId);
if (patternMatch === null || patternMatch[0] !== kafkaConsumerGroupId) {
throw new Error('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"');
}
}
}

/**
Expand Down
40 changes: 40 additions & 0 deletions packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,46 @@ describe('event source mapping', () => {
})).toThrow(/kafkaBootStrapServers must not be empty if set/);
});

test('throws if kafkaConsumerGroupId is invalid', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: 'some invalid',
target: fn,
})).toThrow('kafkaConsumerGroupId contain ivalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"');
});

test('throws if kafkaConsumerGroupId is too long', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: 'x'.repeat(201),
target: fn,
})).toThrow('kafkaConsumerGroupId must be a valid string between 1 and 200 characters');
});

test('not throws if kafkaConsumerGroupId is empty', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: '',
target: fn,
})).not.toThrow();
});

test('not throws if kafkaConsumerGroupId is valid for amazon managed kafka', () => {
expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: 'arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2',
kafkaConsumerGroupId: 'someValidConsumerGroupId',
target: fn,
})).not.toThrow();
});

test('not throws if kafkaConsumerGroupId is valid for self managed kafka', () => {
expect(() => new EventSourceMapping(stack, 'test', {
kafkaBootstrapServers: ['kafka-broker-1:9092', 'kafka-broker-2:9092'],
kafkaConsumerGroupId: 'someValidConsumerGroupId',
target: fn,
})).not.toThrow();
});

test('eventSourceArn appears in stack', () => {
const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', {
type: 'String',
Expand Down
3 changes: 3 additions & 0 deletions packages/@aws-cdk/core/lib/cfn-element.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as cxschema from '@aws-cdk/cloud-assembly-schema';
import * as cxapi from '@aws-cdk/cx-api';
import { Construct, Node } from 'constructs';
import { debugModeEnabled } from './debug';
import { Lazy } from './lazy';

const CFN_ELEMENT_SYMBOL = Symbol.for('@aws-cdk/core.CfnElement');
Expand Down Expand Up @@ -71,6 +72,7 @@ export abstract class CfnElement extends Construct {

if (!this.node.tryGetContext(cxapi.DISABLE_LOGICAL_ID_METADATA)) {
Node.of(this).addMetadata(cxschema.ArtifactMetadataEntryType.LOGICAL_ID, this.logicalId, {
stackTrace: debugModeEnabled(),
traceFromFunction: this.constructor,
});
}
Expand Down Expand Up @@ -204,3 +206,4 @@ function notTooLong(x: string) {
import { CfnReference } from './private/cfn-reference';
import { Stack } from './stack';
import { Token } from './token';

20 changes: 20 additions & 0 deletions packages/@aws-cdk/core/test/cfn-resource.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as cxschema from '@aws-cdk/cloud-assembly-schema';
import { VALIDATE_SNAPSHOT_REMOVAL_POLICY } from '@aws-cdk/cx-api';
import { Construct } from 'constructs';
import * as core from '../lib';
Expand Down Expand Up @@ -255,4 +256,23 @@ describe('cfn resource', () => {
});
}).toThrow(/should be created in the scope of a Stack, but no Stack found/);
});

test('CfnResource has logical ID metadata with stack trace attached', () => {
process.env.CDK_DEBUG = '1';
try {
const app = new core.App();
const stack = new core.Stack(app, 'Stack');
const res = new core.CfnResource(stack, 'SomeCfnResource', {
type: 'Some::Resource',
});

// THEN
const metadata = res.node.metadata.find(m => m.type === cxschema.ArtifactMetadataEntryType.LOGICAL_ID);
expect(metadata).toBeDefined();
expect(metadata?.trace).toBeDefined();
expect(metadata?.trace?.length).toBeGreaterThan(0);
} finally {
delete process.env.CDK_DEBUG;
}
});
});

0 comments on commit 4653e24

Please sign in to comment.