diff --git a/packages/@aws-cdk/aws-iot-actions/README.md b/packages/@aws-cdk/aws-iot-actions/README.md index f9043d86bd447..578334b0b6b41 100644 --- a/packages/@aws-cdk/aws-iot-actions/README.md +++ b/packages/@aws-cdk/aws-iot-actions/README.md @@ -24,6 +24,7 @@ Currently supported are: - Invoke a Lambda function - Put objects to a S3 bucket - Put logs to CloudWatch Logs +- Put records to Kinesis Data Firehose stream ## Invoke a Lambda function @@ -121,3 +122,34 @@ new iot.TopicRule(this, 'TopicRule', { actions: [new actions.CloudWatchLogsAction(logGroup)], }); ``` + + +## Put records to Kinesis Data Firehose stream + +The code snippet below creates an AWS IoT Rule that put records to Put records +to Kinesis Data Firehose stream when it is triggered. + +```ts +import * as iot from '@aws-cdk/aws-iot'; +import * as actions from '@aws-cdk/aws-iot-actions'; +import * as s3 from '@aws-cdk/aws-s3'; +import * as firehose from '@aws-cdk/aws-kinesisfirehose'; +import * as destinations from '@aws-cdk/aws-kinesisfirehose-destinations'; + +const bucket = new s3.Bucket(this, 'MyBucket', { + removalPolicy: cdk.RemovalPolicy.DESTROY, +}); +const stream = new firehose.DeliveryStream(this, 'MyStream', { + destinations: [new destinations.S3Bucket(bucket)], +}); + +const topicRule = new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"), + actions: [ + new actions.FirehoseStreamAction(stream, { + batchMode: true, + recordSeparator: actions.FirehoseStreamRecordSeparator.NEWLINE, + }) + ], +}); +``` diff --git a/packages/@aws-cdk/aws-iot-actions/lib/firehose-stream-action.ts b/packages/@aws-cdk/aws-iot-actions/lib/firehose-stream-action.ts new file mode 100644 index 0000000000000..69c8a124a3afc --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/lib/firehose-stream-action.ts @@ -0,0 +1,88 @@ +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import * as firehose from '@aws-cdk/aws-kinesisfirehose'; +import { CommonActionProps } from './common-action-props'; +import { singletonActionRole } from './private/role'; + +/** + * Record Separator to be used to separate records. + */ +export enum FirehoseStreamRecordSeparator { + /** + * Separate by a new line + */ + NEWLINE = '\n', + + /** + * Separate by a tab + */ + TAB = '\t', + + /** + * Separate by a windows new line + */ + WINDOWS_NEWLINE = '\r\n', + + /** + * Separate by a commma + */ + COMMA = ',', +} + +/** + * Configuration properties of an action for the Kinesis Data Firehose stream. + */ +export interface FirehoseStreamProps extends CommonActionProps { + /** + * Whether to deliver the Kinesis Data Firehose stream as a batch by using `PutRecordBatch`. + * When batchMode is true and the rule's SQL statement evaluates to an Array, each Array + * element forms one record in the PutRecordBatch request. The resulting array can't have + * more than 500 records. + * + * @default false + */ + readonly batchMode?: boolean; + + /** + * A character separator that will be used to separate records written to the Kinesis Data Firehose stream. + * + * @default - none -- the stream does not use a separator + */ + readonly recordSeparator?: FirehoseStreamRecordSeparator; +} + + +/** + * The action to put the record from an MQTT message to the Kinesis Data Firehose stream. + */ +export class FirehoseStreamAction implements iot.IAction { + private readonly batchMode?: boolean; + private readonly recordSeparator?: string; + private readonly role?: iam.IRole; + + /** + * @param stream The Kinesis Data Firehose stream to which to put records. + * @param props Optional properties to not use default + */ + constructor(private readonly stream: firehose.IDeliveryStream, props: FirehoseStreamProps = {}) { + this.batchMode = props.batchMode; + this.recordSeparator = props.recordSeparator; + this.role = props.role; + } + + bind(rule: iot.ITopicRule): iot.ActionConfig { + const role = this.role ?? singletonActionRole(rule); + this.stream.grantPutRecords(role); + + return { + configuration: { + firehose: { + batchMode: this.batchMode, + deliveryStreamName: this.stream.deliveryStreamName, + roleArn: role.roleArn, + separator: this.recordSeparator, + }, + }, + }; + } +} diff --git a/packages/@aws-cdk/aws-iot-actions/lib/index.ts b/packages/@aws-cdk/aws-iot-actions/lib/index.ts index 88521265228d4..ce74a2ff2b685 100644 --- a/packages/@aws-cdk/aws-iot-actions/lib/index.ts +++ b/packages/@aws-cdk/aws-iot-actions/lib/index.ts @@ -1,4 +1,5 @@ export * from './cloudwatch-logs-action'; export * from './common-action-props'; +export * from './firehose-stream-action'; export * from './lambda-function-action'; export * from './s3-put-object-action'; diff --git a/packages/@aws-cdk/aws-iot-actions/package.json b/packages/@aws-cdk/aws-iot-actions/package.json index ca5ca2bf1b1f5..b996897b7719d 100644 --- a/packages/@aws-cdk/aws-iot-actions/package.json +++ b/packages/@aws-cdk/aws-iot-actions/package.json @@ -71,6 +71,7 @@ "license": "Apache-2.0", "devDependencies": { "@aws-cdk/assertions": "0.0.0", + "@aws-cdk/aws-kinesisfirehose-destinations": "0.0.0", "@aws-cdk/cdk-build-tools": "0.0.0", "@aws-cdk/cdk-integ-tools": "0.0.0", "@aws-cdk/pkglint": "0.0.0", @@ -81,6 +82,7 @@ "dependencies": { "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-iot": "0.0.0", + "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", @@ -92,6 +94,7 @@ "peerDependencies": { "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-iot": "0.0.0", + "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", diff --git a/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/firehose-stream-action.test.ts b/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/firehose-stream-action.test.ts new file mode 100644 index 0000000000000..2941cc1db270c --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/firehose-stream-action.test.ts @@ -0,0 +1,143 @@ +import { Template, Match } from '@aws-cdk/assertions'; +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import * as firehose from '@aws-cdk/aws-kinesisfirehose'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + +test('Default firehose stream action', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); + + // WHEN + topicRule.addAction( + new actions.FirehoseStreamAction(stream), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + { + Firehose: { + DeliveryStreamName: 'my-stream', + RoleArn: { + 'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'], + }, + }, + }, + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', { + AssumeRolePolicyDocument: { + Statement: [ + { + Action: 'sts:AssumeRole', + Effect: 'Allow', + Principal: { + Service: 'iot.amazonaws.com', + }, + }, + ], + Version: '2012-10-17', + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: ['firehose:PutRecord', 'firehose:PutRecordBatch'], + Effect: 'Allow', + Resource: 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream', + }, + ], + Version: '2012-10-17', + }, + PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7', + Roles: [ + { Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' }, + ], + }); +}); + +test('can set batchMode', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); + + // WHEN + topicRule.addAction( + new actions.FirehoseStreamAction(stream, { batchMode: true }), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Firehose: { BatchMode: true } }), + ], + }, + }); +}); + +test('can set separotor', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); + + // WHEN + topicRule.addAction( + new actions.FirehoseStreamAction(stream, { recordSeparator: actions.FirehoseStreamRecordSeparator.NEWLINE }), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Firehose: { Separator: '\n' } }), + ], + }, + }); +}); + +test('can set role', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = firehose.DeliveryStream.fromDeliveryStreamArn(stack, 'MyStream', 'arn:aws:firehose:xx-west-1:111122223333:deliverystream/my-stream'); + const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); + + // WHEN + topicRule.addAction( + new actions.FirehoseStreamAction(stream, { role }), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Firehose: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }), + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyName: 'MyRolePolicy64AB00A5', + Roles: ['ForTest'], + }); +}); diff --git a/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/integ.firehose-stream-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/integ.firehose-stream-action.expected.json new file mode 100644 index 0000000000000..d1565669b5c04 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/integ.firehose-stream-action.expected.json @@ -0,0 +1,306 @@ +{ + "Resources": { + "TopicRule40A4EA44": { + "Type": "AWS::IoT::TopicRule", + "Properties": { + "TopicRulePayload": { + "Actions": [ + { + "Firehose": { + "BatchMode": true, + "DeliveryStreamName": { + "Ref": "MyStream5C050E93" + }, + "RoleArn": { + "Fn::GetAtt": [ + "TopicRuleTopicRuleActionRole246C4F77", + "Arn" + ] + }, + "Separator": "\n" + } + } + ], + "AwsIotSqlVersion": "2016-03-23", + "Sql": "SELECT * FROM 'device/+/data'" + } + } + }, + "TopicRuleTopicRuleActionRole246C4F77": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "iot.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + } + } + }, + "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "firehose:PutRecord", + "firehose:PutRecordBatch" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687", + "Roles": [ + { + "Ref": "TopicRuleTopicRuleActionRole246C4F77" + } + ] + } + }, + "MyBucketF68F3FF0": { + "Type": "AWS::S3::Bucket", + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, + "MyStreamServiceRole8C50608A": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "firehose.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + } + } + }, + "MyStreamS3DestinationRole5E0BA960": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "firehose.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + } + } + }, + "MyStreamS3DestinationRoleDefaultPolicy401EF6F2": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "s3:GetObject*", + "s3:GetBucket*", + "s3:List*", + "s3:DeleteObject*", + "s3:PutObject", + "s3:Abort*" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::GetAtt": [ + "MyBucketF68F3FF0", + "Arn" + ] + }, + { + "Fn::Join": [ + "", + [ + { + "Fn::GetAtt": [ + "MyBucketF68F3FF0", + "Arn" + ] + }, + "/*" + ] + ] + } + ] + }, + { + "Action": [ + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "MyStreamLogGroupAB67AB09", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "MyStreamS3DestinationRoleDefaultPolicy401EF6F2", + "Roles": [ + { + "Ref": "MyStreamS3DestinationRole5E0BA960" + } + ] + } + }, + "MyStreamLogGroupAB67AB09": { + "Type": "AWS::Logs::LogGroup", + "Properties": { + "RetentionInDays": 731 + }, + "UpdateReplacePolicy": "Retain", + "DeletionPolicy": "Retain" + }, + "MyStreamLogGroupS3Destination423E82A8": { + "Type": "AWS::Logs::LogStream", + "Properties": { + "LogGroupName": { + "Ref": "MyStreamLogGroupAB67AB09" + } + }, + "UpdateReplacePolicy": "Retain", + "DeletionPolicy": "Retain" + }, + "MyStream5C050E93": { + "Type": "AWS::KinesisFirehose::DeliveryStream", + "Properties": { + "DeliveryStreamType": "DirectPut", + "ExtendedS3DestinationConfiguration": { + "BucketARN": { + "Fn::GetAtt": [ + "MyBucketF68F3FF0", + "Arn" + ] + }, + "CloudWatchLoggingOptions": { + "Enabled": true, + "LogGroupName": { + "Ref": "MyStreamLogGroupAB67AB09" + }, + "LogStreamName": { + "Ref": "MyStreamLogGroupS3Destination423E82A8" + } + }, + "RoleARN": { + "Fn::GetAtt": [ + "MyStreamS3DestinationRole5E0BA960", + "Arn" + ] + } + } + }, + "DependsOn": [ + "MyStreamS3DestinationRoleDefaultPolicy401EF6F2" + ] + } + }, + "Mappings": { + "awscdkawskinesisfirehoseCidrBlocks": { + "af-south-1": { + "FirehoseCidrBlock": "13.244.121.224/27" + }, + "ap-east-1": { + "FirehoseCidrBlock": "18.162.221.32/27" + }, + "ap-northeast-1": { + "FirehoseCidrBlock": "13.113.196.224/27" + }, + "ap-northeast-2": { + "FirehoseCidrBlock": "13.209.1.64/27" + }, + "ap-northeast-3": { + "FirehoseCidrBlock": "13.208.177.192/27" + }, + "ap-south-1": { + "FirehoseCidrBlock": "13.232.67.32/27" + }, + "ap-southeast-1": { + "FirehoseCidrBlock": "13.228.64.192/27" + }, + "ap-southeast-2": { + "FirehoseCidrBlock": "13.210.67.224/27" + }, + "ca-central-1": { + "FirehoseCidrBlock": "35.183.92.128/27" + }, + "cn-north-1": { + "FirehoseCidrBlock": "52.81.151.32/27" + }, + "cn-northwest-1": { + "FirehoseCidrBlock": "161.189.23.64/27" + }, + "eu-central-1": { + "FirehoseCidrBlock": "35.158.127.160/27" + }, + "eu-north-1": { + "FirehoseCidrBlock": "13.53.63.224/27" + }, + "eu-south-1": { + "FirehoseCidrBlock": "15.161.135.128/27" + }, + "eu-west-1": { + "FirehoseCidrBlock": "52.19.239.192/27" + }, + "eu-west-2": { + "FirehoseCidrBlock": "18.130.1.96/27" + }, + "eu-west-3": { + "FirehoseCidrBlock": "35.180.1.96/27" + }, + "me-south-1": { + "FirehoseCidrBlock": "15.185.91.0/27" + }, + "sa-east-1": { + "FirehoseCidrBlock": "18.228.1.128/27" + }, + "us-east-1": { + "FirehoseCidrBlock": "52.70.63.192/27" + }, + "us-east-2": { + "FirehoseCidrBlock": "13.58.135.96/27" + }, + "us-gov-east-1": { + "FirehoseCidrBlock": "18.253.138.96/27" + }, + "us-gov-west-1": { + "FirehoseCidrBlock": "52.61.204.160/27" + }, + "us-west-1": { + "FirehoseCidrBlock": "13.57.135.192/27" + }, + "us-west-2": { + "FirehoseCidrBlock": "52.89.255.224/27" + } + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/integ.firehose-stream-action.ts b/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/integ.firehose-stream-action.ts new file mode 100644 index 0000000000000..9287f1294b4dd --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/firehose-stream-action/integ.firehose-stream-action.ts @@ -0,0 +1,38 @@ +/// !cdk-integ pragma:ignore-assets +import * as iot from '@aws-cdk/aws-iot'; +import * as firehose from '@aws-cdk/aws-kinesisfirehose'; +import * as destinations from '@aws-cdk/aws-kinesisfirehose-destinations'; +import * as s3 from '@aws-cdk/aws-s3'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + + +const app = new cdk.App(); + +class TestStack extends cdk.Stack { + constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + const topicRule = new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323( + "SELECT * FROM 'device/+/data'", + ), + }); + + const bucket = new s3.Bucket(this, 'MyBucket', { + removalPolicy: cdk.RemovalPolicy.DESTROY, + }); + const stream = new firehose.DeliveryStream(this, 'MyStream', { + destinations: [new destinations.S3Bucket(bucket)], + }); + topicRule.addAction( + new actions.FirehoseStreamAction(stream, { + batchMode: true, + recordSeparator: actions.FirehoseStreamRecordSeparator.NEWLINE, + }), + ); + } +} + +new TestStack(app, 'test-stack'); +app.synth(); diff --git a/tools/@aws-cdk/pkglint/lib/rules.ts b/tools/@aws-cdk/pkglint/lib/rules.ts index eda6436884d77..dab684878d98f 100644 --- a/tools/@aws-cdk/pkglint/lib/rules.ts +++ b/tools/@aws-cdk/pkglint/lib/rules.ts @@ -1653,7 +1653,7 @@ export class NoExperimentalDependents extends ValidationRule { ['@aws-cdk/aws-apigatewayv2-authorizers', ['@aws-cdk/aws-apigatewayv2']], ['@aws-cdk/aws-events-targets', ['@aws-cdk/aws-kinesisfirehose']], ['@aws-cdk/aws-kinesisfirehose-destinations', ['@aws-cdk/aws-kinesisfirehose']], - ['@aws-cdk/aws-iot-actions', ['@aws-cdk/aws-iot']], + ['@aws-cdk/aws-iot-actions', ['@aws-cdk/aws-iot', '@aws-cdk/aws-kinesisfirehose']], ]); private readonly excludedModules = ['@aws-cdk/cloudformation-include'];