From 53d1b47682702cf0a320ee5b85e0057eaa5959a4 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 10 Dec 2021 10:24:36 -0800 Subject: [PATCH 1/8] initial --- service/history/fx.go | 4 ++ service/history/handler.go | 25 ++------ service/history/shard/controller_impl.go | 49 +--------------- service/history/shard/fx.go | 74 ++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 67 deletions(-) create mode 100644 service/history/shard/fx.go diff --git a/service/history/fx.go b/service/history/fx.go index 194fd26720e..aa3bf400719 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -58,12 +58,14 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" ) var Module = fx.Options( resource.Module, workflow.Module, + shard.Module, fx.Provide(ParamsExpandProvider), // BootstrapParams should be deprecated fx.Provide(dynamicconfig.NewCollection), fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly @@ -132,6 +134,7 @@ func HandlerProvider( archivalMetadata archiver.ArchivalMetadata, hostInfoProvider resource.HostInfoProvider, archiverProvider provider.ArchiverProvider, + shardController *shard.ControllerImpl, ) *Handler { args := NewHandlerArgs{ config, @@ -157,6 +160,7 @@ func HandlerProvider( archivalMetadata, hostInfoProvider, archiverProvider, + shardController, } return NewHandler(args) } diff --git a/service/history/handler.go b/service/history/handler.go index 8629d0b1af1..d1aa47c6f7c 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -130,6 +130,7 @@ type ( ArchivalMetadata archiver.ArchivalMetadata HostInfoProvider resource.HostInfoProvider ArchiverProvider provider.ArchiverProvider + ShardController *shard.ControllerImpl } ) @@ -184,8 +185,12 @@ func NewHandler(args NewHandlerArgs) *Handler { archivalMetadata: args.ArchivalMetadata, hostInfoProvider: args.HostInfoProvider, archiverProvider: args.ArchiverProvider, + controller: args.ShardController, } + // fx can't create a cycle, so fix it up manually + handler.controller.SetEngineFactory(handler) + // prevent us from trying to serve requests before shard controller is started and ready handler.startWG.Add(1) return handler @@ -210,26 +215,6 @@ func (h *Handler) Start() { h.replicationTaskFetchers.Start() - h.controller = shard.NewController( - h, - h.config, - h.logger, - h.throttledLogger, - h.persistenceExecutionManager, - h.persistenceShardManager, - h.clientBean, - h.historyClient, - h.historyServiceResolver, - h.metricsClient, - h.payloadSerializer, - h.timeSource, - h.namespaceRegistry, - h.saProvider, - h.saMapper, - h.clusterMetadata, - h.archivalMetadata, - h.hostInfoProvider, - ) h.eventNotifier = events.NewNotifier(h.timeSource, h.metricsClient, h.config.GetShardID) // events notifier must starts before controller h.eventNotifier.Start() diff --git a/service/history/shard/controller_impl.go b/service/history/shard/controller_impl.go index fe25974fe16..91171bcfd6a 100644 --- a/service/history/shard/controller_impl.go +++ b/service/history/shard/controller_impl.go @@ -88,53 +88,8 @@ type ( } ) -func NewController( - factory EngineFactory, - config *configs.Config, - logger log.Logger, - throttledLogger log.Logger, - persistenceExecutionManager persistence.ExecutionManager, - persistenceShardManager persistence.ShardManager, - clientBean client.Bean, - historyClient historyservice.HistoryServiceClient, - historyServiceResolver membership.ServiceResolver, - metricsClient metrics.Client, - payloadSerializer serialization.Serializer, - timeSource clock.TimeSource, - namespaceRegistry namespace.Registry, - saProvider searchattribute.Provider, - saMapper searchattribute.Mapper, - clusterMetadata cluster.Metadata, - archivalMetadata archiver.ArchivalMetadata, - hostInfoProvider resource.HostInfoProvider, -) *ControllerImpl { - hostIdentity := hostInfoProvider.HostInfo().Identity() - return &ControllerImpl{ - status: common.DaemonStatusInitialized, - membershipUpdateCh: make(chan *membership.ChangedEvent, 10), - engineFactory: factory, - historyShards: make(map[int32]*ContextImpl), - shutdownCh: make(chan struct{}), - logger: logger, - contextTaggedLogger: log.With(logger, tag.ComponentShardController, tag.Address(hostIdentity)), - throttledLogger: log.With(throttledLogger, tag.ComponentShardController, tag.Address(hostIdentity)), - config: config, - metricsScope: metricsClient.Scope(metrics.HistoryShardControllerScope), - persistenceExecutionManager: persistenceExecutionManager, - persistenceShardManager: persistenceShardManager, - clientBean: clientBean, - historyClient: historyClient, - historyServiceResolver: historyServiceResolver, - metricsClient: metricsClient, - payloadSerializer: payloadSerializer, - timeSource: timeSource, - namespaceRegistry: namespaceRegistry, - saProvider: saProvider, - saMapper: saMapper, - clusterMetadata: clusterMetadata, - archivalMetadata: archivalMetadata, - hostInfoProvider: hostInfoProvider, - } +func (c *ControllerImpl) SetEngineFactory(engineFactory EngineFactory) { + c.engineFactory = engineFactory } func (c *ControllerImpl) Start() { diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go new file mode 100644 index 00000000000..be59b28a832 --- /dev/null +++ b/service/history/shard/fx.go @@ -0,0 +1,74 @@ +// FIXME: copyright + +package shard + +import ( + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common" + "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/membership" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/service/history/configs" + "go.uber.org/fx" +) + +var Module = fx.Options( + fx.Provide(ShardControllerProvider), +) + +func ShardControllerProvider( + config *configs.Config, + logger log.Logger, + throttledLogger log.Logger, + persistenceExecutionManager persistence.ExecutionManager, + persistenceShardManager persistence.ShardManager, + clientBean client.Bean, + historyClient historyservice.HistoryServiceClient, + historyServiceResolver membership.ServiceResolver, + metricsClient metrics.Client, + payloadSerializer serialization.Serializer, + timeSource clock.TimeSource, + namespaceRegistry namespace.Registry, + saProvider searchattribute.Provider, + saMapper searchattribute.Mapper, + clusterMetadata cluster.Metadata, + archivalMetadata archiver.ArchivalMetadata, + hostInfoProvider resource.HostInfoProvider, +) *ControllerImpl { + hostIdentity := hostInfoProvider.HostInfo().Identity() + return &ControllerImpl{ + status: common.DaemonStatusInitialized, + membershipUpdateCh: make(chan *membership.ChangedEvent, 10), + historyShards: make(map[int32]*ContextImpl), + shutdownCh: make(chan struct{}), + logger: logger, + contextTaggedLogger: log.With(logger, tag.ComponentShardController, tag.Address(hostIdentity)), + throttledLogger: log.With(throttledLogger, tag.ComponentShardController, tag.Address(hostIdentity)), + config: config, + metricsScope: metricsClient.Scope(metrics.HistoryShardControllerScope), + persistenceExecutionManager: persistenceExecutionManager, + persistenceShardManager: persistenceShardManager, + clientBean: clientBean, + historyClient: historyClient, + historyServiceResolver: historyServiceResolver, + metricsClient: metricsClient, + payloadSerializer: payloadSerializer, + timeSource: timeSource, + namespaceRegistry: namespaceRegistry, + saProvider: saProvider, + saMapper: saMapper, + clusterMetadata: clusterMetadata, + archivalMetadata: archivalMetadata, + hostInfoProvider: hostInfoProvider, + } +} From bce43d1412cfd12bb88ef9e77eb45371c159c955 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 10 Dec 2021 10:24:44 -0800 Subject: [PATCH 2/8] move hostinfo thing into start --- service/history/shard/controller_impl.go | 4 ++++ service/history/shard/fx.go | 6 ++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/service/history/shard/controller_impl.go b/service/history/shard/controller_impl.go index 91171bcfd6a..66d6624ccbd 100644 --- a/service/history/shard/controller_impl.go +++ b/service/history/shard/controller_impl.go @@ -101,6 +101,10 @@ func (c *ControllerImpl) Start() { return } + hostIdentity := c.hostInfoProvider.HostInfo().Identity() + c.contextTaggedLogger = log.With(c.logger, tag.ComponentShardController, tag.Address(hostIdentity)) + c.throttledLogger = log.With(c.throttledLogger, tag.ComponentShardController, tag.Address(hostIdentity)) + c.acquireShards() c.shutdownWG.Add(1) go c.shardManagementPump() diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go index be59b28a832..f3b3e313286 100644 --- a/service/history/shard/fx.go +++ b/service/history/shard/fx.go @@ -10,7 +10,6 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -45,15 +44,14 @@ func ShardControllerProvider( archivalMetadata archiver.ArchivalMetadata, hostInfoProvider resource.HostInfoProvider, ) *ControllerImpl { - hostIdentity := hostInfoProvider.HostInfo().Identity() return &ControllerImpl{ status: common.DaemonStatusInitialized, membershipUpdateCh: make(chan *membership.ChangedEvent, 10), historyShards: make(map[int32]*ContextImpl), shutdownCh: make(chan struct{}), logger: logger, - contextTaggedLogger: log.With(logger, tag.ComponentShardController, tag.Address(hostIdentity)), - throttledLogger: log.With(throttledLogger, tag.ComponentShardController, tag.Address(hostIdentity)), + contextTaggedLogger: logger, // will add tags in Start + throttledLogger: throttledLogger, // will add tags in Start config: config, metricsScope: metricsClient.Scope(metrics.HistoryShardControllerScope), persistenceExecutionManager: persistenceExecutionManager, From efb846af522f157fa98f0bf95822fbcc7cde2024 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 10 Dec 2021 10:48:37 -0800 Subject: [PATCH 3/8] do cycle with invoke --- service/history/fx.go | 2 +- service/history/handler.go | 3 --- service/history/shard/controller_impl.go | 4 ++++ service/history/shard/fx.go | 3 ++- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/service/history/fx.go b/service/history/fx.go index aa3bf400719..de8a562631d 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -80,6 +80,7 @@ var Module = fx.Options( fx.Provide(HandlerProvider), fx.Provide(ServiceProvider), fx.Invoke(ServiceLifetimeHooks), + fx.Invoke(func(h *Handler, c *shard.ControllerImpl) { c.SetEngineFactory(h) }), // create cycle ) func ServiceProvider( @@ -277,5 +278,4 @@ func ServiceLifetimeHooks( }, }, ) - } diff --git a/service/history/handler.go b/service/history/handler.go index d1aa47c6f7c..8adb28703f7 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -188,9 +188,6 @@ func NewHandler(args NewHandlerArgs) *Handler { controller: args.ShardController, } - // fx can't create a cycle, so fix it up manually - handler.controller.SetEngineFactory(handler) - // prevent us from trying to serve requests before shard controller is started and ready handler.startWG.Add(1) return handler diff --git a/service/history/shard/controller_impl.go b/service/history/shard/controller_impl.go index 66d6624ccbd..a49749363bd 100644 --- a/service/history/shard/controller_impl.go +++ b/service/history/shard/controller_impl.go @@ -101,6 +101,10 @@ func (c *ControllerImpl) Start() { return } + if c.engineFactory == nil { + panic("engineFactory was not injected") + } + hostIdentity := c.hostInfoProvider.HostInfo().Identity() c.contextTaggedLogger = log.With(c.logger, tag.ComponentShardController, tag.Address(hostIdentity)) c.throttledLogger = log.With(c.throttledLogger, tag.ComponentShardController, tag.Address(hostIdentity)) diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go index f3b3e313286..fb0beca11ee 100644 --- a/service/history/shard/fx.go +++ b/service/history/shard/fx.go @@ -3,6 +3,8 @@ package shard import ( + "go.uber.org/fx" + "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" "go.temporal.io/server/common" @@ -18,7 +20,6 @@ import ( "go.temporal.io/server/common/resource" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" - "go.uber.org/fx" ) var Module = fx.Options( From fd2c1e95f8942717277e45f4e3ac92773e52dc83 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 10 Dec 2021 10:49:04 -0800 Subject: [PATCH 4/8] remove unused Service.self --- service/history/service.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/service/history/service.go b/service/history/service.go index 749620444e6..bd67a3efb3f 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -30,7 +30,6 @@ import ( "sync/atomic" "time" - "go.uber.org/fx" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -48,8 +47,6 @@ import ( // Service represents the history service type ( Service struct { - self *fx.App - status int32 handler *Handler visibilityManager manager.VisibilityManager From 08adc127fa864a12bb913493156318908fdfda31 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 10 Dec 2021 13:27:56 -0800 Subject: [PATCH 5/8] move to bottom to match args struct --- service/history/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/handler.go b/service/history/handler.go index 8adb28703f7..65ae1ec042b 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -76,7 +76,6 @@ type ( Handler struct { status int32 - controller *shard.ControllerImpl tokenSerializer common.TaskTokenSerializer startWG sync.WaitGroup config *configs.Config @@ -104,6 +103,7 @@ type ( clusterMetadata cluster.Metadata archivalMetadata archiver.ArchivalMetadata hostInfoProvider resource.HostInfoProvider + controller *shard.ControllerImpl } NewHandlerArgs struct { From edbf9a72dfc459f053d0d0a85bc6d2482d08874a Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 10 Dec 2021 13:35:20 -0800 Subject: [PATCH 6/8] need to use resource.throttledLogger type --- service/history/shard/fx.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go index fb0beca11ee..0c897470308 100644 --- a/service/history/shard/fx.go +++ b/service/history/shard/fx.go @@ -29,7 +29,7 @@ var Module = fx.Options( func ShardControllerProvider( config *configs.Config, logger log.Logger, - throttledLogger log.Logger, + throttledLogger resource.ThrottledLogger, persistenceExecutionManager persistence.ExecutionManager, persistenceShardManager persistence.ShardManager, clientBean client.Bean, From 111f59e96009ee5bcafb090791a4ab459f36c0ae Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 21 Dec 2021 11:54:24 -0800 Subject: [PATCH 7/8] copyright --- service/history/shard/fx.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go index 0c897470308..ba2a227aeba 100644 --- a/service/history/shard/fx.go +++ b/service/history/shard/fx.go @@ -1,4 +1,26 @@ -// FIXME: copyright +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. package shard From 18f8ce99e56116c170d169c6bb93fedfb48481ee Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 21 Dec 2021 12:04:23 -0800 Subject: [PATCH 8/8] NewTestController --- service/history/shard/controller_test.go | 90 +++++++++++------------- 1 file changed, 42 insertions(+), 48 deletions(-) diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index 32e4517d402..8b18076463a 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -43,6 +43,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" @@ -80,6 +81,41 @@ type ( } ) +func NewTestController( + engineFactory *MockEngineFactory, + config *configs.Config, + resource *resource.Test, + hostInfoProvider *resource.MockHostInfoProvider, +) *ControllerImpl { + return &ControllerImpl{ + config: config, + logger: resource.GetLogger(), + throttledLogger: resource.GetThrottledLogger(), + contextTaggedLogger: log.With(resource.GetLogger(), tag.ComponentShardController, tag.Address(resource.GetHostInfo().Identity())), + persistenceExecutionManager: resource.GetExecutionManager(), + persistenceShardManager: resource.GetShardManager(), + clientBean: resource.GetClientBean(), + historyClient: resource.GetHistoryClient(), + historyServiceResolver: resource.GetHistoryServiceResolver(), + metricsClient: resource.GetMetricsClient(), + payloadSerializer: resource.GetPayloadSerializer(), + timeSource: resource.GetTimeSource(), + namespaceRegistry: resource.GetNamespaceRegistry(), + saProvider: resource.GetSearchAttributesProvider(), + saMapper: resource.GetSearchAttributesMapper(), + clusterMetadata: resource.GetClusterMetadata(), + archivalMetadata: resource.GetArchivalMetadata(), + hostInfoProvider: hostInfoProvider, + + status: common.DaemonStatusInitialized, + membershipUpdateCh: make(chan *membership.ChangedEvent, 10), + engineFactory: engineFactory, + shutdownCh: make(chan struct{}), + metricsScope: resource.GetMetricsClient().Scope(metrics.HistoryShardControllerScope), + historyShards: make(map[int32]*ContextImpl), + } +} + func TestShardControllerSuite(t *testing.T) { s := new(controllerSuite) suite.Run(t, s) @@ -103,24 +139,10 @@ func (s *controllerSuite) SetupTest() { s.logger = s.mockResource.Logger s.config = tests.NewDynamicConfig() - s.shardController = NewController( + s.shardController = NewTestController( s.mockEngineFactory, s.config, - s.mockResource.Logger, - s.mockResource.GetThrottledLogger(), - s.mockResource.GetExecutionManager(), - s.mockResource.GetShardManager(), - s.mockResource.GetClientBean(), - s.mockResource.GetHistoryClient(), - s.mockResource.GetHistoryServiceResolver(), - s.mockResource.GetMetricsClient(), - s.mockResource.GetPayloadSerializer(), - s.mockResource.GetTimeSource(), - s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesProvider(), - s.mockResource.GetSearchAttributesMapper(), - s.mockResource.GetClusterMetadata(), - s.mockResource.GetArchivalMetadata(), + s.mockResource, s.mockHostInfoProvider, ) } @@ -476,24 +498,10 @@ func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { func (s *controllerSuite) TestHistoryEngineClosed() { numShards := int32(4) s.config.NumberOfShards = numShards - s.shardController = NewController( + s.shardController = NewTestController( s.mockEngineFactory, s.config, - s.mockResource.Logger, - s.mockResource.GetThrottledLogger(), - s.mockResource.GetExecutionManager(), - s.mockResource.GetShardManager(), - s.mockResource.GetClientBean(), - s.mockResource.GetHistoryClient(), - s.mockResource.GetHistoryServiceResolver(), - s.mockResource.GetMetricsClient(), - s.mockResource.GetPayloadSerializer(), - s.mockResource.GetTimeSource(), - s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesProvider(), - s.mockResource.GetSearchAttributesMapper(), - s.mockResource.GetClusterMetadata(), - s.mockResource.GetArchivalMetadata(), + s.mockResource, s.mockHostInfoProvider, ) historyEngines := make(map[int32]*MockEngine) @@ -586,24 +594,10 @@ func (s *controllerSuite) TestHistoryEngineClosed() { func (s *controllerSuite) TestShardControllerClosed() { numShards := int32(4) s.config.NumberOfShards = numShards - s.shardController = NewController( + s.shardController = NewTestController( s.mockEngineFactory, s.config, - s.mockResource.Logger, - s.mockResource.GetThrottledLogger(), - s.mockResource.GetExecutionManager(), - s.mockResource.GetShardManager(), - s.mockResource.GetClientBean(), - s.mockResource.GetHistoryClient(), - s.mockResource.GetHistoryServiceResolver(), - s.mockResource.GetMetricsClient(), - s.mockResource.GetPayloadSerializer(), - s.mockResource.GetTimeSource(), - s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesProvider(), - s.mockResource.GetSearchAttributesMapper(), - s.mockResource.GetClusterMetadata(), - s.mockResource.GetArchivalMetadata(), + s.mockResource, s.mockHostInfoProvider, )