Skip to content

Commit

Permalink
Deadlock detector (#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev authored Nov 10, 2020
1 parent 1d4cebd commit 6e1331b
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 54 deletions.
4 changes: 0 additions & 4 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,6 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
weh.metricsScope.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
topLine := fmt.Sprintf("process event for %s [panic]:", weh.workflowInfo.TaskQueueName)
st := getStackTraceRaw(topLine, 7, 0)
weh.logger.Error("ProcessEvent panic.",
"PanicError", fmt.Sprintf("%v", p),
"PanicStack", st)

weh.Complete(nil, newWorkflowPanicError(p, st))
}
}()
Expand Down
1 change: 1 addition & 0 deletions internal/internal_logging_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ const (
tagQueryType = "QueryType"
tagResult = "Result"
tagError = "Error"
tagStackTrace = "StackTrace"
)
61 changes: 30 additions & 31 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,11 @@ processWorkflowLoop:
startTime,
)
if err != nil {
return nil, &workflowTaskHeartbeatError{Message: fmt.Sprintf("error sending workflow task heartbeat %v", err)}
errRet = &workflowTaskHeartbeatError{Message: fmt.Sprintf("error sending workflow task heartbeat %v", err)}
return
}
if workflowTask == nil {
return nil, nil
return
}

continue processWorkflowLoop
Expand All @@ -811,7 +812,9 @@ processWorkflowLoop:
break processWorkflowLoop
}
}
return response, err
errRet = err
completeRequest = response
return
}

func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
Expand Down Expand Up @@ -926,37 +929,46 @@ ProcessEvents:
// activity task completed), but the corresponding workflow code that start the event has been removed. In that case
// the replay of that event will panic on the command state machine and the workflow will be marked as completed
// with the panic error.
var nonDeterministicErr error
var panicError error
if !skipReplayCheck && !w.isWorkflowCompleted {
// check if commands from reply matches to the history events
if err := matchReplayWithHistory(replayCommands, respondEvents); err != nil {
nonDeterministicErr = err
panicError = err
}
}
if nonDeterministicErr == nil && w.err != nil {
if panicErr, ok := w.err.(*PanicError); ok && panicErr.value != nil {
if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic {
nonDeterministicErr = panicErr
}
if panicError == nil && w.err != nil {
if panicErr, ok := w.err.(*workflowPanicError); ok {
panicError = panicErr
}
}

if nonDeterministicErr != nil {
w.wth.logger.Error("non-deterministic-error",
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagError, nonDeterministicErr)
if panicError != nil {
if panicErr, ok := w.err.(*workflowPanicError); ok {
w.wth.logger.Error("workflow-panic",
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagError, panicError,
tagStackTrace, panicErr.StackTrace())
} else {
w.wth.logger.Error("workflow-panic",
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagError, panicError)
}

switch w.wth.workflowPanicPolicy {
case FailWorkflow:
// complete workflow with custom error will fail the workflow
eventHandler.Complete(nil, NewApplicationError("Non-determimistic error cause workflow to fail due to FailWorkflow workflow panic policy.", "", false, nonDeterministicErr))
eventHandler.Complete(nil, NewApplicationError(
"Workflow failed on panic due to FailWorkflow workflow panic policy",
"", false, panicError))
case BlockWorkflow:
// return error here will be convert to WorkflowTaskFailed for the first time, and ignored for subsequent
// attempts which will cause WorkflowTaskTimeout and server will retry forever until issue got fixed or
// workflow timeout.
return nil, nonDeterministicErr
return nil, panicError
default:
panic("unknown mismatched workflow history policy.")
}
Expand Down Expand Up @@ -1454,19 +1466,6 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(

metricsScope := metrics.GetMetricsScopeForWorkflow(wth.metricsScope, eventHandler.workflowEnvironmentImpl.workflowInfo.WorkflowType.Name)

// fail workflow task on workflow panic
var workflowPanicErr *workflowPanicError
if errors.As(workflowContext.err, &workflowPanicErr) {
// Workflow panic
metricsScope.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
wth.logger.Error("Workflow panic.",
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
"PanicError", workflowPanicErr.Error(),
"PanicStack", workflowPanicErr.StackTrace())
return errorToFailWorkflowTask(task.TaskToken, workflowContext.err, wth.identity, wth.dataConverter)
}

// complete workflow task
var closeCommand *commandpb.Command
var canceledErr *CanceledError
Expand Down
24 changes: 11 additions & 13 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,8 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() {
ActivityType: &commonpb.ActivityType{Name: "pkg.Greeter_Activity"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
}),
createTestEventWorkflowTaskScheduled(9, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(10),
}
params := workerExecutionParameters{
Namespace: testNamespace,
Expand All @@ -753,13 +755,13 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() {
previousStartedEventID int64
isResultErr bool
}{
{0, 0, false},
{0, 3, false},
{10, 0, true},
{10, 6, true},
{10, 6, false},
{15, 10, true},
}

for i, tc := range testCases {
cacheSize := getWorkflowCache().Size()

taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
task := createWorkflowTask(testEvents, tc.previousStartedEventID, "HelloWorld_Workflow")
// Cut the workflow task scheduled ans started events
Expand All @@ -774,11 +776,12 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() {
t.Error(err, "testcase %v failed", i)
t.Nil(request)
t.Contains(err.Error(), "premature end of stream")
t.EqualValues(getWorkflowCache().Size(), 0)
t.EqualValues(getWorkflowCache().Size(), cacheSize)
continue
}

t.NoError(err, "testcase %v failed", i)
t.EqualValues(getWorkflowCache().Size(), cacheSize+1)
}
}

Expand Down Expand Up @@ -961,15 +964,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() {
}

taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
t.NoError(err)
t.NotNil(request)
r, ok := request.(*workflowservice.RespondWorkflowTaskFailedRequest)
_, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
t.Error(err)
_, ok := err.(*workflowPanicError)
t.True(ok)
t.EqualValues(enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, r.Cause)
t.NotNil(r.GetFailure().GetApplicationFailureInfo())
t.Equal("PanicError", r.GetFailure().GetApplicationFailureInfo().GetType())
t.Equal("panicError", r.GetFailure().GetMessage())
}

func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
DisableStickyExecution: options.DisableStickyExecution,
StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout,
TaskQueueActivitiesPerSecond: options.TaskQueueActivitiesPerSecond,
WorkflowPanicPolicy: options.NonDeterministicWorkflowPolicy,
WorkflowPanicPolicy: options.WorkflowPanicPolicy,
DataConverter: client.dataConverter,
WorkerStopTimeout: options.WorkerStopTimeout,
ContextPropagators: client.contextPropagators,
Expand Down
8 changes: 7 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,13 @@ func (s *coroutineState) call() {
s.unblock <- func(status string, stackDepth int) bool {
return false // unblock
}
<-s.aboutToBlock
select {
case <-s.aboutToBlock:
case <-time.After(1 * time.Second):
s.closed = true
panic(fmt.Sprintf("Potential deadlock detected: "+
"workflow goroutine \"%v\" didn't yield for over a second", s.name))
}
}

func (s *coroutineState) close() {
Expand Down
7 changes: 4 additions & 3 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ type (
BackgroundActivityContext context.Context

// Optional: Sets how workflow worker deals with non-deterministic history events
// (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow definition changes).
// default: BlockWorkflow, which just logs error but reply nothing back to server
NonDeterministicWorkflowPolicy WorkflowPanicPolicy
// (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow
// definition changes) and other panics raised from workflow code.
// default: BlockWorkflow, which just logs error but doesn't fail workflow.
WorkflowPanicPolicy WorkflowPanicPolicy

// Optional: worker graceful stop timeout
// default: 0s
Expand Down
27 changes: 27 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (ts *IntegrationTestSuite) SetupTest() {
options := worker.Options{
DisableStickyExecution: ts.config.IsStickyOff,
WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptor{ts.tracer},
WorkflowPanicPolicy: worker.FailWorkflow,
}

if strings.Contains(ts.T().Name(), "Session") {
Expand Down Expand Up @@ -177,6 +178,32 @@ func (ts *IntegrationTestSuite) TestBasic() {
)
}

func (ts *IntegrationTestSuite) TestPanicFailWorkflow() {
var expected []string
wfOpts := ts.startWorkflowOptions("test-panic")
wfOpts.WorkflowTaskTimeout = 5 * time.Second
wfOpts.WorkflowRunTimeout = 5 * time.Minute
err := ts.executeWorkflowWithOption(wfOpts, ts.workflows.Panicked, &expected)
ts.Error(err)
var applicationErr *temporal.ApplicationError
ok := errors.As(err, &applicationErr)
ts.True(ok)
ts.True(strings.Contains(applicationErr.Error(), "simulated"))
}

func (ts *IntegrationTestSuite) TestDeadlockDetection() {
var expected []string
wfOpts := ts.startWorkflowOptions("test-deadlock")
wfOpts.WorkflowTaskTimeout = 5 * time.Second
wfOpts.WorkflowRunTimeout = 5 * time.Minute
err := ts.executeWorkflowWithOption(wfOpts, ts.workflows.Deadlocked, &expected)
ts.Error(err)
var applicationErr *temporal.ApplicationError
ok := errors.As(err, &applicationErr)
ts.True(ok)
ts.True(strings.Contains(applicationErr.Error(), "Potential deadlock detected"))
}

func (ts *IntegrationTestSuite) TestActivityRetryOnError() {
var expected []string
err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected)
Expand Down
2 changes: 1 addition & 1 deletion test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func (s *replayTestSuite) TestReplayBadWorkflowHistoryFromFile() {

err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "bad-history.json")
require.Error(s.T(), err)
require.True(s.T(), strings.HasPrefix(err.Error(), "replay workflow failed with failure"))
require.True(s.T(), strings.Contains(err.Error(), "nondeterministic workflow definition"))
}
12 changes: 12 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) {
return []string{"toUpperWithDelay", "toUpper"}, nil
}

func (w *Workflows) Deadlocked(ctx workflow.Context) ([]string, error) {
// Simulates deadlock. Never call time.Sleep in production code!
time.Sleep(2 * time.Second)
return []string{}, nil
}

func (w *Workflows) Panicked(ctx workflow.Context) ([]string, error) {
panic("simulated")
}

func (w *Workflows) ActivityRetryOnError(ctx workflow.Context) ([]string, error) {
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptionsWithRetry())
startTime := workflow.Now(ctx)
Expand Down Expand Up @@ -913,6 +923,8 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityRetryOnTimeout)
worker.RegisterWorkflow(w.ActivityRetryOptionsChange)
worker.RegisterWorkflow(w.Basic)
worker.RegisterWorkflow(w.Deadlocked)
worker.RegisterWorkflow(w.Panicked)
worker.RegisterWorkflow(w.BasicSession)
worker.RegisterWorkflow(w.CancelActivity)
worker.RegisterWorkflow(w.CancelActivityImmediately)
Expand Down

0 comments on commit 6e1331b

Please sign in to comment.