Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(stepfunctions): Add native support for DistributedMap #23216

Closed
2 tasks
ayush987goyal opened this issue Dec 3, 2022 · 16 comments · Fixed by #28821
Closed
2 tasks

(stepfunctions): Add native support for DistributedMap #23216

ayush987goyal opened this issue Dec 3, 2022 · 16 comments · Fixed by #28821
Assignees
Labels
@aws-cdk/aws-stepfunctions Related to AWS StepFunctions effort/medium Medium work item – several days of effort feature-request A feature should be added or improved. p1

Comments

@ayush987goyal
Copy link
Contributor

Describe the feature

https://docs.aws.amazon.com/step-functions/latest/dg/concepts-orchestrate-large-scale-parallel-workloads.html

Currently there is a workaround this this but we should have inbuilt support to appropriately define ItemReader, ResultWriter etc.

Use Case

Ability to use the new feature.

Proposed Solution

No response

Other Information

No response

Acknowledgements

  • I may be able to implement this feature request
  • This feature might incur a breaking change

CDK version used

2.53.0

Environment details (OS name and version, etc.)

mac

@ayush987goyal ayush987goyal added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels Dec 3, 2022
@github-actions github-actions bot added the @aws-cdk/aws-stepfunctions Related to AWS StepFunctions label Dec 3, 2022
@peterwoodworth peterwoodworth added p1 effort/medium Medium work item – several days of effort and removed needs-triage This issue or PR still needs to be triaged. labels Dec 5, 2022
@peterwoodworth
Copy link
Contributor

Thanks for the feature request!

Currently there is a workaround this this

Could you share an example workaround please in case anyone else is interested?

@peterwoodworth peterwoodworth added p2 and removed p1 labels Dec 5, 2022
@ayush987goyal
Copy link
Contributor Author

Here is an example:

const dummyMap = new Map(this, "DummyMap");
dummyMap.iterator(someIterationState);

const distributedMap = new CustomState(this, "DistributedMap", {
    stateJson: {
        Type: "Map",
        MaxConcurrency: 100,
        ItemReader: {
            Resource: "arn:aws:states:::s3:getObject",
            ReaderConfig: {
                InputType: "CSV",
                CSVHeaderLocation: "FIRST_ROW",
            },
            Parameters: {
                Bucket: "some-bucket-name",
                "Key.$": "$.my_s3_key",
            },
        },
        ItemSelector: {
            "item.$": "$$.Map.Item.Value",
        },
        ItemProcessor: {
            ...(dummyMap.toStateJson() as any).Iterator,
            ProcessorConfig: {
                Mode: "DISTRIBUTED",
                ExecutionType: "STANDARD",
            },
        },
        ResultWriter: {
            Resource: "arn:aws:states:::s3:putObject",
            Parameters: {
                Bucket: "some-bucket-name",
                Prefix: "process_output",
            },
        },
        ResultPath: "$.map_result",
    },
});

@adeelamin15
Copy link

@ayush987goyal Any idea when it will be resolved ?

@AlessandroVol23
Copy link

Here is an example:

const dummyMap = new Map(this, "DummyMap");
dummyMap.iterator(someIterationState);

const distributedMap = new CustomState(this, "DistributedMap", {
    stateJson: {
        Type: "Map",
        MaxConcurrency: 100,
        ItemReader: {
            Resource: "arn:aws:states:::s3:getObject",
            ReaderConfig: {
                InputType: "CSV",
                CSVHeaderLocation: "FIRST_ROW",
            },
            Parameters: {
                Bucket: "some-bucket-name",
                "Key.$": "$.my_s3_key",
            },
        },
        ItemSelector: {
            "item.$": "$$.Map.Item.Value",
        },
        ItemProcessor: {
            ...(dummyMap.toStateJson() as any).Iterator,
            ProcessorConfig: {
                Mode: "DISTRIBUTED",
                ExecutionType: "STANDARD",
            },
        },
        ResultWriter: {
            Resource: "arn:aws:states:::s3:putObject",
            Parameters: {
                Bucket: "some-bucket-name",
                Prefix: "process_output",
            },
        },
        ResultPath: "$.map_result",
    },
});

Thanks! I'm not why I'd need these three lines:

const dummyMap = new Map(this, "DummyMap");
dummyMap.iterator(someIterationState);
...(dummyMap.toStateJson() as any).Iterator,

I call a Lambda function within the Map task but fail to assign the role via CDK to the custom state. So, it can't invoke the function. Do you have any hint on how to fix that?
Currently, I assign a role directly to the state machine that gets the definition but that doesn't help with single Map runs.

Thanks! 😊

@CptTZ
Copy link

CptTZ commented Jul 13, 2023

The idea of using a dummy map is great! Saved a lot of pain building a correct chain.

but fail to assign the role via CDK to the custom state

I built a StateGraph using that dummyMap, and updated the step function's role policy from the StateGraph's inferred policy statements. Worked perfectly for me.

CDK Code:

  private fixDistributedMapIamPermission() {
    const dummyMapGraph = new sfn.StateGraph(this.dummyMap, "dummyMapGraph");
    dummyMapGraph.policyStatements.map(p => this.stepFunction.addToRolePolicy(p));
    // https://docs.aws.amazon.com/step-functions/latest/dg/iam-policies-eg-dist-map.html#iam-policy-run-dist-map
    this.stepFunction.addToRolePolicy(new iam.PolicyStatement({
      effect: iam.Effect.ALLOW,
      actions: ["states:StartExecution", "states:DescribeExecution", "states:StopExecution"],
      resources: ["*"],
    }));
  }

@mdemarqu
Copy link

mdemarqu commented Jul 28, 2023

I am using @ayush987goyal / @AlessandroVol23 's workaround with the "dummy map trick" for a while and it was working fine until I added a Catch on my DistributedMap:

dummy_map.add_catch(error_chain, errors=["States.ALL"])
...
custom_state_json["Catch"] = dummy_map_json["Catch"]

during the Cloudformation deployment, I get this error:
"Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET: Missing 'Next' target: <my error state> at /States/<my iterator state>/Catch[0]/Next'

Indeed the produced ASL is not correct: "my error state" is called in the Catch but its definition is not present. This looks very much to #25798 .

Any idea how to fix this ?

Nearly 8 months after the launch of distributed maps, this is surprising that CDK still not support it. Any news regarding the roadmap for this feature ?

@T-Guerrero
Copy link

I think a safer approach is to use Workflow Studio directly from AWS Console to design your distributed map and then copy the generated definition into stateJson.

during the Cloudformation deployment, I get this error:
"Invalid State Machine Definition: 'MISSING_TRANSITION_TARGET: Missing 'Next' target: at /States//Catch[0]/Next'

E.g. in the Workflow Studio, the generated definition for a catch is:

...
"Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Fail",
          "ResultPath": "$.failedPayload"
        }
      ],
...

@olivier-schmitt-sonarsource

Hi,
Is there a plan to implement the native support of this key feature into the CDK?
Thanks.

@dmeehan1968
Copy link

The 'dummy map' shown above seems to obfuscate what is going on.

ItemProcessor needs properties startAt and states. If you create states in the normal way for the CDK, then you can reference the id's directly. You need to supply all of the state objects in the states map:

const myFirstState = new Pass(scope, 'First')
const mySecondState = new Pass(scope, 'Second')
myFirstState.next(mySecondState)

new CustomState(scope, 'Map', {
  stateJson: {
    Type: 'Map',
    ItemProcessor: {
      ProcessorConfig: {
        Mode: 'DISTRIBUTED',
        ExecutionType: 'EXPRESS',
      },
      startAt: myFirstState.id,
      states: {
        [myFirstState.id]: myFirstState,
        [mySecondState.id]: mySecondState,
      },
      // ...
    },
})

What the StateGraph does is build the same startAt and states, but also resolves the chain of states into a single states map, as well as resolving the permissions associated with Tasks such as DynamoDB and Lambda.

This makes it much easier to create the ItemProcessor, as we can provide just the start state and spread the graph into the ItemProcessor:

const myFirstState = new Pass(scope, 'First')
const mySecondState = new Pass(scope, 'Second')
myFirstState.next(mySecondState)
const graph = new StateGraph(myFirstState, 'Graph')

new CustomState(scope, 'Map', {
  stateJson: {
    Type: 'Map',
    ItemProcessor: {
      ProcessorConfig: {
        Mode: 'DISTRIBUTED',
        ExecutionType: 'EXPRESS',
      },
      ...graph.toGraphJson(),
      // ...
    },
})

Additionally, as noted by @CptTZ, you need to apply the startExecution permissions to the state machine, so that it can start the states within the distributed map. In the example given, this grants access to all state machines (*), contrary to the principle of least permissive permissions, where you should be setting the resource to the state machine ARN.

Unfortunately, you can't provide the state machine ARN to its default role policy, it will create a circular reference and the CDK synth with barf, but you can work around that by adding the PolicyStatement to a new Policy, then adding the policy to the state machine role:

this.graph.policyStatements.forEach(s => stateMachine.addToRolePolicy(s))

this.policy.addStatements(new PolicyStatement({
    effect: Effect.ALLOW,
    actions: [
        'states:StartExecution',
        'states:DescribeExecution',
        'states:StopExecution',
    ],
    resources: [stateMachine.stateMachineArn],
}))

this.policy.attachToRole(stateMachine.role)

I ended up wrapping this into a (crude for my initial needs) DistributedMap construct:

import { CustomState, JsonPath, State, StateGraph, StateMachine } from "aws-cdk-lib/aws-stepfunctions"
import { Construct } from "constructs"
import { Effect, Policy, PolicyStatement } from "aws-cdk-lib/aws-iam"

export interface DistributedMapProps {
    iterator: State
    itemsPath?: string
    resultPath?: JsonPath
    parameters?: { [key: string]: any }
    maxConcurrency?: number
    label?: string
    executionType: 'EXPRESS' | 'STANDARD'
}

export class DistributedMap extends CustomState {
    graph: StateGraph
    policy: Policy

    constructor(scope: Construct, id: string, props: DistributedMapProps) {

        const graph = new StateGraph(props.iterator, 'Graph')

        super(scope, id, {
            stateJson: {
                Type: 'Map',
                ItemProcessor: {
                    ProcessorConfig: {
                        Mode: 'DISTRIBUTED',
                        ExecutionType: props.executionType,
                    },
                    ...graph.toGraphJson(),
                },
                ItemsPath: props.itemsPath,
                ResultPath: props.resultPath,
                Parameters: props.parameters,
                MaxConcurrency: props.maxConcurrency,
                Label: props.label,
            },
        })

        this.graph = graph

        this.policy = new Policy(this, 'MapPolicy')
    }

    grantNestedPermissions(stateMachine: StateMachine) {
        // this grants the autogenerated permissions in the distributed maps substates to the state machine
        // (e.g) dynamodb, lambda invoke, etc
        this.graph.policyStatements.forEach(s => stateMachine.addToRolePolicy(s))

        // this grants the permissions to the state machine to start, stop, and describe the map
        // NB: we can't add the statement directly to the state machine or it creates a circular
        // reference which the CDK objects too.  By using a policy we get around this limitation.

        this.policy.addStatements(new PolicyStatement({
            effect: Effect.ALLOW,
            actions: [
                'states:StartExecution',
                'states:DescribeExecution',
                'states:StopExecution',
            ],
            resources: [stateMachine.stateMachineArn],
        }))

        this.policy.attachToRole(stateMachine.role)
    }
}

@gael-ft
Copy link

gael-ft commented Nov 27, 2023

If some of you need to combine StateMachineFragment containing a DistributedMap and then support fragment.prefixStates() (to keep unique IDs in your Fragment)

Above solution needs to be updated a little.
Here is my version:

Note my use case is S3 Reader and S3 Writer for DistributedMap, so Construct props are built that way, but logic should be the same in other cases

export interface S3JsonDistributedMapProps {
  readonly iterator: State;
  readonly executionType: 'EXPRESS' | 'STANDARD';
  readonly itemReader: {
    readonly Bucket?: JsonPath | string;
    readonly 'Bucket.$'?: string;
    readonly Key?: JsonPath | string;
    readonly 'Key.$'?: string;
  };
  readonly resultWriter: {
    readonly Bucket?: JsonPath | string;
    readonly 'Bucket.$'?: string;
    readonly Prefix?: JsonPath | string;
    readonly 'Prefix.$'?: string;
  };
  readonly itemSelector?: Readonly<Record<string, JsonPath | string>>;
  readonly maxItemPerBatch?: number;
  readonly maxConcurrency?: number;

  readonly resultPath?: JsonPath | string;
  readonly label?: string;
}

export class S3JsonDistributedMap extends State implements IChainable, INextable {
  public readonly endStates: INextable[];
  private readonly props: S3JsonDistributedMapProps;

  private readonly graph: StateGraph;
  private readonly policy: Policy;

  constructor(scope: Construct, id: string, props: S3JsonDistributedMapProps) {
    super(scope, id, {});
    this.props = props;
    this.graph = new StateGraph(props.iterator, `Map ${this.stateId} Iterator`);
    this.policy = new Policy(this, 'IamRole');

    this.endStates = [this];
  }

  grantNestedPermissions(stateMachine: StateMachine) {
    // roughly the same
  }

  protected makeNext(next: State) {
    super.makeNext(next);
    next.bindToGraph(this.graph);
  }

  public next(next: IChainable): Chain {
    super.makeNext(next.startState);
    return Chain.sequence(this, next);
  }

  override toStateJson(): object {
    const stateJson = {
      Type: 'Map',
      ItemProcessor: {
        ProcessorConfig: {
          Mode: 'DISTRIBUTED',
          ExecutionType: this.props.executionType,
        },
        ...this.graph.toGraphJson(),
      },
      ItemReader: {
        Resource: 'arn:aws:states:::s3:getObject',
        ReaderConfig: {
          InputType: 'JSON',
        },
        Parameters: this.props.itemReader,
      },
      ItemSelector: this.props.itemSelector,
      ItemBatcher: this.props.maxItemPerBatch ? { MaxItemsPerBatch: this.props.maxItemPerBatch } : undefined,
      MaxConcurrency: this.props.maxConcurrency,
      Label: this.props.label,
      ResultWriter: {
        Resource: "arn:aws:states:::s3:putObject",
        Parameters: this.props.resultWriter,
      },
      ResultPath: this.props.resultPath,
    };

    return {
      ...this.renderNextEnd(),
      ...stateJson,
    };
  }
}

@MorielTurjeman
Copy link

Is there any hope to implement this feature into CDK?

@polamjag
Copy link
Contributor

polamjag commented Jan 4, 2024

It seems that #27913 (#27878) adds some support of distributed map 👀

https://github.com/aws/aws-cdk/pull/27913/files#diff-d03ddb7ec6247b274dd8e9588abe929db2414cc3c44f02dbfe61d7674ae4f3deR524-R544

https://github.com/aws/aws-cdk/releases/tag/v2.116.0

@kmkhr
Copy link

kmkhr commented Jan 9, 2024

Is it possible to set the ToleratedFailurePercentage and ToleratedFailurePercentagePath in ItemProcessor?

@dominhhai
Copy link

It seems that #27913 (#27878) adds some support of distributed map 👀

https://github.com/aws/aws-cdk/pull/27913/files#diff-d03ddb7ec6247b274dd8e9588abe929db2414cc3c44f02dbfe61d7674ae4f3deR524-R544

https://github.com/aws/aws-cdk/releases/tag/v2.116.0

the ItemReader , ItemBatcher, ResultWriter, Label is not supported yet.

@dominhhai
Copy link

dominhhai commented Jan 12, 2024

I extend the Map class to add the missing properties.

import {
  IChainable,
  JsonPath,
  ProcessorConfig,
  ProcessorMode,
  Map as SfnMap,
} from 'aws-cdk-lib/aws-stepfunctions'

export type DistributedMapS3Parameter =
  | {
      readonly 'Bucket.$': string
      readonly 'Key.$': string
    }
  | {
      readonly Bucket: JsonPath | string
      readonly Key: JsonPath | string
    }

export interface DistributedMapItemReader {
  readonly Resource:
    | 'arn:aws:states:::s3:getObject'
    | 'arn:aws:states:::s3:listObjectsV2'
  readonly ReaderConfig: {
    readonly InputType: 'CSV' | 'JSON' | 'MANIFEST'
    readonly CSVHeaderLocation?: 'FIRST_ROW' | 'GIVEN'
    readonly CSVHeaders?: string[]
    readonly MaxItems?: number
  }
  readonly Parameters: DistributedMapS3Parameter
}

export interface DistributedMapResultWriter {
  readonly Resource: 'arn:aws:states:::s3:putObject'
  readonly Parameters: DistributedMapS3Parameter
}

export interface DistributedMapItemBatcher {
  readonly MaxItemsPerBatch?: number
  readonly MaxItemsPerBatchPath?: string
  readonly MaxInputBytesPerBatch?: number
  readonly MaxInputBytesPerBatchPath?: number
  readonly BatchInput?: Readonly<Record<string, JsonPath | string>>
}

export class DistributedMap extends SfnMap {
  private itemReader?: DistributedMapItemReader
  private resultWriter?: DistributedMapResultWriter
  private itemBatcher?: DistributedMapItemBatcher
  private itemSelector?: Readonly<Record<string, JsonPath | string>>
  private label?: string

  public override toStateJson(): object {
    const mapStateJson = super.toStateJson()
    return {
      ...mapStateJson,
      ItemReader: this.itemReader,
      ResultWriter: this.resultWriter,
      ItemBatcher: this.itemBatcher,
      ItemSelector: this.itemSelector,
      Label: this.label,
    }
  }

  public override itemProcessor(
    processor: IChainable,
    config: ProcessorConfig = {},
  ): DistributedMap {
    super.itemProcessor(processor, {
      ...config,
      mode: ProcessorMode.DISTRIBUTED,
    })
    return this
  }

  public setLabel(label: string): DistributedMap {
    this.label = label
    return this
  }

  public setItemSelector(
    itemSelector: Readonly<Record<string, JsonPath | string>>,
  ): DistributedMap {
    this.itemSelector = itemSelector
    return this
  }

  public setItemBatcher(
    itemBatcher: DistributedMapItemBatcher,
  ): DistributedMap {
    this.itemBatcher = itemBatcher
    return this
  }

  public setResultWriter(
    resultWriter: DistributedMapResultWriter,
  ): DistributedMap {
    this.resultWriter = resultWriter
    return this
  }

  public setItemReader(itemReader: DistributedMapItemReader): DistributedMap {
    this.itemReader = itemReader
    return this
  }
}

@mergify mergify bot closed this as completed in #28821 Feb 9, 2024
mergify bot pushed a commit that referenced this issue Feb 9, 2024
Adds support for Step Functions Map state in Distributed mode. Currently, in order to create a Distributed Map in CDK, CDK users have to define a Custom State containing their Amazon States Language definition. 

This solution consists of the creation of a new L2 construct, `DistributedMap`. This design decision was made due to the fact that some fields are exclusive to Distributed Maps, such as `ItemReader`. Adding support for it through the existing `Map` L2 construct would lead to some fields being conditionally available.

Some design decisions that were made:
- I created an abstract class `MapBase` that encapsulates all fields currently supported by both `inline` and `distributed` maps. This includes all currently supported fields in the CDK except for `iterator` and `parameters` (deprecated fields). Those are now part of the Map subclass which extends `MapBase`. All new Distributed Maps fields are part of the new `DistributedMap` construct (also a subclass of `MapBase`)
- Permissions specific to Distributed Maps are added as part of this new construct

Thanks to @beck3905 and their PR #24331 for inspiration. A lot of the ideas here are re-used from the PR cited.

Closes #23216 

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
Copy link

github-actions bot commented Feb 9, 2024

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

TheRealAmazonKendra pushed a commit that referenced this issue Feb 9, 2024
Adds support for Step Functions Map state in Distributed mode. Currently, in order to create a Distributed Map in CDK, CDK users have to define a Custom State containing their Amazon States Language definition. 

This solution consists of the creation of a new L2 construct, `DistributedMap`. This design decision was made due to the fact that some fields are exclusive to Distributed Maps, such as `ItemReader`. Adding support for it through the existing `Map` L2 construct would lead to some fields being conditionally available.

Some design decisions that were made:
- I created an abstract class `MapBase` that encapsulates all fields currently supported by both `inline` and `distributed` maps. This includes all currently supported fields in the CDK except for `iterator` and `parameters` (deprecated fields). Those are now part of the Map subclass which extends `MapBase`. All new Distributed Maps fields are part of the new `DistributedMap` construct (also a subclass of `MapBase`)
- Permissions specific to Distributed Maps are added as part of this new construct

Thanks to @beck3905 and their PR #24331 for inspiration. A lot of the ideas here are re-used from the PR cited.

Closes #23216 

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@aws-cdk/aws-stepfunctions Related to AWS StepFunctions effort/medium Medium work item – several days of effort feature-request A feature should be added or improved. p1
Projects
None yet