Skip to content

Commit

Permalink
Add integration test for reset with update events (#1507)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Jun 13, 2024
1 parent 38fe879 commit 30a29ce
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
59 changes: 58 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,63 @@ func (ts *IntegrationTestSuite) TestResetWorkflowExecution() {
ts.Equal(originalResult, newResult)
}

func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithUpdate() {
ctx := context.Background()
wfId := "reset-workflow-execution-with-update"
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions(wfId), ts.workflows.UpdateBasicWorkflow)
ts.NoError(err)
// Send a few updates to the workflow
for i := 0; i < 2; i++ {
handler, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
Args: []interface{}{time.Millisecond},
WaitForStage: client.WorkflowUpdateStageCompleted,
})
ts.NoError(err)
ts.NoError(handler.Get(ctx, nil))
}
// Complete workflow
ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish", "finished"))
var updatesProcessed int
ts.NoError(run.Get(ctx, &updatesProcessed))
ts.Equal(2, updatesProcessed)
// Reset the workflow
resp, err := ts.client.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: ts.config.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: run.GetID(),
RunId: run.GetRunID(),
},
Reason: "integration test",
WorkflowTaskFinishEventId: 4,
ResetReapplyType: enumspb.RESET_REAPPLY_TYPE_ALL_ELIGIBLE,
ResetReapplyExcludeTypes: []enumspb.ResetReapplyExcludeType{enumspb.RESET_REAPPLY_EXCLUDE_TYPE_SIGNAL},
})
ts.NoError(err)
ts.NotEmpty(resp.GetRunId())
newWf := ts.client.GetWorkflow(ctx, wfId, resp.GetRunId())
// Send a few updates to the new workflow
for i := 0; i < 2; i++ {
handler, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: newWf.GetID(),
RunID: newWf.GetRunID(),
UpdateName: "update",
Args: []interface{}{time.Millisecond},
WaitForStage: client.WorkflowUpdateStageCompleted,
})
ts.NoError(err)
ts.NoError(handler.Get(ctx, nil))
}
// Complete the new workflow
ts.NoError(ts.client.SignalWorkflow(ctx, newWf.GetID(), newWf.GetRunID(), "finish", "finished"))
err = newWf.Get(ctx, &updatesProcessed)
ts.NoError(err)
ts.Equal(4, updatesProcessed)
}

func (ts *IntegrationTestSuite) TestEndToEndLatencyMetrics() {
fetchMetrics := func() (localMetric, nonLocalMetric *metrics.CapturedTimer) {
for _, timer := range ts.metricsHandler.Timers() {
Expand Down Expand Up @@ -2780,7 +2837,7 @@ func (ts *IntegrationTestSuite) TestMaxConcurrentSessionExecutionSizeWithRecreat
func (ts *IntegrationTestSuite) TestUpdateBasic() {
ctx := context.Background()
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-update-beee"), ts.workflows.UpdateBasicWorkflow)
ts.startWorkflowOptions("test-update-basic"), ts.workflows.UpdateBasicWorkflow)
ts.Nil(err)
// Send an update request
ts.Run("ShortUpdate", func() {
Expand Down
8 changes: 5 additions & 3 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,19 +326,21 @@ func (w *Workflows) ActivityRetryOnHBTimeout(ctx workflow.Context) ([]string, er
return []string{"heartbeatAndSleep", "heartbeatAndSleep", "heartbeatAndSleep"}, nil
}

func (w *Workflows) UpdateBasicWorkflow(ctx workflow.Context) error {
func (w *Workflows) UpdateBasicWorkflow(ctx workflow.Context) (int, error) {
updatesProcessed := 0
err := workflow.SetUpdateHandler(ctx, "update", func(ctx workflow.Context, t time.Duration) (string, error) {
err := workflow.Sleep(ctx, t)
if err != nil {
return "", err
}
updatesProcessed += 1
return "test", nil
})
if err != nil {
return errors.New("failed to register update handler")
return updatesProcessed, errors.New("failed to register update handler")
}
workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil)
return nil
return updatesProcessed, nil
}

func (w *Workflows) UpdateCancelableWorkflow(ctx workflow.Context) error {
Expand Down

0 comments on commit 30a29ce

Please sign in to comment.