From 16bf19dcb5389f491691e650e17d46f027ede67d Mon Sep 17 00:00:00 2001 From: Mikhail Gryzykhin <12602502+Ardagan@users.noreply.github.com> Date: Fri, 7 Jan 2022 10:37:39 -0800 Subject: [PATCH] Use actual worker in onebox setup. (#2352) --- host/onebox.go | 171 ++++++++++++++++--------------------------- host/taskpoller.go | 1 + host/test_cluster.go | 4 - 3 files changed, 65 insertions(+), 111 deletions(-) diff --git a/host/onebox.go b/host/onebox.go index d2f66fb399a..6d3ddb633f3 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -86,11 +86,12 @@ type ( frontendService *frontend.Service matchingService *matching.Service historyServices []*history.Service - workerService resource.Resource + workerService *worker.Service frontendApp *fx.App matchingApp *fx.App historyApps []*fx.App + workerApp *fx.App matchingNamespaceRegistry namespace.Registry frontendNamespaceRegistry namespace.Registry @@ -226,22 +227,18 @@ func (c *temporalImpl) Start() error { func (c *temporalImpl) Stop() { if c.enableWorker() { - c.shutdownWG.Add(4) - c.workerService.Stop() - } else { - c.shutdownWG.Add(3) + c.shutdownWG.Add(1) + c.workerApp.Stop(context.Background()) } + + c.shutdownWG.Add(3) + c.frontendApp.Stop(context.Background()) for _, historyApp := range c.historyApps { historyApp.Stop(context.Background()) } c.matchingApp.Stop(context.Background()) - if c.workerConfig.EnableReplicator { - c.replicator.Stop() - } - if c.workerConfig.EnableArchiver { - c.clientWorker.Stop() - } + close(c.shutdownCh) c.shutdownWG.Wait() } @@ -625,16 +622,31 @@ func (c *temporalImpl) startMatching(hosts map[string][]string, startWG *sync.Wa } func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.WaitGroup) { + serviceName := common.WorkerServiceName params := &resource.BootstrapParams{} - params.Name = common.WorkerServiceName + params.Name = serviceName params.ThrottledLogger = c.logger - params.RPCFactory = newRPCFactoryImpl(common.WorkerServiceName, c.WorkerGRPCServiceAddress(), c.WorkerServiceRingpopAddress(), c.logger) - tallyMetricsScope := tally.NewTestScope(common.WorkerServiceName, make(map[string]string)) + params.RPCFactory = newRPCFactoryImpl(serviceName, c.WorkerGRPCServiceAddress(), c.WorkerServiceRingpopAddress(), c.logger) + tallyMetricsScope := tally.NewTestScope(serviceName, make(map[string]string)) params.MembershipFactoryInitializer = func(x persistenceClient.Bean, y log.Logger) (resource.MembershipMonitorFactory, error) { return newMembershipFactory(params.Name, hosts), nil } - params.ClusterMetadataConfig = c.clusterMetadataConfig + + clusterConfigCopy := cluster.Config{ + EnableGlobalNamespace: c.clusterMetadataConfig.EnableGlobalNamespace, + FailoverVersionIncrement: c.clusterMetadataConfig.FailoverVersionIncrement, + MasterClusterName: c.clusterMetadataConfig.MasterClusterName, + CurrentClusterName: c.clusterMetadataConfig.CurrentClusterName, + ClusterInformation: make(map[string]cluster.ClusterInformation), + } + for k, v := range c.clusterMetadataConfig.ClusterInformation { + clusterConfigCopy.ClusterInformation[k] = v + } + if c.workerConfig.EnableReplicator { + clusterConfigCopy.EnableGlobalNamespace = true + } + params.ClusterMetadataConfig = &clusterConfigCopy params.MetricsClient = metrics.NewClient(&metrics.ClientConfig{}, tallyMetricsScope, metrics.GetMetricsServiceIdx(params.Name, c.logger)) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider @@ -651,101 +663,44 @@ func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.Wait sdk.NewMetricHandler(params.MetricsClient.UserScope()), ) - dcClient := newIntegrationConfigClient(dynamicconfig.NewNoopClient()) - service, err := resource.New( - c.logger, - params, - common.WorkerServiceName, - dcClient, - dynamicconfig.GetIntPropertyFn(5000), - dynamicconfig.GetIntPropertyFn(5000), - dynamicconfig.GetIntPropertyFn(10000), - nil, - resolver.NewNoopResolver(), - nil, - ) + stoppedCh := make(chan struct{}) + var workerService *worker.Service + var clientBean client.Bean + var namespaceRegistry namespace.Registry - if err != nil { - c.logger.Fatal("unable to create worker service", tag.Error(err)) - } - c.workerService = service - service.Start() + app := fx.New( + fx.Supply( + stoppedCh, + params, + params.ClusterMetadataConfig, + ), + fx.Provide(func() client.FactoryProvider { return client.NewFactoryProvider() }), + fx.Provide(func() searchattribute.Mapper { return nil }), + fx.Provide(func() resolver.ServiceResolver { return resolver.NewNoopResolver() }), + fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), + fx.Provide(func() dynamicconfig.Client { return newIntegrationConfigClient(dynamicconfig.NewNoopClient()) }), + fx.Provide(func() log.Logger { return c.logger }), + fx.Provide(func() esclient.Client { return c.esClient }), - clusterMetadata := cluster.NewMetadataFromConfig(c.clusterMetadataConfig, c.clusterMetadataMgr, c.logger) - var replicatorNamespaceCache namespace.Registry - if c.workerConfig.EnableReplicator { - metadataManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgr, service.GetMetricsClient(), c.logger) - replicatorNamespaceCache = namespace.NewRegistry(metadataManager, clusterMetadata.IsGlobalNamespaceEnabled(), service.GetMetricsClient(), service.GetLogger()) - replicatorNamespaceCache.Start() - c.startWorkerReplicator(service, clusterMetadata) + worker.Module, + fx.Populate(&workerService, &clientBean, &namespaceRegistry), + fx.NopLogger, + ) + err = app.Err() + if err != nil { + c.logger.Fatal("unable to start worker service", tag.Error(err)) } - var clientWorkerNamespaceCache namespace.Registry - if c.workerConfig.EnableArchiver { - metadataProxyManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgr, service.GetMetricsClient(), c.logger) - clientWorkerNamespaceCache = namespace.NewRegistry(metadataProxyManager, clusterMetadata.IsGlobalNamespaceEnabled(), service.GetMetricsClient(), service.GetLogger()) - clientWorkerNamespaceCache.Start() - c.startWorkerClientWorker(params, service, clientWorkerNamespaceCache, dcClient) - } + c.workerApp = app + c.workerService = workerService + c.workerNamespaceRegistry = namespaceRegistry + app.Start(context.Background()) startWG.Done() <-c.shutdownCh - if c.workerConfig.EnableReplicator { - replicatorNamespaceCache.Stop() - } - if c.workerConfig.EnableArchiver { - clientWorkerNamespaceCache.Stop() - } c.shutdownWG.Done() } -func (c *temporalImpl) startWorkerReplicator(service resource.Resource, clusterMetadata cluster.Metadata) { - serviceResolver, err := service.GetMembershipMonitor().GetResolver(common.WorkerServiceName) - if err != nil { - c.logger.Fatal("Fail to start replicator when start worker", tag.Error(err)) - } - c.replicator = replicator.NewReplicator( - clusterMetadata, - service.GetClientBean(), - c.logger, - service.GetMetricsClient(), - service.GetHostInfo(), - serviceResolver, - c.namespaceReplicationQueue, - c.namespaceReplicationTaskExecutor, - ) - c.replicator.Start() -} - -func (c *temporalImpl) startWorkerClientWorker(params *resource.BootstrapParams, service resource.Resource, namespaceRegistry namespace.Registry, dcClient dynamicconfig.Client) { - workerConfig := worker.NewConfig( - c.logger, - dcClient, - params, - ) - workerConfig.ArchiverConfig.ArchiverConcurrency = dynamicconfig.GetIntPropertyFn(10) - - systemSdkClient, err := params.SdkClientFactory.NewSystemClient(c.logger) - if err != nil { - c.logger.Fatal("Unable to create SDK client", tag.Error(err)) - } - - bc := &archiver.BootstrapContainer{ - SdkSystemClient: systemSdkClient, - MetricsClient: service.GetMetricsClient(), - Logger: c.logger, - HistoryV2Manager: c.executionManager, - NamespaceCache: namespaceRegistry, - Config: workerConfig.ArchiverConfig, - ArchiverProvider: c.archiverProvider, - } - c.clientWorker = archiver.NewClientWorker(bc) - if err := c.clientWorker.Start(); err != nil { - c.clientWorker.Stop() - c.logger.Fatal("Fail to start archiver when start worker", tag.Error(err)) - } -} - func (c *temporalImpl) createSystemNamespace() error { err := c.metadataMgr.InitializeSystemNamespaces(c.clusterMetadataConfig.CurrentClusterName) if err != nil { @@ -817,13 +772,12 @@ func (p *membershipFactoryImpl) GetMembershipMonitor() (membership.Monitor, erro } type rpcFactoryImpl struct { - serviceName string - ringpopServiceName string - grpcHostPort string - ringpopHostPort string - logger log.Logger + serviceName string + grpcHostPort string + ringpopHostPort string + logger log.Logger - sync.Mutex + sync.RWMutex listener net.Listener ringpopChannel *tchannel.Channel serverCfg config.GroupTLS @@ -855,9 +809,12 @@ func newRPCFactoryImpl(sName, grpcHostPort, ringpopHostPort string, logger log.L } func (c *rpcFactoryImpl) GetGRPCListener() net.Listener { + c.RLock() if c.listener != nil { + c.RUnlock() return c.listener } + c.RUnlock() c.Lock() defer c.Unlock() @@ -894,7 +851,7 @@ func (c *rpcFactoryImpl) GetRingpopChannel() *tchannel.Channel { err = c.ringpopChannel.ListenAndServe(c.ringpopHostPort) if err != nil { - c.logger.Fatal("Failed to start ringpop listener", tag.Error(err), tag.Address(c.ringpopHostPort)) + c.logger.Fatal("Failed to start ringpop ListenAndServe", tag.Error(err), tag.Address(c.ringpopHostPort)) } } diff --git a/host/taskpoller.go b/host/taskpoller.go index 723bbf63b15..ed21ac9ba31 100644 --- a/host/taskpoller.go +++ b/host/taskpoller.go @@ -146,6 +146,7 @@ Loop: if pollStickyTaskQueue { taskQueue = p.StickyTaskQueue } + response, err1 := p.Engine.PollWorkflowTaskQueue(NewContext(), &workflowservice.PollWorkflowTaskQueueRequest{ Namespace: p.Namespace, TaskQueue: taskQueue, diff --git a/host/test_cluster.go b/host/test_cluster.go index fc2263e96ab..a71fb39224e 100644 --- a/host/test_cluster.go +++ b/host/test_cluster.go @@ -290,10 +290,6 @@ func (tc *TestCluster) SetFaultInjectionRate(rate float64) { tc.host.frontendService.GetFaultInjection().UpdateRate(rate) } - if tc.host.workerService != nil && tc.host.workerService.GetFaultInjection() != nil { - tc.host.workerService.GetFaultInjection().UpdateRate(rate) - } - for _, s := range tc.host.historyServices { if s.GetFaultInjection() != nil { s.GetFaultInjection().UpdateRate(rate)