From 6e1331bc793e7b4b4e329966eb4a60f3614f63ea Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 10 Nov 2020 12:13:32 -0800 Subject: [PATCH] Deadlock detector (#285) --- internal/internal_event_handlers.go | 4 -- internal/internal_logging_tags.go | 1 + internal/internal_task_handlers.go | 61 ++++++++++++------------- internal/internal_task_handlers_test.go | 24 +++++----- internal/internal_worker.go | 2 +- internal/internal_workflow.go | 8 +++- internal/worker.go | 7 +-- test/integration_test.go | 27 +++++++++++ test/replaytests/replay_test.go | 2 +- test/workflow_test.go | 12 +++++ 10 files changed, 94 insertions(+), 54 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index e102290f1..7b5880823 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -760,10 +760,6 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( weh.metricsScope.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1) topLine := fmt.Sprintf("process event for %s [panic]:", weh.workflowInfo.TaskQueueName) st := getStackTraceRaw(topLine, 7, 0) - weh.logger.Error("ProcessEvent panic.", - "PanicError", fmt.Sprintf("%v", p), - "PanicStack", st) - weh.Complete(nil, newWorkflowPanicError(p, st)) } }() diff --git a/internal/internal_logging_tags.go b/internal/internal_logging_tags.go index 44f6868ac..5533fb38f 100644 --- a/internal/internal_logging_tags.go +++ b/internal/internal_logging_tags.go @@ -43,4 +43,5 @@ const ( tagQueryType = "QueryType" tagResult = "Result" tagError = "Error" + tagStackTrace = "StackTrace" ) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 6948c8dff..7f70cfc78 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -789,10 +789,11 @@ processWorkflowLoop: startTime, ) if err != nil { - return nil, &workflowTaskHeartbeatError{Message: fmt.Sprintf("error sending workflow task heartbeat %v", err)} + errRet = &workflowTaskHeartbeatError{Message: fmt.Sprintf("error sending workflow task heartbeat %v", err)} + return } if workflowTask == nil { - return nil, nil + return } continue processWorkflowLoop @@ -811,7 +812,9 @@ processWorkflowLoop: break processWorkflowLoop } } - return response, err + errRet = err + completeRequest = response + return } func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) { @@ -926,37 +929,46 @@ ProcessEvents: // activity task completed), but the corresponding workflow code that start the event has been removed. In that case // the replay of that event will panic on the command state machine and the workflow will be marked as completed // with the panic error. - var nonDeterministicErr error + var panicError error if !skipReplayCheck && !w.isWorkflowCompleted { // check if commands from reply matches to the history events if err := matchReplayWithHistory(replayCommands, respondEvents); err != nil { - nonDeterministicErr = err + panicError = err } } - if nonDeterministicErr == nil && w.err != nil { - if panicErr, ok := w.err.(*PanicError); ok && panicErr.value != nil { - if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic { - nonDeterministicErr = panicErr - } + if panicError == nil && w.err != nil { + if panicErr, ok := w.err.(*workflowPanicError); ok { + panicError = panicErr } } - if nonDeterministicErr != nil { - w.wth.logger.Error("non-deterministic-error", - tagWorkflowType, task.WorkflowType.GetName(), - tagWorkflowID, task.WorkflowExecution.GetWorkflowId(), - tagRunID, task.WorkflowExecution.GetRunId(), - tagError, nonDeterministicErr) + if panicError != nil { + if panicErr, ok := w.err.(*workflowPanicError); ok { + w.wth.logger.Error("workflow-panic", + tagWorkflowType, task.WorkflowType.GetName(), + tagWorkflowID, task.WorkflowExecution.GetWorkflowId(), + tagRunID, task.WorkflowExecution.GetRunId(), + tagError, panicError, + tagStackTrace, panicErr.StackTrace()) + } else { + w.wth.logger.Error("workflow-panic", + tagWorkflowType, task.WorkflowType.GetName(), + tagWorkflowID, task.WorkflowExecution.GetWorkflowId(), + tagRunID, task.WorkflowExecution.GetRunId(), + tagError, panicError) + } switch w.wth.workflowPanicPolicy { case FailWorkflow: // complete workflow with custom error will fail the workflow - eventHandler.Complete(nil, NewApplicationError("Non-determimistic error cause workflow to fail due to FailWorkflow workflow panic policy.", "", false, nonDeterministicErr)) + eventHandler.Complete(nil, NewApplicationError( + "Workflow failed on panic due to FailWorkflow workflow panic policy", + "", false, panicError)) case BlockWorkflow: // return error here will be convert to WorkflowTaskFailed for the first time, and ignored for subsequent // attempts which will cause WorkflowTaskTimeout and server will retry forever until issue got fixed or // workflow timeout. - return nil, nonDeterministicErr + return nil, panicError default: panic("unknown mismatched workflow history policy.") } @@ -1454,19 +1466,6 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( metricsScope := metrics.GetMetricsScopeForWorkflow(wth.metricsScope, eventHandler.workflowEnvironmentImpl.workflowInfo.WorkflowType.Name) - // fail workflow task on workflow panic - var workflowPanicErr *workflowPanicError - if errors.As(workflowContext.err, &workflowPanicErr) { - // Workflow panic - metricsScope.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1) - wth.logger.Error("Workflow panic.", - tagWorkflowID, task.WorkflowExecution.GetWorkflowId(), - tagRunID, task.WorkflowExecution.GetRunId(), - "PanicError", workflowPanicErr.Error(), - "PanicStack", workflowPanicErr.StackTrace()) - return errorToFailWorkflowTask(task.TaskToken, workflowContext.err, wth.identity, wth.dataConverter) - } - // complete workflow task var closeCommand *commandpb.Command var canceledErr *CanceledError diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 4c07519fb..dd82dd340 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -739,6 +739,8 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() { ActivityType: &commonpb.ActivityType{Name: "pkg.Greeter_Activity"}, TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, }), + createTestEventWorkflowTaskScheduled(9, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskStarted(10), } params := workerExecutionParameters{ Namespace: testNamespace, @@ -753,13 +755,13 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() { previousStartedEventID int64 isResultErr bool }{ - {0, 0, false}, - {0, 3, false}, - {10, 0, true}, - {10, 6, true}, + {10, 6, false}, + {15, 10, true}, } for i, tc := range testCases { + cacheSize := getWorkflowCache().Size() + taskHandler := newWorkflowTaskHandler(params, nil, t.registry) task := createWorkflowTask(testEvents, tc.previousStartedEventID, "HelloWorld_Workflow") // Cut the workflow task scheduled ans started events @@ -774,11 +776,12 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() { t.Error(err, "testcase %v failed", i) t.Nil(request) t.Contains(err.Error(), "premature end of stream") - t.EqualValues(getWorkflowCache().Size(), 0) + t.EqualValues(getWorkflowCache().Size(), cacheSize) continue } t.NoError(err, "testcase %v failed", i) + t.EqualValues(getWorkflowCache().Size(), cacheSize+1) } } @@ -961,15 +964,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() { } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) - request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) - t.NoError(err) - t.NotNil(request) - r, ok := request.(*workflowservice.RespondWorkflowTaskFailedRequest) + _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + t.Error(err) + _, ok := err.(*workflowPanicError) t.True(ok) - t.EqualValues(enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, r.Cause) - t.NotNil(r.GetFailure().GetApplicationFailureInfo()) - t.Equal("PanicError", r.GetFailure().GetApplicationFailureInfo().GetType()) - t.Equal("panicError", r.GetFailure().GetMessage()) } func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { diff --git a/internal/internal_worker.go b/internal/internal_worker.go index f0cc9580b..c804b4e62 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1260,7 +1260,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke DisableStickyExecution: options.DisableStickyExecution, StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout, TaskQueueActivitiesPerSecond: options.TaskQueueActivitiesPerSecond, - WorkflowPanicPolicy: options.NonDeterministicWorkflowPolicy, + WorkflowPanicPolicy: options.WorkflowPanicPolicy, DataConverter: client.dataConverter, WorkerStopTimeout: options.WorkerStopTimeout, ContextPropagators: client.contextPropagators, diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index f0aecadfc..af209fdaa 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -855,7 +855,13 @@ func (s *coroutineState) call() { s.unblock <- func(status string, stackDepth int) bool { return false // unblock } - <-s.aboutToBlock + select { + case <-s.aboutToBlock: + case <-time.After(1 * time.Second): + s.closed = true + panic(fmt.Sprintf("Potential deadlock detected: "+ + "workflow goroutine \"%v\" didn't yield for over a second", s.name)) + } } func (s *coroutineState) close() { diff --git a/internal/worker.go b/internal/worker.go index ce9b2ff2c..e506afca0 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -116,9 +116,10 @@ type ( BackgroundActivityContext context.Context // Optional: Sets how workflow worker deals with non-deterministic history events - // (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow definition changes). - // default: BlockWorkflow, which just logs error but reply nothing back to server - NonDeterministicWorkflowPolicy WorkflowPanicPolicy + // (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow + // definition changes) and other panics raised from workflow code. + // default: BlockWorkflow, which just logs error but doesn't fail workflow. + WorkflowPanicPolicy WorkflowPanicPolicy // Optional: worker graceful stop timeout // default: 0s diff --git a/test/integration_test.go b/test/integration_test.go index 467a79867..bdc4b4325 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -143,6 +143,7 @@ func (ts *IntegrationTestSuite) SetupTest() { options := worker.Options{ DisableStickyExecution: ts.config.IsStickyOff, WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptor{ts.tracer}, + WorkflowPanicPolicy: worker.FailWorkflow, } if strings.Contains(ts.T().Name(), "Session") { @@ -177,6 +178,32 @@ func (ts *IntegrationTestSuite) TestBasic() { ) } +func (ts *IntegrationTestSuite) TestPanicFailWorkflow() { + var expected []string + wfOpts := ts.startWorkflowOptions("test-panic") + wfOpts.WorkflowTaskTimeout = 5 * time.Second + wfOpts.WorkflowRunTimeout = 5 * time.Minute + err := ts.executeWorkflowWithOption(wfOpts, ts.workflows.Panicked, &expected) + ts.Error(err) + var applicationErr *temporal.ApplicationError + ok := errors.As(err, &applicationErr) + ts.True(ok) + ts.True(strings.Contains(applicationErr.Error(), "simulated")) +} + +func (ts *IntegrationTestSuite) TestDeadlockDetection() { + var expected []string + wfOpts := ts.startWorkflowOptions("test-deadlock") + wfOpts.WorkflowTaskTimeout = 5 * time.Second + wfOpts.WorkflowRunTimeout = 5 * time.Minute + err := ts.executeWorkflowWithOption(wfOpts, ts.workflows.Deadlocked, &expected) + ts.Error(err) + var applicationErr *temporal.ApplicationError + ok := errors.As(err, &applicationErr) + ts.True(ok) + ts.True(strings.Contains(applicationErr.Error(), "Potential deadlock detected")) +} + func (ts *IntegrationTestSuite) TestActivityRetryOnError() { var expected []string err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected) diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 7c4c5e4f4..9fc498879 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -116,5 +116,5 @@ func (s *replayTestSuite) TestReplayBadWorkflowHistoryFromFile() { err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "bad-history.json") require.Error(s.T(), err) - require.True(s.T(), strings.HasPrefix(err.Error(), "replay workflow failed with failure")) + require.True(s.T(), strings.Contains(err.Error(), "nondeterministic workflow definition")) } diff --git a/test/workflow_test.go b/test/workflow_test.go index 2401b906a..598c3d7bc 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -65,6 +65,16 @@ func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) { return []string{"toUpperWithDelay", "toUpper"}, nil } +func (w *Workflows) Deadlocked(ctx workflow.Context) ([]string, error) { + // Simulates deadlock. Never call time.Sleep in production code! + time.Sleep(2 * time.Second) + return []string{}, nil +} + +func (w *Workflows) Panicked(ctx workflow.Context) ([]string, error) { + panic("simulated") +} + func (w *Workflows) ActivityRetryOnError(ctx workflow.Context) ([]string, error) { ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptionsWithRetry()) startTime := workflow.Now(ctx) @@ -913,6 +923,8 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ActivityRetryOnTimeout) worker.RegisterWorkflow(w.ActivityRetryOptionsChange) worker.RegisterWorkflow(w.Basic) + worker.RegisterWorkflow(w.Deadlocked) + worker.RegisterWorkflow(w.Panicked) worker.RegisterWorkflow(w.BasicSession) worker.RegisterWorkflow(w.CancelActivity) worker.RegisterWorkflow(w.CancelActivityImmediately)