Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipes-targets): add SageMaker #30696

Merged
merged 10 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
* `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)
* `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
* `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
* `targets.SageMakerTarget`: [Send event source to a SageMaker pipeline](#amazon-sagemaker-pipeline)
* `targets.SfnStateMachine`: [Invoke a Step Functions state machine from an event source](#aws-step-functions-state-machine)
* `targets.SqsTarget`: [Send event source to an SQS queue](#amazon-sqs)

Expand Down Expand Up @@ -217,6 +218,39 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon SageMaker Pipeline

A SageMaker pipeline can be used as a target for a pipe.
The pipeline will receive the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetPipeline: sagemaker.IPipeline;

const pipelineTarget = new targets.SageMakerTarget(targetPipeline);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipelineTarget,
});
```

The input to the target pipeline can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetPipeline: sagemaker.IPipeline;

const pipelineTarget = new targets.SageMakerTarget(targetPipeline, {
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipelineTarget,
});
```

### AWS Step Functions State Machine

A Step Functions state machine can be used as a target for a pipe.
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ export * from './cloudwatch-logs';
export * from './event-bridge';
export * from './kinesis';
export * from './lambda';
export * from './sagemaker';
export * from './sqs';
export * from './stepfunctions';
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-pipes-targets-alpha/lib/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface LambdaFunctionParameters {
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

Expand Down
66 changes: 66 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/sagemaker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IPipeline } from 'aws-cdk-lib/aws-sagemaker';

/**
* SageMaker target properties.
*/
export interface SageMakerTargetParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

/**
* List of parameter names and values for SageMaker Model Building Pipeline execution.
msambol marked this conversation as resolved.
Show resolved Hide resolved
*
* The Name/Value pairs are passed to start execution of the pipeline.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsagemakerpipelineparameters.html#cfn-pipes-pipe-pipetargetsagemakerpipelineparameters-pipelineparameterlist
* @default - none
*/
readonly pipelineParameters?: Record<string, string>;
msambol marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* An EventBridge Pipes target that sends messages to a SageMaker pipeline.
*/
export class SageMakerTarget implements ITarget {
private pipeline: IPipeline;
private sageMakerParameters?: SageMakerTargetParameters;
private pipelineParameters?: Record<string, string>;
public readonly targetArn: string;

constructor(pipeline: IPipeline, parameters?: SageMakerTargetParameters) {
this.pipeline = pipeline;
this.targetArn = pipeline.pipelineArn;
this.sageMakerParameters = parameters;
this.pipelineParameters = this.sageMakerParameters?.pipelineParameters;
}

grantPush(grantee: IRole): void {
this.pipeline.grantStartPipelineExecution(grantee);
}

bind(pipe: IPipe): TargetConfig {
if (!this.sageMakerParameters) {
return { targetParameters: {} };
}

return {
targetParameters: {
inputTemplate: this.sageMakerParameters.inputTransformation?.bind(pipe).inputTemplate,
sageMakerPipelineParameters: {
pipelineParameterList: this.pipelineParameters ?
Object.entries(this.pipelineParameters).map(([key, value]) => ({
name: key,
value: value,
})) : undefined,
},
},
};
}
}
6 changes: 3 additions & 3 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface SqsTargetParameters {
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

Expand All @@ -20,15 +20,15 @@ export interface SqsTargetParameters {
* The token used for deduplication of sent messages.
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagededuplicationid
* @default none
* @default - none
*/
readonly messageDeduplicationId?: string;

/**
* The FIFO message group ID to use as the target.
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetsqsqueueparameters.html#cfn-pipes-pipe-pipetargetsqsqueueparameters-messagegroupid
* @default none
* @default - none
*/
readonly messageGroupId?: string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface SfnStateMachineParameters {
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
import * as events from 'aws-cdk-lib/aws-events';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as lambda from 'aws-cdk-lib/aws-lambda';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`SageMaker should grant pipe role push access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`SageMaker should grant pipe role push access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sagemaker:StartPipelineExecution",
"Effect": "Allow",
"Resource": "MyPipeline",
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading