Skip to content

Commit

Permalink
Separate history engine factory from handler (#2409)
Browse files Browse the repository at this point in the history
- Separate history engine factory implementation from history handler
- Clean up some unnecessary parameters after the separation
  • Loading branch information
yycptt authored Jan 24, 2022
1 parent 87dc0ef commit 410b593
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 81 deletions.
81 changes: 64 additions & 17 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
Expand All @@ -77,10 +78,12 @@ var Module = fx.Options(
fx.Provide(ThrottledLoggerRpsFnProvider),
fx.Provide(PersistenceMaxQpsProvider),
fx.Provide(ServiceResolverProvider),
fx.Provide(EventNotifierProvider),
fx.Provide(ReplicationTaskFetchersProvider),
fx.Provide(HistoryEngineFactoryProvider),
fx.Provide(HandlerProvider),
fx.Provide(ServiceProvider),
fx.Invoke(ServiceLifetimeHooks),
fx.Invoke(func(h *Handler, c *shard.ControllerImpl) { c.SetEngineFactory(h) }), // create cycle
)

func ServiceProvider(
Expand Down Expand Up @@ -113,17 +116,10 @@ func ServiceResolverProvider(membershipMonitor membership.Monitor) (membership.S

func HandlerProvider(
config *configs.Config,
visibilityMrg manager.VisibilityManager,
newCacheFn workflow.NewCacheFn,
logger resource.SnTaggedLogger,
throttledLogger resource.ThrottledLogger,
persistenceExecutionManager persistence.ExecutionManager,
persistenceShardManager persistence.ShardManager,
clientBean client.Bean,
historyClient historyservice.HistoryServiceClient,
matchingRawClient resource.MatchingRawClient,
matchingClient resource.MatchingClient,
sdkSystemClient sdkclient.Client,
historyServiceResolver membership.ServiceResolver,
metricsClient metrics.Client,
payloadSerializer serialization.Serializer,
Expand All @@ -134,22 +130,16 @@ func HandlerProvider(
clusterMetadata cluster.Metadata,
archivalMetadata archiver.ArchivalMetadata,
hostInfoProvider resource.HostInfoProvider,
archiverProvider provider.ArchiverProvider,
shardController *shard.ControllerImpl,
eventNotifier events.Notifier,
replicationTaskFetchers ReplicationTaskFetchers,
) *Handler {
args := NewHandlerArgs{
config,
visibilityMrg,
newCacheFn,
logger,
throttledLogger,
persistenceExecutionManager,
persistenceShardManager,
clientBean,
historyClient,
matchingRawClient,
matchingClient,
sdkSystemClient,
historyServiceResolver,
metricsClient,
payloadSerializer,
Expand All @@ -160,12 +150,43 @@ func HandlerProvider(
clusterMetadata,
archivalMetadata,
hostInfoProvider,
archiverProvider,
shardController,
eventNotifier,
replicationTaskFetchers,
}
return NewHandler(args)
}

func HistoryEngineFactoryProvider(
visibilityMgr manager.VisibilityManager,
matchingClient resource.MatchingClient,
historyClient historyservice.HistoryServiceClient,
publicClient sdkclient.Client,
eventNotifier events.Notifier,
config *configs.Config,
replicationTaskFetchers ReplicationTaskFetchers,
rawMatchingClient resource.MatchingRawClient,
newCacheFn workflow.NewCacheFn,
clientBean client.Bean,
archiverProvider provider.ArchiverProvider,
registry namespace.Registry,
) shard.EngineFactory {
return NewEngineFactory(
visibilityMgr,
matchingClient,
historyClient,
publicClient,
eventNotifier,
config,
replicationTaskFetchers,
rawMatchingClient,
newCacheFn,
clientBean,
archiverProvider,
registry,
)
}

func ParamsExpandProvider(params *resource.BootstrapParams) common.RPCFactory {
return params.RPCFactory
}
Expand Down Expand Up @@ -259,6 +280,32 @@ func VisibilityManagerProvider(
)
}

func EventNotifierProvider(
timeSource clock.TimeSource,
metricsClient metrics.Client,
config *configs.Config,
) events.Notifier {
return events.NewNotifier(
timeSource,
metricsClient,
config.GetShardID,
)
}

func ReplicationTaskFetchersProvider(
logger log.Logger,
config *configs.Config,
clusterMetadata cluster.Metadata,
clientBean client.Bean,
) ReplicationTaskFetchers {
return NewReplicationTaskFetchers(
logger,
config,
clusterMetadata,
clientBean,
)
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
Expand Down
64 changes: 4 additions & 60 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,15 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
sdkclient "go.temporal.io/sdk/client"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
namespacespb "go.temporal.io/server/api/namespace/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/archiver/provider"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
Expand All @@ -58,7 +54,6 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/searchattribute"
Expand All @@ -67,7 +62,6 @@ import (
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

type (
Expand All @@ -81,19 +75,11 @@ type (
config *configs.Config
eventNotifier events.Notifier
replicationTaskFetchers ReplicationTaskFetchers
visibilityMrg manager.VisibilityManager
newCacheFn workflow.NewCacheFn
logger log.Logger
throttledLogger log.Logger
persistenceExecutionManager persistence.ExecutionManager
persistenceShardManager persistence.ShardManager
clientBean client.Bean
historyClient historyservice.HistoryServiceClient
matchingRawClient matchingservice.MatchingServiceClient
matchingClient matchingservice.MatchingServiceClient
sdkClient sdkclient.Client
historyServiceResolver membership.ServiceResolver
archiverProvider provider.ArchiverProvider
metricsClient metrics.Client
payloadSerializer serialization.Serializer
timeSource clock.TimeSource
Expand All @@ -108,17 +94,10 @@ type (

NewHandlerArgs struct {
Config *configs.Config
VisibilityMrg manager.VisibilityManager
NewCacheFn workflow.NewCacheFn
Logger log.Logger
ThrottledLogger log.Logger
PersistenceExecutionManager persistence.ExecutionManager
PersistenceShardManager persistence.ShardManager
ClientBean client.Bean
HistoryClient historyservice.HistoryServiceClient
MatchingRawClient matchingservice.MatchingServiceClient
MatchingClient matchingservice.MatchingServiceClient
SdkSystemClient sdkclient.Client
HistoryServiceResolver membership.ServiceResolver
MetricsClient metrics.Client
PayloadSerializer serialization.Serializer
Expand All @@ -129,8 +108,9 @@ type (
ClusterMetadata cluster.Metadata
ArchivalMetadata archiver.ArchivalMetadata
HostInfoProvider resource.HostInfoProvider
ArchiverProvider provider.ArchiverProvider
ShardController *shard.ControllerImpl
EventNotifier events.Notifier
ReplicationTaskFetchers ReplicationTaskFetchers
}
)

Expand All @@ -139,7 +119,6 @@ const (
)

var (
_ shard.EngineFactory = (*Handler)(nil)
_ historyservice.HistoryServiceServer = (*Handler)(nil)

errNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.")
Expand All @@ -163,17 +142,10 @@ func NewHandler(args NewHandlerArgs) *Handler {
status: common.DaemonStatusInitialized,
config: args.Config,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
visibilityMrg: args.VisibilityMrg,
newCacheFn: args.NewCacheFn,
logger: args.Logger,
throttledLogger: args.ThrottledLogger,
persistenceExecutionManager: args.PersistenceExecutionManager,
persistenceShardManager: args.PersistenceShardManager,
clientBean: args.ClientBean,
historyClient: args.HistoryClient,
matchingRawClient: args.MatchingRawClient,
matchingClient: args.MatchingClient,
sdkClient: args.SdkSystemClient,
historyServiceResolver: args.HistoryServiceResolver,
metricsClient: args.MetricsClient,
payloadSerializer: args.PayloadSerializer,
Expand All @@ -184,8 +156,9 @@ func NewHandler(args NewHandlerArgs) *Handler {
clusterMetadata: args.ClusterMetadata,
archivalMetadata: args.ArchivalMetadata,
hostInfoProvider: args.HostInfoProvider,
archiverProvider: args.ArchiverProvider,
controller: args.ShardController,
eventNotifier: args.EventNotifier,
replicationTaskFetchers: args.ReplicationTaskFetchers,
}

// prevent us from trying to serve requests before shard controller is started and ready
Expand All @@ -203,16 +176,8 @@ func (h *Handler) Start() {
return
}

h.replicationTaskFetchers = NewReplicationTaskFetchers(
h.logger,
h.config,
h.clusterMetadata,
h.clientBean,
)

h.replicationTaskFetchers.Start()

h.eventNotifier = events.NewNotifier(h.timeSource, h.metricsClient, h.config.GetShardID)
// events notifier must starts before controller
h.eventNotifier.Start()
h.controller.Start()
Expand All @@ -239,27 +204,6 @@ func (h *Handler) isStopped() bool {
return atomic.LoadInt32(&h.status) == common.DaemonStatusStopped
}

// CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard
func (h *Handler) CreateEngine(
shardContext shard.Context,
) shard.Engine {
return NewEngineWithShardContext(
shardContext,
h.visibilityMrg,
h.matchingClient,
h.historyClient,
h.sdkClient,
h.eventNotifier,
h.config,
h.replicationTaskFetchers,
h.matchingRawClient,
h.newCacheFn,
h.clientBean,
h.archiverProvider,
h.namespaceRegistry,
)
}

// Check is from: https://github.com/grpc/grpc/blob/master/doc/health-checking.md
func (h *Handler) Check(_ context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
h.logger.Debug("History service health check endpoint (gRPC) reached.")
Expand Down
Loading

0 comments on commit 410b593

Please sign in to comment.