Skip to content

Commit

Permalink
Fix task queue user latency
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan committed Jan 28, 2022
1 parent 6c1f2ff commit 8146eb1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
2 changes: 2 additions & 0 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,13 @@ func (t *taskProcessor) processTaskOnce(
}

ctx := context.Background()
ctx = metrics.AddMetricsContext(ctx)
startTime := t.timeSource.Now()
scopeIdx, err := task.processor.process(ctx, task)
if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency); ok {
task.userLatency += time.Duration(duration)
}

scope := t.metricsClient.Scope(scopeIdx).Tagged(t.getNamespaceTagByID(namespace.ID(task.GetNamespaceID())))
if task.shouldProcessTask {
scope.IncCounter(metrics.TaskRequests)
Expand Down
36 changes: 31 additions & 5 deletions service/history/taskProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"
"errors"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -69,6 +70,8 @@ type (
}
)

var typeOfCtx = gomock.AssignableToTypeOf(reflect.TypeOf((*context.Context)(nil)).Elem())

func TestTaskProcessorSuite(t *testing.T) {
s := new(taskProcessorSuite)
suite.Run(t, s)
Expand Down Expand Up @@ -144,7 +147,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceErrRetry_ProcessNoEr
}
s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilterErr)
s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter)
s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().complete(task)
s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil)
s.taskProcessor.processTaskAndAck(
Expand All @@ -160,7 +163,7 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceFalse_ProcessNoErr()
return false, nil
}
s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter)
s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().complete(task)
s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil)
s.taskProcessor.processTaskAndAck(
Expand All @@ -174,8 +177,9 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessNoErr()
var taskFilter taskFilter = func(task tasks.Task) (bool, error) {
return true, nil
}

s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter)
s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().complete(task)
s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil)
s.taskProcessor.processTaskAndAck(
Expand All @@ -191,8 +195,8 @@ func (s *taskProcessorSuite) TestProcessTaskAndAck_NamespaceTrue_ProcessErrNoErr
return true, nil
}
s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter)
s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, err)
s.mockProcessor.EXPECT().process(context.Background(), task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, err)
s.mockProcessor.EXPECT().process(typeOfCtx, task).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().complete(task)
s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).Times(2)
s.taskProcessor.processTaskAndAck(
Expand Down Expand Up @@ -256,6 +260,28 @@ func (s *taskProcessorSuite) TestHandleTaskError_RandomErr() {
s.Equal(err, s.taskProcessor.handleTaskError(s.scope, taskInfo, s.notificationChan, err))
}

func (s *taskProcessorSuite) TestProcessTaskAndAck_SetsUserLatencyCorrectly() {
task := newTaskInfo(s.mockProcessor, &taskForTest{Key: tasks.Key{TaskID: 12345, FireTime: time.Now().UTC()}}, s.logger)
task.shouldProcessTask = false
var taskFilter taskFilter = func(task tasks.Task) (bool, error) {
return false, nil
}
expectedUserLatency := int64(133)
updateContext := func(ctx context.Context, taskInfo interface{}) {
metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency, expectedUserLatency)
}

s.mockProcessor.EXPECT().getTaskFilter().Return(taskFilter)
s.mockProcessor.EXPECT().process(typeOfCtx, task).Do(updateContext).Return(s.scopeIdx, nil)
s.mockProcessor.EXPECT().complete(task)
s.mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil)
s.taskProcessor.processTaskAndAck(
s.notificationChan,
task,
)
s.Equal(time.Duration(expectedUserLatency), task.userLatency)
}

func (t *taskForTest) GetKey() tasks.Key {
return t.Key
}
Expand Down

0 comments on commit 8146eb1

Please sign in to comment.