From daf51141401bd3b80c7e3230d22d209d78f08442 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 11 Jan 2022 10:45:02 -0800 Subject: [PATCH 1/6] Check SDK versions via version check call --- common/headers/versionChecker.go | 8 ++ go.mod | 2 +- go.sum | 4 +- service/frontend/dcRedirectionHandler_test.go | 5 ++ service/frontend/fx.go | 2 + service/frontend/versionChecker.go | 43 +++++++++- service/frontend/workflowHandler.go | 84 +++++++++++-------- service/frontend/workflowHandler_test.go | 34 ++++++++ 8 files changed, 140 insertions(+), 42 deletions(-) diff --git a/common/headers/versionChecker.go b/common/headers/versionChecker.go index 3f14a3df915..e5b3ea129bf 100644 --- a/common/headers/versionChecker.go +++ b/common/headers/versionChecker.go @@ -73,6 +73,7 @@ var ( type ( // VersionChecker is used to check client/server compatibility and client's capabilities VersionChecker interface { + GetClientNameAndVersion(ctx context.Context) (string, string) ClientSupported(ctx context.Context, enableClientVersionCheck bool) error ClientSupportsFeature(ctx context.Context, feature string) bool } @@ -98,6 +99,13 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string) } } +func (vc *versionChecker) GetClientNameAndVersion(ctx context.Context) (string, string) { + headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName) + clientName := headers[0] + clientVersion := headers[1] + return clientName, clientVersion +} + // ClientSupported returns an error if client is unsupported, nil otherwise. func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersionCheck bool) error { if !enableClientVersionCheck { diff --git a/go.mod b/go.mod index b4d9db397cc..ee92d50e9be 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.25.0 go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a go.temporal.io/sdk v1.12.0 - go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193 + go.temporal.io/version v0.2.1 go.uber.org/atomic v1.9.0 go.uber.org/fx v1.14.2 go.uber.org/multierr v1.7.0 diff --git a/go.sum b/go.sum index 6adeb738722..c0538eea943 100644 --- a/go.sum +++ b/go.sum @@ -465,8 +465,8 @@ go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a h1:FYS1fRCzAbcek09VA3wJb go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a/go.mod h1:EMC/8OQVvVUeTSfVQ/OpAaIHmVadlzaKwHVjUmgz3tk= go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= -go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193 h1:jhIqHkAE74DnEXipymFTzmTxyboMYmv6iVkkCFC1pas= -go.temporal.io/version v0.0.0-20201015012359-4d3bb966d193/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= +go.temporal.io/version v0.2.1 h1:wP7ha/DvyAtwIZY0xL5TA6BcsR1svK/WsqzFmmwAPhE= +go.temporal.io/version v0.2.1/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/frontend/dcRedirectionHandler_test.go b/service/frontend/dcRedirectionHandler_test.go index 1d07738c412..a19e0558c3a 100644 --- a/service/frontend/dcRedirectionHandler_test.go +++ b/service/frontend/dcRedirectionHandler_test.go @@ -125,6 +125,7 @@ func (s *dcRedirectionHandlerSuite) SetupTest() { s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), + s, ) s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceServer(s.controller) @@ -147,6 +148,10 @@ func (s *dcRedirectionHandlerSuite) TearDownTest() { s.controller.Finish() } +// RecordSDKInfo is a noop in this test suite +func (s *dcRedirectionHandlerSuite) RecordSDKInfo(name, version string) { +} + func (s *dcRedirectionHandlerSuite) TestDescribeTaskQueue() { apiName := "DescribeTaskQueue" diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 5706afba542..ebf54fd505c 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -394,6 +394,7 @@ func AdminHandlerProvider( func HandlerProvider( params *resource.BootstrapParams, serviceConfig *Config, + versionChecker *VersionChecker, namespaceReplicationQueue FEReplicatorNamespaceReplicationQueue, visibilityMgr manager.VisibilityManager, logger resource.SnTaggedLogger, @@ -432,6 +433,7 @@ func HandlerProvider( saProvider, clusterMetadata, archivalMetadata, + versionChecker, ) handler := NewDCRedirectionHandler(wfHandler, params.DCRedirectionPolicy, logger, clientBean, metricsClient, timeSource, namespaceRegistry, clusterMetadata) return handler diff --git a/service/frontend/versionChecker.go b/service/frontend/versionChecker.go index 683e76f991f..fb68912b065 100644 --- a/service/frontend/versionChecker.go +++ b/service/frontend/versionChecker.go @@ -27,6 +27,7 @@ package frontend import ( "runtime" "sync" + "sync/atomic" "time" enumsbp "go.temporal.io/api/enums/v1" @@ -42,9 +43,27 @@ import ( const VersionCheckInterval = 24 * time.Hour +type sdkNameVersion struct { + name string + version string +} + +type sdkCount struct { + count int64 +} + +type SDKInfoRecorder interface { + RecordSDKInfo(name, version string) +} + type VersionChecker struct { - config *Config - shutdownChan chan struct{} + config *Config + shutdownChan chan struct{} + // Using a sync map to support effienct concurrent updates. + // Key type is sdkNameVersion and value type is sdkCount. + // Note that we never delete keys from this map as the cardinality of + // client versions should be fairly low. + sdkInfoCounter sync.Map metricsScope metrics.Scope clusterMetadataManager persistence.ClusterMetadataManager startOnce sync.Once @@ -80,9 +99,20 @@ func (vc *VersionChecker) Stop() { } } +func (vc *VersionChecker) RecordSDKInfo(name, version string) { + info := sdkNameVersion{name, version} + valIface, ok := vc.sdkInfoCounter.Load(info) + if !ok { + // Store if wasn't added racy + valIface, _ = vc.sdkInfoCounter.LoadOrStore(info, &sdkCount{}) + } + atomic.AddInt64(&valIface.(*sdkCount).count, 1) +} + func (vc *VersionChecker) versionCheckLoop() { timer := time.NewTicker(VersionCheckInterval) defer timer.Stop() + vc.performVersionCheck() for { select { @@ -132,6 +162,14 @@ func isUpdateNeeded(metadata *persistence.GetClusterMetadataResponse) bool { } func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClusterMetadataResponse) (*check.VersionCheckRequest, error) { + sdkInfo := make([]check.SDKInfo, 0) + vc.sdkInfoCounter.Range(func(key, value interface{}) bool { + timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0) + nameVersion := key.(sdkNameVersion) + sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen}) + return true + }) + return &check.VersionCheckRequest{ Product: headers.ClientNameServer, Version: headers.ServerVersion, @@ -140,6 +178,7 @@ func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClu DB: vc.clusterMetadataManager.GetName(), ClusterID: metadata.ClusterId, Timestamp: time.Now().UnixNano(), + SDKInfo: sdkInfo, }, nil } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index e0204254882..cbe29d57ede 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -108,6 +108,7 @@ type ( saMapper searchattribute.Mapper saProvider searchattribute.Provider archivalMetadata archiver.ArchivalMetadata + sdkVersionRecorder SDKInfoRecorder } // HealthStatus is an enum that refers to the rpc handler health status @@ -137,6 +138,7 @@ func NewWorkflowHandler( saProvider searchattribute.Provider, clusterMetadata cluster.Metadata, archivalMetadata archiver.ArchivalMetadata, + sdkInfoRecorder SDKInfoRecorder, ) *WorkflowHandler { handler := &WorkflowHandler{ @@ -168,6 +170,7 @@ func NewWorkflowHandler( saProvider: saProvider, saMapper: saMapper, archivalMetadata: archivalMetadata, + sdkVersionRecorder: sdkInfoRecorder, } return handler @@ -236,6 +239,13 @@ func (wh *WorkflowHandler) Watch(*healthpb.HealthCheckRequest, healthpb.Health_W return serviceerror.NewUnimplemented("Watch is not implemented.") } +func (wh *WorkflowHandler) RecordClientVersionAndCheckIfSupported(ctx context.Context) error { + sdkName, sdkVersion := wh.versionChecker.GetClientNameAndVersion(ctx) + wh.sdkVersionRecorder.RecordSDKInfo(sdkName, sdkVersion) + + return wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()) +} + // RegisterNamespace creates a new namespace which can be used as a container for all resources. Namespace is a top level // entity within Temporal, used as a container for all resources like workflow executions, task queues, etc. Namespace // acts as a sandbox and provides isolation for all resources within the namespace. All resources belong to exactly one @@ -247,7 +257,7 @@ func (wh *WorkflowHandler) RegisterNamespace(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -275,7 +285,7 @@ func (wh *WorkflowHandler) DescribeNamespace(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -298,7 +308,7 @@ func (wh *WorkflowHandler) ListNamespaces(ctx context.Context, request *workflow return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -321,7 +331,7 @@ func (wh *WorkflowHandler) UpdateNamespace(ctx context.Context, request *workflo return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -347,7 +357,7 @@ func (wh *WorkflowHandler) DeprecateNamespace(ctx context.Context, request *work return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -373,7 +383,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -446,7 +456,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -735,7 +745,7 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w callTime := time.Now().UTC() - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -828,7 +838,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted( return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -895,7 +905,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed( return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -968,7 +978,7 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w callTime := time.Now().UTC() - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1063,7 +1073,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1135,7 +1145,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context, return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1239,7 +1249,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1312,7 +1322,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1419,7 +1429,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1486,7 +1496,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1577,7 +1587,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1651,7 +1661,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1752,7 +1762,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution(ctx context.Context, r return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1789,7 +1799,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1857,7 +1867,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1944,7 +1954,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -1998,7 +2008,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context, reque return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2034,7 +2044,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context, reque return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2127,7 +2137,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, req return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2235,7 +2245,7 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2283,7 +2293,7 @@ func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(ctx context.Context, r return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2369,7 +2379,7 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2418,7 +2428,7 @@ func (wh *WorkflowHandler) CountWorkflowExecutions(ctx context.Context, request return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2456,7 +2466,7 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2484,7 +2494,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2555,7 +2565,7 @@ func (wh *WorkflowHandler) ResetStickyTaskQueue(ctx context.Context, request *wo return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2594,7 +2604,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows return nil, errQueryDisallowedForNamespace } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2656,7 +2666,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2711,7 +2721,7 @@ func (wh *WorkflowHandler) DescribeTaskQueue(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } @@ -2778,7 +2788,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows return nil, errShuttingDown } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { return nil, err } diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 12ab41a7411..f3f5af39e09 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -106,6 +106,8 @@ type ( testNamespace namespace.Name testNamespaceID namespace.ID + + versionChecker *VersionChecker } ) @@ -150,12 +152,17 @@ func (s *workflowHandlerSuite) SetupTest() { mockMonitor := s.mockResource.MembershipMonitor mockMonitor.EXPECT().GetMemberCount(common.FrontendServiceName).Return(5, nil).AnyTimes() + s.versionChecker = NewVersionChecker(s.newConfig(), s.mockResource.GetMetricsClient(), s.mockResource.GetClusterMetadataManager()) } func (s *workflowHandlerSuite) TearDownTest() { s.controller.Finish() } +func (s *workflowHandlerSuite) RecordSDKInfo(name, version string) { + s.versionChecker.RecordSDKInfo(name, version) +} + func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandler { return NewWorkflowHandler( config, @@ -175,6 +182,7 @@ func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), + s, ) } @@ -1759,6 +1767,32 @@ func (s *workflowHandlerSuite) TestGetSystemInfo() { s.True(resp.Capabilities.InternalErrorDifferentiation) } +func (s *workflowHandlerSuite) TestSDKInfoCounter() { + config := s.newConfig() + config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) + + wh := s.getWorkflowHandler(config) + + s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes() + s.mockResource.ClusterMetadataMgr.EXPECT().GetName().Return("test").AnyTimes() + + sdkVersion := "1.10.1" + ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) + _, err := wh.GetSystemInfo(ctx, &workflowservice.GetSystemInfoRequest{ + Namespace: s.testNamespace.String(), + }) + s.NoError(err) + metadata := new(persistence.GetClusterMetadataResponse) + metadata.ClusterId = "test" + + req, err := s.versionChecker.createVersionCheckRequest(metadata) + s.NoError(err) + s.Equal(1, len(req.SDKInfo)) + s.Equal(headers.ClientNameGoSDK, req.SDKInfo[0].Name) + s.Equal(sdkVersion, req.SDKInfo[0].Version) + s.Equal(int64(1), req.SDKInfo[0].TimesSeen) +} + func (s *workflowHandlerSuite) newConfig() *Config { return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, "", false) } From 947dd379c5e7eeaf461ae7c943c61a799feab05b Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Wed, 12 Jan 2022 00:26:52 -0800 Subject: [PATCH 2/6] Use interceptor to record sdk versions --- common/headers/versionChecker.go | 3 +- common/rpc/interceptor/sdk_version.go | 97 +++++++++++++++++++ common/rpc/interceptor/sdk_version_test.go | 55 +++++++++++ service/frontend/dcRedirectionHandler_test.go | 5 - service/frontend/fx.go | 8 +- service/frontend/versionChecker.go | 46 ++------- service/frontend/workflowHandler.go | 84 +++++++--------- service/frontend/workflowHandler_test.go | 34 ------- 8 files changed, 204 insertions(+), 128 deletions(-) create mode 100644 common/rpc/interceptor/sdk_version.go create mode 100644 common/rpc/interceptor/sdk_version_test.go diff --git a/common/headers/versionChecker.go b/common/headers/versionChecker.go index e5b3ea129bf..362f6d50e6e 100644 --- a/common/headers/versionChecker.go +++ b/common/headers/versionChecker.go @@ -73,7 +73,6 @@ var ( type ( // VersionChecker is used to check client/server compatibility and client's capabilities VersionChecker interface { - GetClientNameAndVersion(ctx context.Context) (string, string) ClientSupported(ctx context.Context, enableClientVersionCheck bool) error ClientSupportsFeature(ctx context.Context, feature string) bool } @@ -99,7 +98,7 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string) } } -func (vc *versionChecker) GetClientNameAndVersion(ctx context.Context) (string, string) { +func GetClientNameAndVersion(ctx context.Context) (string, string) { headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName) clientName := headers[0] clientVersion := headers[1] diff --git a/common/rpc/interceptor/sdk_version.go b/common/rpc/interceptor/sdk_version.go new file mode 100644 index 00000000000..e87c9ce14e6 --- /dev/null +++ b/common/rpc/interceptor/sdk_version.go @@ -0,0 +1,97 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package interceptor + +import ( + "context" + "sync" + "sync/atomic" + + "go.temporal.io/server/common/headers" + "go.temporal.io/version/check" + "google.golang.org/grpc" +) + +type SDKInfoRecorder interface { + RecordSDKInfo(name, version string) + GetAndResetSDKInfo() []check.SDKInfo +} + +type SDKVersionInterceptor struct { + // Using a sync map to support effienct concurrent updates. + // Key type is sdkNameVersion and value type is sdkCount. + // Note that we never delete keys from this map as the cardinality of + // client versions should be fairly low. + sdkInfoCounter sync.Map +} + +type sdkNameVersion struct { + name string + version string +} + +type sdkCount struct { + count int64 +} + +var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept + +func NewSDKVersionInterceptor() *SDKVersionInterceptor { + return &SDKVersionInterceptor{} +} + +func (vi *SDKVersionInterceptor) Intercept( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (interface{}, error) { + sdkName, sdkVersion := headers.GetClientNameAndVersion(ctx) + if sdkName != "" && sdkVersion != "" { + vi.RecordSDKInfo(sdkName, sdkVersion) + } + return handler(ctx, req) +} + +func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { + info := sdkNameVersion{name, version} + valIface, ok := vi.sdkInfoCounter.Load(info) + if !ok { + // Store if wasn't added racy + valIface, _ = vi.sdkInfoCounter.LoadOrStore(info, &sdkCount{}) + } + atomic.AddInt64(&valIface.(*sdkCount).count, 1) +} + +func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo { + sdkInfo := make([]check.SDKInfo, 0) + vi.sdkInfoCounter.Range(func(key, value interface{}) bool { + timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0) + nameVersion := key.(sdkNameVersion) + sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen}) + return true + }) + return sdkInfo +} diff --git a/common/rpc/interceptor/sdk_version_test.go b/common/rpc/interceptor/sdk_version_test.go new file mode 100644 index 00000000000..18a9639b94c --- /dev/null +++ b/common/rpc/interceptor/sdk_version_test.go @@ -0,0 +1,55 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package interceptor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "go.temporal.io/server/common/headers" +) + +func TestSDKVersionRecorder(t *testing.T) { + interceptor := NewSDKVersionInterceptor() + + sdkVersion := "1.10.1" + ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + ctx = headers.SetVersionsForTests(context.Background(), "", headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + info := interceptor.GetAndResetSDKInfo() + assert.Equal(t, 1, len(info)) + assert.Equal(t, headers.ClientNameGoSDK, info[0].Name) + assert.Equal(t, sdkVersion, info[0].Version) + assert.Equal(t, int64(1), info[0].TimesSeen) +} diff --git a/service/frontend/dcRedirectionHandler_test.go b/service/frontend/dcRedirectionHandler_test.go index a19e0558c3a..1d07738c412 100644 --- a/service/frontend/dcRedirectionHandler_test.go +++ b/service/frontend/dcRedirectionHandler_test.go @@ -125,7 +125,6 @@ func (s *dcRedirectionHandlerSuite) SetupTest() { s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), - s, ) s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceServer(s.controller) @@ -148,10 +147,6 @@ func (s *dcRedirectionHandlerSuite) TearDownTest() { s.controller.Finish() } -// RecordSDKInfo is a noop in this test suite -func (s *dcRedirectionHandlerSuite) RecordSDKInfo(name, version string) { -} - func (s *dcRedirectionHandlerSuite) TestDescribeTaskQueue() { apiName := "DescribeTaskQueue" diff --git a/service/frontend/fx.go b/service/frontend/fx.go index ebf54fd505c..e2577b7e796 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -77,6 +77,7 @@ var Module = fx.Options( fx.Provide(NamespaceCountLimitInterceptorProvider), fx.Provide(NamespaceValidatorInterceptorProvider), fx.Provide(NamespaceRateLimitInterceptorProvider), + fx.Provide(SDKVersionInterceptorProvider), fx.Provide(GrpcServerOptionsProvider), fx.Provide(VisibilityManagerProvider), fx.Provide(ThrottledLoggerRpsFnProvider), @@ -131,6 +132,7 @@ func GrpcServerOptionsProvider( namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor, telemetryInterceptor *interceptor.TelemetryInterceptor, rateLimitInterceptor *interceptor.RateLimitInterceptor, + sdkVersionInterceptor *interceptor.SDKVersionInterceptor, authorizer authorization.Authorizer, claimMapper authorization.ClaimMapper, audienceGetter authorization.JWTAudienceMapper, @@ -168,6 +170,7 @@ func GrpcServerOptionsProvider( logger, audienceGetter, ), + sdkVersionInterceptor.Intercept, } if len(customInterceptors) > 0 { interceptors = append(interceptors, customInterceptors...) @@ -283,6 +286,10 @@ func NamespaceValidatorInterceptorProvider( ) } +func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor { + return interceptor.NewSDKVersionInterceptor() +} + func PersistenceMaxQpsProvider( serviceConfig *Config, ) persistenceClient.PersistenceMaxQps { @@ -433,7 +440,6 @@ func HandlerProvider( saProvider, clusterMetadata, archivalMetadata, - versionChecker, ) handler := NewDCRedirectionHandler(wfHandler, params.DCRedirectionPolicy, logger, clientBean, metricsClient, timeSource, namespaceRegistry, clusterMetadata) return handler diff --git a/service/frontend/versionChecker.go b/service/frontend/versionChecker.go index fb68912b065..b7c15bb4005 100644 --- a/service/frontend/versionChecker.go +++ b/service/frontend/versionChecker.go @@ -27,7 +27,6 @@ package frontend import ( "runtime" "sync" - "sync/atomic" "time" enumsbp "go.temporal.io/api/enums/v1" @@ -39,47 +38,33 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/rpc/interceptor" ) const VersionCheckInterval = 24 * time.Hour -type sdkNameVersion struct { - name string - version string -} - -type sdkCount struct { - count int64 -} - -type SDKInfoRecorder interface { - RecordSDKInfo(name, version string) -} - type VersionChecker struct { - config *Config - shutdownChan chan struct{} - // Using a sync map to support effienct concurrent updates. - // Key type is sdkNameVersion and value type is sdkCount. - // Note that we never delete keys from this map as the cardinality of - // client versions should be fairly low. - sdkInfoCounter sync.Map + config *Config + shutdownChan chan struct{} metricsScope metrics.Scope clusterMetadataManager persistence.ClusterMetadataManager startOnce sync.Once stopOnce sync.Once + sdkVersionRecorder interceptor.SDKInfoRecorder } func NewVersionChecker( config *Config, metricsClient metrics.Client, clusterMetadataManager persistence.ClusterMetadataManager, + sdkVersionRecorder *interceptor.SDKVersionInterceptor, ) *VersionChecker { return &VersionChecker{ config: config, shutdownChan: make(chan struct{}), metricsScope: metricsClient.Scope(metrics.VersionCheckScope), clusterMetadataManager: clusterMetadataManager, + sdkVersionRecorder: sdkVersionRecorder, } } @@ -99,16 +84,6 @@ func (vc *VersionChecker) Stop() { } } -func (vc *VersionChecker) RecordSDKInfo(name, version string) { - info := sdkNameVersion{name, version} - valIface, ok := vc.sdkInfoCounter.Load(info) - if !ok { - // Store if wasn't added racy - valIface, _ = vc.sdkInfoCounter.LoadOrStore(info, &sdkCount{}) - } - atomic.AddInt64(&valIface.(*sdkCount).count, 1) -} - func (vc *VersionChecker) versionCheckLoop() { timer := time.NewTicker(VersionCheckInterval) defer timer.Stop() @@ -162,13 +137,6 @@ func isUpdateNeeded(metadata *persistence.GetClusterMetadataResponse) bool { } func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClusterMetadataResponse) (*check.VersionCheckRequest, error) { - sdkInfo := make([]check.SDKInfo, 0) - vc.sdkInfoCounter.Range(func(key, value interface{}) bool { - timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0) - nameVersion := key.(sdkNameVersion) - sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen}) - return true - }) return &check.VersionCheckRequest{ Product: headers.ClientNameServer, @@ -178,7 +146,7 @@ func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClu DB: vc.clusterMetadataManager.GetName(), ClusterID: metadata.ClusterId, Timestamp: time.Now().UnixNano(), - SDKInfo: sdkInfo, + SDKInfo: vc.sdkVersionRecorder.GetAndResetSDKInfo(), }, nil } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index cbe29d57ede..e0204254882 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -108,7 +108,6 @@ type ( saMapper searchattribute.Mapper saProvider searchattribute.Provider archivalMetadata archiver.ArchivalMetadata - sdkVersionRecorder SDKInfoRecorder } // HealthStatus is an enum that refers to the rpc handler health status @@ -138,7 +137,6 @@ func NewWorkflowHandler( saProvider searchattribute.Provider, clusterMetadata cluster.Metadata, archivalMetadata archiver.ArchivalMetadata, - sdkInfoRecorder SDKInfoRecorder, ) *WorkflowHandler { handler := &WorkflowHandler{ @@ -170,7 +168,6 @@ func NewWorkflowHandler( saProvider: saProvider, saMapper: saMapper, archivalMetadata: archivalMetadata, - sdkVersionRecorder: sdkInfoRecorder, } return handler @@ -239,13 +236,6 @@ func (wh *WorkflowHandler) Watch(*healthpb.HealthCheckRequest, healthpb.Health_W return serviceerror.NewUnimplemented("Watch is not implemented.") } -func (wh *WorkflowHandler) RecordClientVersionAndCheckIfSupported(ctx context.Context) error { - sdkName, sdkVersion := wh.versionChecker.GetClientNameAndVersion(ctx) - wh.sdkVersionRecorder.RecordSDKInfo(sdkName, sdkVersion) - - return wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()) -} - // RegisterNamespace creates a new namespace which can be used as a container for all resources. Namespace is a top level // entity within Temporal, used as a container for all resources like workflow executions, task queues, etc. Namespace // acts as a sandbox and provides isolation for all resources within the namespace. All resources belong to exactly one @@ -257,7 +247,7 @@ func (wh *WorkflowHandler) RegisterNamespace(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -285,7 +275,7 @@ func (wh *WorkflowHandler) DescribeNamespace(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -308,7 +298,7 @@ func (wh *WorkflowHandler) ListNamespaces(ctx context.Context, request *workflow return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -331,7 +321,7 @@ func (wh *WorkflowHandler) UpdateNamespace(ctx context.Context, request *workflo return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -357,7 +347,7 @@ func (wh *WorkflowHandler) DeprecateNamespace(ctx context.Context, request *work return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -383,7 +373,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -456,7 +446,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -745,7 +735,7 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w callTime := time.Now().UTC() - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -838,7 +828,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -905,7 +895,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -978,7 +968,7 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w callTime := time.Now().UTC() - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1073,7 +1063,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1145,7 +1135,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1249,7 +1239,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1322,7 +1312,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1429,7 +1419,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1496,7 +1486,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1587,7 +1577,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1661,7 +1651,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1762,7 +1752,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution(ctx context.Context, r return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1799,7 +1789,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1867,7 +1857,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -1954,7 +1944,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2008,7 +1998,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context, reque return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2044,7 +2034,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context, reque return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2137,7 +2127,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, req return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2245,7 +2235,7 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2293,7 +2283,7 @@ func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(ctx context.Context, r return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2379,7 +2369,7 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, request * return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2428,7 +2418,7 @@ func (wh *WorkflowHandler) CountWorkflowExecutions(ctx context.Context, request return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2466,7 +2456,7 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2494,7 +2484,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2565,7 +2555,7 @@ func (wh *WorkflowHandler) ResetStickyTaskQueue(ctx context.Context, request *wo return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2604,7 +2594,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, request *workflows return nil, errQueryDisallowedForNamespace } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2666,7 +2656,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2721,7 +2711,7 @@ func (wh *WorkflowHandler) DescribeTaskQueue(ctx context.Context, request *workf return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } @@ -2788,7 +2778,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows return nil, errShuttingDown } - if err := wh.RecordClientVersionAndCheckIfSupported(ctx); err != nil { + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index f3f5af39e09..12ab41a7411 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -106,8 +106,6 @@ type ( testNamespace namespace.Name testNamespaceID namespace.ID - - versionChecker *VersionChecker } ) @@ -152,17 +150,12 @@ func (s *workflowHandlerSuite) SetupTest() { mockMonitor := s.mockResource.MembershipMonitor mockMonitor.EXPECT().GetMemberCount(common.FrontendServiceName).Return(5, nil).AnyTimes() - s.versionChecker = NewVersionChecker(s.newConfig(), s.mockResource.GetMetricsClient(), s.mockResource.GetClusterMetadataManager()) } func (s *workflowHandlerSuite) TearDownTest() { s.controller.Finish() } -func (s *workflowHandlerSuite) RecordSDKInfo(name, version string) { - s.versionChecker.RecordSDKInfo(name, version) -} - func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandler { return NewWorkflowHandler( config, @@ -182,7 +175,6 @@ func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), - s, ) } @@ -1767,32 +1759,6 @@ func (s *workflowHandlerSuite) TestGetSystemInfo() { s.True(resp.Capabilities.InternalErrorDifferentiation) } -func (s *workflowHandlerSuite) TestSDKInfoCounter() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - - wh := s.getWorkflowHandler(config) - - s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes() - s.mockResource.ClusterMetadataMgr.EXPECT().GetName().Return("test").AnyTimes() - - sdkVersion := "1.10.1" - ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) - _, err := wh.GetSystemInfo(ctx, &workflowservice.GetSystemInfoRequest{ - Namespace: s.testNamespace.String(), - }) - s.NoError(err) - metadata := new(persistence.GetClusterMetadataResponse) - metadata.ClusterId = "test" - - req, err := s.versionChecker.createVersionCheckRequest(metadata) - s.NoError(err) - s.Equal(1, len(req.SDKInfo)) - s.Equal(headers.ClientNameGoSDK, req.SDKInfo[0].Name) - s.Equal(sdkVersion, req.SDKInfo[0].Version) - s.Equal(int64(1), req.SDKInfo[0].TimesSeen) -} - func (s *workflowHandlerSuite) newConfig() *Config { return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, "", false) } From c57858e1a7aa081973fd9fe1a1cf2d4bf5e06375 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Wed, 12 Jan 2022 21:32:33 -0800 Subject: [PATCH 3/6] Don't keep counts of SDK info, cap and monitor SDK info set size --- common/dynamicconfig/constants.go | 2 + common/metrics/defs.go | 11 ++++ common/rpc/interceptor/sdk_version.go | 77 +++++++++++++--------- common/rpc/interceptor/sdk_version_test.go | 41 +++++++++++- go.mod | 2 +- go.sum | 4 +- service/frontend/fx.go | 7 +- service/frontend/service.go | 4 ++ service/frontend/versionChecker.go | 28 +++++--- 9 files changed, 129 insertions(+), 47 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 4a15b8bc6f3..afcfe316c7e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -487,6 +487,8 @@ const ( VisibilityArchivalQueryMaxQPS // EnableServerVersionCheck is a flag that controls whether or not periodic version checking is enabled EnableServerVersionCheck + // MaxSDKVersionsToRecord caps the number of distinct SDK versions to record for version checking purposes + MaxSDKVersionsToRecord // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request EnableTokenNamespaceEnforcement // KeepAliveMinTime is the minimum amount of time a client should wait before sending a keepalive ping. diff --git a/common/metrics/defs.go b/common/metrics/defs.go index e7ec665229b..509a45b3fc3 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -881,6 +881,10 @@ const ( // VersionCheckScope is scope used by version checker VersionCheckScope + + // SDKVersionRecordScope is the scope used by the version interceptor + SDKVersionRecordScope + // AuthorizationScope is the scope used by all metric emitted by authorization code AuthorizationScope @@ -1562,6 +1566,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FrontendGetClusterInfoScope: {operation: "GetClusterInfo"}, FrontendGetSystemInfoScope: {operation: "GetSystemInfo"}, VersionCheckScope: {operation: "VersionCheck"}, + SDKVersionRecordScope: {operation: "SDKVersionRecord"}, AuthorizationScope: {operation: "Authorization"}, }, // History Scope Names @@ -1873,6 +1878,9 @@ const ( VersionCheckFailedCount VersionCheckLatency + SDKVersionRecordSuccessCount + SDKVersionRecordFailedCount + ParentClosePolicyProcessorSuccess ParentClosePolicyProcessorFailures @@ -2305,6 +2313,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ VersionCheckRequestFailedCount: NewCounterDef("version_check_request_failed"), VersionCheckLatency: NewTimerDef("version_check_latency"), + SDKVersionRecordSuccessCount: NewCounterDef("sdk_version_record_success"), + SDKVersionRecordFailedCount: NewCounterDef("sdk_version_record_faile"), + ParentClosePolicyProcessorSuccess: NewCounterDef("parent_close_policy_processor_requests"), ParentClosePolicyProcessorFailures: NewCounterDef("parent_close_policy_processor_errors"), diff --git a/common/rpc/interceptor/sdk_version.go b/common/rpc/interceptor/sdk_version.go index e87c9ce14e6..765d24765ec 100644 --- a/common/rpc/interceptor/sdk_version.go +++ b/common/rpc/interceptor/sdk_version.go @@ -27,39 +27,34 @@ package interceptor import ( "context" "sync" - "sync/atomic" "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/metrics" "go.temporal.io/version/check" "google.golang.org/grpc" ) -type SDKInfoRecorder interface { - RecordSDKInfo(name, version string) - GetAndResetSDKInfo() []check.SDKInfo -} - -type SDKVersionInterceptor struct { - // Using a sync map to support effienct concurrent updates. - // Key type is sdkNameVersion and value type is sdkCount. - // Note that we never delete keys from this map as the cardinality of - // client versions should be fairly low. - sdkInfoCounter sync.Map -} - type sdkNameVersion struct { name string version string } -type sdkCount struct { - count int64 +type SDKVersionInterceptor struct { + sdkInfoSet map[sdkNameVersion]bool + lock sync.RWMutex + infoSetSizeCapGetter func() int + metricsClient metrics.Client } var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept -func NewSDKVersionInterceptor() *SDKVersionInterceptor { - return &SDKVersionInterceptor{} +func NewSDKVersionInterceptor(infoSetSizeCapGetter func() int, metricsClient metrics.Client) *SDKVersionInterceptor { + return &SDKVersionInterceptor{ + sdkInfoSet: make(map[sdkNameVersion]bool), + lock: sync.RWMutex{}, + infoSetSizeCapGetter: infoSetSizeCapGetter, + metricsClient: metricsClient, + } } func (vi *SDKVersionInterceptor) Intercept( @@ -76,22 +71,44 @@ func (vi *SDKVersionInterceptor) Intercept( } func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { + scope := vi.metricsClient.Scope(metrics.SDKVersionRecordScope) info := sdkNameVersion{name, version} - valIface, ok := vi.sdkInfoCounter.Load(info) - if !ok { - // Store if wasn't added racy - valIface, _ = vi.sdkInfoCounter.LoadOrStore(info, &sdkCount{}) + counter := metrics.SDKVersionRecordSuccessCount + setSizeCap := vi.infoSetSizeCapGetter() + shouldUpdate := false + + // Increment after unlocking + defer func() { scope.IncCounter(counter) }() + + // Update after unlocking read lock + defer func() { + if shouldUpdate { + vi.lock.Lock() + vi.sdkInfoSet[info] = true + vi.lock.Unlock() + } + }() + + vi.lock.RLock() + defer vi.lock.RUnlock() + + if len(vi.sdkInfoSet) >= setSizeCap { + counter = metrics.SDKVersionRecordFailedCount + return } - atomic.AddInt64(&valIface.(*sdkCount).count, 1) + _, found := vi.sdkInfoSet[info] + shouldUpdate = !found } func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo { - sdkInfo := make([]check.SDKInfo, 0) - vi.sdkInfoCounter.Range(func(key, value interface{}) bool { - timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0) - nameVersion := key.(sdkNameVersion) - sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen}) - return true - }) + vi.lock.Lock() + defer vi.lock.Unlock() + sdkInfo := make([]check.SDKInfo, len(vi.sdkInfoSet)) + i := 0 + for k := range vi.sdkInfoSet { + sdkInfo[i] = check.SDKInfo{Name: k.name, Version: k.version} + i += 1 + } + vi.sdkInfoSet = make(map[sdkNameVersion]bool) return sdkInfo } diff --git a/common/rpc/interceptor/sdk_version_test.go b/common/rpc/interceptor/sdk_version_test.go index 18a9639b94c..f8936aac0cd 100644 --- a/common/rpc/interceptor/sdk_version_test.go +++ b/common/rpc/interceptor/sdk_version_test.go @@ -26,30 +26,65 @@ package interceptor import ( "context" + "sort" "testing" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/metrics" ) func TestSDKVersionRecorder(t *testing.T) { - interceptor := NewSDKVersionInterceptor() + ctrl := gomock.NewController(t) + metricsClient := metrics.NewMockClient(ctrl) + metricsScope := metrics.NewMockScope(ctrl) + // Note that that empty SDK names and versions are ignored + metricsClient.EXPECT().Scope(metrics.SDKVersionRecordScope).Times(3).Return(metricsScope) + metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordSuccessCount).Times(2) + metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordFailedCount).Times(1) + interceptor := NewSDKVersionInterceptor(func() int { return 2 }, metricsClient) sdkVersion := "1.10.1" + + // Record first tuple ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { return nil, nil }) + // Record second tuple + ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameTypeScriptSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + // Do not record when over capacity + ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameJavaSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + // Empty SDK version should not be recorded ctx = headers.SetVersionsForTests(context.Background(), "", headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { return nil, nil }) + // Empty SDK name should not be recorded + ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, "", headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + info := interceptor.GetAndResetSDKInfo() - assert.Equal(t, 1, len(info)) + sort.SliceStable(info, func(i, j int) bool { + return info[i].Name < info[j].Name + }) + assert.Equal(t, 2, len(info)) assert.Equal(t, headers.ClientNameGoSDK, info[0].Name) assert.Equal(t, sdkVersion, info[0].Version) - assert.Equal(t, int64(1), info[0].TimesSeen) + assert.Equal(t, headers.ClientNameTypeScriptSDK, info[1].Name) + assert.Equal(t, sdkVersion, info[1].Version) } diff --git a/go.mod b/go.mod index ee92d50e9be..f7f7f29a899 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.25.0 go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a go.temporal.io/sdk v1.12.0 - go.temporal.io/version v0.2.1 + go.temporal.io/version v0.3.0 go.uber.org/atomic v1.9.0 go.uber.org/fx v1.14.2 go.uber.org/multierr v1.7.0 diff --git a/go.sum b/go.sum index c0538eea943..f48a88c5aaa 100644 --- a/go.sum +++ b/go.sum @@ -465,8 +465,8 @@ go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a h1:FYS1fRCzAbcek09VA3wJb go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a/go.mod h1:EMC/8OQVvVUeTSfVQ/OpAaIHmVadlzaKwHVjUmgz3tk= go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= -go.temporal.io/version v0.2.1 h1:wP7ha/DvyAtwIZY0xL5TA6BcsR1svK/WsqzFmmwAPhE= -go.temporal.io/version v0.2.1/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= +go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= +go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/frontend/fx.go b/service/frontend/fx.go index e2577b7e796..6aea530f12e 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -286,8 +286,11 @@ func NamespaceValidatorInterceptorProvider( ) } -func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor { - return interceptor.NewSDKVersionInterceptor() +func SDKVersionInterceptorProvider( + serviceConfig *Config, + metricsClient metrics.Client, +) *interceptor.SDKVersionInterceptor { + return interceptor.NewSDKVersionInterceptor(func() int { return serviceConfig.MaxSDKVersionsToRecord() }, metricsClient) } func PersistenceMaxQpsProvider( diff --git a/service/frontend/service.go b/service/frontend/service.go index 1bd1f2d94a9..ae1baefdc01 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -108,6 +108,9 @@ type Config struct { // EnableServerVersionCheck disables periodic version checking performed by the frontend EnableServerVersionCheck dynamicconfig.BoolPropertyFn + // MaxSDKVersionsToRecord controls how many SDK versions should be recorded in memory to be sent in the version check request. + // This setting is used to prevent potential memory leak, the default should be low enough that this value shouldn't be changed. + MaxSDKVersionsToRecord dynamicconfig.IntPropertyFn // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request EnableTokenNamespaceEnforcement dynamicconfig.BoolPropertyFn @@ -170,6 +173,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()), DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""), + MaxSDKVersionsToRecord: dc.GetIntProperty(dynamicconfig.MaxSDKVersionsToRecord, 1000), EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, false), KeepAliveMinTime: dc.GetDurationProperty(dynamicconfig.KeepAliveMinTime, 10*time.Second), KeepAlivePermitWithoutStream: dc.GetBoolProperty(dynamicconfig.KeepAlivePermitWithoutStream, true), diff --git a/service/frontend/versionChecker.go b/service/frontend/versionChecker.go index b7c15bb4005..976c49b5247 100644 --- a/service/frontend/versionChecker.go +++ b/service/frontend/versionChecker.go @@ -50,7 +50,7 @@ type VersionChecker struct { clusterMetadataManager persistence.ClusterMetadataManager startOnce sync.Once stopOnce sync.Once - sdkVersionRecorder interceptor.SDKInfoRecorder + sdkVersionRecorder *interceptor.SDKVersionInterceptor } func NewVersionChecker( @@ -159,7 +159,12 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro if err != nil { return err } - metadata.VersionInfo = toVersionInfo(resp) + // TODO(bergundy): Extract and save version info per SDK + versionInfo, err := toVersionInfo(resp) + if err != nil { + return err + } + metadata.VersionInfo = versionInfo saved, err := vc.clusterMetadataManager.SaveClusterMetadata(&persistence.SaveClusterMetadataRequest{ ClusterMetadata: metadata.ClusterMetadata, Version: metadata.Version}) if err != nil { @@ -171,14 +176,19 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro return nil } -func toVersionInfo(resp *check.VersionCheckResponse) *versionpb.VersionInfo { - return &versionpb.VersionInfo{ - Current: convertReleaseInfo(resp.Current), - Recommended: convertReleaseInfo(resp.Recommended), - Instructions: resp.Instructions, - Alerts: convertAlerts(resp.Alerts), - LastUpdateTime: timestamp.TimePtr(time.Now().UTC()), +func toVersionInfo(resp *check.VersionCheckResponse) (*versionpb.VersionInfo, error) { + for _, product := range resp.Products { + if product.Product == "server" { + return &versionpb.VersionInfo{ + Current: convertReleaseInfo(product.Current), + Recommended: convertReleaseInfo(product.Recommended), + Instructions: product.Instructions, + Alerts: convertAlerts(product.Alerts), + LastUpdateTime: timestamp.TimePtr(time.Now().UTC()), + }, nil + } } + return nil, serviceerror.NewNotFound("version info update was not found in response") } func convertAlerts(alerts []check.Alert) []*versionpb.Alert { From 0a9038e9ef167dd8931415a7e5790630b391b7a1 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Thu, 13 Jan 2022 14:25:25 -0800 Subject: [PATCH 4/6] Address review comments --- common/rpc/interceptor/sdk_version.go | 47 +++++++--------------- common/rpc/interceptor/sdk_version_test.go | 11 +---- service/frontend/fx.go | 3 +- 3 files changed, 17 insertions(+), 44 deletions(-) diff --git a/common/rpc/interceptor/sdk_version.go b/common/rpc/interceptor/sdk_version.go index 765d24765ec..854addb2376 100644 --- a/common/rpc/interceptor/sdk_version.go +++ b/common/rpc/interceptor/sdk_version.go @@ -29,7 +29,6 @@ import ( "sync" "go.temporal.io/server/common/headers" - "go.temporal.io/server/common/metrics" "go.temporal.io/version/check" "google.golang.org/grpc" ) @@ -43,17 +42,15 @@ type SDKVersionInterceptor struct { sdkInfoSet map[sdkNameVersion]bool lock sync.RWMutex infoSetSizeCapGetter func() int - metricsClient metrics.Client } var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept -func NewSDKVersionInterceptor(infoSetSizeCapGetter func() int, metricsClient metrics.Client) *SDKVersionInterceptor { +func NewSDKVersionInterceptor(infoSetSizeCapGetter func() int) *SDKVersionInterceptor { return &SDKVersionInterceptor{ sdkInfoSet: make(map[sdkNameVersion]bool), lock: sync.RWMutex{}, infoSetSizeCapGetter: infoSetSizeCapGetter, - metricsClient: metricsClient, } } @@ -71,44 +68,30 @@ func (vi *SDKVersionInterceptor) Intercept( } func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { - scope := vi.metricsClient.Scope(metrics.SDKVersionRecordScope) info := sdkNameVersion{name, version} - counter := metrics.SDKVersionRecordSuccessCount setSizeCap := vi.infoSetSizeCapGetter() - shouldUpdate := false - - // Increment after unlocking - defer func() { scope.IncCounter(counter) }() - - // Update after unlocking read lock - defer func() { - if shouldUpdate { - vi.lock.Lock() - vi.sdkInfoSet[info] = true - vi.lock.Unlock() - } - }() vi.lock.RLock() - defer vi.lock.RUnlock() + overCap := len(vi.sdkInfoSet) >= setSizeCap + _, found := vi.sdkInfoSet[info] + vi.lock.RUnlock() - if len(vi.sdkInfoSet) >= setSizeCap { - counter = metrics.SDKVersionRecordFailedCount - return + if !overCap && !found { + vi.lock.Lock() + vi.sdkInfoSet[info] = true + vi.lock.Unlock() } - _, found := vi.sdkInfoSet[info] - shouldUpdate = !found } func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo { vi.lock.Lock() - defer vi.lock.Unlock() - sdkInfo := make([]check.SDKInfo, len(vi.sdkInfoSet)) - i := 0 - for k := range vi.sdkInfoSet { - sdkInfo[i] = check.SDKInfo{Name: k.name, Version: k.version} - i += 1 - } + currSet := vi.sdkInfoSet vi.sdkInfoSet = make(map[sdkNameVersion]bool) + vi.lock.Unlock() + + sdkInfo := make([]check.SDKInfo, 0, len(currSet)) + for k := range currSet { + sdkInfo = append(sdkInfo, check.SDKInfo{Name: k.name, Version: k.version}) + } return sdkInfo } diff --git a/common/rpc/interceptor/sdk_version_test.go b/common/rpc/interceptor/sdk_version_test.go index f8936aac0cd..ba33c562633 100644 --- a/common/rpc/interceptor/sdk_version_test.go +++ b/common/rpc/interceptor/sdk_version_test.go @@ -29,22 +29,13 @@ import ( "sort" "testing" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "go.temporal.io/server/common/headers" - "go.temporal.io/server/common/metrics" ) func TestSDKVersionRecorder(t *testing.T) { - ctrl := gomock.NewController(t) - metricsClient := metrics.NewMockClient(ctrl) - metricsScope := metrics.NewMockScope(ctrl) - // Note that that empty SDK names and versions are ignored - metricsClient.EXPECT().Scope(metrics.SDKVersionRecordScope).Times(3).Return(metricsScope) - metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordSuccessCount).Times(2) - metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordFailedCount).Times(1) - interceptor := NewSDKVersionInterceptor(func() int { return 2 }, metricsClient) + interceptor := NewSDKVersionInterceptor(func() int { return 2 }) sdkVersion := "1.10.1" diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 6aea530f12e..4be4c2bd1c9 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -288,9 +288,8 @@ func NamespaceValidatorInterceptorProvider( func SDKVersionInterceptorProvider( serviceConfig *Config, - metricsClient metrics.Client, ) *interceptor.SDKVersionInterceptor { - return interceptor.NewSDKVersionInterceptor(func() int { return serviceConfig.MaxSDKVersionsToRecord() }, metricsClient) + return interceptor.NewSDKVersionInterceptor(func() int { return serviceConfig.MaxSDKVersionsToRecord() }) } func PersistenceMaxQpsProvider( From c1eb1fb25ebee2cb87ab4c5729e0382598df7166 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Thu, 13 Jan 2022 17:49:18 -0800 Subject: [PATCH 5/6] Simplify version interceptor, remove dynamicconfig and metric --- common/dynamicconfig/constants.go | 2 -- common/headers/versionChecker.go | 1 + common/metrics/defs.go | 11 ----------- common/rpc/interceptor/sdk_version.go | 22 ++++++++++++---------- common/rpc/interceptor/sdk_version_test.go | 5 ++++- service/frontend/fx.go | 6 ++---- service/frontend/service.go | 4 ---- service/frontend/versionChecker.go | 4 +--- 8 files changed, 20 insertions(+), 35 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index afcfe316c7e..4a15b8bc6f3 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -487,8 +487,6 @@ const ( VisibilityArchivalQueryMaxQPS // EnableServerVersionCheck is a flag that controls whether or not periodic version checking is enabled EnableServerVersionCheck - // MaxSDKVersionsToRecord caps the number of distinct SDK versions to record for version checking purposes - MaxSDKVersionsToRecord // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request EnableTokenNamespaceEnforcement // KeepAliveMinTime is the minimum amount of time a client should wait before sending a keepalive ping. diff --git a/common/headers/versionChecker.go b/common/headers/versionChecker.go index 362f6d50e6e..eb4a116ad99 100644 --- a/common/headers/versionChecker.go +++ b/common/headers/versionChecker.go @@ -98,6 +98,7 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string) } } +// GetClientNameAndVersion extracts SDK name and version from context headers func GetClientNameAndVersion(ctx context.Context) (string, string) { headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName) clientName := headers[0] diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 509a45b3fc3..e7ec665229b 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -881,10 +881,6 @@ const ( // VersionCheckScope is scope used by version checker VersionCheckScope - - // SDKVersionRecordScope is the scope used by the version interceptor - SDKVersionRecordScope - // AuthorizationScope is the scope used by all metric emitted by authorization code AuthorizationScope @@ -1566,7 +1562,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FrontendGetClusterInfoScope: {operation: "GetClusterInfo"}, FrontendGetSystemInfoScope: {operation: "GetSystemInfo"}, VersionCheckScope: {operation: "VersionCheck"}, - SDKVersionRecordScope: {operation: "SDKVersionRecord"}, AuthorizationScope: {operation: "Authorization"}, }, // History Scope Names @@ -1878,9 +1873,6 @@ const ( VersionCheckFailedCount VersionCheckLatency - SDKVersionRecordSuccessCount - SDKVersionRecordFailedCount - ParentClosePolicyProcessorSuccess ParentClosePolicyProcessorFailures @@ -2313,9 +2305,6 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ VersionCheckRequestFailedCount: NewCounterDef("version_check_request_failed"), VersionCheckLatency: NewTimerDef("version_check_latency"), - SDKVersionRecordSuccessCount: NewCounterDef("sdk_version_record_success"), - SDKVersionRecordFailedCount: NewCounterDef("sdk_version_record_faile"), - ParentClosePolicyProcessorSuccess: NewCounterDef("parent_close_policy_processor_requests"), ParentClosePolicyProcessorFailures: NewCounterDef("parent_close_policy_processor_errors"), diff --git a/common/rpc/interceptor/sdk_version.go b/common/rpc/interceptor/sdk_version.go index 854addb2376..c6586c98c35 100644 --- a/common/rpc/interceptor/sdk_version.go +++ b/common/rpc/interceptor/sdk_version.go @@ -39,21 +39,22 @@ type sdkNameVersion struct { } type SDKVersionInterceptor struct { - sdkInfoSet map[sdkNameVersion]bool - lock sync.RWMutex - infoSetSizeCapGetter func() int + sdkInfoSet map[sdkNameVersion]bool + lock sync.RWMutex + maxSetSize int } -var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept +const defaultMaxSetSize = 100 -func NewSDKVersionInterceptor(infoSetSizeCapGetter func() int) *SDKVersionInterceptor { +// NewSDKVersionInterceptor creates a new SDKVersionInterceptor with default max set size +func NewSDKVersionInterceptor() *SDKVersionInterceptor { return &SDKVersionInterceptor{ - sdkInfoSet: make(map[sdkNameVersion]bool), - lock: sync.RWMutex{}, - infoSetSizeCapGetter: infoSetSizeCapGetter, + sdkInfoSet: make(map[sdkNameVersion]bool), + maxSetSize: defaultMaxSetSize, } } +// Intercept a grpc request func (vi *SDKVersionInterceptor) Intercept( ctx context.Context, req interface{}, @@ -67,12 +68,12 @@ func (vi *SDKVersionInterceptor) Intercept( return handler(ctx, req) } +// RecordSDKInfo records name and version tuple in memory func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { info := sdkNameVersion{name, version} - setSizeCap := vi.infoSetSizeCapGetter() vi.lock.RLock() - overCap := len(vi.sdkInfoSet) >= setSizeCap + overCap := len(vi.sdkInfoSet) >= vi.maxSetSize _, found := vi.sdkInfoSet[info] vi.lock.RUnlock() @@ -83,6 +84,7 @@ func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { } } +// GetAndResetSDKInfo gets all recorded name, version tuples and resets internal records func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo { vi.lock.Lock() currSet := vi.sdkInfoSet diff --git a/common/rpc/interceptor/sdk_version_test.go b/common/rpc/interceptor/sdk_version_test.go index ba33c562633..1601da00b1d 100644 --- a/common/rpc/interceptor/sdk_version_test.go +++ b/common/rpc/interceptor/sdk_version_test.go @@ -35,7 +35,10 @@ import ( ) func TestSDKVersionRecorder(t *testing.T) { - interceptor := NewSDKVersionInterceptor(func() int { return 2 }) + interceptor := &SDKVersionInterceptor{ + sdkInfoSet: make(map[sdkNameVersion]bool), + maxSetSize: 2, + } sdkVersion := "1.10.1" diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 4be4c2bd1c9..e2577b7e796 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -286,10 +286,8 @@ func NamespaceValidatorInterceptorProvider( ) } -func SDKVersionInterceptorProvider( - serviceConfig *Config, -) *interceptor.SDKVersionInterceptor { - return interceptor.NewSDKVersionInterceptor(func() int { return serviceConfig.MaxSDKVersionsToRecord() }) +func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor { + return interceptor.NewSDKVersionInterceptor() } func PersistenceMaxQpsProvider( diff --git a/service/frontend/service.go b/service/frontend/service.go index ae1baefdc01..1bd1f2d94a9 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -108,9 +108,6 @@ type Config struct { // EnableServerVersionCheck disables periodic version checking performed by the frontend EnableServerVersionCheck dynamicconfig.BoolPropertyFn - // MaxSDKVersionsToRecord controls how many SDK versions should be recorded in memory to be sent in the version check request. - // This setting is used to prevent potential memory leak, the default should be low enough that this value shouldn't be changed. - MaxSDKVersionsToRecord dynamicconfig.IntPropertyFn // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request EnableTokenNamespaceEnforcement dynamicconfig.BoolPropertyFn @@ -173,7 +170,6 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()), DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""), - MaxSDKVersionsToRecord: dc.GetIntProperty(dynamicconfig.MaxSDKVersionsToRecord, 1000), EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, false), KeepAliveMinTime: dc.GetDurationProperty(dynamicconfig.KeepAliveMinTime, 10*time.Second), KeepAlivePermitWithoutStream: dc.GetBoolProperty(dynamicconfig.KeepAlivePermitWithoutStream, true), diff --git a/service/frontend/versionChecker.go b/service/frontend/versionChecker.go index 976c49b5247..0f048140930 100644 --- a/service/frontend/versionChecker.go +++ b/service/frontend/versionChecker.go @@ -87,7 +87,6 @@ func (vc *VersionChecker) Stop() { func (vc *VersionChecker) versionCheckLoop() { timer := time.NewTicker(VersionCheckInterval) defer timer.Stop() - vc.performVersionCheck() for { select { @@ -137,7 +136,6 @@ func isUpdateNeeded(metadata *persistence.GetClusterMetadataResponse) bool { } func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClusterMetadataResponse) (*check.VersionCheckRequest, error) { - return &check.VersionCheckRequest{ Product: headers.ClientNameServer, Version: headers.ServerVersion, @@ -178,7 +176,7 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro func toVersionInfo(resp *check.VersionCheckResponse) (*versionpb.VersionInfo, error) { for _, product := range resp.Products { - if product.Product == "server" { + if product.Product == headers.ClientNameServer { return &versionpb.VersionInfo{ Current: convertReleaseInfo(product.Current), Recommended: convertReleaseInfo(product.Recommended), From b01a99f55ef02712e0594b1d50bdca1f85b6ddce Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 18 Jan 2022 09:17:25 -0800 Subject: [PATCH 6/6] Use struct{} for sdkInfoSet --- common/rpc/interceptor/sdk_version.go | 8 ++++---- common/rpc/interceptor/sdk_version_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/rpc/interceptor/sdk_version.go b/common/rpc/interceptor/sdk_version.go index c6586c98c35..484edecb5b8 100644 --- a/common/rpc/interceptor/sdk_version.go +++ b/common/rpc/interceptor/sdk_version.go @@ -39,7 +39,7 @@ type sdkNameVersion struct { } type SDKVersionInterceptor struct { - sdkInfoSet map[sdkNameVersion]bool + sdkInfoSet map[sdkNameVersion]struct{} lock sync.RWMutex maxSetSize int } @@ -49,7 +49,7 @@ const defaultMaxSetSize = 100 // NewSDKVersionInterceptor creates a new SDKVersionInterceptor with default max set size func NewSDKVersionInterceptor() *SDKVersionInterceptor { return &SDKVersionInterceptor{ - sdkInfoSet: make(map[sdkNameVersion]bool), + sdkInfoSet: make(map[sdkNameVersion]struct{}), maxSetSize: defaultMaxSetSize, } } @@ -79,7 +79,7 @@ func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { if !overCap && !found { vi.lock.Lock() - vi.sdkInfoSet[info] = true + vi.sdkInfoSet[info] = struct{}{} vi.lock.Unlock() } } @@ -88,7 +88,7 @@ func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo { vi.lock.Lock() currSet := vi.sdkInfoSet - vi.sdkInfoSet = make(map[sdkNameVersion]bool) + vi.sdkInfoSet = make(map[sdkNameVersion]struct{}) vi.lock.Unlock() sdkInfo := make([]check.SDKInfo, 0, len(currSet)) diff --git a/common/rpc/interceptor/sdk_version_test.go b/common/rpc/interceptor/sdk_version_test.go index 1601da00b1d..817557c0714 100644 --- a/common/rpc/interceptor/sdk_version_test.go +++ b/common/rpc/interceptor/sdk_version_test.go @@ -36,7 +36,7 @@ import ( func TestSDKVersionRecorder(t *testing.T) { interceptor := &SDKVersionInterceptor{ - sdkInfoSet: make(map[sdkNameVersion]bool), + sdkInfoSet: make(map[sdkNameVersion]struct{}), maxSetSize: 2, }