Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipes-targets): add step functions state machine target #29987

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ For more details see the service documentation:

Pipe targets are the end point of a EventBridge Pipe.

The following targets are supported:

1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions)

### Amazon SQS

A SQS message queue can be used as a target for a pipe. Messages will be pushed to the queue.
Expand All @@ -43,7 +48,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

The target configuration can be transformed:
The target input can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
Expand All @@ -63,3 +68,57 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: pipeTarget
});
```

### AWS Step Functions State Machine

A State Machine can be used as a target for a pipe. The State Machine will be invoked with the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStateMachine: sfn.IStateMachine;

const pipeTarget = new targets.SfnStateMachine(targetStateMachine,{});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

Specifying the Invocation Type when the target State Machine is invoked:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStateMachine: sfn.IStateMachine;

const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
{
invocationType: targets.StateMachineInvocationType.FIRE_AND_FORGET,
}
);


const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

The input to the target State Machine can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStateMachine: sfn.IStateMachine;

const pipeTarget = new targets.SfnStateMachine(targetStateMachine,
{
inputTransformation: pipes.InputTransformation.fromObject({ body: '<$.body>' }),
invocationType: targets.StateMachineInvocationType.FIRE_AND_FORGET,
}
);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './sqs';
export * from './stepfunctions';
84 changes: 84 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/stepfunctions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import { StateMachine, StateMachineType } from 'aws-cdk-lib/aws-stepfunctions';

/**
* Parameters for the SfnStateMachine target
*/
export interface SfnStateMachineParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
*/
readonly inputTransformation?: IInputTransformation;

/**
* Specify whether to invoke the State Machine synchronously (`REQUEST_RESPONSE`) or asynchronously (`FIRE_AND_FORGET`).
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagededuplicationid
mrgrain marked this conversation as resolved.
Show resolved Hide resolved
* @default StateMachineInvocationType.FIRE_AND_FORGET
*/
readonly invocationType?: StateMachineInvocationType;
}

/**
* InvocationType for invoking the State Machine.
* @see https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetStateMachineParameters.html
*/
export enum StateMachineInvocationType {
/**
* Invoke StepFunction asynchronously (`StartExecution`). See https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html for more details.
*/
FIRE_AND_FORGET = 'FIRE_AND_FORGET',

/**
* Invoke StepFunction synchronously (`StartSyncExecution`) and wait for the execution to complete. See https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartSyncExecution.html for more details.
*/
REQUEST_RESPONSE = 'REQUEST_RESPONSE',
}

/**
* An EventBridge Pipes target that sends messages to an AWS Step Functions State Machine.
*/
export class SfnStateMachine implements ITarget {
public readonly targetArn: string;

private readonly stateMachine: sfn.IStateMachine;
private readonly invocationType: StateMachineInvocationType;
private readonly inputTemplate?: IInputTransformation;

constructor(stateMachine: sfn.IStateMachine, parameters: SfnStateMachineParameters) {
this.stateMachine = stateMachine;
this.targetArn = stateMachine.stateMachineArn;
this.invocationType = parameters.invocationType?? StateMachineInvocationType.FIRE_AND_FORGET;
this.inputTemplate = parameters.inputTransformation;

if (this.stateMachine instanceof StateMachine
&& this.stateMachine.stateMachineType === StateMachineType.STANDARD
&& this.invocationType === StateMachineInvocationType.REQUEST_RESPONSE) {
throw new Error('STANDARD state machine workflows do not support the REQUEST_RESPONSE invocation type. Use FIRE_AND_FORGET instead.');
}
}

grantPush(grantee: IRole): void {
if (this.invocationType === StateMachineInvocationType.FIRE_AND_FORGET) {
this.stateMachine.grantStartExecution(grantee);
} else {
this.stateMachine.grantStartSyncExecution(grantee);
}
}

bind(pipe: IPipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTemplate?.bind(pipe).inputTemplate,
stepFunctionStateMachineParameters: {
invocationType: this.invocationType,
},
},
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Fixture with packages imported, but nothing else
import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import * as targets from '@aws-cdk/aws-pipes-targets-alpha';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`step-function should grant pipe role push access (StartAsyncExecution) with default invocation type (FIRE_AND_FORGET) 1`] = `
{
"MySfnPipeRoleF1D0F697": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyStateMachineRoleD59FFEBC": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::FindInMap": [
"ServiceprincipalMap",
{
"Ref": "AWS::Region",
},
"states",
],
},
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`step-function should grant pipe role push access (StartAsyncExecution) with invocation type FIRE_AND_FORGET 1`] = `
{
"MySfnPipeRoleF1D0F697": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyStateMachineRoleD59FFEBC": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::FindInMap": [
"ServiceprincipalMap",
{
"Ref": "AWS::Region",
},
"states",
],
},
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`step-function should grant pipe role push access (StartSyncExecution) with invocation type REQUEST-RESPONSE 1`] = `
{
"MySfnPipeRoleF1D0F697": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyStateMachineRoleD59FFEBC": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::FindInMap": [
"ServiceprincipalMap",
{
"Ref": "AWS::Region",
},
"states",
],
},
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;
Loading
Loading