Skip to content

Commit

Permalink
feat(events): DLQ support for EventBus target (#16383)
Browse files Browse the repository at this point in the history
Closes #15954.

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
DaWyz authored Oct 27, 2021
1 parent 3cfe8a2 commit dbb3f25
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 12 deletions.
22 changes: 21 additions & 1 deletion packages/@aws-cdk/aws-events-targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Currently supported are:
* Put a record to a Kinesis stream
* [Log an event into a LogGroup](#log-an-event-into-a-loggroup)
* Put a record to a Kinesis Data Firehose stream
* Put an event on an EventBridge bus
* [Put an event on an EventBridge bus](#put-an-event-on-an-eventbridge-bus)

See the README of the `@aws-cdk/aws-events` library for more information on
EventBridge.
Expand Down Expand Up @@ -266,3 +266,23 @@ rule.addTarget(
} ),
)
```

## Put an event on an EventBridge bus

Use the `EventBus` target to route event to a different EventBus.

The code snippet below creates the scheduled event rule that route events to an imported event bus.

```ts
const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.expression('rate(1 minute)'),
});

rule.addTarget(new targets.EventBus(
events.EventBus.fromEventBusArn(
this,
'External',
`arn:aws:events:eu-west-1:999999999999:event-bus/test-bus`,
),
));
```
35 changes: 26 additions & 9 deletions packages/@aws-cdk/aws-events-targets/lib/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import { singletonEventRole } from './util';
import * as sqs from '@aws-cdk/aws-sqs';
import { singletonEventRole, addToDeadLetterQueueResourcePolicy } from './util';

/**
* Configuration properties of an Event Bus event
*
* Cannot extend TargetBaseProps. Retry policy is not supported for Event bus targets.
*/
export interface EventBusProps {
/**
Expand All @@ -12,25 +15,39 @@ export interface EventBusProps {
* @default a new role is created.
*/
readonly role?: iam.IRole;

/**
* The SQS queue to be used as deadLetterQueue.
* Check out the [considerations for using a dead-letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html#dlq-considerations).
*
* The events not successfully delivered are automatically retried for a specified period of time,
* depending on the retry policy of the target.
* If an event is not delivered before all retry attempts are exhausted, it will be sent to the dead letter queue.
*
* @default - no dead-letter queue
*/
readonly deadLetterQueue?: sqs.IQueue;
}

/**
* Notify an existing Event Bus of an event
*/
export class EventBus implements events.IRuleTarget {
private readonly role?: iam.IRole;

constructor(private readonly eventBus: events.IEventBus, props: EventBusProps = {}) {
this.role = props.role;
}
constructor(private readonly eventBus: events.IEventBus, private readonly props: EventBusProps = {}) { }

bind(rule: events.IRule, _id?: string): events.RuleTargetConfig {
if (this.role) {
this.role.addToPrincipalPolicy(this.putEventStatement());
if (this.props.role) {
this.props.role.addToPrincipalPolicy(this.putEventStatement());
}
const role = this.role ?? singletonEventRole(rule, [this.putEventStatement()]);
const role = this.props.role ?? singletonEventRole(rule, [this.putEventStatement()]);

if (this.props.deadLetterQueue) {
addToDeadLetterQueueResourcePolicy(rule, this.props.deadLetterQueue);
}

return {
arn: this.eventBus.eventBusArn,
deadLetterConfig: this.props.deadLetterQueue ? { arn: this.props.deadLetterQueue?.queueArn } : undefined,
role,
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import '@aws-cdk/assert-internal/jest';
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import * as sqs from '@aws-cdk/aws-sqs';
import { Stack } from '@aws-cdk/core';
import * as targets from '../../lib';

Expand Down Expand Up @@ -90,4 +91,79 @@ test('with supplied role', () => {
Ref: 'Role1ABCC5F0',
}],
});
});
});

test('with a Dead Letter Queue specified', () => {
const stack = new Stack();
const rule = new events.Rule(stack, 'Rule', {
schedule: events.Schedule.expression('rate(1 min)'),
});
const queue = new sqs.Queue(stack, 'Queue');

rule.addTarget(new targets.EventBus(
events.EventBus.fromEventBusArn(
stack,
'External',
'arn:aws:events:us-east-1:123456789012:default',
),
{ deadLetterQueue: queue },
));

expect(stack).toHaveResource('AWS::Events::Rule', {
Targets: [{
Arn: 'arn:aws:events:us-east-1:123456789012:default',
Id: 'Target0',
RoleArn: {
'Fn::GetAtt': [
'RuleEventsRoleC51A4248',
'Arn',
],
},
DeadLetterConfig: {
Arn: {
'Fn::GetAtt': [
'Queue4A7E3555',
'Arn',
],
},
},
}],
});

expect(stack).toHaveResource('AWS::SQS::QueuePolicy', {
PolicyDocument: {
Statement: [
{
Action: 'sqs:SendMessage',
Condition: {
ArnEquals: {
'aws:SourceArn': {
'Fn::GetAtt': [
'Rule4C995B7F',
'Arn',
],
},
},
},
Effect: 'Allow',
Principal: {
Service: 'events.amazonaws.com',
},
Resource: {
'Fn::GetAtt': [
'Queue4A7E3555',
'Arn',
],
},
Sid: 'AllowEventRuleRule',
},
],
Version: '2012-10-17',
},
Queues: [
{
Ref: 'Queue4A7E3555',
},
],
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
]
]
},
"DeadLetterConfig": {
"Arn": {
"Fn::GetAtt": [
"Queue4A7E3555",
"Arn"
]
}
},
"Id": "Target0",
"RoleArn": {
"Fn::GetAtt": [
Expand Down Expand Up @@ -78,6 +86,50 @@
}
]
}
},
"Queue4A7E3555": {
"Type": "AWS::SQS::Queue",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"QueuePolicy25439813": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Fn::GetAtt": [
"Rule4C995B7F",
"Arn"
]
}
}
},
"Effect": "Allow",
"Principal": {
"Service": "events.amazonaws.com"
},
"Resource": {
"Fn::GetAtt": [
"Queue4A7E3555",
"Arn"
]
},
"Sid": "AllowEventRuleeventsourcestackRuleFCA41174"
}
],
"Version": "2012-10-17"
},
"Queues": [
{
"Ref": "Queue4A7E3555"
}
]
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// !cdk-integ pragma:ignore-assets
import * as events from '@aws-cdk/aws-events';
import * as sqs from '@aws-cdk/aws-sqs';
import * as cdk from '@aws-cdk/core';
import * as targets from '../../lib';

Expand All @@ -12,12 +13,18 @@ class EventSourceStack extends cdk.Stack {
const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.expression('rate(1 minute)'),
});

const queue = new sqs.Queue(this, 'Queue');

rule.addTarget(new targets.EventBus(
events.EventBus.fromEventBusArn(
this,
'External',
`arn:aws:events:${this.region}:999999999999:event-bus/test-bus`,
),
{
deadLetterQueue: queue,
},
));
}
}
Expand Down

0 comments on commit dbb3f25

Please sign in to comment.