diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 4a15b8bc6f3..afcfe316c7e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -487,6 +487,8 @@ const ( VisibilityArchivalQueryMaxQPS // EnableServerVersionCheck is a flag that controls whether or not periodic version checking is enabled EnableServerVersionCheck + // MaxSDKVersionsToRecord caps the number of distinct SDK versions to record for version checking purposes + MaxSDKVersionsToRecord // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request EnableTokenNamespaceEnforcement // KeepAliveMinTime is the minimum amount of time a client should wait before sending a keepalive ping. diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 2f05560866a..0ea5934e43f 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -877,6 +877,10 @@ const ( // VersionCheckScope is scope used by version checker VersionCheckScope + + // SDKVersionRecordScope is the scope used by the version interceptor + SDKVersionRecordScope + // AuthorizationScope is the scope used by all metric emitted by authorization code AuthorizationScope @@ -1556,6 +1560,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FrontendGetClusterInfoScope: {operation: "GetClusterInfo"}, FrontendGetSystemInfoScope: {operation: "GetSystemInfo"}, VersionCheckScope: {operation: "VersionCheck"}, + SDKVersionRecordScope: {operation: "SDKVersionRecord"}, AuthorizationScope: {operation: "Authorization"}, }, // History Scope Names @@ -1867,6 +1872,9 @@ const ( VersionCheckFailedCount VersionCheckLatency + SDKVersionRecordSuccessCount + SDKVersionRecordFailedCount + ParentClosePolicyProcessorSuccess ParentClosePolicyProcessorFailures @@ -2299,6 +2307,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ VersionCheckRequestFailedCount: NewCounterDef("version_check_request_failed"), VersionCheckLatency: NewTimerDef("version_check_latency"), + SDKVersionRecordSuccessCount: NewCounterDef("sdk_version_record_success"), + SDKVersionRecordFailedCount: NewCounterDef("sdk_version_record_faile"), + ParentClosePolicyProcessorSuccess: NewCounterDef("parent_close_policy_processor_requests"), ParentClosePolicyProcessorFailures: NewCounterDef("parent_close_policy_processor_errors"), diff --git a/common/rpc/interceptor/sdk_version.go b/common/rpc/interceptor/sdk_version.go index e87c9ce14e6..765d24765ec 100644 --- a/common/rpc/interceptor/sdk_version.go +++ b/common/rpc/interceptor/sdk_version.go @@ -27,39 +27,34 @@ package interceptor import ( "context" "sync" - "sync/atomic" "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/metrics" "go.temporal.io/version/check" "google.golang.org/grpc" ) -type SDKInfoRecorder interface { - RecordSDKInfo(name, version string) - GetAndResetSDKInfo() []check.SDKInfo -} - -type SDKVersionInterceptor struct { - // Using a sync map to support effienct concurrent updates. - // Key type is sdkNameVersion and value type is sdkCount. - // Note that we never delete keys from this map as the cardinality of - // client versions should be fairly low. - sdkInfoCounter sync.Map -} - type sdkNameVersion struct { name string version string } -type sdkCount struct { - count int64 +type SDKVersionInterceptor struct { + sdkInfoSet map[sdkNameVersion]bool + lock sync.RWMutex + infoSetSizeCapGetter func() int + metricsClient metrics.Client } var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept -func NewSDKVersionInterceptor() *SDKVersionInterceptor { - return &SDKVersionInterceptor{} +func NewSDKVersionInterceptor(infoSetSizeCapGetter func() int, metricsClient metrics.Client) *SDKVersionInterceptor { + return &SDKVersionInterceptor{ + sdkInfoSet: make(map[sdkNameVersion]bool), + lock: sync.RWMutex{}, + infoSetSizeCapGetter: infoSetSizeCapGetter, + metricsClient: metricsClient, + } } func (vi *SDKVersionInterceptor) Intercept( @@ -76,22 +71,44 @@ func (vi *SDKVersionInterceptor) Intercept( } func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) { + scope := vi.metricsClient.Scope(metrics.SDKVersionRecordScope) info := sdkNameVersion{name, version} - valIface, ok := vi.sdkInfoCounter.Load(info) - if !ok { - // Store if wasn't added racy - valIface, _ = vi.sdkInfoCounter.LoadOrStore(info, &sdkCount{}) + counter := metrics.SDKVersionRecordSuccessCount + setSizeCap := vi.infoSetSizeCapGetter() + shouldUpdate := false + + // Increment after unlocking + defer func() { scope.IncCounter(counter) }() + + // Update after unlocking read lock + defer func() { + if shouldUpdate { + vi.lock.Lock() + vi.sdkInfoSet[info] = true + vi.lock.Unlock() + } + }() + + vi.lock.RLock() + defer vi.lock.RUnlock() + + if len(vi.sdkInfoSet) >= setSizeCap { + counter = metrics.SDKVersionRecordFailedCount + return } - atomic.AddInt64(&valIface.(*sdkCount).count, 1) + _, found := vi.sdkInfoSet[info] + shouldUpdate = !found } func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo { - sdkInfo := make([]check.SDKInfo, 0) - vi.sdkInfoCounter.Range(func(key, value interface{}) bool { - timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0) - nameVersion := key.(sdkNameVersion) - sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen}) - return true - }) + vi.lock.Lock() + defer vi.lock.Unlock() + sdkInfo := make([]check.SDKInfo, len(vi.sdkInfoSet)) + i := 0 + for k := range vi.sdkInfoSet { + sdkInfo[i] = check.SDKInfo{Name: k.name, Version: k.version} + i += 1 + } + vi.sdkInfoSet = make(map[sdkNameVersion]bool) return sdkInfo } diff --git a/common/rpc/interceptor/sdk_version_test.go b/common/rpc/interceptor/sdk_version_test.go index 18a9639b94c..f8936aac0cd 100644 --- a/common/rpc/interceptor/sdk_version_test.go +++ b/common/rpc/interceptor/sdk_version_test.go @@ -26,30 +26,65 @@ package interceptor import ( "context" + "sort" "testing" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/metrics" ) func TestSDKVersionRecorder(t *testing.T) { - interceptor := NewSDKVersionInterceptor() + ctrl := gomock.NewController(t) + metricsClient := metrics.NewMockClient(ctrl) + metricsScope := metrics.NewMockScope(ctrl) + // Note that that empty SDK names and versions are ignored + metricsClient.EXPECT().Scope(metrics.SDKVersionRecordScope).Times(3).Return(metricsScope) + metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordSuccessCount).Times(2) + metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordFailedCount).Times(1) + interceptor := NewSDKVersionInterceptor(func() int { return 2 }, metricsClient) sdkVersion := "1.10.1" + + // Record first tuple ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { return nil, nil }) + // Record second tuple + ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameTypeScriptSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + // Do not record when over capacity + ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameJavaSDK, headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + // Empty SDK version should not be recorded ctx = headers.SetVersionsForTests(context.Background(), "", headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures) interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { return nil, nil }) + // Empty SDK name should not be recorded + ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, "", headers.SupportedServerVersions, headers.AllFeatures) + interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + info := interceptor.GetAndResetSDKInfo() - assert.Equal(t, 1, len(info)) + sort.SliceStable(info, func(i, j int) bool { + return info[i].Name < info[j].Name + }) + assert.Equal(t, 2, len(info)) assert.Equal(t, headers.ClientNameGoSDK, info[0].Name) assert.Equal(t, sdkVersion, info[0].Version) - assert.Equal(t, int64(1), info[0].TimesSeen) + assert.Equal(t, headers.ClientNameTypeScriptSDK, info[1].Name) + assert.Equal(t, sdkVersion, info[1].Version) } diff --git a/go.mod b/go.mod index 8ed2debdf8f..65ae4e024c4 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.24.0 go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a go.temporal.io/sdk v1.12.0 - go.temporal.io/version v0.2.1 + go.temporal.io/version v0.3.0 go.uber.org/atomic v1.9.0 go.uber.org/fx v1.14.2 go.uber.org/multierr v1.7.0 diff --git a/go.sum b/go.sum index d125801e92a..a91c866eead 100644 --- a/go.sum +++ b/go.sum @@ -466,8 +466,8 @@ go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a h1:FYS1fRCzAbcek09VA3wJb go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a/go.mod h1:EMC/8OQVvVUeTSfVQ/OpAaIHmVadlzaKwHVjUmgz3tk= go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= -go.temporal.io/version v0.2.1 h1:wP7ha/DvyAtwIZY0xL5TA6BcsR1svK/WsqzFmmwAPhE= -go.temporal.io/version v0.2.1/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= +go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= +go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/frontend/fx.go b/service/frontend/fx.go index e2577b7e796..6aea530f12e 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -286,8 +286,11 @@ func NamespaceValidatorInterceptorProvider( ) } -func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor { - return interceptor.NewSDKVersionInterceptor() +func SDKVersionInterceptorProvider( + serviceConfig *Config, + metricsClient metrics.Client, +) *interceptor.SDKVersionInterceptor { + return interceptor.NewSDKVersionInterceptor(func() int { return serviceConfig.MaxSDKVersionsToRecord() }, metricsClient) } func PersistenceMaxQpsProvider( diff --git a/service/frontend/service.go b/service/frontend/service.go index 1bd1f2d94a9..ae1baefdc01 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -108,6 +108,9 @@ type Config struct { // EnableServerVersionCheck disables periodic version checking performed by the frontend EnableServerVersionCheck dynamicconfig.BoolPropertyFn + // MaxSDKVersionsToRecord controls how many SDK versions should be recorded in memory to be sent in the version check request. + // This setting is used to prevent potential memory leak, the default should be low enough that this value shouldn't be changed. + MaxSDKVersionsToRecord dynamicconfig.IntPropertyFn // EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request EnableTokenNamespaceEnforcement dynamicconfig.BoolPropertyFn @@ -170,6 +173,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()), DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""), + MaxSDKVersionsToRecord: dc.GetIntProperty(dynamicconfig.MaxSDKVersionsToRecord, 1000), EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, false), KeepAliveMinTime: dc.GetDurationProperty(dynamicconfig.KeepAliveMinTime, 10*time.Second), KeepAlivePermitWithoutStream: dc.GetBoolProperty(dynamicconfig.KeepAlivePermitWithoutStream, true), diff --git a/service/frontend/versionChecker.go b/service/frontend/versionChecker.go index b7c15bb4005..976c49b5247 100644 --- a/service/frontend/versionChecker.go +++ b/service/frontend/versionChecker.go @@ -50,7 +50,7 @@ type VersionChecker struct { clusterMetadataManager persistence.ClusterMetadataManager startOnce sync.Once stopOnce sync.Once - sdkVersionRecorder interceptor.SDKInfoRecorder + sdkVersionRecorder *interceptor.SDKVersionInterceptor } func NewVersionChecker( @@ -159,7 +159,12 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro if err != nil { return err } - metadata.VersionInfo = toVersionInfo(resp) + // TODO(bergundy): Extract and save version info per SDK + versionInfo, err := toVersionInfo(resp) + if err != nil { + return err + } + metadata.VersionInfo = versionInfo saved, err := vc.clusterMetadataManager.SaveClusterMetadata(&persistence.SaveClusterMetadataRequest{ ClusterMetadata: metadata.ClusterMetadata, Version: metadata.Version}) if err != nil { @@ -171,14 +176,19 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro return nil } -func toVersionInfo(resp *check.VersionCheckResponse) *versionpb.VersionInfo { - return &versionpb.VersionInfo{ - Current: convertReleaseInfo(resp.Current), - Recommended: convertReleaseInfo(resp.Recommended), - Instructions: resp.Instructions, - Alerts: convertAlerts(resp.Alerts), - LastUpdateTime: timestamp.TimePtr(time.Now().UTC()), +func toVersionInfo(resp *check.VersionCheckResponse) (*versionpb.VersionInfo, error) { + for _, product := range resp.Products { + if product.Product == "server" { + return &versionpb.VersionInfo{ + Current: convertReleaseInfo(product.Current), + Recommended: convertReleaseInfo(product.Recommended), + Instructions: product.Instructions, + Alerts: convertAlerts(product.Alerts), + LastUpdateTime: timestamp.TimePtr(time.Now().UTC()), + }, nil + } } + return nil, serviceerror.NewNotFound("version info update was not found in response") } func convertAlerts(alerts []check.Alert) []*versionpb.Alert {