Skip to content

Commit

Permalink
Don't keep counts of SDK info, cap and monitor SDK info set size
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed Jan 13, 2022
1 parent 22f36a1 commit 63eb27a
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 47 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ const (
VisibilityArchivalQueryMaxQPS
// EnableServerVersionCheck is a flag that controls whether or not periodic version checking is enabled
EnableServerVersionCheck
// MaxSDKVersionsToRecord caps the number of distinct SDK versions to record for version checking purposes
MaxSDKVersionsToRecord
// EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request
EnableTokenNamespaceEnforcement
// KeepAliveMinTime is the minimum amount of time a client should wait before sending a keepalive ping.
Expand Down
11 changes: 11 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,10 @@ const (

// VersionCheckScope is scope used by version checker
VersionCheckScope

// SDKVersionRecordScope is the scope used by the version interceptor
SDKVersionRecordScope

// AuthorizationScope is the scope used by all metric emitted by authorization code
AuthorizationScope

Expand Down Expand Up @@ -1556,6 +1560,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendGetClusterInfoScope: {operation: "GetClusterInfo"},
FrontendGetSystemInfoScope: {operation: "GetSystemInfo"},
VersionCheckScope: {operation: "VersionCheck"},
SDKVersionRecordScope: {operation: "SDKVersionRecord"},
AuthorizationScope: {operation: "Authorization"},
},
// History Scope Names
Expand Down Expand Up @@ -1867,6 +1872,9 @@ const (
VersionCheckFailedCount
VersionCheckLatency

SDKVersionRecordSuccessCount
SDKVersionRecordFailedCount

ParentClosePolicyProcessorSuccess
ParentClosePolicyProcessorFailures

Expand Down Expand Up @@ -2299,6 +2307,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
VersionCheckRequestFailedCount: NewCounterDef("version_check_request_failed"),
VersionCheckLatency: NewTimerDef("version_check_latency"),

SDKVersionRecordSuccessCount: NewCounterDef("sdk_version_record_success"),
SDKVersionRecordFailedCount: NewCounterDef("sdk_version_record_faile"),

ParentClosePolicyProcessorSuccess: NewCounterDef("parent_close_policy_processor_requests"),
ParentClosePolicyProcessorFailures: NewCounterDef("parent_close_policy_processor_errors"),

Expand Down
77 changes: 47 additions & 30 deletions common/rpc/interceptor/sdk_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,34 @@ package interceptor
import (
"context"
"sync"
"sync/atomic"

"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/metrics"
"go.temporal.io/version/check"
"google.golang.org/grpc"
)

type SDKInfoRecorder interface {
RecordSDKInfo(name, version string)
GetAndResetSDKInfo() []check.SDKInfo
}

type SDKVersionInterceptor struct {
// Using a sync map to support effienct concurrent updates.
// Key type is sdkNameVersion and value type is sdkCount.
// Note that we never delete keys from this map as the cardinality of
// client versions should be fairly low.
sdkInfoCounter sync.Map
}

type sdkNameVersion struct {
name string
version string
}

type sdkCount struct {
count int64
type SDKVersionInterceptor struct {
sdkInfoSet map[sdkNameVersion]bool
lock sync.RWMutex
infoSetSizeCapGetter func() int
metricsClient metrics.Client
}

var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept

func NewSDKVersionInterceptor() *SDKVersionInterceptor {
return &SDKVersionInterceptor{}
func NewSDKVersionInterceptor(infoSetSizeCapGetter func() int, metricsClient metrics.Client) *SDKVersionInterceptor {
return &SDKVersionInterceptor{
sdkInfoSet: make(map[sdkNameVersion]bool),
lock: sync.RWMutex{},
infoSetSizeCapGetter: infoSetSizeCapGetter,
metricsClient: metricsClient,
}
}

func (vi *SDKVersionInterceptor) Intercept(
Expand All @@ -76,22 +71,44 @@ func (vi *SDKVersionInterceptor) Intercept(
}

func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) {
scope := vi.metricsClient.Scope(metrics.SDKVersionRecordScope)
info := sdkNameVersion{name, version}
valIface, ok := vi.sdkInfoCounter.Load(info)
if !ok {
// Store if wasn't added racy
valIface, _ = vi.sdkInfoCounter.LoadOrStore(info, &sdkCount{})
counter := metrics.SDKVersionRecordSuccessCount
setSizeCap := vi.infoSetSizeCapGetter()
shouldUpdate := false

// Increment after unlocking
defer func() { scope.IncCounter(counter) }()

// Update after unlocking read lock
defer func() {
if shouldUpdate {
vi.lock.Lock()
vi.sdkInfoSet[info] = true
vi.lock.Unlock()
}
}()

vi.lock.RLock()
defer vi.lock.RUnlock()

if len(vi.sdkInfoSet) >= setSizeCap {
counter = metrics.SDKVersionRecordFailedCount
return
}
atomic.AddInt64(&valIface.(*sdkCount).count, 1)
_, found := vi.sdkInfoSet[info]
shouldUpdate = !found
}

func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo {
sdkInfo := make([]check.SDKInfo, 0)
vi.sdkInfoCounter.Range(func(key, value interface{}) bool {
timesSeen := atomic.SwapInt64(&value.(*sdkCount).count, 0)
nameVersion := key.(sdkNameVersion)
sdkInfo = append(sdkInfo, check.SDKInfo{Name: nameVersion.name, Version: nameVersion.version, TimesSeen: timesSeen})
return true
})
vi.lock.Lock()
defer vi.lock.Unlock()
sdkInfo := make([]check.SDKInfo, len(vi.sdkInfoSet))
i := 0
for k := range vi.sdkInfoSet {
sdkInfo[i] = check.SDKInfo{Name: k.name, Version: k.version}
i += 1
}
vi.sdkInfoSet = make(map[sdkNameVersion]bool)
return sdkInfo
}
41 changes: 38 additions & 3 deletions common/rpc/interceptor/sdk_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,65 @@ package interceptor

import (
"context"
"sort"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/metrics"
)

func TestSDKVersionRecorder(t *testing.T) {
interceptor := NewSDKVersionInterceptor()
ctrl := gomock.NewController(t)
metricsClient := metrics.NewMockClient(ctrl)
metricsScope := metrics.NewMockScope(ctrl)
// Note that that empty SDK names and versions are ignored
metricsClient.EXPECT().Scope(metrics.SDKVersionRecordScope).Times(3).Return(metricsScope)
metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordSuccessCount).Times(2)
metricsScope.EXPECT().IncCounter(metrics.SDKVersionRecordFailedCount).Times(1)
interceptor := NewSDKVersionInterceptor(func() int { return 2 }, metricsClient)

sdkVersion := "1.10.1"

// Record first tuple
ctx := headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Record second tuple
ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameTypeScriptSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Do not record when over capacity
ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, headers.ClientNameJavaSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Empty SDK version should not be recorded
ctx = headers.SetVersionsForTests(context.Background(), "", headers.ClientNameGoSDK, headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

// Empty SDK name should not be recorded
ctx = headers.SetVersionsForTests(context.Background(), sdkVersion, "", headers.SupportedServerVersions, headers.AllFeatures)
interceptor.Intercept(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})

info := interceptor.GetAndResetSDKInfo()
assert.Equal(t, 1, len(info))
sort.SliceStable(info, func(i, j int) bool {
return info[i].Name < info[j].Name
})
assert.Equal(t, 2, len(info))
assert.Equal(t, headers.ClientNameGoSDK, info[0].Name)
assert.Equal(t, sdkVersion, info[0].Version)
assert.Equal(t, int64(1), info[0].TimesSeen)
assert.Equal(t, headers.ClientNameTypeScriptSDK, info[1].Name)
assert.Equal(t, sdkVersion, info[1].Version)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.24.0
go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a
go.temporal.io/sdk v1.12.0
go.temporal.io/version v0.2.1
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
go.uber.org/fx v1.14.2
go.uber.org/multierr v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a h1:FYS1fRCzAbcek09VA3wJb
go.temporal.io/api v1.7.1-0.20211215222122-0be6c74f9c9a/go.mod h1:EMC/8OQVvVUeTSfVQ/OpAaIHmVadlzaKwHVjUmgz3tk=
go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/version v0.2.1 h1:wP7ha/DvyAtwIZY0xL5TA6BcsR1svK/WsqzFmmwAPhE=
go.temporal.io/version v0.2.1/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
7 changes: 5 additions & 2 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,11 @@ func NamespaceValidatorInterceptorProvider(
)
}

func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor {
return interceptor.NewSDKVersionInterceptor()
func SDKVersionInterceptorProvider(
serviceConfig *Config,
metricsClient metrics.Client,
) *interceptor.SDKVersionInterceptor {
return interceptor.NewSDKVersionInterceptor(func() int { return serviceConfig.MaxSDKVersionsToRecord() }, metricsClient)
}

func PersistenceMaxQpsProvider(
Expand Down
4 changes: 4 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ type Config struct {

// EnableServerVersionCheck disables periodic version checking performed by the frontend
EnableServerVersionCheck dynamicconfig.BoolPropertyFn
// MaxSDKVersionsToRecord controls how many SDK versions should be recorded in memory to be sent in the version check request.
// This setting is used to prevent potential memory leak, the default should be low enough that this value shouldn't be changed.
MaxSDKVersionsToRecord dynamicconfig.IntPropertyFn

// EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request
EnableTokenNamespaceEnforcement dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -170,6 +173,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""),
MaxSDKVersionsToRecord: dc.GetIntProperty(dynamicconfig.MaxSDKVersionsToRecord, 1000),
EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, false),
KeepAliveMinTime: dc.GetDurationProperty(dynamicconfig.KeepAliveMinTime, 10*time.Second),
KeepAlivePermitWithoutStream: dc.GetBoolProperty(dynamicconfig.KeepAlivePermitWithoutStream, true),
Expand Down
28 changes: 19 additions & 9 deletions service/frontend/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type VersionChecker struct {
clusterMetadataManager persistence.ClusterMetadataManager
startOnce sync.Once
stopOnce sync.Once
sdkVersionRecorder interceptor.SDKInfoRecorder
sdkVersionRecorder *interceptor.SDKVersionInterceptor
}

func NewVersionChecker(
Expand Down Expand Up @@ -159,7 +159,12 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro
if err != nil {
return err
}
metadata.VersionInfo = toVersionInfo(resp)
// TODO(bergundy): Extract and save version info per SDK
versionInfo, err := toVersionInfo(resp)
if err != nil {
return err
}
metadata.VersionInfo = versionInfo
saved, err := vc.clusterMetadataManager.SaveClusterMetadata(&persistence.SaveClusterMetadataRequest{
ClusterMetadata: metadata.ClusterMetadata, Version: metadata.Version})
if err != nil {
Expand All @@ -171,14 +176,19 @@ func (vc *VersionChecker) saveVersionInfo(resp *check.VersionCheckResponse) erro
return nil
}

func toVersionInfo(resp *check.VersionCheckResponse) *versionpb.VersionInfo {
return &versionpb.VersionInfo{
Current: convertReleaseInfo(resp.Current),
Recommended: convertReleaseInfo(resp.Recommended),
Instructions: resp.Instructions,
Alerts: convertAlerts(resp.Alerts),
LastUpdateTime: timestamp.TimePtr(time.Now().UTC()),
func toVersionInfo(resp *check.VersionCheckResponse) (*versionpb.VersionInfo, error) {
for _, product := range resp.Products {
if product.Product == "server" {
return &versionpb.VersionInfo{
Current: convertReleaseInfo(product.Current),
Recommended: convertReleaseInfo(product.Recommended),
Instructions: product.Instructions,
Alerts: convertAlerts(product.Alerts),
LastUpdateTime: timestamp.TimePtr(time.Now().UTC()),
}, nil
}
}
return nil, serviceerror.NewNotFound("version info update was not found in response")
}

func convertAlerts(alerts []check.Alert) []*versionpb.Alert {
Expand Down

0 comments on commit 63eb27a

Please sign in to comment.