Skip to content

Commit

Permalink
Fix an event ordering bug possible with outstanding commands during W…
Browse files Browse the repository at this point in the history
…F cancel (#382)

Increment the next command id for every command which is cancelled as a result of the workflow itself being cancelled.

This is a bit of a tragic workaround for the event counting Go SDK does which is a fundamentally flawed approach. We
should switch to core-sdk as soon as we can for the state machines.
  • Loading branch information
Sushisource authored Mar 16, 2021
1 parent 738df4e commit 0ec1d86
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 17 deletions.
36 changes: 22 additions & 14 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ type (
orderedCommands *list.List
commands map[commandID]*list.Element

scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]string
scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]string
commandsCancelledDuringWFCancellation int64
workflowExecutionIsCancelling bool
}

// panic when command state machine is in illegal state
Expand Down Expand Up @@ -414,6 +416,9 @@ func (d *commandStateMachineBase) cancel() {
case commandStateCommandSent:
d.moveState(commandStateCancellationCommandSent, eventCancel)
case commandStateInitiated:
if d.helper.workflowExecutionIsCancelling {
d.helper.commandsCancelledDuringWFCancellation++
}
d.moveState(commandStateCanceledAfterInitiated, eventCancel)
default:
d.failStateTransition(eventCancel)
Expand Down Expand Up @@ -789,10 +794,11 @@ func newCommandsHelper() *commandsHelper {
orderedCommands: list.New(),
commands: make(map[commandID]*list.Element),

scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]string),
scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]string),
commandsCancelledDuringWFCancellation: 0,
}
}

Expand All @@ -801,12 +807,14 @@ func (h *commandsHelper) incrementNextCommandEventID() {
}

func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStartedEventID int64) {
// Server always processes the commands in the same order it is generated by client and each command results
// in coresponding history event after procesing. So we can use workflow task started event id + 2
// as the offset as workflow task completed event is always the first event in the workflow task followed by
// events generated from commands. This allows client sdk to deterministically predict history event ids
// generated by processing of the command.
h.nextCommandEventID = workflowTaskStartedEventID + 2
// Server always processes the commands in the same order it is generated by client and each command results in
// corresponding history event after processing. So we can use workflow task started event id + 2 as the offset as
// workflow task completed event is always the first event in the workflow task followed by events generated from
// commands. This allows client sdk to deterministically predict history event ids generated by processing of the
// command. We must also add the number of cancel commands that were spawned during cancellation of the workflow
// execution as those canceled command events will show up *after* the workflow task completed event.
h.nextCommandEventID = workflowTaskStartedEventID + 2 + h.commandsCancelledDuringWFCancellation
h.commandsCancelledDuringWFCancellation = 0
}

func (h *commandsHelper) getNextID() int64 {
Expand Down
6 changes: 5 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*c
}

func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) {
wc.cancelHandler = handler
wrappedHandler := func() {
wc.commandsHelper.workflowExecutionIsCancelling = true
handler()
}
wc.cancelHandler = wrappedHandler
}

func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
Expand Down
7 changes: 7 additions & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ func createTestEventRequestCancelExternalWorkflowExecutionInitiated(eventID int6
Attributes: &historypb.HistoryEvent_RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{RequestCancelExternalWorkflowExecutionInitiatedEventAttributes: attr}}
}

func createTestEventWorkflowExecutionCancelRequested(eventID int64, attr *historypb.WorkflowExecutionCancelRequestedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED,
Attributes: &historypb.HistoryEvent_WorkflowExecutionCancelRequestedEventAttributes{WorkflowExecutionCancelRequestedEventAttributes: attr}}
}

func createTestEventExternalWorkflowExecutionCancelRequested(eventID int64, attr *historypb.ExternalWorkflowExecutionCancelRequestedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
Expand Down
77 changes: 77 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,73 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_ChildWorkflowCancell
require.NoError(s.T(), err)
}

func testReplayWorkflowCancelWorkflowWhileSleepingWithActivities(ctx Context) error {
defer func() {
// When workflow is canceled, it has to get a new disconnected context to execute any activities
newCtx, _ := NewDisconnectedContext(ctx)
err := ExecuteActivity(newCtx, testInfiniteActivity).Get(ctx, nil)
if err != nil {
panic("Cleanup activity errored")
}
}()

if err := Sleep(ctx, time.Minute*1); err != nil {
return err
}

// This is the activity that should get cancelled
_ = ExecuteActivity(ctx, "testActivityNoResult").Get(ctx, nil)

_ = ExecuteActivity(ctx, "testActivityNoResult").Get(ctx, nil)

return nil
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_CancelWorkflowWhileSleepingWithActivities() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{

createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflowCancelWorkflowWhileSleepingWithActivities"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),
createTestEventTimerStarted(5, 5),
createTestEventWorkflowExecutionCancelRequested(6, &historypb.WorkflowExecutionCancelRequestedEventAttributes{}),
createTestEventWorkflowTaskScheduled(7, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(8),
createTestEventWorkflowTaskCompleted(9, &historypb.WorkflowTaskCompletedEventAttributes{}),
createTestEventTimerCanceled(10, 5),
createTestEventActivityTaskScheduled(11, &historypb.ActivityTaskScheduledEventAttributes{
ActivityId: "11",
ActivityType: &commonpb.ActivityType{Name: "testInfiniteActivity"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
}),
createTestEventActivityTaskStarted(12, &historypb.ActivityTaskStartedEventAttributes{
ScheduledEventId: 11,
}),
createTestEventActivityTaskCompleted(13, &historypb.ActivityTaskCompletedEventAttributes{
ScheduledEventId: 11,
StartedEventId: 12,
}),
createTestEventWorkflowTaskScheduled(14, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(15),
}

history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer := NewWorkflowReplayer()
replayer.RegisterWorkflow(testReplayWorkflowCancelWorkflowWhileSleepingWithActivities)
err := replayer.ReplayWorkflowHistory(logger, history)
if err != nil {
fmt.Printf("replay failed. Error: %v", err.Error())
}
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Result_Mismatch() {
taskQueue := "taskQueue1"
result, _ := converter.GetDefaultDataConverter().ToPayloads("some-incorrect-result")
Expand Down Expand Up @@ -1823,6 +1890,16 @@ func testActivityReturnStructPtrPtr() (**testActivityResult, error) {
return &r, nil
}

func testInfiniteActivity(ctx context.Context) error {
for {
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
return nil
}
}
}

type testActivityStructWithFns struct{}

func (t *testActivityStructWithFns) ValidActivity(context.Context) error { return nil }
Expand Down
17 changes: 16 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,13 +642,28 @@ func (ts *IntegrationTestSuite) TestCancelChildWorkflowUnusualTransitions() {
}

func (ts *IntegrationTestSuite) TestCancelActivityImmediately() {
ts.T().Skip(`Currently fails with "PanicError": "unknown command internal.commandID{commandType:0, id:"5"}, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition`)
var expected []string
err := ts.executeWorkflow("test-cancel-activity-immediately", ts.workflows.CancelActivityImmediately, &expected)
ts.NoError(err)
ts.EqualValues(expected, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestCancelMultipleCommandsOverMultipleTasks() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-cancel-multiple-commands-over-multiple-tasks"),
ts.workflows.CancelMultipleCommandsOverMultipleTasks)
ts.NoError(err)
ts.NotNil(run)
// We need to wait a beat before firing the cancellation
time.Sleep(time.Second)
ts.Nil(ts.client.CancelWorkflow(ctx, "test-cancel-multiple-commands-over-multiple-tasks",
run.GetRunID()))
err = run.Get(ctx, nil)
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestWorkflowWithLocalActivityCtxPropagation() {
var expected string
err := ts.executeWorkflow("test-wf-local-activity-ctx-prop", ts.workflows.WorkflowWithLocalActivityCtxPropagation, &expected)
Expand Down
41 changes: 40 additions & 1 deletion test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,45 @@ func (w *Workflows) CancelActivityImmediately(ctx workflow.Context) ([]string, e
})
_ = workflow.ExecuteActivity(activityCtx2, "Prefix_ToUpper", "hello").Get(activityCtx2, nil)

return []string{"toUpperWithDelay", "toUpper"}, nil
return []string{"toUpper"}, nil
}

func (w *Workflows) CancelMultipleCommandsOverMultipleTasks(ctx workflow.Context) error {
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
// We want this "cleanup" activity to be run when the whole workflow is cancelled
defer func() {
// When workflow is canceled, it has to get a new disconnected context to execute any activities
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, "Prefix_ToUpper", "hello").Get(newCtx, nil)
if err != nil {
panic("Cleanup activity error")
}
}()

// Start a timer that will be canceled when the workflow is
_ = workflow.NewTimer(ctx, time.Minute*10)
// Throw in a side effect for fun
_ = workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return "hi!"
})
// Include a timer we cancel across the wf task
timerCtx, cancelTimer := workflow.WithCancel(ctx)
_ = workflow.NewTimer(timerCtx, time.Second*3)
// Actually wait on a real timer to trigger a wf task
_ = workflow.Sleep(ctx, time.Millisecond*500)
cancelTimer()
// Another timers we expect to get cancelled
_ = workflow.NewTimer(ctx, time.Minute*10)

// Include a timer we cancel immediately
timerCtx2, cancelTimer2 := workflow.WithCancel(ctx)
_ = workflow.NewTimer(timerCtx2, time.Second*3)
cancelTimer2()

// We need to be cancelled by test runner here
_ = workflow.Sleep(ctx, time.Minute*10)

return nil
}

func (w *Workflows) SimplestWorkflow(_ workflow.Context) (string, error) {
Expand Down Expand Up @@ -1142,6 +1180,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.SignalWorkflow)
worker.RegisterWorkflow(w.CronWorkflow)
worker.RegisterWorkflow(w.CancelTimerConcurrentWithOtherCommandWorkflow)
worker.RegisterWorkflow(w.CancelMultipleCommandsOverMultipleTasks)

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childForMemoAndSearchAttr)
Expand Down

0 comments on commit 0ec1d86

Please sign in to comment.