diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index c46aee7b232..f17f10ab876 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1197,6 +1197,7 @@ const ( // Default value: 10 // Allowed filters: DomainName ParentClosePolicyThreshold + ParentClosePolicyBatchSize // NumParentClosePolicySystemWorkflows is key for number of parentClosePolicy system workflows running in total // KeyName: history.numParentClosePolicySystemWorkflows // Value type: Int @@ -3694,6 +3695,12 @@ var IntKeys = map[IntKey]DynamicInt{ Description: "ParentClosePolicyThreshold is decides that parent close policy will be processed by sys workers(if enabled) ifthe number of children greater than or equal to this threshold", DefaultValue: 10, }, + ParentClosePolicyBatchSize: { + KeyName: "history.parentClosePolicyBatchSize", + Filters: []Filter{DomainName}, + Description: "ParentClosePolicyBatchSize is the batch size of parent close policy processed by sys workers", + DefaultValue: 200, + }, NumParentClosePolicySystemWorkflows: { KeyName: "history.numParentClosePolicySystemWorkflows", Description: "NumParentClosePolicySystemWorkflows is key for number of parentClosePolicy system workflows running in total", diff --git a/service/history/config/config.go b/service/history/config/config.go index 1cdc5acf124..df9aff1457f 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -222,6 +222,8 @@ type Config struct { // parent close policy will be processed by sys workers(if enabled) if // the number of children greater than or equal to this threshold ParentClosePolicyThreshold dynamicconfig.IntPropertyFnWithDomainFilter + // the batch size of parent close policy processed by sys workers + ParentClosePolicyBatchSize dynamicconfig.IntPropertyFnWithDomainFilter // total number of parentClosePolicy system workflows NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn @@ -515,6 +517,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s NumParentClosePolicySystemWorkflows: dc.GetIntProperty(dynamicconfig.NumParentClosePolicySystemWorkflows), EnableParentClosePolicyWorker: dc.GetBoolProperty(dynamicconfig.EnableParentClosePolicyWorker), ParentClosePolicyThreshold: dc.GetIntPropertyFilteredByDomain(dynamicconfig.ParentClosePolicyThreshold), + ParentClosePolicyBatchSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.ParentClosePolicyBatchSize), NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows), ArchiveRequestRPS: dc.GetIntProperty(dynamicconfig.ArchiveRequestRPS), diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 0d2d14d4ccc..e060b1a1310 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -1777,8 +1777,11 @@ func (t *transferActiveTaskExecutor) processParentClosePolicy( if t.shard.GetConfig().EnableParentClosePolicyWorker() && len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(domainName) { - executions := make([]parentclosepolicy.RequestDetail, 0, len(childInfos)) + batchSize := t.shard.GetConfig().ParentClosePolicyBatchSize(domainName) + executions := make([]parentclosepolicy.RequestDetail, 0, common.MinInt(len(childInfos), batchSize)) + count := 0 for _, childInfo := range childInfos { + count++ if childInfo.ParentClosePolicy == types.ParentClosePolicyAbandon { continue } @@ -1790,17 +1793,27 @@ func (t *transferActiveTaskExecutor) processParentClosePolicy( RunID: childInfo.StartedRunID, Policy: childInfo.ParentClosePolicy, }) + + if len(executions) == batchSize { + err := t.parentClosePolicyClient.SendParentClosePolicyRequest(ctx, parentclosepolicy.Request{ + DomainName: domainName, + Executions: executions, + }) + if err != nil { + return err + } + executions = make([]parentclosepolicy.RequestDetail, 0, common.MinInt(len(childInfos)-count, batchSize)) + } } if len(executions) == 0 { return nil } - request := parentclosepolicy.Request{ + return t.parentClosePolicyClient.SendParentClosePolicyRequest(ctx, parentclosepolicy.Request{ DomainName: domainName, Executions: executions, - } - return t.parentClosePolicyClient.SendParentClosePolicyRequest(ctx, request) + }) } for _, childInfo := range childInfos { diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 2a12318cb9d..7f2f895e0c0 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -833,7 +833,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent_Has workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID) s.NoError(err) - numChildWorkflows := 10 + numChildWorkflows := 500 for i := 0; i < numChildWorkflows; i++ { _, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(decisionCompletionID, uuid.New(), &types.StartChildWorkflowExecutionDecisionAttributes{ Domain: s.domainName, @@ -870,7 +870,20 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent_Has s.mockArchivalMetadata.On("GetVisibilityConfig").Return(archiver.NewDisabledArchvialConfig()) s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy( func(request parentclosepolicy.Request) bool { - if len(request.Executions) != numChildWorkflows { + if len(request.Executions) != s.mockShard.GetConfig().ParentClosePolicyBatchSize(constants.TestDomainName) { + return false + } + for _, executions := range request.Executions { + if executions.DomainName != constants.TestDomainName { + return false + } + } + return true + }, + )).Return(nil).Times(numChildWorkflows / s.mockShard.GetConfig().ParentClosePolicyBatchSize(constants.TestDomainName)) + s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy( + func(request parentclosepolicy.Request) bool { + if len(request.Executions) != numChildWorkflows%s.mockShard.GetConfig().ParentClosePolicyBatchSize(constants.TestDomainName) { return false } for _, executions := range request.Executions {