From 170cb7d6eb38f7f0edf09988d43d242ceefe87f5 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Wed, 26 Jan 2022 18:48:47 -0800 Subject: [PATCH 1/5] Add cause to ResourceExhaustedError --- common/archiver/filestore/historyArchiver_test.go | 3 ++- common/archiver/gcloud/historyArchiver_test.go | 3 ++- common/archiver/s3store/historyArchiver_test.go | 3 ++- common/persistence/client/fault_injection.go | 4 +++- .../nosql/nosqlplugin/cassandra/gocql/errors.go | 6 ++++-- common/persistence/persistenceRateLimitedClients.go | 3 ++- common/rpc/interceptor/namespace_count_limit.go | 3 ++- common/rpc/interceptor/namespace_rate_limit.go | 3 ++- common/rpc/interceptor/rate_limit.go | 3 ++- go.mod | 2 +- go.sum | 4 ++-- host/sizelimit_test.go | 7 +++---- service/history/consts/const.go | 4 ++-- service/matching/handler.go | 2 -- service/matching/matcher_test.go | 10 +++++++--- service/matching/taskWriter.go | 2 +- 16 files changed, 37 insertions(+), 25 deletions(-) diff --git a/common/archiver/filestore/historyArchiver_test.go b/common/archiver/filestore/historyArchiver_test.go index 18c027e1b4c..54d139ffc86 100644 --- a/common/archiver/filestore/historyArchiver_test.go +++ b/common/archiver/filestore/historyArchiver_test.go @@ -32,6 +32,7 @@ import ( "testing" "time" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/server/tests/testhelper" "github.com/golang/mock/gomock" @@ -188,7 +189,7 @@ func (s *historyArchiverSuite) TestArchive_Fail_TimeoutWhenReadingHistory() { historyIterator := archiver.NewMockHistoryIterator(mockCtrl) gomock.InOrder( historyIterator.EXPECT().HasNext().Return(true), - historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted("")), + historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "")), ) historyArchiver := s.newTestHistoryArchiver(historyIterator) diff --git a/common/archiver/gcloud/historyArchiver_test.go b/common/archiver/gcloud/historyArchiver_test.go index 4b044077ab0..7eccb604805 100644 --- a/common/archiver/gcloud/historyArchiver_test.go +++ b/common/archiver/gcloud/historyArchiver_test.go @@ -33,6 +33,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" @@ -221,7 +222,7 @@ func (h *historyArchiverSuite) TestArchive_Fail_TimeoutWhenReadingHistory() { historyIterator := archiver.NewMockHistoryIterator(h.controller) gomock.InOrder( historyIterator.EXPECT().HasNext().Return(true), - historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted("")), + historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "")), ) historyArchiver := newHistoryArchiver(h.container, historyIterator, storageWrapper) diff --git a/common/archiver/s3store/historyArchiver_test.go b/common/archiver/s3store/historyArchiver_test.go index 965e53244e0..2d7cfcfe5c8 100644 --- a/common/archiver/s3store/historyArchiver_test.go +++ b/common/archiver/s3store/historyArchiver_test.go @@ -44,6 +44,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally/v4" + enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" @@ -327,7 +328,7 @@ func (s *historyArchiverSuite) TestArchive_Fail_TimeoutWhenReadingHistory() { historyIterator := archiver.NewMockHistoryIterator(mockCtrl) gomock.InOrder( historyIterator.EXPECT().HasNext().Return(true), - historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted("")), + historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "")), ) historyArchiver := s.newTestHistoryArchiver(historyIterator) diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index 80fffbc9b63..deb31a57b7d 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -28,6 +28,7 @@ import ( "fmt" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/config" @@ -95,7 +96,8 @@ var defaultErrors = []FaultWeight{ }, { errFactory: func(msg string) error { - return serviceerror.NewResourceExhausted(fmt.Sprintf("serviceerror.NewResourceExhausted: %s", msg)) + return serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT, + fmt.Sprintf("serviceerror.NewResourceExhausted: %s", msg)) }, weight: 1, }, diff --git a/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go b/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go index 3bd7468cd8a..9af11cb2170 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go @@ -29,6 +29,7 @@ import ( "fmt" "github.com/gocql/gocql" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/persistence" @@ -52,8 +53,9 @@ func ConvertError( case *gocql.RequestErrWriteTimeout: return &persistence.TimeoutError{Msg: fmt.Sprintf("operation %v encountered %v", operation, err.Error())} case gocql.RequestError: - if v.Code() == 0x1001 { - return serviceerror.NewResourceExhausted(fmt.Sprintf("operation %v encountered %v", operation, err.Error())) + if v.Code() == gocql.ErrCodeOverloaded { + return serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT, + fmt.Sprintf("operation %v encountered %v", operation, err.Error())) } return serviceerror.NewUnavailable(fmt.Sprintf("operation %v encountered %v", operation, err.Error())) default: diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index 4c9b93606d0..2e9945ca5e5 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -26,6 +26,7 @@ package persistence import ( commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/log" @@ -34,7 +35,7 @@ import ( var ( // ErrPersistenceLimitExceeded is the error indicating QPS limit reached. - ErrPersistenceLimitExceeded = serviceerror.NewResourceExhausted("Persistence Max QPS Reached.") + ErrPersistenceLimitExceeded = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT, "Persistence Max QPS Reached.") ) type ( diff --git a/common/rpc/interceptor/namespace_count_limit.go b/common/rpc/interceptor/namespace_count_limit.go index 2606806a276..664de66ec52 100644 --- a/common/rpc/interceptor/namespace_count_limit.go +++ b/common/rpc/interceptor/namespace_count_limit.go @@ -29,6 +29,7 @@ import ( "sync" "sync/atomic" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "google.golang.org/grpc" @@ -38,7 +39,7 @@ import ( ) var ( - ErrNamespaceCountLimitServerBusy = serviceerror.NewResourceExhausted("namespace count limit exceeded") + ErrNamespaceCountLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_POLLER_LIMIT, "namespace concurrent poller limit exceeded") ) type ( diff --git a/common/rpc/interceptor/namespace_rate_limit.go b/common/rpc/interceptor/namespace_rate_limit.go index 80708b175bc..02ec3ae756a 100644 --- a/common/rpc/interceptor/namespace_rate_limit.go +++ b/common/rpc/interceptor/namespace_rate_limit.go @@ -28,6 +28,7 @@ import ( "context" "time" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "google.golang.org/grpc" @@ -40,7 +41,7 @@ const ( ) var ( - ErrNamespaceRateLimitServerBusy = serviceerror.NewResourceExhausted("namespace rate limit exceeded") + ErrNamespaceRateLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "namespace rate limit exceeded") ) type ( diff --git a/common/rpc/interceptor/rate_limit.go b/common/rpc/interceptor/rate_limit.go index 8b755db392f..e8ef174d3e5 100644 --- a/common/rpc/interceptor/rate_limit.go +++ b/common/rpc/interceptor/rate_limit.go @@ -28,6 +28,7 @@ import ( "context" "time" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "google.golang.org/grpc" @@ -39,7 +40,7 @@ const ( ) var ( - RateLimitServerBusy = serviceerror.NewResourceExhausted("service rate limit exceeded") + RateLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "service rate limit exceeded") ) type ( diff --git a/go.mod b/go.mod index f2d679aa45d..197c49884b3 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/otel/sdk/export/metric v0.25.0 go.opentelemetry.io/otel/sdk/metric v0.25.0 - go.temporal.io/api v1.7.1-0.20220126215723-f2aa2e2ad71d + go.temporal.io/api v1.7.1-0.20220127031327-912a8000a4fa go.temporal.io/sdk v1.12.0 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.9.0 diff --git a/go.sum b/go.sum index af7b82ad85b..de98b90ad83 100644 --- a/go.sum +++ b/go.sum @@ -461,8 +461,8 @@ go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuN go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA= -go.temporal.io/api v1.7.1-0.20220126215723-f2aa2e2ad71d h1:uqiiUinsGlZ7R6M74B8Md2XXbjkoinQdhbFrY7coh1o= -go.temporal.io/api v1.7.1-0.20220126215723-f2aa2e2ad71d/go.mod h1:Qy3l0Bw9C1RcToB+kfsI0lkrZYbDLgC9pzi6OYYJ/aE= +go.temporal.io/api v1.7.1-0.20220127031327-912a8000a4fa h1:ynXFCvhVcA+216z+4vbdrOXHyIx5UYz9iAAYWap3m38= +go.temporal.io/api v1.7.1-0.20220127031327-912a8000a4fa/go.mod h1:Qy3l0Bw9C1RcToB+kfsI0lkrZYbDLgC9pzi6OYYJ/aE= go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= diff --git a/host/sizelimit_test.go b/host/sizelimit_test.go index 98c51f247c6..0331fd4dc6a 100644 --- a/host/sizelimit_test.go +++ b/host/sizelimit_test.go @@ -197,10 +197,9 @@ SignalLoop: } } // Signalling workflow should result in force terminating the workflow execution and returns with ResourceExhausted - // error. ResourceExhausted is retried by the client and eventually results in NotFound error returned back to the - // caller as workflow execution is no longer running. - s.EqualError(signalErr, "workflow execution already completed") - s.IsType(&serviceerror.NotFound{}, signalErr) + // error. InvalidArgument is returned by the client. + s.EqualError(signalErr, "Workflow history size / count exceeds limit.") + s.IsType(&serviceerror.InvalidArgument{}, signalErr) s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{ WorkflowId: id, diff --git a/service/history/consts/const.go b/service/history/consts/const.go index c79ac50d5bf..242f585aa63 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -63,7 +63,7 @@ var ( // ErrDeserializingToken is the error to indicate task token is invalid ErrDeserializingToken = serviceerror.NewInvalidArgument("error deserializing task token") // ErrSignalsLimitExceeded is the error indicating limit reached for maximum number of signal events - ErrSignalsLimitExceeded = serviceerror.NewResourceExhausted("exceeded workflow execution limit for signal events") + ErrSignalsLimitExceeded = serviceerror.NewInvalidArgument("exceeded workflow execution limit for signal events") // ErrEventsAterWorkflowFinish is the error indicating server error trying to write events after workflow finish event ErrEventsAterWorkflowFinish = serviceerror.NewInternal("error validating last event being workflow finish event") // ErrQueryEnteredInvalidState is error indicating query entered invalid state @@ -73,7 +73,7 @@ var ( // ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0 ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encountered empty history batch") // ErrSizeExceedsLimit is error indicating workflow execution has exceeded system defined limit - ErrSizeExceedsLimit = serviceerror.NewResourceExhausted(common.FailureReasonSizeExceedsLimit) + ErrSizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonSizeExceedsLimit) // ErrUnknownCluster is error indicating unknown cluster ErrUnknownCluster = serviceerror.NewInvalidArgument("unknown cluster") // ErrBufferedQueryCleared is error indicating mutable state is cleared while buffered query is pending diff --git a/service/matching/handler.go b/service/matching/handler.go index 41859acca00..c32ee9181a1 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -64,8 +64,6 @@ const ( var ( _ matchingservice.MatchingServiceServer = (*Handler)(nil) - - errMatchingHostThrottle = serviceerror.NewResourceExhausted("Matching host RPS exceeded.") ) // NewHandler creates a gRPC handler for the matchingservice diff --git a/service/matching/matcher_test.go b/service/matching/matcher_test.go index c911d75b69e..b0147b51048 100644 --- a/service/matching/matcher_test.go +++ b/service/matching/matcher_test.go @@ -36,6 +36,7 @@ import ( "github.com/stretchr/testify/suite" enumspb "go.temporal.io/api/enums/v1" querypb "go.temporal.io/api/query/v1" + "go.temporal.io/api/serviceerror" "go.uber.org/atomic" enumsspb "go.temporal.io/server/api/enums/v1" @@ -48,6 +49,9 @@ import ( "go.temporal.io/server/common/payloads" ) + +var errMatchingHostThrottleTest = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "Matching host RPS exceeded.") + type MatcherTestSuite struct { suite.Suite controller *gomock.Controller @@ -199,7 +203,7 @@ func (t *MatcherTestSuite) TestSyncMatchFailure() { func(arg0 context.Context, arg1 *matchingservice.AddWorkflowTaskRequest, arg2 ...interface{}) { req = arg1 }, - ).Return(&matchingservice.AddWorkflowTaskResponse{}, errMatchingHostThrottle) + ).Return(&matchingservice.AddWorkflowTaskResponse{}, errMatchingHostThrottleTest) syncMatch, err := t.matcher.Offer(ctx, task) cancel() @@ -322,7 +326,7 @@ func (t *MatcherTestSuite) TestQueryRemoteSyncMatchError() { close(pollSigC) time.Sleep(10 * time.Millisecond) }, - ).Return(nil, errMatchingHostThrottle) + ).Return(nil, errMatchingHostThrottleTest) result, err := t.matcher.OfferQuery(ctx, task) cancel() @@ -400,7 +404,7 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() { var err error var remoteSyncMatch bool var req *matchingservice.AddWorkflowTaskRequest - t.client.EXPECT().AddWorkflowTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&matchingservice.AddWorkflowTaskResponse{}, errMatchingHostThrottle) + t.client.EXPECT().AddWorkflowTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&matchingservice.AddWorkflowTaskResponse{}, errMatchingHostThrottleTest) t.client.EXPECT().AddWorkflowTask(gomock.Any(), gomock.Any(), gomock.Any()).Do( func(arg0 context.Context, arg1 *matchingservice.AddWorkflowTaskRequest, arg2 ...interface{}) { req = arg1 diff --git a/service/matching/taskWriter.go b/service/matching/taskWriter.go index 2d5597b6194..eb2ed23e538 100644 --- a/service/matching/taskWriter.go +++ b/service/matching/taskWriter.go @@ -166,7 +166,7 @@ func (w *taskWriter) appendTask( return nil, errShutdown } default: // channel is full, throttle - return nil, serviceerror.NewResourceExhausted("Too many outstanding appends to the TaskQueue") + return nil, serviceerror.NewUnavailable("Too many outstanding appends to the TaskQueue") } } From 8cb82bd2c7466b64f5ca76e01238f10d07c29628 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Wed, 26 Jan 2022 19:30:18 -0800 Subject: [PATCH 2/5] add resource_exhausted_cause metrics tag --- common/metrics/tags.go | 6 +++ .../visibility/visiblity_manager_metrics.go | 4 +- common/rpc/interceptor/telemetry.go | 2 +- service/frontend/adminHandler.go | 4 +- service/history/replicationTaskProcessor.go | 37 ++++++++++--------- service/matching/matcher_test.go | 1 - service/matching/taskWriter.go | 5 ++- service/worker/archiver/client.go | 3 +- 8 files changed, 37 insertions(+), 25 deletions(-) diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 4dafb262e57..b96710ed654 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -26,6 +26,8 @@ package metrics import ( "strconv" + + enumspb "go.temporal.io/api/enums/v1" ) const ( @@ -197,6 +199,10 @@ func VisibilityTypeTag(value string) Tag { return &tagImpl{key: visibilityTypeTagName, value: value} } +func ResourceExhaustedCauseTag(cause enumspb.ResourceExhaustedCause) Tag { + return &tagImpl{key: "resource_exhausted_cause", value: cause.String()} +} + var standardVisibilityTypeTag = VisibilityTypeTag(standardVisibilityTagValue) var advancedVisibilityTypeTag = VisibilityTypeTag(advancedVisibilityTagValue) diff --git a/common/persistence/visibility/visiblity_manager_metrics.go b/common/persistence/visibility/visiblity_manager_metrics.go index 98bf5772b7f..91392a601e8 100644 --- a/common/persistence/visibility/visiblity_manager_metrics.go +++ b/common/persistence/visibility/visiblity_manager_metrics.go @@ -25,6 +25,7 @@ package visibility import ( + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -180,7 +181,8 @@ func (m *visibilityManagerMetrics) updateErrorMetric(scope metrics.Scope, err er scope.IncCounter(metrics.VisibilityPersistenceTimeout) scope.IncCounter(metrics.VisibilityPersistenceFailures) case *serviceerror.ResourceExhausted: - scope.IncCounter(metrics.VisibilityPersistenceResourceExhausted) + scope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT)). + IncCounter(metrics.VisibilityPersistenceResourceExhausted) scope.IncCounter(metrics.VisibilityPersistenceFailures) case *serviceerror.Internal: scope.IncCounter(metrics.VisibilityPersistenceInternal) diff --git a/common/rpc/interceptor/telemetry.go b/common/rpc/interceptor/telemetry.go index d066f22f8cd..4ef8c169fc6 100644 --- a/common/rpc/interceptor/telemetry.go +++ b/common/rpc/interceptor/telemetry.go @@ -170,7 +170,7 @@ func (ti *TelemetryInterceptor) handleError( case *serviceerror.NotFound: scope.IncCounter(metrics.ServiceErrNotFoundCounter) case *serviceerror.ResourceExhausted: - scope.IncCounter(metrics.ServiceErrResourceExhaustedCounter) + scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter) case *serviceerrors.RetryReplication: scope.IncCounter(metrics.ServiceErrRetryTaskCounter) case *serviceerror.NamespaceAlreadyExists: diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 8fe6a6764b9..3cf341048e0 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -1704,7 +1704,7 @@ func (adh *AdminHandler) startRequestProfile(scope int) (metrics.Scope, metrics. } func (adh *AdminHandler) error(err error, scope metrics.Scope) error { - switch err.(type) { + switch err := err.(type) { case *serviceerror.Unavailable: adh.logger.Error("unavailable error", tag.Error(err)) scope.IncCounter(metrics.ServiceFailures) @@ -1713,7 +1713,7 @@ func (adh *AdminHandler) error(err error, scope metrics.Scope) error { scope.IncCounter(metrics.ServiceErrInvalidArgumentCounter) return err case *serviceerror.ResourceExhausted: - scope.IncCounter(metrics.ServiceErrResourceExhaustedCounter) + scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter) return err case *serviceerror.NotFound: return err diff --git a/service/history/replicationTaskProcessor.go b/service/history/replicationTaskProcessor.go index fd068a1edbb..55dda181ea5 100644 --- a/service/history/replicationTaskProcessor.go +++ b/service/history/replicationTaskProcessor.go @@ -514,38 +514,39 @@ func (p *ReplicationTaskProcessorImpl) cleanupReplicationTasks() error { } func (p *ReplicationTaskProcessorImpl) emitTaskMetrics(scope int, err error) { + metricsScope := p.metricsClient.Scope(scope) if common.IsContextDeadlineExceededErr(err) || common.IsContextCanceledErr(err) { - p.metricsClient.IncCounter(scope, metrics.ServiceErrContextTimeoutCounter) + metricsScope.IncCounter(metrics.ServiceErrContextTimeoutCounter) return } // Also update counter to distinguish between type of failures - switch err.(type) { + switch err := err.(type) { case nil: - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksApplied) + metricsScope.IncCounter(metrics.ReplicationTasksApplied) case *serviceerrors.ShardOwnershipLost: - p.metricsClient.IncCounter(scope, metrics.ServiceErrShardOwnershipLostCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrShardOwnershipLostCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.InvalidArgument: - p.metricsClient.IncCounter(scope, metrics.ServiceErrInvalidArgumentCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrInvalidArgumentCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.NamespaceNotActive: - p.metricsClient.IncCounter(scope, metrics.ServiceErrNamespaceNotActiveCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrNamespaceNotActiveCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.WorkflowExecutionAlreadyStarted: - p.metricsClient.IncCounter(scope, metrics.ServiceErrExecutionAlreadyStartedCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrExecutionAlreadyStartedCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.NotFound: - p.metricsClient.IncCounter(scope, metrics.ServiceErrNotFoundCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrNotFoundCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerror.ResourceExhausted: - p.metricsClient.IncCounter(scope, metrics.ServiceErrResourceExhaustedCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) case *serviceerrors.RetryReplication: - p.metricsClient.IncCounter(scope, metrics.ServiceErrRetryTaskCounter) - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ServiceErrRetryTaskCounter) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) default: - p.metricsClient.IncCounter(scope, metrics.ReplicationTasksFailed) + metricsScope.IncCounter(metrics.ReplicationTasksFailed) } } diff --git a/service/matching/matcher_test.go b/service/matching/matcher_test.go index b0147b51048..9fb30c2a47e 100644 --- a/service/matching/matcher_test.go +++ b/service/matching/matcher_test.go @@ -49,7 +49,6 @@ import ( "go.temporal.io/server/common/payloads" ) - var errMatchingHostThrottleTest = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "Matching host RPS exceeded.") type MatcherTestSuite struct { diff --git a/service/matching/taskWriter.go b/service/matching/taskWriter.go index eb2ed23e538..733c6a5adc2 100644 --- a/service/matching/taskWriter.go +++ b/service/matching/taskWriter.go @@ -31,6 +31,7 @@ import ( "time" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -166,7 +167,9 @@ func (w *taskWriter) appendTask( return nil, errShutdown } default: // channel is full, throttle - return nil, serviceerror.NewUnavailable("Too many outstanding appends to the TaskQueue") + return nil, serviceerror.NewResourceExhausted( + enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, + "Too many outstanding appends to the TaskQueue") } } diff --git a/service/worker/archiver/client.go b/service/worker/archiver/client.go index 1e6e8d08b53..08e306824b9 100644 --- a/service/worker/archiver/client.go +++ b/service/worker/archiver/client.go @@ -278,7 +278,8 @@ func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest, c.metricsScope.IncCounter(metrics.ArchiverClientSendSignalCount) if ok := c.rateLimiter.Allow(); !ok { c.logger.Error(tooManyRequestsErrMsg) - c.metricsScope.IncCounter(metrics.ServiceErrResourceExhaustedCounter) + c.metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT)). + IncCounter(metrics.ServiceErrResourceExhaustedCounter) return errors.New(tooManyRequestsErrMsg) } From e285721d208560775dd1808ddfcf89261aa15d88 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Thu, 27 Jan 2022 13:39:05 -0800 Subject: [PATCH 3/5] update resource exhausted cause names --- common/archiver/filestore/historyArchiver_test.go | 2 +- common/archiver/gcloud/historyArchiver_test.go | 2 +- common/archiver/s3store/historyArchiver_test.go | 2 +- common/metrics/defs.go | 1 + common/metrics/tags.go | 8 ++++---- common/persistence/client/fault_injection.go | 2 +- .../nosql/nosqlplugin/cassandra/gocql/errors.go | 2 +- common/persistence/persistenceRateLimitedClients.go | 2 +- common/rpc/interceptor/namespace_count_limit.go | 2 +- common/rpc/interceptor/namespace_rate_limit.go | 2 +- common/rpc/interceptor/rate_limit.go | 2 +- go.mod | 4 ++-- go.sum | 8 ++++---- service/matching/matcher_test.go | 2 +- service/matching/taskWriter.go | 4 ++-- 15 files changed, 23 insertions(+), 22 deletions(-) diff --git a/common/archiver/filestore/historyArchiver_test.go b/common/archiver/filestore/historyArchiver_test.go index 54d139ffc86..4b999277904 100644 --- a/common/archiver/filestore/historyArchiver_test.go +++ b/common/archiver/filestore/historyArchiver_test.go @@ -189,7 +189,7 @@ func (s *historyArchiverSuite) TestArchive_Fail_TimeoutWhenReadingHistory() { historyIterator := archiver.NewMockHistoryIterator(mockCtrl) gomock.InOrder( historyIterator.EXPECT().HasNext().Return(true), - historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "")), + historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "")), ) historyArchiver := s.newTestHistoryArchiver(historyIterator) diff --git a/common/archiver/gcloud/historyArchiver_test.go b/common/archiver/gcloud/historyArchiver_test.go index 7eccb604805..8b05153ff98 100644 --- a/common/archiver/gcloud/historyArchiver_test.go +++ b/common/archiver/gcloud/historyArchiver_test.go @@ -222,7 +222,7 @@ func (h *historyArchiverSuite) TestArchive_Fail_TimeoutWhenReadingHistory() { historyIterator := archiver.NewMockHistoryIterator(h.controller) gomock.InOrder( historyIterator.EXPECT().HasNext().Return(true), - historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "")), + historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "")), ) historyArchiver := newHistoryArchiver(h.container, historyIterator, storageWrapper) diff --git a/common/archiver/s3store/historyArchiver_test.go b/common/archiver/s3store/historyArchiver_test.go index 2d7cfcfe5c8..f5b8bb6ec3e 100644 --- a/common/archiver/s3store/historyArchiver_test.go +++ b/common/archiver/s3store/historyArchiver_test.go @@ -328,7 +328,7 @@ func (s *historyArchiverSuite) TestArchive_Fail_TimeoutWhenReadingHistory() { historyIterator := archiver.NewMockHistoryIterator(mockCtrl) gomock.InOrder( historyIterator.EXPECT().HasNext().Return(true), - historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "")), + historyIterator.EXPECT().Next().Return(nil, serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "")), ) historyArchiver := s.newTestHistoryArchiver(historyIterator) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index fe9300680fd..fe6ef29105d 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -101,6 +101,7 @@ const ( QueueTypeTagName = "queue_type" visibilityTypeTagName = "visibility_type" httpStatusTagName = "http_status" + resourceExhaustedTag = "resource_exhausted_cause" ) // This package should hold all the metrics and tags for temporal diff --git a/common/metrics/tags.go b/common/metrics/tags.go index b96710ed654..30ad369093f 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -199,10 +199,6 @@ func VisibilityTypeTag(value string) Tag { return &tagImpl{key: visibilityTypeTagName, value: value} } -func ResourceExhaustedCauseTag(cause enumspb.ResourceExhaustedCause) Tag { - return &tagImpl{key: "resource_exhausted_cause", value: cause.String()} -} - var standardVisibilityTypeTag = VisibilityTypeTag(standardVisibilityTagValue) var advancedVisibilityTypeTag = VisibilityTypeTag(advancedVisibilityTagValue) @@ -218,3 +214,7 @@ func AdvancedVisibilityTypeTag() Tag { func HttpStatusTag(value int) Tag { return &tagImpl{key: httpStatusTagName, value: strconv.Itoa(value)} } + +func ResourceExhaustedCauseTag(cause enumspb.ResourceExhaustedCause) Tag { + return &tagImpl{key: resourceExhaustedTag, value: cause.String()} +} diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index deb31a57b7d..7f144dff98e 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -96,7 +96,7 @@ var defaultErrors = []FaultWeight{ }, { errFactory: func(msg string) error { - return serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT, + return serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SYSTEM_OVERLOADED, fmt.Sprintf("serviceerror.NewResourceExhausted: %s", msg)) }, weight: 1, diff --git a/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go b/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go index 9af11cb2170..0ca7751a923 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go @@ -54,7 +54,7 @@ func ConvertError( return &persistence.TimeoutError{Msg: fmt.Sprintf("operation %v encountered %v", operation, err.Error())} case gocql.RequestError: if v.Code() == gocql.ErrCodeOverloaded { - return serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT, + return serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SYSTEM_OVERLOADED, fmt.Sprintf("operation %v encountered %v", operation, err.Error())) } return serviceerror.NewUnavailable(fmt.Sprintf("operation %v encountered %v", operation, err.Error())) diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index 2e9945ca5e5..7599b24e513 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -35,7 +35,7 @@ import ( var ( // ErrPersistenceLimitExceeded is the error indicating QPS limit reached. - ErrPersistenceLimitExceeded = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT, "Persistence Max QPS Reached.") + ErrPersistenceLimitExceeded = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SYSTEM_OVERLOADED, "Persistence Max QPS Reached.") ) type ( diff --git a/common/rpc/interceptor/namespace_count_limit.go b/common/rpc/interceptor/namespace_count_limit.go index 664de66ec52..e0307f5f431 100644 --- a/common/rpc/interceptor/namespace_count_limit.go +++ b/common/rpc/interceptor/namespace_count_limit.go @@ -39,7 +39,7 @@ import ( ) var ( - ErrNamespaceCountLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_POLLER_LIMIT, "namespace concurrent poller limit exceeded") + ErrNamespaceCountLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_LIMIT, "namespace concurrent poller limit exceeded") ) type ( diff --git a/common/rpc/interceptor/namespace_rate_limit.go b/common/rpc/interceptor/namespace_rate_limit.go index 02ec3ae756a..6ee3f0e0ea7 100644 --- a/common/rpc/interceptor/namespace_rate_limit.go +++ b/common/rpc/interceptor/namespace_rate_limit.go @@ -41,7 +41,7 @@ const ( ) var ( - ErrNamespaceRateLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "namespace rate limit exceeded") + ErrNamespaceRateLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "namespace rate limit exceeded") ) type ( diff --git a/common/rpc/interceptor/rate_limit.go b/common/rpc/interceptor/rate_limit.go index e8ef174d3e5..052b9146d5d 100644 --- a/common/rpc/interceptor/rate_limit.go +++ b/common/rpc/interceptor/rate_limit.go @@ -40,7 +40,7 @@ const ( ) var ( - RateLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "service rate limit exceeded") + RateLimitServerBusy = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "service rate limit exceeded") ) type ( diff --git a/go.mod b/go.mod index 197c49884b3..572aad11913 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/otel/sdk/export/metric v0.25.0 go.opentelemetry.io/otel/sdk/metric v0.25.0 - go.temporal.io/api v1.7.1-0.20220127031327-912a8000a4fa + go.temporal.io/api v1.7.1-0.20220127205414-5eb92fdbe9b1 go.temporal.io/sdk v1.12.0 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.9.0 @@ -90,7 +90,7 @@ require ( go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/otel/internal/metric v0.25.0 // indirect go.opentelemetry.io/otel/trace v1.2.0 // indirect - golang.org/x/net v0.0.0-20220121210141-e204ce36a2ba // indirect + golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/go.sum b/go.sum index de98b90ad83..1c6bee00acd 100644 --- a/go.sum +++ b/go.sum @@ -461,8 +461,8 @@ go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuN go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA= -go.temporal.io/api v1.7.1-0.20220127031327-912a8000a4fa h1:ynXFCvhVcA+216z+4vbdrOXHyIx5UYz9iAAYWap3m38= -go.temporal.io/api v1.7.1-0.20220127031327-912a8000a4fa/go.mod h1:Qy3l0Bw9C1RcToB+kfsI0lkrZYbDLgC9pzi6OYYJ/aE= +go.temporal.io/api v1.7.1-0.20220127205414-5eb92fdbe9b1 h1:IgE1LVlgBwT5/GJmNmiTzjUxZqGRljTfzo/ubGj2MYc= +go.temporal.io/api v1.7.1-0.20220127205414-5eb92fdbe9b1/go.mod h1:sKL6Cu/CbleQ+tjl2/GnMAIk/yvVbOnpgzAlV6WyEso= go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= @@ -580,8 +580,8 @@ golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220121210141-e204ce36a2ba h1:6u6sik+bn/y7vILcYkK3iwTBWN7WtBvB0+SZswQnbf8= -golang.org/x/net v0.0.0-20220121210141-e204ce36a2ba/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/service/matching/matcher_test.go b/service/matching/matcher_test.go index 9fb30c2a47e..e088eb9b04d 100644 --- a/service/matching/matcher_test.go +++ b/service/matching/matcher_test.go @@ -49,7 +49,7 @@ import ( "go.temporal.io/server/common/payloads" ) -var errMatchingHostThrottleTest = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, "Matching host RPS exceeded.") +var errMatchingHostThrottleTest = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "Matching host RPS exceeded.") type MatcherTestSuite struct { suite.Suite diff --git a/service/matching/taskWriter.go b/service/matching/taskWriter.go index 733c6a5adc2..bbf7394f877 100644 --- a/service/matching/taskWriter.go +++ b/service/matching/taskWriter.go @@ -168,8 +168,8 @@ func (w *taskWriter) appendTask( } default: // channel is full, throttle return nil, serviceerror.NewResourceExhausted( - enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT, - "Too many outstanding appends to the TaskQueue") + enumspb.RESOURCE_EXHAUSTED_CAUSE_SYSTEM_OVERLOADED, + "Too many outstanding appends to the task queue") } } From 25adfb369f414059fef181ff2002912304355fc7 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Thu, 27 Jan 2022 13:43:59 -0800 Subject: [PATCH 4/5] rename --- common/persistence/visibility/visiblity_manager_metrics.go | 2 +- service/worker/archiver/client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/persistence/visibility/visiblity_manager_metrics.go b/common/persistence/visibility/visiblity_manager_metrics.go index 91392a601e8..e6ec503135b 100644 --- a/common/persistence/visibility/visiblity_manager_metrics.go +++ b/common/persistence/visibility/visiblity_manager_metrics.go @@ -181,7 +181,7 @@ func (m *visibilityManagerMetrics) updateErrorMetric(scope metrics.Scope, err er scope.IncCounter(metrics.VisibilityPersistenceTimeout) scope.IncCounter(metrics.VisibilityPersistenceFailures) case *serviceerror.ResourceExhausted: - scope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_PERSISTENCE_RATE_LIMIT)). + scope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_SYSTEM_OVERLOADED)). IncCounter(metrics.VisibilityPersistenceResourceExhausted) scope.IncCounter(metrics.VisibilityPersistenceFailures) case *serviceerror.Internal: diff --git a/service/worker/archiver/client.go b/service/worker/archiver/client.go index 08e306824b9..aa9347b6714 100644 --- a/service/worker/archiver/client.go +++ b/service/worker/archiver/client.go @@ -278,7 +278,7 @@ func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest, c.metricsScope.IncCounter(metrics.ArchiverClientSendSignalCount) if ok := c.rateLimiter.Allow(); !ok { c.logger.Error(tooManyRequestsErrMsg) - c.metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_SERVICE_RATE_LIMIT)). + c.metricsScope.Tagged(metrics.ResourceExhaustedCauseTag(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT)). IncCounter(metrics.ServiceErrResourceExhaustedCounter) return errors.New(tooManyRequestsErrMsg) } From d0f1c23ac1b7454a646f525e729af5a9381a88fb Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Thu, 27 Jan 2022 13:50:45 -0800 Subject: [PATCH 5/5] update api version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 572aad11913..f5224ec8ddc 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/otel/sdk/export/metric v0.25.0 go.opentelemetry.io/otel/sdk/metric v0.25.0 - go.temporal.io/api v1.7.1-0.20220127205414-5eb92fdbe9b1 + go.temporal.io/api v1.7.1-0.20220127213442-107e361dceeb go.temporal.io/sdk v1.12.0 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.9.0 diff --git a/go.sum b/go.sum index 1c6bee00acd..db057c0470f 100644 --- a/go.sum +++ b/go.sum @@ -461,8 +461,8 @@ go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuN go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA= -go.temporal.io/api v1.7.1-0.20220127205414-5eb92fdbe9b1 h1:IgE1LVlgBwT5/GJmNmiTzjUxZqGRljTfzo/ubGj2MYc= -go.temporal.io/api v1.7.1-0.20220127205414-5eb92fdbe9b1/go.mod h1:sKL6Cu/CbleQ+tjl2/GnMAIk/yvVbOnpgzAlV6WyEso= +go.temporal.io/api v1.7.1-0.20220127213442-107e361dceeb h1:snXrLlZzZxtPBAdEg4h29IJzjvsZNbMHTsQL+MK2TCU= +go.temporal.io/api v1.7.1-0.20220127213442-107e361dceeb/go.mod h1:sKL6Cu/CbleQ+tjl2/GnMAIk/yvVbOnpgzAlV6WyEso= go.temporal.io/sdk v1.12.0 h1:QkqOpmgXVnHHCFP9HbSbyrF3jYgLBKY/3NdZyR7e5nQ= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=