Skip to content

Commit

Permalink
Use actual worker in onebox setup. (#2352)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan authored Jan 7, 2022
1 parent b0e2c05 commit 16bf19d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 111 deletions.
171 changes: 64 additions & 107 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ type (
frontendService *frontend.Service
matchingService *matching.Service
historyServices []*history.Service
workerService resource.Resource
workerService *worker.Service

frontendApp *fx.App
matchingApp *fx.App
historyApps []*fx.App
workerApp *fx.App

matchingNamespaceRegistry namespace.Registry
frontendNamespaceRegistry namespace.Registry
Expand Down Expand Up @@ -226,22 +227,18 @@ func (c *temporalImpl) Start() error {

func (c *temporalImpl) Stop() {
if c.enableWorker() {
c.shutdownWG.Add(4)
c.workerService.Stop()
} else {
c.shutdownWG.Add(3)
c.shutdownWG.Add(1)
c.workerApp.Stop(context.Background())
}

c.shutdownWG.Add(3)

c.frontendApp.Stop(context.Background())
for _, historyApp := range c.historyApps {
historyApp.Stop(context.Background())
}
c.matchingApp.Stop(context.Background())
if c.workerConfig.EnableReplicator {
c.replicator.Stop()
}
if c.workerConfig.EnableArchiver {
c.clientWorker.Stop()
}

close(c.shutdownCh)
c.shutdownWG.Wait()
}
Expand Down Expand Up @@ -625,16 +622,31 @@ func (c *temporalImpl) startMatching(hosts map[string][]string, startWG *sync.Wa
}

func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.WaitGroup) {
serviceName := common.WorkerServiceName

params := &resource.BootstrapParams{}
params.Name = common.WorkerServiceName
params.Name = serviceName
params.ThrottledLogger = c.logger
params.RPCFactory = newRPCFactoryImpl(common.WorkerServiceName, c.WorkerGRPCServiceAddress(), c.WorkerServiceRingpopAddress(), c.logger)
tallyMetricsScope := tally.NewTestScope(common.WorkerServiceName, make(map[string]string))
params.RPCFactory = newRPCFactoryImpl(serviceName, c.WorkerGRPCServiceAddress(), c.WorkerServiceRingpopAddress(), c.logger)
tallyMetricsScope := tally.NewTestScope(serviceName, make(map[string]string))
params.MembershipFactoryInitializer = func(x persistenceClient.Bean, y log.Logger) (resource.MembershipMonitorFactory, error) {
return newMembershipFactory(params.Name, hosts), nil
}
params.ClusterMetadataConfig = c.clusterMetadataConfig

clusterConfigCopy := cluster.Config{
EnableGlobalNamespace: c.clusterMetadataConfig.EnableGlobalNamespace,
FailoverVersionIncrement: c.clusterMetadataConfig.FailoverVersionIncrement,
MasterClusterName: c.clusterMetadataConfig.MasterClusterName,
CurrentClusterName: c.clusterMetadataConfig.CurrentClusterName,
ClusterInformation: make(map[string]cluster.ClusterInformation),
}
for k, v := range c.clusterMetadataConfig.ClusterInformation {
clusterConfigCopy.ClusterInformation[k] = v
}
if c.workerConfig.EnableReplicator {
clusterConfigCopy.EnableGlobalNamespace = true
}
params.ClusterMetadataConfig = &clusterConfigCopy
params.MetricsClient = metrics.NewClient(&metrics.ClientConfig{}, tallyMetricsScope, metrics.GetMetricsServiceIdx(params.Name, c.logger))
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider
Expand All @@ -651,101 +663,44 @@ func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.Wait
sdk.NewMetricHandler(params.MetricsClient.UserScope()),
)

dcClient := newIntegrationConfigClient(dynamicconfig.NewNoopClient())
service, err := resource.New(
c.logger,
params,
common.WorkerServiceName,
dcClient,
dynamicconfig.GetIntPropertyFn(5000),
dynamicconfig.GetIntPropertyFn(5000),
dynamicconfig.GetIntPropertyFn(10000),
nil,
resolver.NewNoopResolver(),
nil,
)
stoppedCh := make(chan struct{})
var workerService *worker.Service
var clientBean client.Bean
var namespaceRegistry namespace.Registry

if err != nil {
c.logger.Fatal("unable to create worker service", tag.Error(err))
}
c.workerService = service
service.Start()
app := fx.New(
fx.Supply(
stoppedCh,
params,
params.ClusterMetadataConfig,
),
fx.Provide(func() client.FactoryProvider { return client.NewFactoryProvider() }),
fx.Provide(func() searchattribute.Mapper { return nil }),
fx.Provide(func() resolver.ServiceResolver { return resolver.NewNoopResolver() }),
fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }),
fx.Provide(func() dynamicconfig.Client { return newIntegrationConfigClient(dynamicconfig.NewNoopClient()) }),
fx.Provide(func() log.Logger { return c.logger }),
fx.Provide(func() esclient.Client { return c.esClient }),

clusterMetadata := cluster.NewMetadataFromConfig(c.clusterMetadataConfig, c.clusterMetadataMgr, c.logger)
var replicatorNamespaceCache namespace.Registry
if c.workerConfig.EnableReplicator {
metadataManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgr, service.GetMetricsClient(), c.logger)
replicatorNamespaceCache = namespace.NewRegistry(metadataManager, clusterMetadata.IsGlobalNamespaceEnabled(), service.GetMetricsClient(), service.GetLogger())
replicatorNamespaceCache.Start()
c.startWorkerReplicator(service, clusterMetadata)
worker.Module,
fx.Populate(&workerService, &clientBean, &namespaceRegistry),
fx.NopLogger,
)
err = app.Err()
if err != nil {
c.logger.Fatal("unable to start worker service", tag.Error(err))
}

var clientWorkerNamespaceCache namespace.Registry
if c.workerConfig.EnableArchiver {
metadataProxyManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgr, service.GetMetricsClient(), c.logger)
clientWorkerNamespaceCache = namespace.NewRegistry(metadataProxyManager, clusterMetadata.IsGlobalNamespaceEnabled(), service.GetMetricsClient(), service.GetLogger())
clientWorkerNamespaceCache.Start()
c.startWorkerClientWorker(params, service, clientWorkerNamespaceCache, dcClient)
}
c.workerApp = app
c.workerService = workerService
c.workerNamespaceRegistry = namespaceRegistry
app.Start(context.Background())

startWG.Done()
<-c.shutdownCh
if c.workerConfig.EnableReplicator {
replicatorNamespaceCache.Stop()
}
if c.workerConfig.EnableArchiver {
clientWorkerNamespaceCache.Stop()
}
c.shutdownWG.Done()
}

func (c *temporalImpl) startWorkerReplicator(service resource.Resource, clusterMetadata cluster.Metadata) {
serviceResolver, err := service.GetMembershipMonitor().GetResolver(common.WorkerServiceName)
if err != nil {
c.logger.Fatal("Fail to start replicator when start worker", tag.Error(err))
}
c.replicator = replicator.NewReplicator(
clusterMetadata,
service.GetClientBean(),
c.logger,
service.GetMetricsClient(),
service.GetHostInfo(),
serviceResolver,
c.namespaceReplicationQueue,
c.namespaceReplicationTaskExecutor,
)
c.replicator.Start()
}

func (c *temporalImpl) startWorkerClientWorker(params *resource.BootstrapParams, service resource.Resource, namespaceRegistry namespace.Registry, dcClient dynamicconfig.Client) {
workerConfig := worker.NewConfig(
c.logger,
dcClient,
params,
)
workerConfig.ArchiverConfig.ArchiverConcurrency = dynamicconfig.GetIntPropertyFn(10)

systemSdkClient, err := params.SdkClientFactory.NewSystemClient(c.logger)
if err != nil {
c.logger.Fatal("Unable to create SDK client", tag.Error(err))
}

bc := &archiver.BootstrapContainer{
SdkSystemClient: systemSdkClient,
MetricsClient: service.GetMetricsClient(),
Logger: c.logger,
HistoryV2Manager: c.executionManager,
NamespaceCache: namespaceRegistry,
Config: workerConfig.ArchiverConfig,
ArchiverProvider: c.archiverProvider,
}
c.clientWorker = archiver.NewClientWorker(bc)
if err := c.clientWorker.Start(); err != nil {
c.clientWorker.Stop()
c.logger.Fatal("Fail to start archiver when start worker", tag.Error(err))
}
}

func (c *temporalImpl) createSystemNamespace() error {
err := c.metadataMgr.InitializeSystemNamespaces(c.clusterMetadataConfig.CurrentClusterName)
if err != nil {
Expand Down Expand Up @@ -817,13 +772,12 @@ func (p *membershipFactoryImpl) GetMembershipMonitor() (membership.Monitor, erro
}

type rpcFactoryImpl struct {
serviceName string
ringpopServiceName string
grpcHostPort string
ringpopHostPort string
logger log.Logger
serviceName string
grpcHostPort string
ringpopHostPort string
logger log.Logger

sync.Mutex
sync.RWMutex
listener net.Listener
ringpopChannel *tchannel.Channel
serverCfg config.GroupTLS
Expand Down Expand Up @@ -855,9 +809,12 @@ func newRPCFactoryImpl(sName, grpcHostPort, ringpopHostPort string, logger log.L
}

func (c *rpcFactoryImpl) GetGRPCListener() net.Listener {
c.RLock()
if c.listener != nil {
c.RUnlock()
return c.listener
}
c.RUnlock()

c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -894,7 +851,7 @@ func (c *rpcFactoryImpl) GetRingpopChannel() *tchannel.Channel {

err = c.ringpopChannel.ListenAndServe(c.ringpopHostPort)
if err != nil {
c.logger.Fatal("Failed to start ringpop listener", tag.Error(err), tag.Address(c.ringpopHostPort))
c.logger.Fatal("Failed to start ringpop ListenAndServe", tag.Error(err), tag.Address(c.ringpopHostPort))
}
}

Expand Down
1 change: 1 addition & 0 deletions host/taskpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Loop:
if pollStickyTaskQueue {
taskQueue = p.StickyTaskQueue
}

response, err1 := p.Engine.PollWorkflowTaskQueue(NewContext(), &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: p.Namespace,
TaskQueue: taskQueue,
Expand Down
4 changes: 0 additions & 4 deletions host/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,6 @@ func (tc *TestCluster) SetFaultInjectionRate(rate float64) {
tc.host.frontendService.GetFaultInjection().UpdateRate(rate)
}

if tc.host.workerService != nil && tc.host.workerService.GetFaultInjection() != nil {
tc.host.workerService.GetFaultInjection().UpdateRate(rate)
}

for _, s := range tc.host.historyServices {
if s.GetFaultInjection() != nil {
s.GetFaultInjection().UpdateRate(rate)
Expand Down

0 comments on commit 16bf19d

Please sign in to comment.