Skip to content

Commit

Permalink
Emit metrics when transfer tasks could be ratelimited (#5652)
Browse files Browse the repository at this point in the history
* Emit metrics when transfer tasks could be ratelimited

* lint

* Update transfer_standby_task_executor.go
  • Loading branch information
sankari165 authored Feb 9, 2024
1 parent d660630 commit eec8f6d
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 3 deletions.
2 changes: 1 addition & 1 deletion config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ frontend.validSearchAttributes:
service: 1
user: 1
IsDeleted: 4
constraints: {}
constraints: {}
1 change: 1 addition & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (h *handlerImpl) CreateEngine(
h.GetMatchingRawClient(),
h.queueTaskProcessor,
h.failoverCoordinator,
h.workflowIDCache,
)
}

Expand Down
5 changes: 5 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
"github.com/uber/cadence/service/history/workflow"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
)

Expand Down Expand Up @@ -123,6 +124,7 @@ type (
clientChecker client.VersionChecker
replicationDLQHandler replication.DLQHandler
failoverMarkerNotifier failover.MarkerNotifier
wfIDCache workflowcache.WFCache
}
)

Expand Down Expand Up @@ -152,6 +154,7 @@ func NewEngineWithShardContext(
rawMatchingClient matching.Client,
queueTaskProcessor task.Processor,
failoverCoordinator failover.Coordinator,
wfIDCache workflowcache.WFCache,
) engine.Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()

Expand Down Expand Up @@ -234,6 +237,7 @@ func NewEngineWithShardContext(
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
wfIDCache: wfIDCache,
}
historyEngImpl.decisionHandler = decision.NewHandler(
shard,
Expand All @@ -255,6 +259,7 @@ func NewEngineWithShardContext(
historyEngImpl.workflowResetter,
historyEngImpl.archivalClient,
openExecutionCheck,
historyEngImpl.wfIDCache,
)

historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor(
Expand Down
4 changes: 4 additions & 0 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/uber/cadence/service/history/reset"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand Down Expand Up @@ -87,6 +88,7 @@ func NewTransferQueueProcessor(
workflowResetter reset.WorkflowResetter,
archivalClient archiver.Client,
executionCheck invariant.Invariant,
wfIDCache workflowcache.WFCache,
) Processor {
logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue)
currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
Expand All @@ -100,6 +102,7 @@ func NewTransferQueueProcessor(
workflowResetter,
logger,
config,
wfIDCache,
)

activeQueueProcessor := newTransferQueueActiveProcessor(
Expand Down Expand Up @@ -131,6 +134,7 @@ func NewTransferQueueProcessor(
logger,
clusterName,
config,
wfIDCache,
)
standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
clusterName,
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/reset"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
"github.com/uber/cadence/service/worker/parentclosepolicy"
)
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewTransferActiveTaskExecutor(
workflowResetter reset.WorkflowResetter,
logger log.Logger,
config *config.Config,
wfIDCache workflowcache.WFCache,
) Executor {

return &transferActiveTaskExecutor{
Expand All @@ -93,6 +95,7 @@ func NewTransferActiveTaskExecutor(
executionCache,
logger,
config,
wfIDCache,
),
historyClient: shard.GetService().GetHistoryClient(),
parentClosePolicyClient: parentclosepolicy.NewClient(
Expand Down
6 changes: 5 additions & 1 deletion service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
test "github.com/uber/cadence/service/history/testing"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
"github.com/uber/cadence/service/worker/parentclosepolicy"
)
Expand All @@ -66,6 +67,7 @@ type (
mockShard *shard.TestContext
mockEngine *engine.MockEngine
mockDomainCache *cache.MockDomainCache
mockWFCache *workflowcache.MockWFCache
mockHistoryClient *hclient.MockClient
mockMatchingClient *matching.MockClient

Expand Down Expand Up @@ -168,6 +170,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() {
s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata
s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider
s.mockDomainCache = s.mockShard.Resource.DomainCache
s.mockWFCache = workflowcache.NewMockWFCache(s.controller)
s.mockDomainCache.EXPECT().GetDomainByID(s.domainID).Return(s.domainEntry, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(s.domainID).Return(s.domainName, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainID(s.domainName).Return(s.domainID, nil).AnyTimes()
Expand Down Expand Up @@ -197,6 +200,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() {
nil,
s.logger,
config,
s.mockWFCache,
).(*transferActiveTaskExecutor)
s.transferActiveTaskExecutor.parentClosePolicyClient = s.mockParentClosePolicyClient
}
Expand Down Expand Up @@ -239,7 +243,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessActivityTask_Success() {
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1)

s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1)
err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -56,6 +57,7 @@ func NewTransferStandbyTaskExecutor(
logger log.Logger,
clusterName string,
config *config.Config,
wfIDCache workflowcache.WFCache,
) Executor {
return &transferStandbyTaskExecutor{
transferTaskExecutorBase: newTransferTaskExecutorBase(
Expand All @@ -64,6 +66,7 @@ func NewTransferStandbyTaskExecutor(
executionCache,
logger,
config,
wfIDCache,
),
clusterName: clusterName,
historyResender: historyResender,
Expand Down
6 changes: 5 additions & 1 deletion service/history/task/transfer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
test "github.com/uber/cadence/service/history/testing"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -60,6 +61,7 @@ type (
controller *gomock.Controller
mockShard *shard.TestContext
mockDomainCache *cache.MockDomainCache
mockWFCache *workflowcache.MockWFCache
mockNDCHistoryResender *ndc.MockHistoryResender
mockMatchingClient *matching.MockClient

Expand Down Expand Up @@ -136,6 +138,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() {
s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata
s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider
s.mockDomainCache = s.mockShard.Resource.DomainCache
s.mockWFCache = workflowcache.NewMockWFCache(s.controller)
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(constants.TestDomainID).Return(constants.TestDomainName, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(constants.TestDomainName).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
Expand All @@ -159,6 +162,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() {
s.logger,
s.clusterName,
config,
s.mockWFCache,
).(*transferStandbyTaskExecutor)
}

Expand Down Expand Up @@ -236,7 +240,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending_PushT
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1)

s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1)
s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
Expand Down
7 changes: 7 additions & 0 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -59,6 +60,7 @@ type (
visibilityMgr persistence.VisibilityManager
config *config.Config
throttleRetry *backoff.ThrottleRetry
wfIDCache workflowcache.WFCache
}
)

Expand All @@ -68,6 +70,7 @@ func newTransferTaskExecutorBase(
executionCache *execution.Cache,
logger log.Logger,
config *config.Config,
wfIDCache workflowcache.WFCache,
) *transferTaskExecutorBase {
return &transferTaskExecutorBase{
shard: shard,
Expand All @@ -82,6 +85,7 @@ func newTransferTaskExecutorBase(
backoff.WithRetryPolicy(taskRetryPolicy),
backoff.WithRetryableError(common.IsServiceTransientError),
),
wfIDCache: wfIDCache,
}
}

Expand All @@ -99,6 +103,9 @@ func (t *transferTaskExecutorBase) pushActivity(
t.logger.Fatal("Cannot process non activity task", tag.TaskType(task.GetTaskType()))
}

// Ratelimiting is not done. This is only to count the number of requests via metrics
t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID)

return t.matchingClient.AddActivityTask(ctx, &types.AddActivityTaskRequest{
DomainUUID: task.TargetDomainID,
SourceDomainUUID: task.DomainID,
Expand Down

0 comments on commit eec8f6d

Please sign in to comment.