Skip to content

Commit

Permalink
Update matching simulation test to support round robin load balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Sep 26, 2024
1 parent c9c8c14 commit e14653a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 6 deletions.
15 changes: 12 additions & 3 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestMatchingSimulationSuite(t *testing.T) {
dynamicconfig.LocalTaskWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalTaskWaitTime,
dynamicconfig.EnableTasklistIsolation: len(isolationGroups) > 0,
dynamicconfig.AllIsolationGroups: isolationGroups,
dynamicconfig.TasklistLoadBalancerStrategy: getTasklistLoadBalancerStrategy(clusterConfig.MatchingConfig.SimulationConfig.TasklistLoadBalancerStrategy),
}

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -179,7 +180,7 @@ func (s *MatchingSimulationSuite) TearDownSuite() {
}

func (s *MatchingSimulationSuite) TestMatchingSimulation() {
matchingClient := s.testCluster.GetMatchingClient()
matchingClients := s.testCluster.GetMatchingClients()

ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -206,7 +207,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
pollerWG.Add(1)
pollerID := fmt.Sprintf("[%d]-%s-%d", idx, pollerConfig.getIsolationGroup(), i)
config := pollerConfig
go s.poll(ctx, matchingClient, domainID, tasklist, pollerID, &pollerWG, statsCh, &tasksToReceive, &config)
go s.poll(ctx, matchingClients[i%len(matchingClients)], domainID, tasklist, pollerID, &pollerWG, statsCh, &tasksToReceive, &config)
}
}

Expand All @@ -225,7 +226,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
numGenerators++
generatorWG.Add(1)
config := taskConfig
go s.generate(ctx, matchingClient, domainID, tasklist, rateLimiter, &tasksGenerated, &lastTaskScheduleID, &generatorWG, statsCh, &config)
go s.generate(ctx, matchingClients[i%len(matchingClients)], domainID, tasklist, rateLimiter, &tasksGenerated, &lastTaskScheduleID, &generatorWG, statsCh, &config)
}
}

Expand Down Expand Up @@ -258,6 +259,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
testSummary = append(testSummary, fmt.Sprintf("Record Decision Task Started Time: %v", s.testClusterConfig.MatchingConfig.SimulationConfig.RecordDecisionTaskStartedTime))
testSummary = append(testSummary, fmt.Sprintf("Num of Write Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistWritePartitions]))
testSummary = append(testSummary, fmt.Sprintf("Num of Read Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistReadPartitions]))
testSummary = append(testSummary, fmt.Sprintf("Tasklist load balancer strategy: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.TasklistLoadBalancerStrategy]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Polls: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingPolls]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Tasks: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingTasks]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max RPS: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxRatePerSecond]))
Expand Down Expand Up @@ -591,3 +593,10 @@ func getRecordDecisionTaskStartedTime(duration time.Duration) time.Duration {

return duration
}

func getTasklistLoadBalancerStrategy(strategy string) string {
if strategy == "" {
return "random"
}
return strategy
}
15 changes: 12 additions & 3 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Cadence interface {
FrontendHost() membership.HostInfo
GetHistoryClient() historyClient.Client
GetMatchingClient() matchingClient.Client
GetMatchingClients() []matchingClient.Client
GetExecutionManagerFactory() persistence.ExecutionManagerFactory
}

Expand All @@ -97,7 +98,7 @@ type (
adminClient adminClient.Client
frontendClient frontendClient.Client
historyClient historyClient.Client
matchingClient matchingClient.Client
matchingClients []matchingClient.Client
logger log.Logger
clusterMetadata cluster.Metadata
persistenceConfig config.Persistence
Expand Down Expand Up @@ -180,6 +181,9 @@ type (
// RecordDecisionTaskStartedTime. The amount of time spent by History to complete RecordDecisionTaskStarted
RecordDecisionTaskStartedTime time.Duration

// TasklistLoadBalancerStrategy the strategy of load balancer. defaults to "random".
TasklistLoadBalancerStrategy string

// The pollers that will be created to process
Pollers []SimulationPollerConfiguration

Expand Down Expand Up @@ -543,7 +547,11 @@ func (c *cadenceImpl) GetHistoryClient() historyClient.Client {
}

func (c *cadenceImpl) GetMatchingClient() matchingClient.Client {
return c.matchingClient
return c.matchingClients[0]
}

func (c *cadenceImpl) GetMatchingClients() []matchingClient.Client {
return c.matchingClients
}

func (c *cadenceImpl) startFrontend(hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup) {
Expand Down Expand Up @@ -750,10 +758,11 @@ func (c *cadenceImpl) startMatching(hosts map[string][]membership.HostInfo, star

// When there are multiple matching hosts the last client will overwrite previous ones.
// It should be fine because the underlying client bean logic should still pick the right destination.
c.matchingClient, err = clientBean.GetMatchingClient(matchingService.GetDomainCache().GetDomainName)
matchingClient, err := clientBean.GetMatchingClient(matchingService.GetDomainCache().GetDomainName)
if err != nil {
params.Logger.Fatal("unable to get matching client", tag.Error(err))
}
c.matchingClients = append(c.matchingClients, matchingClient)
c.matchingServices = append(c.matchingServices, matchingService)
go matchingService.Start()
}
Expand Down
9 changes: 9 additions & 0 deletions host/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,15 @@ func (tc *TestCluster) GetMatchingClient() MatchingClient {
return tc.host.GetMatchingClient()
}

func (tc *TestCluster) GetMatchingClients() []MatchingClient {
clients := tc.host.GetMatchingClients()
result := make([]MatchingClient, 0, len(clients))
for _, client := range clients {
result = append(result, client)
}
return result
}

// GetExecutionManagerFactory returns an execution manager factory from the test cluster
func (tc *TestCluster) GetExecutionManagerFactory() persistence.ExecutionManagerFactory {
return tc.host.GetExecutionManagerFactory()
Expand Down
30 changes: 30 additions & 0 deletions host/testdata/matching_simulation_round_robin_load_balancer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 8
simulationconfig:
tasklistwritepartitions: 2
tasklistreadpartitions: 2
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 0ms
localtaskwaittime: 0ms
recorddecisiontaskstartedtime: 13ms
tasklistloadbalancerstrategy: round-robin
tasks:
- numtaskgenerators: 100
taskspersecond: 300
maxtasktogenerate: 30000
pollers:
- taskprocesstime: 1ms
numpollers: 8
polltimeout: 60s
workerconfig:
enableasyncwfconsumer: false

0 comments on commit e14653a

Please sign in to comment.