Skip to content

Commit

Permalink
add resource_exhausted_cause metrics tag
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc committed Jan 27, 2022
1 parent 170cb7d commit 8cb82bd
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 25 deletions.
6 changes: 6 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package metrics

import (
"strconv"

enumspb "go.temporal.io/api/enums/v1"
)

const (
Expand Down Expand Up @@ -197,6 +199,10 @@ func VisibilityTypeTag(value string) Tag {
return &tagImpl{key: visibilityTypeTagName, value: value}
}

func ResourceExhaustedCauseTag(cause enumspb.ResourceExhaustedCause) Tag {
return &tagImpl{key: "resource_exhausted_cause", value: cause.String()}
}

var standardVisibilityTypeTag = VisibilityTypeTag(standardVisibilityTagValue)
var advancedVisibilityTypeTag = VisibilityTypeTag(advancedVisibilityTagValue)

Expand Down
4 changes: 3 additions & 1 deletion common/persistence/visibility/visiblity_manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package visibility

import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -180,7 +181,8 @@ func (m *visibilityManagerMetrics) updateErrorMetric(scope metrics.Scope, err er
scope.IncCounter(metrics.VisibilityPersistenceTimeout)
scope.IncCounter(metrics.VisibilityPersistenceFailures)
case *serviceerror.ResourceExhausted:
scope.IncCounter(metrics.VisibilityPersistenceResourceExhausted)
scope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT)).
IncCounter(metrics.VisibilityPersistenceResourceExhausted)
scope.IncCounter(metrics.VisibilityPersistenceFailures)
case *serviceerror.Internal:
scope.IncCounter(metrics.VisibilityPersistenceInternal)
Expand Down
2 changes: 1 addition & 1 deletion common/rpc/interceptor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (ti *TelemetryInterceptor) handleError(
case *serviceerror.NotFound:
scope.IncCounter(metrics.ServiceErrNotFoundCounter)
case *serviceerror.ResourceExhausted:
scope.IncCounter(metrics.ServiceErrResourceExhaustedCounter)
scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter)
case *serviceerrors.RetryReplication:
scope.IncCounter(metrics.ServiceErrRetryTaskCounter)
case *serviceerror.NamespaceAlreadyExists:
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ func (adh *AdminHandler) startRequestProfile(scope int) (metrics.Scope, metrics.
}

func (adh *AdminHandler) error(err error, scope metrics.Scope) error {
switch err.(type) {
switch err := err.(type) {
case *serviceerror.Unavailable:
adh.logger.Error("unavailable error", tag.Error(err))
scope.IncCounter(metrics.ServiceFailures)
Expand All @@ -1713,7 +1713,7 @@ func (adh *AdminHandler) error(err error, scope metrics.Scope) error {
scope.IncCounter(metrics.ServiceErrInvalidArgumentCounter)
return err
case *serviceerror.ResourceExhausted:
scope.IncCounter(metrics.ServiceErrResourceExhaustedCounter)
scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter)
return err
case *serviceerror.NotFound:
return err
Expand Down
37 changes: 19 additions & 18 deletions service/history/replicationTaskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,38 +514,39 @@ func (p *ReplicationTaskProcessorImpl) cleanupReplicationTasks() error {
}

func (p *ReplicationTaskProcessorImpl) emitTaskMetrics(scope int, err error) {
metricsScope := p.metricsClient.Scope(scope)
if common.IsContextDeadlineExceededErr(err) || common.IsContextCanceledErr(err) {
p.metricsClient.IncCounter(scope, metrics.ServiceErrContextTimeoutCounter)
metricsScope.IncCounter(metrics.ServiceErrContextTimeoutCounter)
return
}

// Also update counter to distinguish between type of failures
switch err.(type) {
switch err := err.(type) {
case nil:
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksApplied)
metricsScope.IncCounter(metrics.ReplicationTasksApplied)
case *serviceerrors.ShardOwnershipLost:
p.metricsClient.IncCounter(scope, metrics.ServiceErrShardOwnershipLostCounter)
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.IncCounter(metrics.ServiceErrShardOwnershipLostCounter)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
case *serviceerror.InvalidArgument:
p.metricsClient.IncCounter(scope, metrics.ServiceErrInvalidArgumentCounter)
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.IncCounter(metrics.ServiceErrInvalidArgumentCounter)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
case *serviceerror.NamespaceNotActive:
p.metricsClient.IncCounter(scope, metrics.ServiceErrNamespaceNotActiveCounter)
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.IncCounter(metrics.ServiceErrNamespaceNotActiveCounter)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
case *serviceerror.WorkflowExecutionAlreadyStarted:
p.metricsClient.IncCounter(scope, metrics.ServiceErrExecutionAlreadyStartedCounter)
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.IncCounter(metrics.ServiceErrExecutionAlreadyStartedCounter)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
case *serviceerror.NotFound:
p.metricsClient.IncCounter(scope, metrics.ServiceErrNotFoundCounter)
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.IncCounter(metrics.ServiceErrNotFoundCounter)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
case *serviceerror.ResourceExhausted:
p.metricsClient.IncCounter(scope, metrics.ServiceErrResourceExhaustedCounter)
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
case *serviceerrors.RetryReplication:
p.metricsClient.IncCounter(scope, metrics.ServiceErrRetryTaskCounter)
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.IncCounter(metrics.ServiceErrRetryTaskCounter)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
default:
p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed)
metricsScope.IncCounter(metrics.ReplicationTasksFailed)
}
}

Expand Down
1 change: 0 additions & 1 deletion service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"go.temporal.io/server/common/payloads"
)


var errMatchingHostThrottleTest = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "Matching host RPS exceeded.")

type MatcherTestSuite struct {
Expand Down
5 changes: 4 additions & 1 deletion service/matching/taskWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -166,7 +167,9 @@ func (w *taskWriter) appendTask(
return nil, errShutdown
}
default: // channel is full, throttle
return nil, serviceerror.NewUnavailable("Too many outstanding appends to the TaskQueue")
return nil, serviceerror.NewResourceExhausted(
enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT,
"Too many outstanding appends to the TaskQueue")
}
}

Expand Down
3 changes: 2 additions & 1 deletion service/worker/archiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest,
c.metricsScope.IncCounter(metrics.ArchiverClientSendSignalCount)
if ok := c.rateLimiter.Allow(); !ok {
c.logger.Error(tooManyRequestsErrMsg)
c.metricsScope.IncCounter(metrics.ServiceErrResourceExhaustedCounter)
c.metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT)).
IncCounter(metrics.ServiceErrResourceExhaustedCounter)
return errors.New(tooManyRequestsErrMsg)
}

Expand Down

0 comments on commit 8cb82bd

Please sign in to comment.