-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
State machine and handlers working as expected
- Loading branch information
1 parent
05698b8
commit cf1dbc2
Showing
4 changed files
with
245 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,64 @@ | ||
=================================== | ||
Serverless Stepfunctions Callback | ||
=================================== | ||
==================================== | ||
Serverless Step Functions Callback | ||
==================================== | ||
|
||
This is a demo of how you can use "callback" pattern to restart a | ||
StepFunctions statemachine from within a Lambda function. It took me a | ||
while to dig through the AWS docs, sample code, and examples to unlock | ||
the mysteries, so I hope it saves you some time. | ||
`Step Functions <https://aws.amazon.com/step-functions/>`_ | ||
statemachine from within a Lambda function. It took me a while to dig | ||
through the AWS docs, sample code, and examples to unlock the | ||
mysteries, so I hope it saves you some time. | ||
|
||
It is inspired by Ross Rhodes' tweet on the same topic: | ||
|
||
https://twitter.com/trrhodes/status/1160958680537489408 | ||
|
||
He used the AWS Cloud Development Kit and SQS, but I'll be using the | ||
It is inspired by `Ross Rhodes' tweet | ||
<https://twitter.com/trrhodes/status/1160958680537489408>`_ on the | ||
same topic. He used the AWS Cloud Development Kit and SQS, but I'll be using the | ||
Serverless Framework with direct Lambda calls because it's a pattern | ||
that comes up repeately in our use cases. | ||
|
||
Ben Kehoe has an excellent AWS Blog post on the same topic, and he's | ||
using SNS Email for human approvals: | ||
|
||
https://aws.amazon.com/blogs/aws/using-callback-urls-for-approval-emails-with-aws-step-functions/ | ||
`Ben Kehoe wrote an excellent AWS Blog post | ||
<https://aws.amazon.com/blogs/aws/using-callback-urls-for-approval-emails-with-aws-step-functions/>`_ | ||
on the same topic; he's using SNS Email for human approvals. | ||
|
||
The SNS is also not exactly alined with our current use cases, but | ||
SQS- and SNS-driven restarts are both likely something we'll need at | ||
some point. | ||
|
||
Our Real Use Case | ||
================= | ||
|
||
Our application takes a document and uses a Lambda to split it up into | ||
chunks which are dropped onto S3; each of those chunk CreateObject | ||
chunks which are dropped onto S3. Each of those chunks' S3 ``CreateObject`` | ||
events trigger a Lambda to process the chunk, so all the chunks get | ||
prococessed in parallel. Some chunks take longer than others, so once | ||
we determine that all the chunks are done, we want to restart our | ||
state machine. We do this by calling StepFunction API directly, | ||
state machine. We do this by calling Step Functions API directly, | ||
indicating success. | ||
|
||
Demo Implementation | ||
=================== | ||
|
||
This demo code skips the complexity of our real app, allowing us to | ||
focus on the statemachine stop and restart. We add to it a chance | ||
that the processing function may fail, so we signal the failure. Our | ||
statemachine has a handler for this failure, so it can do different | ||
things on success and failure. | ||
focus on the statemachine stop and restart. We'll use a random chance | ||
to decide when we're done, and add to it a chance that the processing | ||
function fails, so we can signal the failure. Our statemachine has a | ||
handler for this, so it can do different things on success and | ||
failure. | ||
|
||
Our preferred backend language is Python, so that's what we'll use for | ||
our Lambda handler. Translating to Node or some other Lambda language | ||
should be trivial: just map the few API calls we make to your Step | ||
Functions SDK. | ||
|
||
We've been using the `Serverless Framework <https://serverless.com/>`_ | ||
for a while for our commercial and government projects and really like | ||
it: it's a pleasure to use and makes all the boring stuff go away. It | ||
takes care of the infrastructure so we don't need to do our own | ||
CloudFormation, nor its shiny new cousin, Cloud Development Kit. | ||
Under the covers, Serverless does CloudFormation for us, and that's | ||
just where it should be -- under the covers, so we can inspect it if | ||
we need to, and ignore it most of the time. | ||
|
||
Takahiro Horike's `Step Function plugin | ||
<https://github.com/horike37/serverless-step-functions>`_ for the | ||
Serverless Framework makes it a breeze to describe state machines | ||
directly in our ``serverless.yml`` file. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import json | ||
from random import random | ||
|
||
import boto3 | ||
|
||
SFN = boto3.client('stepfunctions') | ||
|
||
|
||
def split_doc(event, context): | ||
print(f'split_doc: simulate initial processing by splitting the doc') | ||
return {'msg': 'OK, the doc is split; next each chunk should be processed'} | ||
|
||
|
||
def process_and_check_completion(event, context): | ||
print(f'# event={json.dumps(event)}') | ||
task_token = event['taskToken'] # named by the state machine Payload | ||
print(f'Simulate processing a chunk and check for all chunks done...') | ||
# In a real application, we'd process a chunk and check if they're all | ||
# done; if they are not all completed, we'd just exit without triggering | ||
# the state machine; here, we pretend we're done, or maybe we encountered | ||
# an error. | ||
chance = random() | ||
if chance < 0.7: # most of the time we succeed | ||
print(f'Great, our chunks finished ok, restart the machine happy path') | ||
SFN.send_task_success( | ||
taskToken=task_token, | ||
output=json.dumps({'msg': 'this goes to the next state', | ||
'status': 'looking good'})) | ||
else: # but randomly simuate a failure | ||
SFN.send_task_failure( | ||
taskToken=task_token, | ||
error='ProcessingFailed', # match in state machine ErrorEquals | ||
cause=f'Something broke in our chunk processing chance={chance}') |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
service: serverless-stepfunctions-callback | ||
|
||
plugins: | ||
- serverless-pseudo-parameters | ||
- serverless-step-functions | ||
|
||
provider: | ||
name: aws | ||
runtime: python3.7 | ||
logRetentionInDays: 7 | ||
iamRoleStatements: | ||
- Effect: Allow | ||
Action: | ||
- states:SendTaskFailure | ||
- states:SendTaskSuccess | ||
Resource: | ||
- "arn:aws:states:#{AWS::Region}:#{AWS::AccountId}:states:SendTaskSuccess" | ||
- Ref: CallbackExampleStepFunctionsStateMachine | ||
|
||
package: | ||
include: | ||
- handler.py | ||
exclude: | ||
- "*~" | ||
- .venv3/** | ||
- README.rst | ||
- node_modules/** | ||
- package-lock.json | ||
|
||
functions: | ||
SplitDoc: | ||
handler: handler.split_doc | ||
ProcessAndCheckCompletion: | ||
handler: handler.process_and_check_completion | ||
|
||
stepFunctions: | ||
stateMachines: | ||
CallbackExample: | ||
events: # start the state machine with a simulated event GET /start | ||
- http: | ||
path: start | ||
method: GET | ||
definition: | ||
StartAt: SplitDoc | ||
States: | ||
SplitDoc: | ||
Type: Task # we could use Pass and skip the Lambda, but simulate the split | ||
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage}-SplitDoc" | ||
Next: WaitForCompletion | ||
WaitForCompletion: | ||
Type: Task | ||
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken # this is the magick incantation | ||
Parameters: | ||
FunctionName: ${self:service}-${opt:stage}-ProcessAndCheckCompletion | ||
Payload: | ||
taskToken.$: $$.Task.Token | ||
Next: ContinueProcess # the happy path | ||
Catch: # put our named failures before generic TaskFailed | ||
- ErrorEquals: ["ProcessingFailed"] | ||
Next: ProcessingFailed | ||
- ErrorEquals: ["States.TaskFailed"] # any other problem including Lambda Python exception | ||
Next: UnexpectedFailure | ||
ContinueProcess: | ||
Type: Succeed | ||
ProcessingFailed: | ||
Type: Fail | ||
Error: Processing failed | ||
Cause: We caught a problem when processing so we should clean up the artifacts | ||
UnexpectedFailure: | ||
Type: Fail | ||
Error: Unexpected failure | ||
Cause: Some unexpected error occurred, possibly a Lambda function exception, check logs |