From eec8f6dd9e08311bc76ad6a5f640dd366837c0e8 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Fri, 9 Feb 2024 16:34:28 +0100 Subject: [PATCH] Emit metrics when transfer tasks could be ratelimited (#5652) * Emit metrics when transfer tasks could be ratelimited * lint * Update transfer_standby_task_executor.go --- config/dynamicconfig/development.yaml | 2 +- service/history/handler.go | 1 + service/history/historyEngine.go | 5 +++++ service/history/queue/transfer_queue_processor.go | 4 ++++ service/history/task/transfer_active_task_executor.go | 3 +++ service/history/task/transfer_active_task_executor_test.go | 6 +++++- service/history/task/transfer_standby_task_executor.go | 3 +++ .../history/task/transfer_standby_task_executor_test.go | 6 +++++- service/history/task/transfer_task_executor_base.go | 7 +++++++ 9 files changed, 34 insertions(+), 3 deletions(-) diff --git a/config/dynamicconfig/development.yaml b/config/dynamicconfig/development.yaml index ca6b0b202e9..6805841d27f 100644 --- a/config/dynamicconfig/development.yaml +++ b/config/dynamicconfig/development.yaml @@ -46,4 +46,4 @@ frontend.validSearchAttributes: service: 1 user: 1 IsDeleted: 4 - constraints: {} \ No newline at end of file + constraints: {} diff --git a/service/history/handler.go b/service/history/handler.go index 881606b6a00..e773dd2c03d 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -245,6 +245,7 @@ func (h *handlerImpl) CreateEngine( h.GetMatchingRawClient(), h.queueTaskProcessor, h.failoverCoordinator, + h.workflowIDCache, ) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 424eff20bd8..8a0d9d8d51c 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -66,6 +66,7 @@ import ( "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" "github.com/uber/cadence/service/history/workflow" + "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" ) @@ -123,6 +124,7 @@ type ( clientChecker client.VersionChecker replicationDLQHandler replication.DLQHandler failoverMarkerNotifier failover.MarkerNotifier + wfIDCache workflowcache.WFCache } ) @@ -152,6 +154,7 @@ func NewEngineWithShardContext( rawMatchingClient matching.Client, queueTaskProcessor task.Processor, failoverCoordinator failover.Coordinator, + wfIDCache workflowcache.WFCache, ) engine.Engine { currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() @@ -234,6 +237,7 @@ func NewEngineWithShardContext( replicationTaskStore: replicationTaskStore, replicationMetricsEmitter: replication.NewMetricsEmitter( shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()), + wfIDCache: wfIDCache, } historyEngImpl.decisionHandler = decision.NewHandler( shard, @@ -255,6 +259,7 @@ func NewEngineWithShardContext( historyEngImpl.workflowResetter, historyEngImpl.archivalClient, openExecutionCheck, + historyEngImpl.wfIDCache, ) historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor( diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 54c5af68b65..21830bd9af8 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -46,6 +46,7 @@ import ( "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) @@ -87,6 +88,7 @@ func NewTransferQueueProcessor( workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, + wfIDCache workflowcache.WFCache, ) Processor { logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue) currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() @@ -100,6 +102,7 @@ func NewTransferQueueProcessor( workflowResetter, logger, config, + wfIDCache, ) activeQueueProcessor := newTransferQueueActiveProcessor( @@ -131,6 +134,7 @@ func NewTransferQueueProcessor( logger, clusterName, config, + wfIDCache, ) standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor( clusterName, diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 68284953384..714bca3b0ff 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -43,6 +43,7 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/parentclosepolicy" ) @@ -84,6 +85,7 @@ func NewTransferActiveTaskExecutor( workflowResetter reset.WorkflowResetter, logger log.Logger, config *config.Config, + wfIDCache workflowcache.WFCache, ) Executor { return &transferActiveTaskExecutor{ @@ -93,6 +95,7 @@ func NewTransferActiveTaskExecutor( executionCache, logger, config, + wfIDCache, ), historyClient: shard.GetService().GetHistoryClient(), parentClosePolicyClient: parentclosepolicy.NewClient( diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index c6a3c72b5a5..cf6a92d695f 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -53,6 +53,7 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" test "github.com/uber/cadence/service/history/testing" + "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/parentclosepolicy" ) @@ -66,6 +67,7 @@ type ( mockShard *shard.TestContext mockEngine *engine.MockEngine mockDomainCache *cache.MockDomainCache + mockWFCache *workflowcache.MockWFCache mockHistoryClient *hclient.MockClient mockMatchingClient *matching.MockClient @@ -168,6 +170,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider s.mockDomainCache = s.mockShard.Resource.DomainCache + s.mockWFCache = workflowcache.NewMockWFCache(s.controller) s.mockDomainCache.EXPECT().GetDomainByID(s.domainID).Return(s.domainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(s.domainID).Return(s.domainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainID(s.domainName).Return(s.domainID, nil).AnyTimes() @@ -197,6 +200,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { nil, s.logger, config, + s.mockWFCache, ).(*transferActiveTaskExecutor) s.transferActiveTaskExecutor.parentClosePolicyClient = s.mockParentClosePolicyClient } @@ -239,7 +243,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessActivityTask_Success() { s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1) - + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) err = s.transferActiveTaskExecutor.Execute(transferTask, true) s.Nil(err) } diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index c13e6594768..56f432b318d 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -35,6 +35,7 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) @@ -56,6 +57,7 @@ func NewTransferStandbyTaskExecutor( logger log.Logger, clusterName string, config *config.Config, + wfIDCache workflowcache.WFCache, ) Executor { return &transferStandbyTaskExecutor{ transferTaskExecutorBase: newTransferTaskExecutorBase( @@ -64,6 +66,7 @@ func NewTransferStandbyTaskExecutor( executionCache, logger, config, + wfIDCache, ), clusterName: clusterName, historyResender: historyResender, diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index eb364951429..6149f6fa1aa 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -49,6 +49,7 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" test "github.com/uber/cadence/service/history/testing" + "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" ) @@ -60,6 +61,7 @@ type ( controller *gomock.Controller mockShard *shard.TestContext mockDomainCache *cache.MockDomainCache + mockWFCache *workflowcache.MockWFCache mockNDCHistoryResender *ndc.MockHistoryResender mockMatchingClient *matching.MockClient @@ -136,6 +138,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() { s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider s.mockDomainCache = s.mockShard.Resource.DomainCache + s.mockWFCache = workflowcache.NewMockWFCache(s.controller) s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(constants.TestDomainID).Return(constants.TestDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(constants.TestDomainName).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() @@ -159,6 +162,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() { s.logger, s.clusterName, config, + s.mockWFCache, ).(*transferStandbyTaskExecutor) } @@ -236,7 +240,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending_PushT s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1) - + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) s.mockShard.SetCurrentTime(s.clusterName, now) err = s.transferStandbyTaskExecutor.Execute(transferTask, true) s.Nil(err) diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index da9470c5ac0..d2c2782ca05 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) @@ -59,6 +60,7 @@ type ( visibilityMgr persistence.VisibilityManager config *config.Config throttleRetry *backoff.ThrottleRetry + wfIDCache workflowcache.WFCache } ) @@ -68,6 +70,7 @@ func newTransferTaskExecutorBase( executionCache *execution.Cache, logger log.Logger, config *config.Config, + wfIDCache workflowcache.WFCache, ) *transferTaskExecutorBase { return &transferTaskExecutorBase{ shard: shard, @@ -82,6 +85,7 @@ func newTransferTaskExecutorBase( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ), + wfIDCache: wfIDCache, } } @@ -99,6 +103,9 @@ func (t *transferTaskExecutorBase) pushActivity( t.logger.Fatal("Cannot process non activity task", tag.TaskType(task.GetTaskType())) } + // Ratelimiting is not done. This is only to count the number of requests via metrics + t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID) + return t.matchingClient.AddActivityTask(ctx, &types.AddActivityTaskRequest{ DomainUUID: task.TargetDomainID, SourceDomainUUID: task.DomainID,