Skip to content

Commit

Permalink
Handle CanceledAfterStarted in CanceledEvent (#1027)
Browse files Browse the repository at this point in the history
Handle CanceledAfterStarted in CanceledEvent
  • Loading branch information
Quinn-With-Two-Ns authored Feb 2, 2023
1 parent c423fc8 commit c6405d6
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func (d *commandStateMachineBase) handleCancelFailedEvent() {

func (d *commandStateMachineBase) handleCanceledEvent() {
switch d.state {
case commandStateCancellationCommandSent, commandStateCanceledAfterInitiated, commandStateCancellationCommandAccepted:
case commandStateCancellationCommandSent, commandStateCanceledAfterInitiated, commandStateCanceledAfterStarted, commandStateCancellationCommandAccepted:
d.moveState(commandStateCompleted, eventCanceled)
default:
d.failStateTransition(eventCanceled)
Expand Down
51 changes: 51 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,57 @@ func testReplayWorkflowCancelChildWorkflowUnusualOrdering(ctx Context) error {
return nil
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_ChildWorkflowCancellation_WhenWorkflowCanceled() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflowCancelChildWorkflowUnusualOrdering"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),
createTestEventStartChildWorkflowExecutionInitiated(5, &historypb.StartChildWorkflowExecutionInitiatedEventAttributes{
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
WorkflowId: "workflowId",
WorkflowType: &commonpb.WorkflowType{Name: "testWorkflow"},
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_TERMINATE,
}),
createTestEventTimerStarted(6, 6),
createTestEventChildWorkflowExecutionStarted(7, &historypb.ChildWorkflowExecutionStartedEventAttributes{
InitiatedEventId: 5,
WorkflowType: &commonpb.WorkflowType{Name: "testWorkflow"},
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"},
}),

createTestEventWorkflowTaskScheduled(8, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(9),
createTestEventWorkflowTaskCompleted(10, &historypb.WorkflowTaskCompletedEventAttributes{}),
createTestEventWorkflowExecutionCancelRequested(11, &historypb.WorkflowExecutionCancelRequestedEventAttributes{}),
createTestEventTimerFired(12, 6),
createTestEventWorkflowTaskScheduled(13, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventChildWorkflowExecutionCanceled(14, &historypb.ChildWorkflowExecutionCanceledEventAttributes{
InitiatedEventId: 5,
StartedEventId: 7,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"},
}),
createTestEventWorkflowTaskStarted(15),
createTestEventWorkflowTaskCompleted(16, &historypb.WorkflowTaskCompletedEventAttributes{}),
}

history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{})
require.NoError(s.T(), err)
replayer.RegisterWorkflow(testReplayWorkflowCancelChildWorkflowUnusualOrdering)
err = replayer.ReplayWorkflowHistory(logger, history)
if err != nil {
fmt.Printf("replay failed. Error: %v", err.Error())
}
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_ChildWorkflowCancellation_Unusual_Ordering() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
Expand Down
33 changes: 33 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,39 @@ func (ts *IntegrationTestSuite) TestCancelChildWorkflowUnusualTransitions() {
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestCancelChildWorkflowAndParentWorkflow() {
wfid := "test-cancel-child-workflow-and-parent-workflow"
run, err := ts.client.ExecuteWorkflow(context.Background(),
ts.startWorkflowOptions(wfid),
ts.workflows.ChildWorkflowAndParentCancel)
ts.NoError(err)

// Give it a sec to populate the query
<-time.After(1 * time.Second)

v, err := ts.client.QueryWorkflow(context.Background(), run.GetID(), "", "child-and-parent-cancel-child-workflow-id")
ts.NoError(err)

var childWorkflowID string
err = v.Get(&childWorkflowID)
ts.NoError(err)
ts.NotNil(childWorkflowID)
ts.NotEmpty(childWorkflowID)

err = ts.client.CancelWorkflow(context.Background(), childWorkflowID, "")
ts.NoError(err)

err = ts.client.CancelWorkflow(context.Background(), run.GetID(), "")
ts.NoError(err)

err = run.Get(context.Background(), nil)
ts.NoError(err)

err = ts.client.GetWorkflow(context.Background(), childWorkflowID, "").Get(context.Background(), nil)
var canceledError *temporal.CanceledError
ts.ErrorAs(err, &canceledError)
}

func (ts *IntegrationTestSuite) TestChildWorkflowDuplicatePanic_Regression() {
wfid := "test-child-workflow-duplicate-panic-regression"
run, err := ts.client.ExecuteWorkflow(context.Background(),
Expand Down
42 changes: 42 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,46 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflo
return childWE.ID, err
}

func (w *Workflows) childWorkflowWaitOnContextCancel(ctx workflow.Context) error {
var err error
// Wait for the workflow to be cancelled
ctx.Done().Receive(ctx, &err)
var canceledError *temporal.CanceledError
if errors.As(ctx.Err(), &canceledError) {
return ctx.Err()
} else {
return errors.New("childWorkflowWaitOnContextCancel was not cancelled")
}
}

func (w *Workflows) ChildWorkflowAndParentCancel(ctx workflow.Context) error {
var childWorkflowID string
err := workflow.SetQueryHandler(ctx, "child-and-parent-cancel-child-workflow-id", func(input []byte) (string, error) {
return childWorkflowID, nil
})
if err != nil {
return err
}

cwo := workflow.ChildWorkflowOptions{
WorkflowRunTimeout: time.Second * 2,
}
ctx = workflow.WithChildOptions(ctx, cwo)

childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, w.childWorkflowWaitOnContextCancel)

var childWorkflowExecution workflow.Execution
err = childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWorkflowExecution)
if err != nil {
return err
}
childWorkflowID = childWorkflowExecution.ID

// Wait for the workflow to be cancelled
ctx.Done().Receive(ctx, &err)
return nil
}

func (w *Workflows) childWorkflowWaitOnSignal(ctx workflow.Context) error {
workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil)
return nil
Expand Down Expand Up @@ -2100,6 +2140,8 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childForMemoAndSearchAttr)
worker.RegisterWorkflow(w.childWorkflowWaitOnSignal)
worker.RegisterWorkflow(w.childWorkflowWaitOnContextCancel)
worker.RegisterWorkflow(w.ChildWorkflowAndParentCancel)
worker.RegisterWorkflow(w.sleep)
worker.RegisterWorkflow(w.timer)
}
Expand Down

0 comments on commit c6405d6

Please sign in to comment.