Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filemanager): move SQS and DLQ to stateful stack #101

Merged
merged 10 commits into from
Feb 15, 2024
20 changes: 16 additions & 4 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { OrcaBusStatefulConfig } from '../lib/workload/orcabus-stateful-stack';
import { AuroraPostgresEngineVersion } from 'aws-cdk-lib/aws-rds';
import { OrcaBusStatelessConfig } from '../lib/workload/orcabus-stateless-stack';
import { Duration, aws_lambda, RemovalPolicy } from 'aws-cdk-lib';
import { EventSourceProps } from '../lib/workload/stateful/event_source/component';

const regName = 'OrcaBusSchemaRegistry';
const eventBusName = 'OrcaBusMain';
Expand Down Expand Up @@ -62,6 +63,16 @@ const orcaBusStatelessConfig = {
rdsMasterSecretName: rdsMasterSecretName,
};

const eventSourceConfig: EventSourceProps = {
queueName: 'orcabus-event-source-queue',
maxReceiveCount: 3,
rules: [
{
bucket: 'umccr-temp-dev',
},
],
};

interface EnvironmentConfig {
name: string;
accountId: string;
Expand All @@ -83,7 +94,6 @@ export const getEnvironmentConfig = (
schemaRegistryProps: {
...orcaBusStatefulConfig.schemaRegistryProps,
},

eventBusProps: {
...orcaBusStatefulConfig.eventBusProps,
},
Expand All @@ -99,8 +109,12 @@ export const getEnvironmentConfig = (
securityGroupProps: {
...orcaBusStatefulConfig.securityGroupProps,
},
eventSourceProps: eventSourceConfig,
},
orcaBusStatelessConfig: {
...orcaBusStatelessConfig,
eventSourceQueueName: eventSourceConfig.queueName,
},
orcaBusStatelessConfig: orcaBusStatelessConfig,
},
};

Expand All @@ -113,7 +127,6 @@ export const getEnvironmentConfig = (
schemaRegistryProps: {
...orcaBusStatefulConfig.schemaRegistryProps,
},

eventBusProps: {
...orcaBusStatefulConfig.eventBusProps,
},
Expand Down Expand Up @@ -143,7 +156,6 @@ export const getEnvironmentConfig = (
schemaRegistryProps: {
...orcaBusStatefulConfig.schemaRegistryProps,
},

eventBusProps: {
...orcaBusStatefulConfig.eventBusProps,
},
Expand Down
7 changes: 7 additions & 0 deletions lib/workload/orcabus-stateful-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import { EventBusConstruct, EventBusProps } from './stateful/eventbridge/compone
import { DatabaseConstruct, DatabaseProps } from './stateful/database/component';
import { SecurityGroupConstruct, SecurityGroupProps } from './stateful/securitygroup/component';
import { SchemaRegistryConstruct, SchemaRegistryProps } from './stateful/schemaregistry/component';
import { EventSource, EventSourceProps } from './stateful/event_source/component';

export interface OrcaBusStatefulConfig {
schemaRegistryProps: SchemaRegistryProps;
eventBusProps: EventBusProps;
databaseProps: DatabaseProps;
securityGroupProps: SecurityGroupProps;
eventSourceProps?: EventSourceProps;
}

export class OrcaBusStatefulStack extends cdk.Stack {
readonly eventBus: EventBusConstruct;
readonly database: DatabaseConstruct;
readonly securityGroup: SecurityGroupConstruct;
readonly schemaRegistry: SchemaRegistryConstruct;
readonly eventSource?: EventSource;

constructor(scope: Construct, id: string, props: cdk.StackProps & OrcaBusStatefulConfig) {
super(scope, id, props);
Expand Down Expand Up @@ -46,5 +49,9 @@ export class OrcaBusStatefulStack extends cdk.Stack {
'SchemaRegistryConstruct',
props.schemaRegistryProps
);

if (props.eventSourceProps) {
this.eventSource = new EventSource(this, 'EventSourceConstruct', props.eventSourceProps);
}
}
}
1 change: 1 addition & 0 deletions lib/workload/orcabus-stateless-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface OrcaBusStatelessConfig {
lambdaRuntimePythonVersion: aws_lambda.Runtime;
bclConvertFunctionName: string;
rdsMasterSecretName: string;
eventSourceQueueName?: string;
}

export class OrcaBusStatelessStack extends cdk.Stack {
Expand Down
120 changes: 120 additions & 0 deletions lib/workload/stateful/event_source/component.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { Construct } from 'constructs';
import { Rule } from 'aws-cdk-lib/aws-events';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { SqsQueue } from 'aws-cdk-lib/aws-events-targets';
import { Alarm, ComparisonOperator, MathExpression } from 'aws-cdk-lib/aws-cloudwatch';
import { ServicePrincipal } from 'aws-cdk-lib/aws-iam';

/**
* Properties for defining an S3 EventBridge rule.
*/
export type EventSourceRule = {
/**
* Bucket to receive events from. If not specified, captures events from all buckets.
*/
bucket?: string;
/**
* The types of events to capture for the bucket. If not specified, captures all events.
* This should be from the list S3 EventBridge events:
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html
*/
eventTypes?: string[];
/**
* A prefix of the objects that are matched when receiving events from the buckets.
*/
prefix?: string;
};

/**
* Properties for the EventSource construct.
*/
export type EventSourceProps = {
/**
* The name of the queue to construct.
*/
queueName: string;
/**
* The maximum number of times a message can be unsuccessfully received before
* pushing it to the DLQ.
*/
maxReceiveCount: number;
/**
* A set of EventBridge rules to define..
*/
rules: EventSourceRule[];
};

/**
* A construct that defines an SQS S3 event source, along with a DLQ and CloudWatch alarms.
*/
export class EventSource extends Construct {
readonly queue: Queue;
readonly deadLetterQueue: Queue;
readonly alarm: Alarm;

constructor(scope: Construct, id: string, props: EventSourceProps) {
super(scope, id);

this.deadLetterQueue = new Queue(this, 'DeadLetterQueue');
this.queue = new Queue(this, 'Queue', {
queueName: props.queueName,
deadLetterQueue: {
maxReceiveCount: props.maxReceiveCount,
queue: this.deadLetterQueue,
},
});

for (const prop of props.rules) {
const rule = new Rule(scope, 'Rule', {
eventPattern: {
source: ['aws.s3'],
detailType: prop.eventTypes,
detail: {
...(prop.bucket && {
bucket: {
name: prop.bucket,
},
}),
...(prop.prefix && {
object: {
key: [
{
prefix: prop.prefix,
},
],
},
}),
},
},
});

rule.addTarget(new SqsQueue(this.queue));
}

this.queue.grantSendMessages(new ServicePrincipal('events.amazonaws.com'));

const rateOfMessages = new MathExpression({
expression: 'RATE(visible + notVisible)',
usingMetrics: {
visible: this.deadLetterQueue.metricApproximateNumberOfMessagesVisible(),
notVisible: this.deadLetterQueue.metricApproximateNumberOfMessagesVisible(),
},
});

this.alarm = new Alarm(this, 'Alarm', {
metric: rateOfMessages,
comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
threshold: 0,
evaluationPeriods: 1,
alarmName: 'Orcabus EventSource Alarm',
alarmDescription: 'An event has been received in the dead letter queue.',
});
}

/**
* Get the SQS queue ARN.
*/
get queueArn(): string {
return this.queue.queueArn;
}
}
121 changes: 121 additions & 0 deletions test/stateful/eventSourceConstruct.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import * as cdk from 'aws-cdk-lib';
import { Match, Template } from 'aws-cdk-lib/assertions';
import { EventSource } from '../../lib/workload/stateful/event_source/component';

let stack: cdk.Stack;

function assert_common(template: Template) {
template.resourceCountIs('AWS::SQS::Queue', 2);

template.hasResourceProperties('AWS::SQS::Queue', {
QueueName: 'queue',
RedrivePolicy: {
deadLetterTargetArn: Match.anyValue(),
maxReceiveCount: 100,
},
});

template.hasResourceProperties('AWS::CloudWatch::Alarm', {
ComparisonOperator: 'GreaterThanThreshold',
EvaluationPeriods: 1,
Threshold: 0,
});

template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
source: ['aws.s3'],
detail: {
bucket: {
name: 'bucket',
},
},
},
});
}

beforeEach(() => {
stack = new cdk.Stack();
});

test('Test EventSource created props', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [
{
bucket: 'bucket',
},
],
});
const template = Template.fromStack(stack);

console.log(JSON.stringify(template, undefined, 2));

assert_common(template);
});

test('Test EventSource created props with event types', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [
{
bucket: 'bucket',
eventTypes: ['Object Created'],
},
],
});
const template = Template.fromStack(stack);

assert_common(template);
template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
'detail-type': ['Object Created'],
},
});
});

test('Test EventSource created props with prefix', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [
{
bucket: 'bucket',
prefix: 'prefix',
},
],
});
const template = Template.fromStack(stack);

assert_common(template);
template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
detail: {
object: {
key: [
{
prefix: 'prefix',
},
],
},
},
},
});
});

test('Test EventSource created props with rules matching any bucket', () => {
new EventSource(stack, 'TestEventSourceConstruct', {
queueName: 'queue',
maxReceiveCount: 100,
rules: [{}],
});
const template = Template.fromStack(stack);

template.hasResourceProperties('AWS::Events::Rule', {
EventPattern: {
source: ['aws.s3'],
detail: {},
},
});
});
Loading