From 8da885f994e918daaa438fc9062d22106e4dec44 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Wed, 12 Jan 2022 00:26:52 -0800 Subject: [PATCH] Use interceptor to record sdk versions --- common/headers/versionChecker.go | 3 +- common/rpc/interceptor/sdk_version.go | 97 +++++++++++++++++++ common/rpc/interceptor/sdk_version_test.go | 55 +++++++++++ service/frontend/dcRedirectionHandler_test.go | 5 - service/frontend/fx.go | 8 +- service/frontend/versionChecker.go | 46 ++------- service/frontend/workflowHandler.go | 84 +++++++--------- service/frontend/workflowHandler_test.go | 34 ------- 8 files changed, 204 insertions(+), 128 deletions(-) create mode 100644 common/rpc/interceptor/sdk_version.go create mode 100644 common/rpc/interceptor/sdk_version_test.go diff --git a/common/headers/versionChecker.go b/common/headers/versionChecker.go index e5b3ea129bfc..362f6d50e6ea 100644 --- a/common/headers/versionChecker.go +++ b/common/headers/versionChecker.go @@ -73,7 +73,6 @@ var ( type ( // VersionChecker is used to check client/server compatibility and client's capabilities VersionChecker interface { - GetClientNameAndVersion(ctx context.Context) (string, string) ClientSupported(ctx context.Context, enableClientVersionCheck bool) error ClientSupportsFeature(ctx context.Context, feature string) bool } @@ -99,7 +98,7 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string) } } -func (vc *versionChecker) GetClientNameAndVersion(ctx context.Context) (string, string) { +func GetClientNameAndVersion(ctx context.Context) (string, string) { headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName) clientName := headers[0] clientVersion := headers[1] diff --git a/common/rpc/interceptor/sdk_version.go b/common/rpc/interceptor/sdk_version.go new file mode 100644 index 000000000000..e87c9ce14e61 --- /dev/null +++ b/common/rpc/interceptor/sdk_version.go @@ -0,0 +1,97 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package interceptor + +import ( + "context" + "sync" + "sync/atomic" + + "go.temporal.io/server/common/headers" + "go.temporal.io/version/check" + "google.golang.org/grpc" +) + +type SDKInfoRecorder interface { + RecordSDKInfo(name, version string) + GetAndResetSDKInfo() []check.SDKInfo +} + +type SDKVersionInterceptor struct { + // Using a sync map to support effienct concurrent updates. + // Key type is sdkNameVersion and value type is sdkCount. + // Note that we never delete keys from this map as the cardinality of + // client versions should be fairly low. + sdkInfoCounter sync.Map +} + +type sdkNameVersion struct { + name string + version string +} + +type sdkCount struct { + count int64 +} + +var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept + +func NewSDKVersionInterceptor() *SDKVersionInterceptor { + return &SDKVersionInterceptor{} +} + +func (vi *SDKVersionInterceptor) Intercept( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (interface{}, error) { + sdkName, sdkVersion := headers.GetClientNameAndVersion(ctx) + if sdkName != "" && sdkVersion != "" { + vi.RecordSDKInfo(sdkName, sdkVersion) + } + return handler(ctx, req) +} + +func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { + info := sdkNameVersion{name, version} + valIface, ok := vi.sdkInfoCounter.Load(info) + if !ok { + // Store if wasn't added racy + valIface, _ = vi.sdkInfoCounter.LoadOrStore(info, &sdkCount{}) + } + atomic.AddInt64(&valIface.(*sdkCount).count, 1) +} + +func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo { + sdkInfo := make([]check.SDKInfo, 0) + vi.sdkInfoCounter.Range(func(key, value interface{}) bool { + timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0) + nameVersion := key.(sdkNameVersion) + sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen}) + return true + }) + return sdkInfo +} diff --git a/common/rpc/interceptor/sdk_version_test.go b/common/rpc/interceptor/sdk_version_test.go new file mode 100644 index 000000000000..18a9639b94cf --- /dev/null +++ b/common/rpc/interceptor/sdk_version_test.go @@ -0,0 +1,55 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package interceptor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "go.temporal.io/server/common/headers" +) + +func TestSDKVersionRecorder(t *testing.T) { + interceptor := NewSDKVersionInterceptor() + + sdkVersion := "1.10.1" + ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + ctx = headers.SetVersionsForTests(context.Background(), "", headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + info := interceptor.GetAndResetSDKInfo() + assert.Equal(t, 1, len(info)) + assert.Equal(t, headers.ClientNameGoSDK, info[0].Name) + assert.Equal(t, sdkVersion, info[0].Version) + assert.Equal(t, int64(1), info[0].TimesSeen) +} diff --git a/service/frontend/dcRedirectionHandler_test.go b/service/frontend/dcRedirectionHandler_test.go index a19e0558c3ac..1d07738c412c 100644 --- a/service/frontend/dcRedirectionHandler_test.go +++ b/service/frontend/dcRedirectionHandler_test.go @@ -125,7 +125,6 @@ func (s *dcRedirectionHandlerSuite) SetupTest() { s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), - s, ) s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceServer(s.controller) @@ -148,10 +147,6 @@ func (s *dcRedirectionHandlerSuite) TearDownTest() { s.controller.Finish() } -// RecordSDKInfo is a noop in this test suite -func (s *dcRedirectionHandlerSuite) RecordSDKInfo(name, version string) { -} - func (s *dcRedirectionHandlerSuite) TestDescribeTaskQueue() { apiName := "DescribeTaskQueue" diff --git a/service/frontend/fx.go b/service/frontend/fx.go index ebf54fd505c5..e2577b7e7969 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -77,6 +77,7 @@ var Module = fx.Options( fx.Provide(NamespaceCountLimitInterceptorProvider), fx.Provide(NamespaceValidatorInterceptorProvider), fx.Provide(NamespaceRateLimitInterceptorProvider), + fx.Provide(SDKVersionInterceptorProvider), fx.Provide(GrpcServerOptionsProvider), fx.Provide(VisibilityManagerProvider), fx.Provide(ThrottledLoggerRpsFnProvider), @@ -131,6 +132,7 @@ func GrpcServerOptionsProvider( namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor, telemetryInterceptor *interceptor.TelemetryInterceptor, rateLimitInterceptor *interceptor.RateLimitInterceptor, + sdkVersionInterceptor *interceptor.SDKVersionInterceptor, authorizer authorization.Authorizer, claimMapper authorization.ClaimMapper, audienceGetter authorization.JWTAudienceMapper, @@ -168,6 +170,7 @@ func GrpcServerOptionsProvider( logger, audienceGetter, ), + sdkVersionInterceptor.Intercept, } if len(customInterceptors) > 0 { interceptors = append(interceptors, customInterceptors...) @@ -283,6 +286,10 @@ func NamespaceValidatorInterceptorProvider( ) } +func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor { + return interceptor.NewSDKVersionInterceptor() +} + func PersistenceMaxQpsProvider( serviceConfig *Config, ) persistenceClient.PersistenceMaxQps { @@ -433,7 +440,6 @@ func HandlerProvider( saProvider, clusterMetadata, archivalMetadata, - versionChecker, ) handler := NewDCRedirectionHandler(wfHandler, params.DCRedirectionPolicy, logger, clientBean, metricsClient, timeSource, namespaceRegistry, clusterMetadata) return handler diff --git a/service/frontend/versionChecker.go b/service/frontend/versionChecker.go index fb68912b065b..7a5fc27ee1f8 100644 --- a/service/frontend/versionChecker.go +++ b/service/frontend/versionChecker.go @@ -27,7 +27,6 @@ package frontend import ( "runtime" "sync" - "sync/atomic" "time" enumsbp "go.temporal.io/api/enums/v1" @@ -39,47 +38,33 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/rpc/interceptor" ) const VersionCheckInterval = 24 * time.Hour -type sdkNameVersion struct { - name string - version string -} - -type sdkCount struct { - count int64 -} - -type SDKInfoRecorder interface { - RecordSDKInfo(name, version string) -} - type VersionChecker struct { - config *Config - shutdownChan chan struct{} - // Using a sync map to support effienct concurrent updates. - // Key type is sdkNameVersion and value type is sdkCount. - // Note that we never delete keys from this map as the cardinality of - // client versions should be fairly low. - sdkInfoCounter sync.Map + config *Config + shutdownChan chan struct{} metricsScope metrics.Scope clusterMetadataManager persistence.ClusterMetadataManager startOnce sync.Once stopOnce sync.Once + sdkVersionRecorder interceptor.SDKInfoRecorder } func NewVersionChecker( config *Config, metricsClient metrics.Client, clusterMetadataManager persistence.ClusterMetadataManager, + sdkVersionRecorder interceptor.SDKInfoRecorder, ) *VersionChecker { return &VersionChecker{ config: config, shutdownChan: make(chan struct{}), metricsScope: metricsClient.Scope(metrics.VersionCheckScope), clusterMetadataManager: clusterMetadataManager, + sdkVersionRecorder: sdkVersionRecorder, } } @@ -99,16 +84,6 @@ func (vc *VersionChecker) Stop() { } } -func (vc *VersionChecker) RecordSDKInfo(name, version string) { - info := sdkNameVersion{name, version} - valIface, ok := vc.sdkInfoCounter.Load(info) - if !ok { - // Store if wasn't added racy - valIface, _ = vc.sdkInfoCounter.LoadOrStore(info, &sdkCount{}) - } - atomic.AddInt64(&valIface.(*sdkCount).count, 1) -} - func (vc *VersionChecker) versionCheckLoop() { timer := time.NewTicker(VersionCheckInterval) defer timer.Stop() @@ -162,13 +137,6 @@ func isUpdateNeeded(metadata *persistence.GetClusterMetadataResponse) bool { } func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClusterMetadataResponse) (*check.VersionCheckRequest, error) { - sdkInfo := make([]check.SDKInfo, 0) - vc.sdkInfoCounter.Range(func(key, value interface{}) bool { - timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0) - nameVersion := key.(sdkNameVersion) - sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen}) - return true - }) return &check.VersionCheckRequest{ Product: headers.ClientNameServer, @@ -178,7 +146,7 @@ func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClu DB: vc.clusterMetadataManager.GetName(), ClusterID: metadata.ClusterId, Timestamp: time.Now().UnixNano(), - SDKInfo: sdkInfo, + SDKInfo: vc.sdkVersionRecorder.GetAndResetSDKInfo(), }, nil } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index cbe29d57ede8..e02042548822 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -108,7 +108,6 @@ type ( saMapper searchattribute.Mapper saProvider searchattribute.Provider archivalMetadata archiver.ArchivalMetadata - sdkVersionRecorder SDKInfoRecorder } // HealthStatus is an enum that refers to the rpc handler health status @@ -138,7 +137,6 @@ func NewWorkflowHandler( saProvider searchattribute.Provider, clusterMetadata cluster.Metadata, archivalMetadata archiver.ArchivalMetadata, - sdkInfoRecorder SDKInfoRecorder, ) *WorkflowHandler { handler := &WorkflowHandler{ @@ -170,7 +168,6 @@ func NewWorkflowHandler( saProvider: saProvider, saMapper: saMapper, archivalMetadata: archivalMetadata, - sdkVersionRecorder: sdkInfoRecorder, } return handler @@ -239,13 +236,6 @@ func (wh *WorkflowHandler) Watch(*healthpb.HealthCheckRequest, healthpb.Health_W return serviceerror.NewUnimplemented("Watch is not implemented.") } -func (wh *WorkflowHandler) RecordClientVersionAndCheckIfSupported(ctx context.Context) error { - sdkName, sdkVersion := wh.versionChecker.GetClientNameAndVersion(ctx) - wh.sdkVersionRecorder.RecordSDKInfo(sdkName, sdkVersion) - - return wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()) -} - // RegisterNamespace creates a new namespace which can be used as a container for all resources. Namespace is a top level // entity within Temporal, used as a container for all resources like workflow executions, task queues, etc. Namespace // acts as a sandbox and provides isolation for all resources within the namespace. All resources belong to exactly one @@ -257,7 +247,7 @@ func (wh *WorkflowHandler) RegisterNamespace(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -285,7 +275,7 @@ func (wh *WorkflowHandler) DescribeNamespace(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -308,7 +298,7 @@ func (wh *WorkflowHandler) ListNamespaces(ctx context.Context, request *workflow return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -331,7 +321,7 @@ func (wh *WorkflowHandler) UpdateNamespace(ctx context.Context, request *workflo return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -357,7 +347,7 @@ func (wh *WorkflowHandler) DeprecateNamespace(ctx context.Context, request *work return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -383,7 +373,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -456,7 +446,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -745,7 +735,7 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w callTime := time.Now().UTC() - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -838,7 +828,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -905,7 +895,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -978,7 +968,7 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w callTime := time.Now().UTC() - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1073,7 +1063,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1145,7 +1135,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1249,7 +1239,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1322,7 +1312,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1429,7 +1419,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1496,7 +1486,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1587,7 +1577,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1661,7 +1651,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1762,7 +1752,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution(ctx context.Context, r return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1799,7 +1789,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1867,7 +1857,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1954,7 +1944,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2008,7 +1998,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context, reque return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2044,7 +2034,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context, reque return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2137,7 +2127,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, req return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2245,7 +2235,7 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2293,7 +2283,7 @@ func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(ctx context.Context, r return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2379,7 +2369,7 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2428,7 +2418,7 @@ func (wh *WorkflowHandler) CountWorkflowExecutions(ctx context.Context, request return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2466,7 +2456,7 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2494,7 +2484,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2565,7 +2555,7 @@ func (wh *WorkflowHandler) ResetStickyTaskQueue(ctx context.Context, request *wo return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2604,7 +2594,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows return nil, errQueryDisallowedForNamespace } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2666,7 +2656,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2721,7 +2711,7 @@ func (wh *WorkflowHandler) DescribeTaskQueue(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2788,7 +2778,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index f3f5af39e095..12ab41a7411f 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -106,8 +106,6 @@ type ( testNamespace namespace.Name testNamespaceID namespace.ID - - versionChecker *VersionChecker } ) @@ -152,17 +150,12 @@ func (s *workflowHandlerSuite) SetupTest() { mockMonitor := s.mockResource.MembershipMonitor mockMonitor.EXPECT().GetMemberCount(common.FrontendServiceName).Return(5, nil).AnyTimes() - s.versionChecker = NewVersionChecker(s.newConfig(), s.mockResource.GetMetricsClient(), s.mockResource.GetClusterMetadataManager()) } func (s *workflowHandlerSuite) TearDownTest() { s.controller.Finish() } -func (s *workflowHandlerSuite) RecordSDKInfo(name, version string) { - s.versionChecker.RecordSDKInfo(name, version) -} - func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandler { return NewWorkflowHandler( config, @@ -182,7 +175,6 @@ func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), - s, ) } @@ -1767,32 +1759,6 @@ func (s *workflowHandlerSuite) TestGetSystemInfo() { s.True(resp.Capabilities.InternalErrorDifferentiation) } -func (s *workflowHandlerSuite) TestSDKInfoCounter() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - - wh := s.getWorkflowHandler(config) - - s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes() - s.mockResource.ClusterMetadataMgr.EXPECT().GetName().Return("test").AnyTimes() - - sdkVersion := "1.10.1" - ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) - _, err := wh.GetSystemInfo(ctx, &workflowservice.GetSystemInfoRequest{ - Namespace: s.testNamespace.String(), - }) - s.NoError(err) - metadata := new(persistence.GetClusterMetadataResponse) - metadata.ClusterId = "test" - - req, err := s.versionChecker.createVersionCheckRequest(metadata) - s.NoError(err) - s.Equal(1, len(req.SDKInfo)) - s.Equal(headers.ClientNameGoSDK, req.SDKInfo[0].Name) - s.Equal(sdkVersion, req.SDKInfo[0].Version) - s.Equal(int64(1), req.SDKInfo[0].TimesSeen) -} - func (s *workflowHandlerSuite) newConfig() *Config { return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, "", false) }