diff --git a/service/history/fx.go b/service/history/fx.go index 194fd26720e..de8a562631d 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -58,12 +58,14 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" ) var Module = fx.Options( resource.Module, workflow.Module, + shard.Module, fx.Provide(ParamsExpandProvider), // BootstrapParams should be deprecated fx.Provide(dynamicconfig.NewCollection), fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly @@ -78,6 +80,7 @@ var Module = fx.Options( fx.Provide(HandlerProvider), fx.Provide(ServiceProvider), fx.Invoke(ServiceLifetimeHooks), + fx.Invoke(func(h *Handler, c *shard.ControllerImpl) { c.SetEngineFactory(h) }), // create cycle ) func ServiceProvider( @@ -132,6 +135,7 @@ func HandlerProvider( archivalMetadata archiver.ArchivalMetadata, hostInfoProvider resource.HostInfoProvider, archiverProvider provider.ArchiverProvider, + shardController *shard.ControllerImpl, ) *Handler { args := NewHandlerArgs{ config, @@ -157,6 +161,7 @@ func HandlerProvider( archivalMetadata, hostInfoProvider, archiverProvider, + shardController, } return NewHandler(args) } @@ -273,5 +278,4 @@ func ServiceLifetimeHooks( }, }, ) - } diff --git a/service/history/handler.go b/service/history/handler.go index 8629d0b1af1..65ae1ec042b 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -76,7 +76,6 @@ type ( Handler struct { status int32 - controller *shard.ControllerImpl tokenSerializer common.TaskTokenSerializer startWG sync.WaitGroup config *configs.Config @@ -104,6 +103,7 @@ type ( clusterMetadata cluster.Metadata archivalMetadata archiver.ArchivalMetadata hostInfoProvider resource.HostInfoProvider + controller *shard.ControllerImpl } NewHandlerArgs struct { @@ -130,6 +130,7 @@ type ( ArchivalMetadata archiver.ArchivalMetadata HostInfoProvider resource.HostInfoProvider ArchiverProvider provider.ArchiverProvider + ShardController *shard.ControllerImpl } ) @@ -184,6 +185,7 @@ func NewHandler(args NewHandlerArgs) *Handler { archivalMetadata: args.ArchivalMetadata, hostInfoProvider: args.HostInfoProvider, archiverProvider: args.ArchiverProvider, + controller: args.ShardController, } // prevent us from trying to serve requests before shard controller is started and ready @@ -210,26 +212,6 @@ func (h *Handler) Start() { h.replicationTaskFetchers.Start() - h.controller = shard.NewController( - h, - h.config, - h.logger, - h.throttledLogger, - h.persistenceExecutionManager, - h.persistenceShardManager, - h.clientBean, - h.historyClient, - h.historyServiceResolver, - h.metricsClient, - h.payloadSerializer, - h.timeSource, - h.namespaceRegistry, - h.saProvider, - h.saMapper, - h.clusterMetadata, - h.archivalMetadata, - h.hostInfoProvider, - ) h.eventNotifier = events.NewNotifier(h.timeSource, h.metricsClient, h.config.GetShardID) // events notifier must starts before controller h.eventNotifier.Start() diff --git a/service/history/service.go b/service/history/service.go index 749620444e6..bd67a3efb3f 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -30,7 +30,6 @@ import ( "sync/atomic" "time" - "go.uber.org/fx" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -48,8 +47,6 @@ import ( // Service represents the history service type ( Service struct { - self *fx.App - status int32 handler *Handler visibilityManager manager.VisibilityManager diff --git a/service/history/shard/controller_impl.go b/service/history/shard/controller_impl.go index fe25974fe16..a49749363bd 100644 --- a/service/history/shard/controller_impl.go +++ b/service/history/shard/controller_impl.go @@ -88,53 +88,8 @@ type ( } ) -func NewController( - factory EngineFactory, - config *configs.Config, - logger log.Logger, - throttledLogger log.Logger, - persistenceExecutionManager persistence.ExecutionManager, - persistenceShardManager persistence.ShardManager, - clientBean client.Bean, - historyClient historyservice.HistoryServiceClient, - historyServiceResolver membership.ServiceResolver, - metricsClient metrics.Client, - payloadSerializer serialization.Serializer, - timeSource clock.TimeSource, - namespaceRegistry namespace.Registry, - saProvider searchattribute.Provider, - saMapper searchattribute.Mapper, - clusterMetadata cluster.Metadata, - archivalMetadata archiver.ArchivalMetadata, - hostInfoProvider resource.HostInfoProvider, -) *ControllerImpl { - hostIdentity := hostInfoProvider.HostInfo().Identity() - return &ControllerImpl{ - status: common.DaemonStatusInitialized, - membershipUpdateCh: make(chan *membership.ChangedEvent, 10), - engineFactory: factory, - historyShards: make(map[int32]*ContextImpl), - shutdownCh: make(chan struct{}), - logger: logger, - contextTaggedLogger: log.With(logger, tag.ComponentShardController, tag.Address(hostIdentity)), - throttledLogger: log.With(throttledLogger, tag.ComponentShardController, tag.Address(hostIdentity)), - config: config, - metricsScope: metricsClient.Scope(metrics.HistoryShardControllerScope), - persistenceExecutionManager: persistenceExecutionManager, - persistenceShardManager: persistenceShardManager, - clientBean: clientBean, - historyClient: historyClient, - historyServiceResolver: historyServiceResolver, - metricsClient: metricsClient, - payloadSerializer: payloadSerializer, - timeSource: timeSource, - namespaceRegistry: namespaceRegistry, - saProvider: saProvider, - saMapper: saMapper, - clusterMetadata: clusterMetadata, - archivalMetadata: archivalMetadata, - hostInfoProvider: hostInfoProvider, - } +func (c *ControllerImpl) SetEngineFactory(engineFactory EngineFactory) { + c.engineFactory = engineFactory } func (c *ControllerImpl) Start() { @@ -146,6 +101,14 @@ func (c *ControllerImpl) Start() { return } + if c.engineFactory == nil { + panic("engineFactory was not injected") + } + + hostIdentity := c.hostInfoProvider.HostInfo().Identity() + c.contextTaggedLogger = log.With(c.logger, tag.ComponentShardController, tag.Address(hostIdentity)) + c.throttledLogger = log.With(c.throttledLogger, tag.ComponentShardController, tag.Address(hostIdentity)) + c.acquireShards() c.shutdownWG.Add(1) go c.shardManagementPump() diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index 32e4517d402..8b18076463a 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -43,6 +43,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" @@ -80,6 +81,41 @@ type ( } ) +func NewTestController( + engineFactory *MockEngineFactory, + config *configs.Config, + resource *resource.Test, + hostInfoProvider *resource.MockHostInfoProvider, +) *ControllerImpl { + return &ControllerImpl{ + config: config, + logger: resource.GetLogger(), + throttledLogger: resource.GetThrottledLogger(), + contextTaggedLogger: log.With(resource.GetLogger(), tag.ComponentShardController, tag.Address(resource.GetHostInfo().Identity())), + persistenceExecutionManager: resource.GetExecutionManager(), + persistenceShardManager: resource.GetShardManager(), + clientBean: resource.GetClientBean(), + historyClient: resource.GetHistoryClient(), + historyServiceResolver: resource.GetHistoryServiceResolver(), + metricsClient: resource.GetMetricsClient(), + payloadSerializer: resource.GetPayloadSerializer(), + timeSource: resource.GetTimeSource(), + namespaceRegistry: resource.GetNamespaceRegistry(), + saProvider: resource.GetSearchAttributesProvider(), + saMapper: resource.GetSearchAttributesMapper(), + clusterMetadata: resource.GetClusterMetadata(), + archivalMetadata: resource.GetArchivalMetadata(), + hostInfoProvider: hostInfoProvider, + + status: common.DaemonStatusInitialized, + membershipUpdateCh: make(chan *membership.ChangedEvent, 10), + engineFactory: engineFactory, + shutdownCh: make(chan struct{}), + metricsScope: resource.GetMetricsClient().Scope(metrics.HistoryShardControllerScope), + historyShards: make(map[int32]*ContextImpl), + } +} + func TestShardControllerSuite(t *testing.T) { s := new(controllerSuite) suite.Run(t, s) @@ -103,24 +139,10 @@ func (s *controllerSuite) SetupTest() { s.logger = s.mockResource.Logger s.config = tests.NewDynamicConfig() - s.shardController = NewController( + s.shardController = NewTestController( s.mockEngineFactory, s.config, - s.mockResource.Logger, - s.mockResource.GetThrottledLogger(), - s.mockResource.GetExecutionManager(), - s.mockResource.GetShardManager(), - s.mockResource.GetClientBean(), - s.mockResource.GetHistoryClient(), - s.mockResource.GetHistoryServiceResolver(), - s.mockResource.GetMetricsClient(), - s.mockResource.GetPayloadSerializer(), - s.mockResource.GetTimeSource(), - s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesProvider(), - s.mockResource.GetSearchAttributesMapper(), - s.mockResource.GetClusterMetadata(), - s.mockResource.GetArchivalMetadata(), + s.mockResource, s.mockHostInfoProvider, ) } @@ -476,24 +498,10 @@ func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { func (s *controllerSuite) TestHistoryEngineClosed() { numShards := int32(4) s.config.NumberOfShards = numShards - s.shardController = NewController( + s.shardController = NewTestController( s.mockEngineFactory, s.config, - s.mockResource.Logger, - s.mockResource.GetThrottledLogger(), - s.mockResource.GetExecutionManager(), - s.mockResource.GetShardManager(), - s.mockResource.GetClientBean(), - s.mockResource.GetHistoryClient(), - s.mockResource.GetHistoryServiceResolver(), - s.mockResource.GetMetricsClient(), - s.mockResource.GetPayloadSerializer(), - s.mockResource.GetTimeSource(), - s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesProvider(), - s.mockResource.GetSearchAttributesMapper(), - s.mockResource.GetClusterMetadata(), - s.mockResource.GetArchivalMetadata(), + s.mockResource, s.mockHostInfoProvider, ) historyEngines := make(map[int32]*MockEngine) @@ -586,24 +594,10 @@ func (s *controllerSuite) TestHistoryEngineClosed() { func (s *controllerSuite) TestShardControllerClosed() { numShards := int32(4) s.config.NumberOfShards = numShards - s.shardController = NewController( + s.shardController = NewTestController( s.mockEngineFactory, s.config, - s.mockResource.Logger, - s.mockResource.GetThrottledLogger(), - s.mockResource.GetExecutionManager(), - s.mockResource.GetShardManager(), - s.mockResource.GetClientBean(), - s.mockResource.GetHistoryClient(), - s.mockResource.GetHistoryServiceResolver(), - s.mockResource.GetMetricsClient(), - s.mockResource.GetPayloadSerializer(), - s.mockResource.GetTimeSource(), - s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesProvider(), - s.mockResource.GetSearchAttributesMapper(), - s.mockResource.GetClusterMetadata(), - s.mockResource.GetArchivalMetadata(), + s.mockResource, s.mockHostInfoProvider, ) diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go new file mode 100644 index 00000000000..ba2a227aeba --- /dev/null +++ b/service/history/shard/fx.go @@ -0,0 +1,95 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package shard + +import ( + "go.uber.org/fx" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common" + "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/membership" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/service/history/configs" +) + +var Module = fx.Options( + fx.Provide(ShardControllerProvider), +) + +func ShardControllerProvider( + config *configs.Config, + logger log.Logger, + throttledLogger resource.ThrottledLogger, + persistenceExecutionManager persistence.ExecutionManager, + persistenceShardManager persistence.ShardManager, + clientBean client.Bean, + historyClient historyservice.HistoryServiceClient, + historyServiceResolver membership.ServiceResolver, + metricsClient metrics.Client, + payloadSerializer serialization.Serializer, + timeSource clock.TimeSource, + namespaceRegistry namespace.Registry, + saProvider searchattribute.Provider, + saMapper searchattribute.Mapper, + clusterMetadata cluster.Metadata, + archivalMetadata archiver.ArchivalMetadata, + hostInfoProvider resource.HostInfoProvider, +) *ControllerImpl { + return &ControllerImpl{ + status: common.DaemonStatusInitialized, + membershipUpdateCh: make(chan *membership.ChangedEvent, 10), + historyShards: make(map[int32]*ContextImpl), + shutdownCh: make(chan struct{}), + logger: logger, + contextTaggedLogger: logger, // will add tags in Start + throttledLogger: throttledLogger, // will add tags in Start + config: config, + metricsScope: metricsClient.Scope(metrics.HistoryShardControllerScope), + persistenceExecutionManager: persistenceExecutionManager, + persistenceShardManager: persistenceShardManager, + clientBean: clientBean, + historyClient: historyClient, + historyServiceResolver: historyServiceResolver, + metricsClient: metricsClient, + payloadSerializer: payloadSerializer, + timeSource: timeSource, + namespaceRegistry: namespaceRegistry, + saProvider: saProvider, + saMapper: saMapper, + clusterMetadata: clusterMetadata, + archivalMetadata: archivalMetadata, + hostInfoProvider: hostInfoProvider, + } +}