Skip to content

Commit

Permalink
Add feature flag on emitting signal name metric tag (#4434)
Browse files Browse the repository at this point in the history
* Add feature flag for emit signal name tag
  • Loading branch information
yux0 authored and yycptt committed Sep 1, 2021
1 parent db77377 commit d58d346
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 10 deletions.
8 changes: 7 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,12 @@ const (
// Default value: 0
// Allowed filters: N/A
FrontendErrorInjectionRate
// FrontendEmitSignalNameMetricsTag enables emitting signal name tag in metrics in frontend client
// KeyName: frontend.emitSignalNameMetricsTag
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
FrontendEmitSignalNameMetricsTag

// key for matching

Expand Down Expand Up @@ -2027,7 +2033,7 @@ var Keys = map[Key]string{
DomainFailoverRefreshInterval: "frontend.domainFailoverRefreshInterval",
DomainFailoverRefreshTimerJitterCoefficient: "frontend.domainFailoverRefreshTimerJitterCoefficient",
FrontendErrorInjectionRate: "frontend.errorInjectionRate",

FrontendEmitSignalNameMetricsTag: "frontend.emitSignalNameMetricsTag",
// matching settings
MatchingRPS: "matching.rps",
MatchingPersistenceMaxQPS: "matching.persistenceMaxQPS",
Expand Down
9 changes: 9 additions & 0 deletions common/metrics/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func TagContext(ctx context.Context, tag Tag) context.Context {
return context.WithValue(ctx, contextTagsKey, tags)
}

func TagContexts(ctx context.Context, ctxTags ...Tag) context.Context {
tags, ok := ctx.Value(contextTagsKey).([]Tag)
if !ok {
tags = []Tag{}
}
tags = append(tags, ctxTags...)
return context.WithValue(ctx, contextTagsKey, tags)
}

func GetContextTags(ctx context.Context) []Tag {
tags, ok := ctx.Value(contextTagsKey).([]Tag)
if !ok {
Expand Down
5 changes: 5 additions & 0 deletions common/metrics/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ func TestContextTags(t *testing.T) {
tag2 := ThriftTransportTag()
ctx = TagContext(ctx, tag2)
assert.Equal(t, []Tag{tag1, tag2}, GetContextTags(ctx))

ctx1 := context.Background()
ctx1 = TagContexts(ctx1, tag1, tag2)
assert.Contains(t, GetContextTags(ctx1), tag1)
assert.Contains(t, GetContextTags(ctx1), tag2)
}
6 changes: 3 additions & 3 deletions common/metrics/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *metricsScope) StartTimer(id int) Stopwatch {
case !def.metricRollupName.Empty():
return NewStopwatch(timer, m.rootScope.Timer(def.metricRollupName.String()))
case m.isDomainTagged:
timerAll := m.scope.Tagged(map[string]string{domain: domainAllValue}).Timer(def.metricName.String())
timerAll := m.scope.Tagged(map[string]string{domain: allValue}).Timer(def.metricName.String())
return NewStopwatch(timer, timerAll)
default:
return NewStopwatch(timer)
Expand All @@ -99,7 +99,7 @@ func (m *metricsScope) RecordTimer(id int, d time.Duration) {
case m.isDomainTagged:
// N.B. - Dual emit here so that we can see aggregate timer stats across all
// domains along with the individual domains stats
m.scope.Tagged(map[string]string{domain: domainAllValue}).Timer(def.metricName.String()).Record(d)
m.scope.Tagged(map[string]string{domain: allValue}).Timer(def.metricName.String()).Record(d)
}
}

Expand Down Expand Up @@ -139,5 +139,5 @@ func (m *metricsScope) getBuckets(id int) tally.Buckets {
}

func isDomainTagged(tag Tag) bool {
return tag.Key() == domain && tag.Value() != domainAllValue
return tag.Key() == domain && tag.Value() != allValue
}
9 changes: 7 additions & 2 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ const (
transport = "transport"
signalName = "signalName"

domainAllValue = "all"
unknownValue = "_unknown_"
allValue = "all"
unknownValue = "_unknown_"

transportThrift = "thrift"
transportGRPC = "grpc"
Expand Down Expand Up @@ -159,3 +159,8 @@ func GPRCTransportTag() Tag {
func SignalNameTag(value string) Tag {
return metricWithUnknown(signalName, value)
}

// SignalNameAllTag returns a new SignalName tag with all value
func SignalNameAllTag() Tag {
return metricWithUnknown(signalName, allValue)
}
6 changes: 6 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ type Config struct {

// max number of decisions per RespondDecisionTaskCompleted request (unlimited by default)
DecisionResultCountLimit dynamicconfig.IntPropertyFnWithDomainFilter

//Debugging

// Emit signal related metrics with signal name tag. Be aware of cardinality.
EmitSignalNameMetricsTag dynamicconfig.BoolPropertyFnWithDomainFilter
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -149,6 +154,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
DisallowQuery: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.DisallowQuery, false),
SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, sendRawWorkflowHistory),
DecisionResultCountLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendDecisionResultCountLimit, 0),
EmitSignalNameMetricsTag: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.FrontendEmitSignalNameMetricsTag, false),
domainConfig: domain.Config{
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, domain.MaxBadBinaries),
MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.DefaultMinWorkflowRetentionInDays),
Expand Down
13 changes: 10 additions & 3 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2273,8 +2273,15 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
}, nil
}

func withSignalName(ctx context.Context, signalName string) context.Context {
return metrics.TagContext(ctx, metrics.SignalNameTag(signalName))
func (wh *WorkflowHandler) withSignalName(
ctx context.Context,
domainName string,
signalName string,
) context.Context {
if wh.config.EmitSignalNameMetricsTag(domainName) {
return metrics.TagContext(ctx, metrics.SignalNameTag(signalName))
}
return ctx
}

// SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in
Expand All @@ -2285,7 +2292,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(
) (retError error) {
defer log.CapturePanic(wh.GetLogger(), &retError)

ctx = withSignalName(ctx, signalRequest.GetSignalName())
ctx = wh.withSignalName(ctx, signalRequest.GetDomain(), signalRequest.GetSignalName())
scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendSignalWorkflowExecutionScope, signalRequest)
defer sw.Stop()

Expand Down
4 changes: 3 additions & 1 deletion service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ func (s *workflowHandlerSuite) TestSignalMetricHasSignalName() {
}

func (s *workflowHandlerSuite) newConfig(dynamicClient dc.Client) *Config {
return NewConfig(
config := NewConfig(
dc.NewCollection(
dynamicClient,
s.mockResource.GetLogger(),
Expand All @@ -1468,6 +1468,8 @@ func (s *workflowHandlerSuite) newConfig(dynamicClient dc.Client) *Config {
false,
false,
)
config.EmitSignalNameMetricsTag = dc.GetBoolPropertyFnFilteredByDomain(true)
return config
}

func updateRequest(
Expand Down

0 comments on commit d58d346

Please sign in to comment.