From c35ec679a059e4932b847491b14a51e8fe679ac3 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 16 Jan 2024 08:34:45 -0800 Subject: [PATCH 1/2] Evict workflow from cache on RespondTaskCompleted failure --- internal/internal_task_pollers.go | 2 + internal/internal_task_pollers_test.go | 77 ++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 2cea8f117..49b0d3a7f 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -379,6 +379,8 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { } response, err := wtp.RespondTaskCompletedWithMetrics(completedRequest, taskErr, task.task, startTime) if err != nil { + // If we get an error responding to the workflow task we need to evict the execution from the cache. + taskErr = err return err } diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index c62573c30..ad4879657 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -27,6 +27,7 @@ package internal import ( "context" "encoding/binary" + "errors" "sync/atomic" "testing" @@ -146,3 +147,79 @@ func TestWFTRacePrevention(t *testing.T) { close(completionChans[1]) require.NoError(t, <-resultsChan) } + +func TestWFTCorruption(t *testing.T) { + cache := NewWorkerCache() + params := workerExecutionParameters{cache: cache} + ensureRequiredParams(¶ms) + wfType := commonpb.WorkflowType{Name: t.Name() + "-workflow-type"} + reg := newRegistry() + reg.RegisterWorkflowWithOptions(func(ctx Context) error { + Await(ctx, func() bool { + return false + }) + return nil + }, RegisterWorkflowOptions{ + Name: wfType.Name, + }) + var ( + taskQueue = taskqueuepb.TaskQueue{Name: t.Name() + "task-queue"} + startedAttrs = historypb.WorkflowExecutionStartedEventAttributes{ + TaskQueue: &taskQueue, + } + startedEvent = createTestEventWorkflowExecutionStarted(1, &startedAttrs) + history = historypb.History{Events: []*historypb.HistoryEvent{startedEvent}} + runID = t.Name() + "-run-id" + wfID = t.Name() + "-workflow-id" + wfe = commonpb.WorkflowExecution{RunId: runID, WorkflowId: wfID} + ctrl = gomock.NewController(t) + client = workflowservicemock.NewMockWorkflowServiceClient(ctrl) + innerTaskHandler = newWorkflowTaskHandler(params, nil, reg) + taskHandler = &countingTaskHandler{WorkflowTaskHandler: innerTaskHandler} + contextManager = taskHandler + completionChans = []chan struct{}{make(chan struct{}), make(chan struct{})} + codec = binary.LittleEndian + pollResp0 = workflowservice.PollWorkflowTaskQueueResponse{ + Attempt: 1, + WorkflowExecution: &wfe, + WorkflowType: &wfType, + History: &history, + // encode the task pseudo-ID into the token; 0 here and 1 for + // pollResp1 below. The mock will use this as an index into + // `completionChans` (above) to get a task-specific control channel. + TaskToken: codec.AppendUint32(nil, 0), + } + task0 = workflowTask{task: &pollResp0} + ) + + // Return an error on respond workflow task complete, the SDK should flush the workflow from cache + client.EXPECT().RespondWorkflowTaskCompleted(gomock.Any(), gomock.Any()). + DoAndReturn(func( + _ context.Context, + req *workflowservice.RespondWorkflowTaskCompletedRequest, + _ ...grpc.CallOption, + ) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { + // find the appropriate channel for this task - the index is encoded + // into the TaskToken + ch := completionChans[int(codec.Uint32(req.TaskToken))] + <-ch + // these two reads ^v allow the test code to capture a task processing + // goroutine exactly here + <-ch + return nil, errors.New("Failure responding to workflow task") + }) + + poller := newWorkflowTaskPoller(taskHandler, contextManager, client, params) + processTaskDone := make(chan struct{}) + go func() { + require.Error(t, poller.processWorkflowTask(&task0)) + close(processTaskDone) + }() + completionChans[0] <- struct{}{} + // Until RespondWorkflowTaskCompleted returns an error the workflow should be in cache + require.True(t, (*cache.sharedCache.workflowCache).Exist(runID)) + close(completionChans[0]) + <-processTaskDone + // Workflow should not be in cache + require.Nil(t, cache.getWorkflowContext(runID)) +} From 2bfd4bfe144d594318fa82eb22b76a57602d4283 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 16 Jan 2024 08:40:45 -0800 Subject: [PATCH 2/2] Fix errcheck --- internal/internal_task_pollers_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index ad4879657..26692a133 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -155,10 +155,9 @@ func TestWFTCorruption(t *testing.T) { wfType := commonpb.WorkflowType{Name: t.Name() + "-workflow-type"} reg := newRegistry() reg.RegisterWorkflowWithOptions(func(ctx Context) error { - Await(ctx, func() bool { + return Await(ctx, func() bool { return false }) - return nil }, RegisterWorkflowOptions{ Name: wfType.Name, })