Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request failure code label to metrics #1472

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ type (
// worker options, the ones here wrap the ones in worker options. The same
// interceptor should not be set here and in worker options.
Interceptors []ClientInterceptor

/// If set true, error code labels will not be included on request failure metrics.
Sushisource marked this conversation as resolved.
Show resolved Hide resolved
DisableErrorCodeMetricTags bool
}

// HeadersProvider returns a map of gRPC headers that should be used on every request.
Expand Down Expand Up @@ -813,14 +816,8 @@ func newDialParameters(options *ClientOptions, excludeInternalFromRetry *atomic.
return dialParameters{
UserConnectionOptions: options.ConnectionOptions,
HostPort: options.HostPort,
RequiredInterceptors: requiredInterceptors(
options.MetricsHandler,
options.HeadersProvider,
options.TrafficController,
excludeInternalFromRetry,
options.Credentials,
),
DefaultServiceConfig: defaultServiceConfig,
RequiredInterceptors: requiredInterceptors(options, excludeInternalFromRetry),
DefaultServiceConfig: defaultServiceConfig,
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const (
OperationTagName = "operation"
CauseTagName = "cause"
WorkflowTaskFailureReason = "failure_reason"
RequestFailureCode = "status_code"
)

// Metric tag values
Expand Down
10 changes: 7 additions & 3 deletions internal/common/metrics/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type HandlerContextKey struct{}
type LongPollContextKey struct{}

// NewGRPCInterceptor creates a new gRPC unary interceptor to record metrics.
func NewGRPCInterceptor(defaultHandler Handler, suffix string) grpc.UnaryClientInterceptor {
func NewGRPCInterceptor(defaultHandler Handler, suffix string, disableRequestFailCodes bool) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
Expand Down Expand Up @@ -78,7 +78,7 @@ func NewGRPCInterceptor(defaultHandler Handler, suffix string) grpc.UnaryClientI
start := time.Now()
recordRequestStart(handler, longPoll, suffix)
err := invoker(ctx, method, req, reply, cc, opts...)
recordRequestEnd(handler, longPoll, suffix, start, err)
recordRequestEnd(handler, longPoll, suffix, start, err, disableRequestFailCodes)
return err
}
}
Expand All @@ -93,7 +93,7 @@ func recordRequestStart(handler Handler, longPoll bool, suffix string) {
handler.Counter(metric).Inc(1)
}

func recordRequestEnd(handler Handler, longPoll bool, suffix string, start time.Time, err error) {
func recordRequestEnd(handler Handler, longPoll bool, suffix string, start time.Time, err error, disableRequestFailCodes bool) {
// Record latency
timerMetric := TemporalRequestLatency
if longPoll {
Expand All @@ -109,6 +109,10 @@ func recordRequestEnd(handler Handler, longPoll bool, suffix string, start time.
failureMetric = TemporalLongRequestFailure
}
failureMetric += suffix
errStatus, _ := status.FromError(err)
if !disableRequestFailCodes {
handler = handler.WithTags(RequestFailureCodeTags(errStatus.Code()))
}
handler.Counter(failureMetric).Inc(1)

// If it's a resource exhausted, extract cause if present and increment
Expand Down
2 changes: 1 addition & 1 deletion internal/common/metrics/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestGRPCInterceptor(t *testing.T) {
handler := metrics.NewCapturingHandler()
cc, err := grpc.Dial(l.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(metrics.NewGRPCInterceptor(handler, "_my_suffix")))
grpc.WithUnaryInterceptor(metrics.NewGRPCInterceptor(handler, "_my_suffix", true)))
require.NoError(t, err)
defer func() { _ = cc.Close() }()
client := grpc_health_v1.NewHealthClient(cc)
Expand Down
56 changes: 56 additions & 0 deletions internal/common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@

package metrics

import (
"strconv"

"google.golang.org/grpc/codes"
)

// RootTags returns a set of base tags for all metrics.
func RootTags(namespace string) map[string]string {
return map[string]string{
Expand Down Expand Up @@ -94,3 +100,53 @@ func WorkflowTaskFailedTags(reason string) map[string]string {
WorkflowTaskFailureReason: reason,
}
}

// RequestFailureCodeTags returns a set of tags for a request failure.
func RequestFailureCodeTags(statusCode codes.Code) map[string]string {
asStr := canonicalString(statusCode)
return map[string]string{
RequestFailureCode: asStr,
}
}

// Annoyingly gRPC defines this, but does not expose it publicly.
func canonicalString(c codes.Code) string {
switch c {
case codes.OK:
return "OK"
case codes.Canceled:
return "CANCELLED"
case codes.Unknown:
return "UNKNOWN"
case codes.InvalidArgument:
return "INVALID_ARGUMENT"
case codes.DeadlineExceeded:
return "DEADLINE_EXCEEDED"
case codes.NotFound:
return "NOT_FOUND"
case codes.AlreadyExists:
return "ALREADY_EXISTS"
case codes.PermissionDenied:
return "PERMISSION_DENIED"
case codes.ResourceExhausted:
return "RESOURCE_EXHAUSTED"
case codes.FailedPrecondition:
return "FAILED_PRECONDITION"
case codes.Aborted:
return "ABORTED"
case codes.OutOfRange:
return "OUT_OF_RANGE"
case codes.Unimplemented:
return "UNIMPLEMENTED"
case codes.Internal:
return "INTERNAL"
case codes.Unavailable:
return "UNAVAILABLE"
case codes.DataLoss:
return "DATA_LOSS"
case codes.Unauthenticated:
return "UNAUTHENTICATED"
default:
return "CODE(" + strconv.FormatInt(int64(c), 10) + ")"
}
}
21 changes: 9 additions & 12 deletions internal/grpc_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,34 +145,31 @@ func dial(params dialParameters) (*grpc.ClientConn, error) {
}

func requiredInterceptors(
metricsHandler metrics.Handler,
headersProvider HeadersProvider,
controller TrafficController,
clientOptions *ClientOptions,
excludeInternalFromRetry *atomic.Bool,
credentials Credentials,
) []grpc.UnaryClientInterceptor {
interceptors := []grpc.UnaryClientInterceptor{
errorInterceptor,
// Report aggregated metrics for the call, this is done outside of the retry loop.
metrics.NewGRPCInterceptor(metricsHandler, ""),
metrics.NewGRPCInterceptor(clientOptions.MetricsHandler, "", clientOptions.DisableErrorCodeMetricTags),
// By default the grpc retry interceptor *is disabled*, preventing accidental use of retries.
// We add call options for retry configuration based on the values present in the context.
retry.NewRetryOptionsInterceptor(excludeInternalFromRetry),
// Performs retries *IF* retry options are set for the call.
grpc_retry.UnaryClientInterceptor(),
// Report metrics for every call made to the server.
metrics.NewGRPCInterceptor(metricsHandler, attemptSuffix),
metrics.NewGRPCInterceptor(clientOptions.MetricsHandler, attemptSuffix, clientOptions.DisableErrorCodeMetricTags),
}
if headersProvider != nil {
interceptors = append(interceptors, headersProviderInterceptor(headersProvider))
if clientOptions.HeadersProvider != nil {
interceptors = append(interceptors, headersProviderInterceptor(clientOptions.HeadersProvider))
}
if controller != nil {
interceptors = append(interceptors, trafficControllerInterceptor(controller))
if clientOptions.TrafficController != nil {
interceptors = append(interceptors, trafficControllerInterceptor(clientOptions.TrafficController))
}
// Add credentials interceptor. This is intentionally added after headers
// provider to overwrite anything set there.
if credentials != nil {
if interceptor := credentials.gRPCInterceptor(); interceptor != nil {
if clientOptions.Credentials != nil {
if interceptor := clientOptions.Credentials.gRPCInterceptor(); interceptor != nil {
interceptors = append(interceptors, interceptor)
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ func TestHeadersProvider_Error(t *testing.T) {
}

func TestHeadersProvider_NotIncludedWhenNil(t *testing.T) {
interceptors := requiredInterceptors(nil, nil, nil, nil, nil)
interceptors := requiredInterceptors(&ClientOptions{}, nil)
require.Equal(t, 5, len(interceptors))
}

func TestHeadersProvider_IncludedWithHeadersProvider(t *testing.T) {
interceptors := requiredInterceptors(nil,
authHeadersProvider{token: "test-auth-token"}, nil, nil, nil)
opts := &ClientOptions{HeadersProvider: authHeadersProvider{token: "test-auth-token"}}
interceptors := requiredInterceptors(opts, nil)
require.Equal(t, 6, len(interceptors))
}

Expand Down
12 changes: 12 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4828,6 +4828,18 @@ func (ts *IntegrationTestSuite) TestNondeterministicUpdateRegistertion() {
ts.EqualValues(expected, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestRequestFailureMetric() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Unset namespace field will cause an invalid argument error
_, _ = ts.client.WorkflowService().DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{})

ts.assertMetricCount(metrics.TemporalRequestFailure, 1,
metrics.OperationTagName, "DescribeNamespace",
metrics.RequestFailureCode, "INVALID_ARGUMENT")
}

// executeWorkflow executes a given workflow and waits for the result
func (ts *IntegrationTestSuite) executeWorkflow(
wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{},
Expand Down
Loading