From c6405d671181c16ba9703023963cf882096f8e4d Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 2 Feb 2023 11:34:11 -0800 Subject: [PATCH] Handle CanceledAfterStarted in CanceledEvent (#1027) Handle CanceledAfterStarted in CanceledEvent --- internal/internal_command_state_machine.go | 2 +- internal/internal_worker_test.go | 51 ++++++++++++++++++++++ test/integration_test.go | 33 ++++++++++++++ test/workflow_test.go | 42 ++++++++++++++++++ 4 files changed, 127 insertions(+), 1 deletion(-) diff --git a/internal/internal_command_state_machine.go b/internal/internal_command_state_machine.go index 254606e1f..2b006d423 100644 --- a/internal/internal_command_state_machine.go +++ b/internal/internal_command_state_machine.go @@ -534,7 +534,7 @@ func (d *commandStateMachineBase) handleCancelFailedEvent() { func (d *commandStateMachineBase) handleCanceledEvent() { switch d.state { - case commandStateCancellationCommandSent, commandStateCanceledAfterInitiated, commandStateCancellationCommandAccepted: + case commandStateCancellationCommandSent, commandStateCanceledAfterInitiated, commandStateCanceledAfterStarted, commandStateCancellationCommandAccepted: d.moveState(commandStateCompleted, eventCanceled) default: d.failStateTransition(eventCanceled) diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 5e44e0763..2e550b513 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1161,6 +1161,57 @@ func testReplayWorkflowCancelChildWorkflowUnusualOrdering(ctx Context) error { return nil } +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_ChildWorkflowCancellation_WhenWorkflowCanceled() { + taskQueue := "taskQueue1" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflowCancelChildWorkflowUnusualOrdering"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()), + }), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(3), + createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}), + createTestEventStartChildWorkflowExecutionInitiated(5, &historypb.StartChildWorkflowExecutionInitiatedEventAttributes{ + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + WorkflowId: "workflowId", + WorkflowType: &commonpb.WorkflowType{Name: "testWorkflow"}, + ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_TERMINATE, + }), + createTestEventTimerStarted(6, 6), + createTestEventChildWorkflowExecutionStarted(7, &historypb.ChildWorkflowExecutionStartedEventAttributes{ + InitiatedEventId: 5, + WorkflowType: &commonpb.WorkflowType{Name: "testWorkflow"}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"}, + }), + + createTestEventWorkflowTaskScheduled(8, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(9), + createTestEventWorkflowTaskCompleted(10, &historypb.WorkflowTaskCompletedEventAttributes{}), + createTestEventWorkflowExecutionCancelRequested(11, &historypb.WorkflowExecutionCancelRequestedEventAttributes{}), + createTestEventTimerFired(12, 6), + createTestEventWorkflowTaskScheduled(13, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventChildWorkflowExecutionCanceled(14, &historypb.ChildWorkflowExecutionCanceledEventAttributes{ + InitiatedEventId: 5, + StartedEventId: 7, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"}, + }), + createTestEventWorkflowTaskStarted(15), + createTestEventWorkflowTaskCompleted(16, &historypb.WorkflowTaskCompletedEventAttributes{}), + } + + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{}) + require.NoError(s.T(), err) + replayer.RegisterWorkflow(testReplayWorkflowCancelChildWorkflowUnusualOrdering) + err = replayer.ReplayWorkflowHistory(logger, history) + if err != nil { + fmt.Printf("replay failed. Error: %v", err.Error()) + } + require.NoError(s.T(), err) +} + func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_ChildWorkflowCancellation_Unusual_Ordering() { taskQueue := "taskQueue1" testEvents := []*historypb.HistoryEvent{ diff --git a/test/integration_test.go b/test/integration_test.go index 726e8509f..bf0d4756d 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1003,6 +1003,39 @@ func (ts *IntegrationTestSuite) TestCancelChildWorkflowUnusualTransitions() { ts.NoError(err) } +func (ts *IntegrationTestSuite) TestCancelChildWorkflowAndParentWorkflow() { + wfid := "test-cancel-child-workflow-and-parent-workflow" + run, err := ts.client.ExecuteWorkflow(context.Background(), + ts.startWorkflowOptions(wfid), + ts.workflows.ChildWorkflowAndParentCancel) + ts.NoError(err) + + // Give it a sec to populate the query + <-time.After(1 * time.Second) + + v, err := ts.client.QueryWorkflow(context.Background(), run.GetID(), "", "child-and-parent-cancel-child-workflow-id") + ts.NoError(err) + + var childWorkflowID string + err = v.Get(&childWorkflowID) + ts.NoError(err) + ts.NotNil(childWorkflowID) + ts.NotEmpty(childWorkflowID) + + err = ts.client.CancelWorkflow(context.Background(), childWorkflowID, "") + ts.NoError(err) + + err = ts.client.CancelWorkflow(context.Background(), run.GetID(), "") + ts.NoError(err) + + err = run.Get(context.Background(), nil) + ts.NoError(err) + + err = ts.client.GetWorkflow(context.Background(), childWorkflowID, "").Get(context.Background(), nil) + var canceledError *temporal.CanceledError + ts.ErrorAs(err, &canceledError) +} + func (ts *IntegrationTestSuite) TestChildWorkflowDuplicatePanic_Regression() { wfid := "test-child-workflow-duplicate-panic-regression" run, err := ts.client.ExecuteWorkflow(context.Background(), diff --git a/test/workflow_test.go b/test/workflow_test.go index c60542211..3a3d8cb11 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -487,6 +487,46 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflo return childWE.ID, err } +func (w *Workflows) childWorkflowWaitOnContextCancel(ctx workflow.Context) error { + var err error + // Wait for the workflow to be cancelled + ctx.Done().Receive(ctx, &err) + var canceledError *temporal.CanceledError + if errors.As(ctx.Err(), &canceledError) { + return ctx.Err() + } else { + return errors.New("childWorkflowWaitOnContextCancel was not cancelled") + } +} + +func (w *Workflows) ChildWorkflowAndParentCancel(ctx workflow.Context) error { + var childWorkflowID string + err := workflow.SetQueryHandler(ctx, "child-and-parent-cancel-child-workflow-id", func(input []byte) (string, error) { + return childWorkflowID, nil + }) + if err != nil { + return err + } + + cwo := workflow.ChildWorkflowOptions{ + WorkflowRunTimeout: time.Second * 2, + } + ctx = workflow.WithChildOptions(ctx, cwo) + + childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, w.childWorkflowWaitOnContextCancel) + + var childWorkflowExecution workflow.Execution + err = childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWorkflowExecution) + if err != nil { + return err + } + childWorkflowID = childWorkflowExecution.ID + + // Wait for the workflow to be cancelled + ctx.Done().Receive(ctx, &err) + return nil +} + func (w *Workflows) childWorkflowWaitOnSignal(ctx workflow.Context) error { workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil) return nil @@ -2100,6 +2140,8 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childForMemoAndSearchAttr) worker.RegisterWorkflow(w.childWorkflowWaitOnSignal) + worker.RegisterWorkflow(w.childWorkflowWaitOnContextCancel) + worker.RegisterWorkflow(w.ChildWorkflowAndParentCancel) worker.RegisterWorkflow(w.sleep) worker.RegisterWorkflow(w.timer) }