Skip to content

Commit

Permalink
Simplify version interceptor, remove dynamicconfig and metric
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy committed Jan 14, 2022
1 parent f793198 commit 31c6e67
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 34 deletions.
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,6 @@ const (
VisibilityArchivalQueryMaxQPS
// EnableServerVersionCheck is a flag that controls whether or not periodic version checking is enabled
EnableServerVersionCheck
// MaxSDKVersionsToRecord caps the number of distinct SDK versions to record for version checking purposes
MaxSDKVersionsToRecord
// EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request
EnableTokenNamespaceEnforcement
// KeepAliveMinTime is the minimum amount of time a client should wait before sending a keepalive ping.
Expand Down
1 change: 1 addition & 0 deletions common/headers/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func NewVersionChecker(supportedClients map[string]string, serverVersion string)
}
}

// GetClientNameAndVersion extracts SDK name and version from context headers
func GetClientNameAndVersion(ctx context.Context) (string, string) {
headers := GetValues(ctx, ClientNameHeaderName, ClientVersionHeaderName)
clientName := headers[0]
Expand Down
11 changes: 0 additions & 11 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,10 +877,6 @@ const (

// VersionCheckScope is scope used by version checker
VersionCheckScope

// SDKVersionRecordScope is the scope used by the version interceptor
SDKVersionRecordScope

// AuthorizationScope is the scope used by all metric emitted by authorization code
AuthorizationScope

Expand Down Expand Up @@ -1560,7 +1556,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendGetClusterInfoScope: {operation: "GetClusterInfo"},
FrontendGetSystemInfoScope: {operation: "GetSystemInfo"},
VersionCheckScope: {operation: "VersionCheck"},
SDKVersionRecordScope: {operation: "SDKVersionRecord"},
AuthorizationScope: {operation: "Authorization"},
},
// History Scope Names
Expand Down Expand Up @@ -1872,9 +1867,6 @@ const (
VersionCheckFailedCount
VersionCheckLatency

SDKVersionRecordSuccessCount
SDKVersionRecordFailedCount

ParentClosePolicyProcessorSuccess
ParentClosePolicyProcessorFailures

Expand Down Expand Up @@ -2307,9 +2299,6 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
VersionCheckRequestFailedCount: NewCounterDef("version_check_request_failed"),
VersionCheckLatency: NewTimerDef("version_check_latency"),

SDKVersionRecordSuccessCount: NewCounterDef("sdk_version_record_success"),
SDKVersionRecordFailedCount: NewCounterDef("sdk_version_record_faile"),

ParentClosePolicyProcessorSuccess: NewCounterDef("parent_close_policy_processor_requests"),
ParentClosePolicyProcessorFailures: NewCounterDef("parent_close_policy_processor_errors"),

Expand Down
22 changes: 12 additions & 10 deletions common/rpc/interceptor/sdk_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,22 @@ type sdkNameVersion struct {
}

type SDKVersionInterceptor struct {
sdkInfoSet map[sdkNameVersion]bool
lock sync.RWMutex
infoSetSizeCapGetter func() int
sdkInfoSet map[sdkNameVersion]bool
lock sync.RWMutex
maxSetSize int
}

var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept
const defaultMaxSetSize = 100

func NewSDKVersionInterceptor(infoSetSizeCapGetter func() int) *SDKVersionInterceptor {
// NewSDKVersionInterceptor creates a new SDKVersionInterceptor with default max set size
func NewSDKVersionInterceptor() *SDKVersionInterceptor {
return &SDKVersionInterceptor{
sdkInfoSet: make(map[sdkNameVersion]bool),
lock: sync.RWMutex{},
infoSetSizeCapGetter: infoSetSizeCapGetter,
sdkInfoSet: make(map[sdkNameVersion]bool),
maxSetSize: defaultMaxSetSize,
}
}

// Intercept a grpc request
func (vi *SDKVersionInterceptor) Intercept(
ctx context.Context,
req interface{},
Expand All @@ -67,12 +68,12 @@ func (vi *SDKVersionInterceptor) Intercept(
return handler(ctx, req)
}

// RecordSDKInfo records name and version tuple in memory
func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) {
info := sdkNameVersion{name, version}
setSizeCap := vi.infoSetSizeCapGetter()

vi.lock.RLock()
overCap := len(vi.sdkInfoSet) >= setSizeCap
overCap := len(vi.sdkInfoSet) >= vi.maxSetSize
_, found := vi.sdkInfoSet[info]
vi.lock.RUnlock()

Expand All @@ -83,6 +84,7 @@ func (vi *SDKVersionInterceptor) RecordSDKInfo(name, version string) {
}
}

// GetAndResetSDKInfo gets all recorded name, version tuples and resets internal records
func (vi *SDKVersionInterceptor) GetAndResetSDKInfo() []check.SDKInfo {
vi.lock.Lock()
currSet := vi.sdkInfoSet
Expand Down
5 changes: 4 additions & 1 deletion common/rpc/interceptor/sdk_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import (
)

func TestSDKVersionRecorder(t *testing.T) {
interceptor := NewSDKVersionInterceptor(func() int { return 2 })
interceptor := &SDKVersionInterceptor{
sdkInfoSet: make(map[sdkNameVersion]bool),
maxSetSize: 2,
}

sdkVersion := "1.10.1"

Expand Down
6 changes: 2 additions & 4 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,8 @@ func NamespaceValidatorInterceptorProvider(
)
}

func SDKVersionInterceptorProvider(
serviceConfig *Config,
) *interceptor.SDKVersionInterceptor {
return interceptor.NewSDKVersionInterceptor(func() int { return serviceConfig.MaxSDKVersionsToRecord() })
func SDKVersionInterceptorProvider() *interceptor.SDKVersionInterceptor {
return interceptor.NewSDKVersionInterceptor()
}

func PersistenceMaxQpsProvider(
Expand Down
4 changes: 0 additions & 4 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ type Config struct {

// EnableServerVersionCheck disables periodic version checking performed by the frontend
EnableServerVersionCheck dynamicconfig.BoolPropertyFn
// MaxSDKVersionsToRecord controls how many SDK versions should be recorded in memory to be sent in the version check request.
// This setting is used to prevent potential memory leak, the default should be low enough that this value shouldn't be changed.
MaxSDKVersionsToRecord dynamicconfig.IntPropertyFn

// EnableTokenNamespaceEnforcement enables enforcement that namespace in completion token matches namespace of the request
EnableTokenNamespaceEnforcement dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -173,7 +170,6 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""),
MaxSDKVersionsToRecord: dc.GetIntProperty(dynamicconfig.MaxSDKVersionsToRecord, 1000),
EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, false),
KeepAliveMinTime: dc.GetDurationProperty(dynamicconfig.KeepAliveMinTime, 10*time.Second),
KeepAlivePermitWithoutStream: dc.GetBoolProperty(dynamicconfig.KeepAlivePermitWithoutStream, true),
Expand Down
2 changes: 0 additions & 2 deletions service/frontend/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (vc *VersionChecker) Stop() {
func (vc *VersionChecker) versionCheckLoop() {
timer := time.NewTicker(VersionCheckInterval)
defer timer.Stop()

vc.performVersionCheck()
for {
select {
Expand Down Expand Up @@ -137,7 +136,6 @@ func isUpdateNeeded(metadata *persistence.GetClusterMetadataResponse) bool {
}

func (vc *VersionChecker) createVersionCheckRequest(metadata *persistence.GetClusterMetadataResponse) (*check.VersionCheckRequest, error) {

return &check.VersionCheckRequest{
Product: headers.ClientNameServer,
Version: headers.ServerVersion,
Expand Down

0 comments on commit 31c6e67

Please sign in to comment.