Skip to content

Commit

Permalink
Merge branch 'main' into pipelines-cli-binary-format
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorharwell authored Aug 5, 2022
2 parents 3f2809e + a02ef9c commit b0465e3
Show file tree
Hide file tree
Showing 40 changed files with 2,745 additions and 413 deletions.
4 changes: 4 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,17 @@ const topic = 'some-cool-topic';
// The secret that allows access to your self hosted Kafka cluster
declare const secret: Secret;

// (Optional) The secret containing the root CA certificate that your Kafka brokers use for TLS encryption
declare const encryption: Secret;

declare const myFunction: lambda.Function;
myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers: bootstrapServers,
topic: topic,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
encryption: encryption // optional
}));
```

Expand Down
17 changes: 16 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { StreamEventSource, BaseStreamEventSourceProps } from './stream';
/**
* Properties for a Kafka event source
*/
export interface KafkaEventSourceProps extends BaseStreamEventSourceProps{
export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
/**
* The Kafka topic to subscribe to
*/
Expand Down Expand Up @@ -94,6 +94,14 @@ export interface SelfManagedKafkaEventSourceProps extends KafkaEventSourceProps
* @default AuthenticationMethod.SASL_SCRAM_512_AUTH
*/
readonly authenticationMethod?: AuthenticationMethod

/**
* The secret with the root CA certificate used by your Kafka brokers for TLS encryption
* This field is required if your Kafka brokers use certificates signed by a private CA
*
* @default - none
*/
readonly rootCACertificate?: secretsmanager.Secret;
}

/**
Expand Down Expand Up @@ -231,6 +239,13 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
sourceAccessConfigurations.push({ type: authType, uri: this.innerProps.secret.secretArn });
}

if (this.innerProps.rootCACertificate !== undefined) {
sourceAccessConfigurations.push({
type: lambda.SourceAccessConfigurationType.SERVER_ROOT_CA_CERTIFICATE,
uri: this.innerProps.rootCACertificate.secretArn,
});
}

if (this.innerProps.vpcSubnets !== undefined && this.innerProps.securityGroup !== undefined) {
sourceAccessConfigurations.push({
type: lambda.SourceAccessConfigurationType.VPC_SECURITY_GROUP,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import * as lambda from '@aws-cdk/aws-lambda';
import * as secretsmanager from '@aws-cdk/aws-secretsmanager';
import * as cdk from '@aws-cdk/core';
import * as integ from '@aws-cdk/integ-tests';
import { AuthenticationMethod, SelfManagedKafkaEventSource } from '../lib';
import { TestFunction } from './test-function';

class KafkaSelfManagedEventSourceTest extends cdk.Stack {
constructor(scope: cdk.App, id: string) {
super(scope, id);

const dummyCertString = `-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----"
`;

const dummyPrivateKey = `-----BEGIN ENCRYPTED PRIVATE KEY-----
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----`;

const fn = new TestFunction(this, 'F');
const rootCASecret = new secretsmanager.Secret(this, 'S', {
secretObjectValue: {
certificate: cdk.SecretValue.unsafePlainText(dummyCertString),
},
});
const clientCertificatesSecret = new secretsmanager.Secret(this, 'SC', {
secretObjectValue: {
certificate: cdk.SecretValue.unsafePlainText(dummyCertString),
privateKey: cdk.SecretValue.unsafePlainText(dummyPrivateKey),
},
});
rootCASecret.grantRead(fn);
clientCertificatesSecret.grantRead(fn);

const bootstrapServers = [
'my-self-hosted-kafka-broker-1:9092',
'my-self-hosted-kafka-broker-2:9092',
'my-self-hosted-kafka-broker-3:9092',
];

fn.addEventSource(
new SelfManagedKafkaEventSource({
bootstrapServers,
topic: 'my-test-topic',
secret: clientCertificatesSecret,
authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
rootCACertificate: rootCASecret,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
}),
);
}
}

const app = new cdk.App();
const stack = new KafkaSelfManagedEventSourceTest(
app,
'lambda-event-source-kafka-self-managed',
);
new integ.IntegTest(app, 'LambdaEventSourceKafkaSelfManagedTest', {
testCases: [stack],
});
app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":"20.0.0"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"version": "20.0.0",
"testCases": {
"LambdaEventSourceKafkaSelfManagedTest/DefaultTest": {
"stacks": [
"lambda-event-source-kafka-self-managed"
],
"assertionStack": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{
"Resources": {
"FServiceRole3AC82EE1": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"FServiceRoleDefaultPolicy17A19BFA": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"secretsmanager:DescribeSecret",
"secretsmanager:GetSecretValue"
],
"Effect": "Allow",
"Resource": [
{
"Ref": "S509448A1"
},
{
"Ref": "SC0855C491"
}
]
}
],
"Version": "2012-10-17"
},
"PolicyName": "FServiceRoleDefaultPolicy17A19BFA",
"Roles": [
{
"Ref": "FServiceRole3AC82EE1"
}
]
}
},
"FC4345940": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}"
},
"Role": {
"Fn::GetAtt": [
"FServiceRole3AC82EE1",
"Arn"
]
},
"Handler": "index.handler",
"Runtime": "nodejs14.x"
},
"DependsOn": [
"FServiceRoleDefaultPolicy17A19BFA",
"FServiceRole3AC82EE1"
]
},
"FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 100,
"SelfManagedEventSource": {
"Endpoints": {
"KafkaBootstrapServers": [
"my-self-hosted-kafka-broker-1:9092",
"my-self-hosted-kafka-broker-2:9092",
"my-self-hosted-kafka-broker-3:9092"
]
}
},
"SourceAccessConfigurations": [
{
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",
"URI": {
"Ref": "SC0855C491"
}
},
{
"Type": "SERVER_ROOT_CA_CERTIFICATE",
"URI": {
"Ref": "S509448A1"
}
}
],
"StartingPosition": "TRIM_HORIZON",
"Topics": [
"my-test-topic"
]
}
},
"S509448A1": {
"Type": "AWS::SecretsManager::Secret",
"Properties": {
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\"}"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"SC0855C491": {
"Type": "AWS::SecretsManager::Secret",
"Properties": {
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"version": "20.0.0",
"artifacts": {
"Tree": {
"type": "cdk:tree",
"properties": {
"file": "tree.json"
}
},
"lambda-event-source-kafka-self-managed": {
"type": "aws:cloudformation:stack",
"environment": "aws://unknown-account/unknown-region",
"properties": {
"templateFile": "lambda-event-source-kafka-self-managed.template.json",
"validateOnSynth": false
},
"metadata": {
"/lambda-event-source-kafka-self-managed/F/ServiceRole/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FServiceRole3AC82EE1"
}
],
"/lambda-event-source-kafka-self-managed/F/ServiceRole/DefaultPolicy/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FServiceRoleDefaultPolicy17A19BFA"
}
],
"/lambda-event-source-kafka-self-managed/F/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FC4345940"
}
],
"/lambda-event-source-kafka-self-managed/F/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798"
}
],
"/lambda-event-source-kafka-self-managed/S/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "S509448A1"
}
],
"/lambda-event-source-kafka-self-managed/SC/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "SC0855C491"
}
]
},
"displayName": "lambda-event-source-kafka-self-managed"
},
"LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F": {
"type": "aws:cloudformation:stack",
"environment": "aws://unknown-account/unknown-region",
"properties": {
"templateFile": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.template.json",
"validateOnSynth": false
},
"displayName": "LambdaEventSourceKafkaSelfManagedTest/DefaultTest/DeployAssert"
}
}
}
Loading

0 comments on commit b0465e3

Please sign in to comment.