Skip to content

Commit

Permalink
Fix parent close policy (#6307)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored and dkrotx committed Sep 26, 2024
1 parent 59dee0c commit 876dab1
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ const (
// Default value: 10
// Allowed filters: DomainName
ParentClosePolicyThreshold
ParentClosePolicyBatchSize
// NumParentClosePolicySystemWorkflows is key for number of parentClosePolicy system workflows running in total
// KeyName: history.numParentClosePolicySystemWorkflows
// Value type: Int
Expand Down Expand Up @@ -3694,6 +3695,12 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "ParentClosePolicyThreshold is decides that parent close policy will be processed by sys workers(if enabled) ifthe number of children greater than or equal to this threshold",
DefaultValue: 10,
},
ParentClosePolicyBatchSize: {
KeyName: "history.parentClosePolicyBatchSize",
Filters: []Filter{DomainName},
Description: "ParentClosePolicyBatchSize is the batch size of parent close policy processed by sys workers",
DefaultValue: 200,
},
NumParentClosePolicySystemWorkflows: {
KeyName: "history.numParentClosePolicySystemWorkflows",
Description: "NumParentClosePolicySystemWorkflows is key for number of parentClosePolicy system workflows running in total",
Expand Down
3 changes: 3 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ type Config struct {
// parent close policy will be processed by sys workers(if enabled) if
// the number of children greater than or equal to this threshold
ParentClosePolicyThreshold dynamicconfig.IntPropertyFnWithDomainFilter
// the batch size of parent close policy processed by sys workers
ParentClosePolicyBatchSize dynamicconfig.IntPropertyFnWithDomainFilter
// total number of parentClosePolicy system workflows
NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn

Expand Down Expand Up @@ -515,6 +517,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
NumParentClosePolicySystemWorkflows: dc.GetIntProperty(dynamicconfig.NumParentClosePolicySystemWorkflows),
EnableParentClosePolicyWorker: dc.GetBoolProperty(dynamicconfig.EnableParentClosePolicyWorker),
ParentClosePolicyThreshold: dc.GetIntPropertyFilteredByDomain(dynamicconfig.ParentClosePolicyThreshold),
ParentClosePolicyBatchSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.ParentClosePolicyBatchSize),

NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows),
ArchiveRequestRPS: dc.GetIntProperty(dynamicconfig.ArchiveRequestRPS),
Expand Down
21 changes: 17 additions & 4 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,8 +1777,11 @@ func (t *transferActiveTaskExecutor) processParentClosePolicy(
if t.shard.GetConfig().EnableParentClosePolicyWorker() &&
len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(domainName) {

executions := make([]parentclosepolicy.RequestDetail, 0, len(childInfos))
batchSize := t.shard.GetConfig().ParentClosePolicyBatchSize(domainName)
executions := make([]parentclosepolicy.RequestDetail, 0, common.MinInt(len(childInfos), batchSize))
count := 0
for _, childInfo := range childInfos {
count++
if childInfo.ParentClosePolicy == types.ParentClosePolicyAbandon {
continue
}
Expand All @@ -1790,17 +1793,27 @@ func (t *transferActiveTaskExecutor) processParentClosePolicy(
RunID: childInfo.StartedRunID,
Policy: childInfo.ParentClosePolicy,
})

if len(executions) == batchSize {
err := t.parentClosePolicyClient.SendParentClosePolicyRequest(ctx, parentclosepolicy.Request{
DomainName: domainName,
Executions: executions,
})
if err != nil {
return err
}
executions = make([]parentclosepolicy.RequestDetail, 0, common.MinInt(len(childInfos)-count, batchSize))
}
}

if len(executions) == 0 {
return nil
}

request := parentclosepolicy.Request{
return t.parentClosePolicyClient.SendParentClosePolicyRequest(ctx, parentclosepolicy.Request{
DomainName: domainName,
Executions: executions,
}
return t.parentClosePolicyClient.SendParentClosePolicyRequest(ctx, request)
})
}

for _, childInfo := range childInfos {
Expand Down
17 changes: 15 additions & 2 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent_Has
workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

numChildWorkflows := 10
numChildWorkflows := 500
for i := 0; i < numChildWorkflows; i++ {
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(decisionCompletionID, uuid.New(), &types.StartChildWorkflowExecutionDecisionAttributes{
Domain: s.domainName,
Expand Down Expand Up @@ -870,7 +870,20 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent_Has
s.mockArchivalMetadata.On("GetVisibilityConfig").Return(archiver.NewDisabledArchvialConfig())
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
if len(request.Executions) != numChildWorkflows {
if len(request.Executions) != s.mockShard.GetConfig().ParentClosePolicyBatchSize(constants.TestDomainName) {
return false
}
for _, executions := range request.Executions {
if executions.DomainName != constants.TestDomainName {
return false
}
}
return true
},
)).Return(nil).Times(numChildWorkflows / s.mockShard.GetConfig().ParentClosePolicyBatchSize(constants.TestDomainName))
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
if len(request.Executions) != numChildWorkflows%s.mockShard.GetConfig().ParentClosePolicyBatchSize(constants.TestDomainName) {
return false
}
for _, executions := range request.Executions {
Expand Down

0 comments on commit 876dab1

Please sign in to comment.