Skip to content

Commit

Permalink
Add resource exhausted metric with cause (#940)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Oct 31, 2022
1 parent 3f3eca4 commit 40f7da0
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 6 deletions.
15 changes: 9 additions & 6 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ const (
WorkerTaskSlotsAvailable = TemporalMetricsPrefix + "worker_task_slots_available"
PollerStartCounter = TemporalMetricsPrefix + "poller_start"

TemporalRequest = TemporalMetricsPrefix + "request"
TemporalRequestFailure = TemporalRequest + "_failure"
TemporalRequestLatency = TemporalRequest + "_latency"
TemporalLongRequest = TemporalMetricsPrefix + "long_request"
TemporalLongRequestFailure = TemporalLongRequest + "_failure"
TemporalLongRequestLatency = TemporalLongRequest + "_latency"
TemporalRequest = TemporalMetricsPrefix + "request"
TemporalRequestFailure = TemporalRequest + "_failure"
TemporalRequestLatency = TemporalRequest + "_latency"
TemporalLongRequest = TemporalMetricsPrefix + "long_request"
TemporalLongRequestFailure = TemporalLongRequest + "_failure"
TemporalLongRequestLatency = TemporalLongRequest + "_latency"
TemporalRequestResourceExhausted = TemporalRequest + "_resource_exhausted"
TemporalLongRequestResourceExhausted = TemporalLongRequest + "_resource_exhausted"

StickyCacheHit = TemporalMetricsPrefix + "sticky_cache_hit"
StickyCacheMiss = TemporalMetricsPrefix + "sticky_cache_miss"
Expand All @@ -87,6 +89,7 @@ const (
ActivityTypeNameTagName = "activity_type"
TaskQueueTagName = "task_queue"
OperationTagName = "operation"
CauseTagName = "cause"
)

// Metric tag values
Expand Down
17 changes: 17 additions & 0 deletions internal/common/metrics/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
"strings"
"time"

"github.com/gogo/status"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

// HandlerContextKey is the context key for a MetricHandler value.
Expand Down Expand Up @@ -105,5 +109,18 @@ func recordRequestEnd(handler Handler, longPoll bool, suffix string, start time.
}
failureMetric += suffix
handler.Counter(failureMetric).Inc(1)

// If it's a resource exhausted, extract cause if present and increment
if s := status.Convert(err); s.Code() == codes.ResourceExhausted {
resMetric := TemporalRequestResourceExhausted
if longPoll {
resMetric = TemporalLongRequestResourceExhausted
}
var cause enumspb.ResourceExhaustedCause
if resErr, _ := serviceerror.FromStatus(s).(*serviceerror.ResourceExhausted); resErr != nil {
cause = resErr.Cause
}
handler.WithTags(map[string]string{CauseTagName: cause.String()}).Counter(resMetric).Inc(1)
}
}
}
35 changes: 35 additions & 0 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/errordetails/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/common/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -401,6 +403,39 @@ func TestCustomResolver(t *testing.T) {
require.Equal(t, 4, s2.signalWorkflowInvokeCount())
}

func TestResourceExhaustedCause(t *testing.T) {
// Start gRPC server
srv, err := startTestGRPCServer()
require.NoError(t, err)
defer srv.Stop()
handler := metrics.NewCapturingHandler()

// Attempt dial with a resource exhausted cause
s, _ := status.New(codes.ResourceExhausted, "some resource exhausted").WithDetails(&errordetails.ResourceExhaustedFailure{
Cause: enums.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_LIMIT,
})
srv.getSystemInfoResponseError = s.Err()
_, err = DialClient(ClientOptions{HostPort: srv.addr, MetricsHandler: handler})
require.Error(t, err)

// Attempt dial with a cause-less resource exhausted
srv.getSystemInfoResponseError = status.New(codes.ResourceExhausted, "some resource exhausted").Err()
_, err = DialClient(ClientOptions{HostPort: srv.addr, MetricsHandler: handler})
require.Error(t, err)

// Make sure we have 1 metric with cause and 1 without
var foundWithCause, foundWithoutCause bool
for _, counter := range handler.Counters() {
if counter.Tags["operation"] == "GetSystemInfo" && counter.Tags["cause"] == "ConcurrentLimit" {
foundWithCause = true
} else if counter.Tags["operation"] == "GetSystemInfo" && counter.Tags["cause"] == "Unspecified" {
foundWithoutCause = true
}
}
require.True(t, foundWithCause)
require.True(t, foundWithoutCause)
}

type testGRPCServer struct {
workflowservice.UnimplementedWorkflowServiceServer
*grpc.Server
Expand Down

0 comments on commit 40f7da0

Please sign in to comment.