diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 4dafb262e57..b96710ed654 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -26,6 +26,8 @@ package metrics import ( "strconv" + + enumspb "go.temporal.io/api/enums/v1" ) const ( @@ -197,6 +199,10 @@ func VisibilityTypeTag(value string) Tag { return &tagImpl{key: visibilityTypeTagName, value: value} } +func ResourceExhaustedCauseTag(cause enumspb.ResourceExhaustedCause) Tag { + return &tagImpl{key: "resource_exhausted_cause", value: cause.String()} +} + var standardVisibilityTypeTag = VisibilityTypeTag(standardVisibilityTagValue) var advancedVisibilityTypeTag = VisibilityTypeTag(advancedVisibilityTagValue) diff --git a/common/persistence/visibility/visiblity_manager_metrics.go b/common/persistence/visibility/visiblity_manager_metrics.go index 98bf5772b7f..91392a601e8 100644 --- a/common/persistence/visibility/visiblity_manager_metrics.go +++ b/common/persistence/visibility/visiblity_manager_metrics.go @@ -25,6 +25,7 @@ package visibility import ( + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -180,7 +181,8 @@ func (m *visibilityManagerMetrics) updateErrorMetric(scope metrics.Scope, err er scope.IncCounter(metrics.VisibilityPersistenceTimeout) scope.IncCounter(metrics.VisibilityPersistenceFailures) case *serviceerror.ResourceExhausted: - scope.IncCounter(metrics.VisibilityPersistenceResourceExhausted) + scope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT)). + IncCounter(metrics.VisibilityPersistenceResourceExhausted) scope.IncCounter(metrics.VisibilityPersistenceFailures) case *serviceerror.Internal: scope.IncCounter(metrics.VisibilityPersistenceInternal) diff --git a/common/rpc/interceptor/telemetry.go b/common/rpc/interceptor/telemetry.go index d066f22f8cd..4ef8c169fc6 100644 --- a/common/rpc/interceptor/telemetry.go +++ b/common/rpc/interceptor/telemetry.go @@ -170,7 +170,7 @@ func (ti *TelemetryInterceptor) handleError( case *serviceerror.NotFound: scope.IncCounter(metrics.ServiceErrNotFoundCounter) case *serviceerror.ResourceExhausted: - scope.IncCounter(metrics.ServiceErrResourceExhaustedCounter) + scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter) case *serviceerrors.RetryReplication: scope.IncCounter(metrics.ServiceErrRetryTaskCounter) case *serviceerror.NamespaceAlreadyExists: diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 8fe6a6764b9..3cf341048e0 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -1704,7 +1704,7 @@ func (adh *AdminHandler) startRequestProfile(scope int) (metrics.Scope, metrics. } func (adh *AdminHandler) error(err error, scope metrics.Scope) error { - switch err.(type) { + switch err := err.(type) { case *serviceerror.Unavailable: adh.logger.Error("unavailable error", tag.Error(err)) scope.IncCounter(metrics.ServiceFailures) @@ -1713,7 +1713,7 @@ func (adh *AdminHandler) error(err error, scope metrics.Scope) error { scope.IncCounter(metrics.ServiceErrInvalidArgumentCounter) return err case *serviceerror.ResourceExhausted: - scope.IncCounter(metrics.ServiceErrResourceExhaustedCounter) + scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter) return err case *serviceerror.NotFound: return err diff --git a/service/history/replicationTaskProcessor.go b/service/history/replicationTaskProcessor.go index fd068a1edbb..55dda181ea5 100644 --- a/service/history/replicationTaskProcessor.go +++ b/service/history/replicationTaskProcessor.go @@ -514,38 +514,39 @@ func (p *ReplicationTaskProcessorImpl) cleanupReplicationTasks() error { } func (p *ReplicationTaskProcessorImpl) emitTaskMetrics(scope int, err error) { + metricsScope := p.metricsClient.Scope(scope) if common.IsContextDeadlineExceededErr(err) || common.IsContextCanceledErr(err) { - p.metricsClient.IncCounter(scope, metrics.ServiceErrContextTimeoutCounter) + metricsScope.IncCounter(metrics.ServiceErrContextTimeoutCounter) return } // Also update counter to distinguish between type of failures - switch err.(type) { + switch err := err.(type) { case nil: - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksApplied) + metricsScope.IncCounter(metrics.ReplicationTasksApplied) case *serviceerrors.ShardOwnershipLost: - p.metricsClient.IncCounter(scope, metrics.ServiceErrShardOwnershipLostCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrShardOwnershipLostCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.InvalidArgument: - p.metricsClient.IncCounter(scope, metrics.ServiceErrInvalidArgumentCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrInvalidArgumentCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.NamespaceNotActive: - p.metricsClient.IncCounter(scope, metrics.ServiceErrNamespaceNotActiveCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrNamespaceNotActiveCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.WorkflowExecutionAlreadyStarted: - p.metricsClient.IncCounter(scope, metrics.ServiceErrExecutionAlreadyStartedCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrExecutionAlreadyStartedCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.NotFound: - p.metricsClient.IncCounter(scope, metrics.ServiceErrNotFoundCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrNotFoundCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.ResourceExhausted: - p.metricsClient.IncCounter(scope, metrics.ServiceErrResourceExhaustedCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerrors.RetryReplication: - p.metricsClient.IncCounter(scope, metrics.ServiceErrRetryTaskCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrRetryTaskCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) default: - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) } } diff --git a/service/matching/matcher_test.go b/service/matching/matcher_test.go index b0147b51048..9fb30c2a47e 100644 --- a/service/matching/matcher_test.go +++ b/service/matching/matcher_test.go @@ -49,7 +49,6 @@ import ( "go.temporal.io/server/common/payloads" ) - var errMatchingHostThrottleTest = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "Matching host RPS exceeded.") type MatcherTestSuite struct { diff --git a/service/matching/taskWriter.go b/service/matching/taskWriter.go index eb2ed23e538..733c6a5adc2 100644 --- a/service/matching/taskWriter.go +++ b/service/matching/taskWriter.go @@ -31,6 +31,7 @@ import ( "time" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -166,7 +167,9 @@ func (w *taskWriter) appendTask( return nil, errShutdown } default: // channel is full, throttle - return nil, serviceerror.NewUnavailable("Too many outstanding appends to the TaskQueue") + return nil, serviceerror.NewResourceExhausted( + enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, + "Too many outstanding appends to the TaskQueue") } } diff --git a/service/worker/archiver/client.go b/service/worker/archiver/client.go index 1e6e8d08b53..08e306824b9 100644 --- a/service/worker/archiver/client.go +++ b/service/worker/archiver/client.go @@ -278,7 +278,8 @@ func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest, c.metricsScope.IncCounter(metrics.ArchiverClientSendSignalCount) if ok := c.rateLimiter.Allow(); !ok { c.logger.Error(tooManyRequestsErrMsg) - c.metricsScope.IncCounter(metrics.ServiceErrResourceExhaustedCounter) + c.metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT)). + IncCounter(metrics.ServiceErrResourceExhaustedCounter) return errors.New(tooManyRequestsErrMsg) }