Skip to content

Commit

Permalink
feat(iot): Action to send messages to SQS queues (#18087)
Browse files Browse the repository at this point in the history
Fixes #17699

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
dyoshikawa authored Jan 4, 2022
1 parent 82fa742 commit 37537fe
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 0 deletions.
25 changes: 25 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Currently supported are:
- Capture CloudWatch metrics
- Change state for a CloudWatch alarm
- Put records to Kinesis Data Firehose stream
- Send messages to SQS queues

## Invoke a Lambda function

Expand Down Expand Up @@ -209,3 +210,27 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {
],
});
```

## Send messages to an SQS queue

The code snippet below creates an AWS IoT Rule that send messages
to an SQS queue when it is triggered:

```ts
import * as iot from '@aws-cdk/aws-iot';
import * as actions from '@aws-cdk/aws-iot-actions';
import * as sqs from '@aws-cdk/aws-sqs';

const queue = new sqs.Queue(this, 'MyQueue');

const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'",
),
actions: [
new actions.SqsQueueAction(queue, {
useBase64: true, // optional property, default is 'false'
}),
]
});
```
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ export * from './common-action-props';
export * from './firehose-stream-action';
export * from './lambda-function-action';
export * from './s3-put-object-action';
export * from './sqs-queue-action';

54 changes: 54 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/sqs-queue-action.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as sqs from '@aws-cdk/aws-sqs';
import { CommonActionProps } from './common-action-props';
import { singletonActionRole } from './private/role';

/**
* Configuration properties of an action for SQS.
*/
export interface SqsQueueActionProps extends CommonActionProps {
/**
* Specifies whether to use Base64 encoding.
*
* @default false
*/
readonly useBase64?: boolean
}

/**
* The action to write the data from an MQTT message to an Amazon SQS queue.
*/
export class SqsQueueAction implements iot.IAction {
private readonly role?: iam.IRole;
private readonly queue: sqs.IQueue;
private readonly useBase64?: boolean;

/**
* @param queue The Amazon SQS queue to which to write data.
* @param props Optional properties to not use default
*/
constructor(queue: sqs.IQueue, props: SqsQueueActionProps = {}) {
this.queue = queue;
this.role = props.role;
this.useBase64 = props.useBase64;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['sqs:SendMessage'],
resources: [this.queue.queueArn],
}));

return {
configuration: {
sqs: {
queueUrl: this.queue.queueUrl,
useBase64: this.useBase64,
roleArn: role.roleArn,
},
},
};
}
}
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
"@aws-cdk/core": "0.0.0",
"case": "1.6.3",
"constructs": "^3.3.69"
Expand All @@ -100,6 +101,7 @@
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
"@aws-cdk/core": "0.0.0",
"constructs": "^3.3.69"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"Resources": {
"TopicRule40A4EA44": {
"Type": "AWS::IoT::TopicRule",
"Properties": {
"TopicRulePayload": {
"Actions": [
{
"Sqs": {
"QueueUrl": {
"Ref": "MyQueueE6CA6235"
},
"RoleArn": {
"Fn::GetAtt": [
"TopicRuleTopicRuleActionRole246C4F77",
"Arn"
]
}
}
}
],
"AwsIotSqlVersion": "2016-03-23",
"Sql": "SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'"
}
}
},
"TopicRuleTopicRuleActionRole246C4F77": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
}
},
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyQueueE6CA6235",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687",
"Roles": [
{
"Ref": "TopicRuleTopicRuleActionRole246C4F77"
}
]
}
},
"MyQueueE6CA6235": {
"Type": "AWS::SQS::Queue",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/// !cdk-integ pragma:ignore-assets
import * as iot from '@aws-cdk/aws-iot';
import * as sqs from '@aws-cdk/aws-sqs';
import * as cdk from '@aws-cdk/core';
import * as actions from '../../lib';

const app = new cdk.App();

class TestStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);

const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT topic(2) as device_id, year, month, day FROM 'device/+/data'",
),
});

const queue = new sqs.Queue(this, 'MyQueue', {
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
topicRule.addAction(new actions.SqsQueueAction(queue));
}
}

new TestStack(app, 'test-stack');
app.synth();
129 changes: 129 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/test/sqs/sqs-queue-action.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import { Template, Match } from '@aws-cdk/assertions';
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as sqs from '@aws-cdk/aws-sqs';
import * as cdk from '@aws-cdk/core';
import * as actions from '../../lib';

test('Default SQS queue action', () => {
// GIVEN
const stack = new cdk.Stack();
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
});
const queue = sqs.Queue.fromQueueArn(stack, 'MyQueue', 'arn:aws:sqs::123456789012:test-queue');

// WHEN
topicRule.addAction(new actions.SqsQueueAction(queue));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
{
Sqs: {
QueueUrl: {
'Fn::Join': ['', [
'https://sqs..',
{ Ref: 'AWS::URLSuffix' },
'/123456789012/test-queue',
]],
},
RoleArn: {
'Fn::GetAtt': [
'MyTopicRuleTopicRuleActionRoleCE2D05DA',
'Arn',
],
},
},
},
],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
AssumeRolePolicyDocument: {
Statement: [
{
Action: 'sts:AssumeRole',
Effect: 'Allow',
Principal: {
Service: 'iot.amazonaws.com',
},
},
],
Version: '2012-10-17',
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: 'sqs:SendMessage',
Effect: 'Allow',
Resource: 'arn:aws:sqs::123456789012:test-queue',
},
],
Version: '2012-10-17',
},
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7',
Roles: [
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' },
],
});
});

test('Can set useBase64', () => {
// GIVEN
const stack = new cdk.Stack();
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
});
const queue = sqs.Queue.fromQueueArn(stack, 'MyQueue', 'arn:aws:sqs::123456789012:test-queue');

// WHEN
topicRule.addAction(new actions.SqsQueueAction(queue, {
useBase64: true,
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
Match.objectLike({ Sqs: { UseBase64: true } }),
],
},
});
});

test('Can set role', () => {
// GIVEN
const stack = new cdk.Stack();
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
});
const queue = sqs.Queue.fromQueueArn(stack, 'MyQueue', 'arn:aws:sqs::123456789012:test-queue');
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest');

// WHEN
topicRule.addAction(new actions.SqsQueueAction(queue, { role }));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
Match.objectLike({
Sqs: {
RoleArn: 'arn:aws:iam::123456789012:role/ForTest',
},
}),
],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyName: 'MyRolePolicy64AB00A5',
Roles: ['ForTest'],
});
});

0 comments on commit 37537fe

Please sign in to comment.