From 8146eb1e4c6266fd61bc5b621fc0105d5d1c74c8 Mon Sep 17 00:00:00 2001 From: Mikhail Gryzykhin Date: Fri, 28 Jan 2022 11:38:12 -0800 Subject: [PATCH] Fix task queue user latency --- service/history/taskProcessor.go | 2 ++ service/history/taskProcessor_test.go | 36 +++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/service/history/taskProcessor.go b/service/history/taskProcessor.go index 81660115507..dee67685823 100644 --- a/service/history/taskProcessor.go +++ b/service/history/taskProcessor.go @@ -271,11 +271,13 @@ func (t *taskProcessor) processTaskOnce( } ctx := context.Background() + ctx = metrics.AddMetricsContext(ctx) startTime := t.timeSource.Now() scopeIdx, err := task.processor.process(ctx, task) if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency); ok { task.userLatency += time.Duration(duration) } + scope := t.metricsClient.Scope(scopeIdx).Tagged(t.getNamespaceTagByID(namespace.ID(task.GetNamespaceID()))) if task.shouldProcessTask { scope.IncCounter(metrics.TaskRequests) diff --git a/service/history/taskProcessor_test.go b/service/history/taskProcessor_test.go index b3b774401aa..fa3df4adbd8 100644 --- a/service/history/taskProcessor_test.go +++ b/service/history/taskProcessor_test.go @@ -27,6 +27,7 @@ package history import ( "context" "errors" + "reflect" "testing" "time" @@ -69,6 +70,8 @@ type ( } ) +var typeOfCtx = gomock.AssignableToTypeOf(reflect.TypeOf((*context.Context)(nil)).Elem()) + func TestTaskProcessorSuite(t *testing.T) { s := new(taskProcessorSuite) suite.Run(t, s) @@ -144,7 +147,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceErrRetry_ProcessNoEr } s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilterErr) s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter) - s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil) + s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil) s.mockProcessor.EXPECT().complete(task) s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil) s.taskProcessor.processTaskAndAck( @@ -160,7 +163,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceFalse_ProcessNoErr() return false, nil } s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter) - s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil) + s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil) s.mockProcessor.EXPECT().complete(task) s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil) s.taskProcessor.processTaskAndAck( @@ -174,8 +177,9 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessNoErr() var taskFilter taskFilter = func(task tasks.Task) (bool, error) { return true, nil } + s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter) - s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil) + s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil) s.mockProcessor.EXPECT().complete(task) s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil) s.taskProcessor.processTaskAndAck( @@ -191,8 +195,8 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessErrNoErr return true, nil } s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter) - s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, err) - s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil) + s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, err) + s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil) s.mockProcessor.EXPECT().complete(task) s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).Times(2) s.taskProcessor.processTaskAndAck( @@ -256,6 +260,28 @@ func (s *taskProcessorSuite) TestHandleTaskError_RandomErr() { s.Equal(err, s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err)) } +func (s *taskProcessorSuite) TestProcessTaskAndAck_SetsUserLatencyCorrectly() { + task := newTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger) + task.shouldProcessTask = false + var taskFilter taskFilter = func(task tasks.Task) (bool, error) { + return false, nil + } + expectedUserLatency := int64(133) + updateContext := func(ctx context.Context, taskInfo interface{}) { + metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency, expectedUserLatency) + } + + s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter) + s.mockProcessor.EXPECT().process(typeOfCtx, task).Do(updateContext).Return(s.scopeIdx, nil) + s.mockProcessor.EXPECT().complete(task) + s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil) + s.taskProcessor.processTaskAndAck( + s.notificationChan, + task, + ) + s.Equal(time.Duration(expectedUserLatency), task.userLatency) +} + func (t *taskForTest) GetKey() tasks.Key { return t.Key }