diff --git a/internal/common/backoff/retry.go b/internal/common/backoff/retry.go index 9c46d624f..57ed2c3a5 100644 --- a/internal/common/backoff/retry.go +++ b/internal/common/backoff/retry.go @@ -58,6 +58,13 @@ func (c *ConcurrentRetrier) Throttle(doneCh <-chan struct{}) { c.throttleInternal(doneCh) } +// GetElapsedTime gets the amount of time since that last ConcurrentRetrier.Succeeded call +func (c *ConcurrentRetrier) GetElapsedTime() time.Duration { + c.Lock() + defer c.Unlock() + return c.retrier.GetElapsedTime() +} + func (c *ConcurrentRetrier) throttleInternal(doneCh <-chan struct{}) time.Duration { next := done diff --git a/internal/common/backoff/retrypolicy.go b/internal/common/backoff/retrypolicy.go index 133e9ec3f..dd4e7791c 100644 --- a/internal/common/backoff/retrypolicy.go +++ b/internal/common/backoff/retrypolicy.go @@ -45,6 +45,7 @@ type ( // Retrier manages the state of retry operation Retrier interface { + GetElapsedTime() time.Duration NextBackOff() time.Duration Reset() } @@ -195,13 +196,15 @@ func (r *retrierImpl) Reset() { // NextBackOff returns the next delay interval. This is used by Retry to delay calling the operation again func (r *retrierImpl) NextBackOff() time.Duration { - nextInterval := r.policy.ComputeNextDelay(r.getElapsedTime(), r.currentAttempt) + nextInterval := r.policy.ComputeNextDelay(r.GetElapsedTime(), r.currentAttempt) // Now increment the current attempt r.currentAttempt++ return nextInterval } -func (r *retrierImpl) getElapsedTime() time.Duration { +// GetElapsedTime returns the amount of time since the retrier was created or the last reset, +// whatever was sooner. +func (r *retrierImpl) GetElapsedTime() time.Duration { return r.clock.Now().Sub(r.startTime) } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 7fbb75a4b..00f6ec7f3 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -58,6 +58,7 @@ const ( var ( pollOperationRetryPolicy = createPollRetryPolicy() pollResourceExhaustedRetryPolicy = createPollResourceExhaustedRetryPolicy() + retryLongPollGracePeriod = 2 * time.Minute ) var errStop = errors.New("worker stopping") @@ -193,6 +194,17 @@ type ( } ) +// SetRetryLongPollGracePeriod sets the amount of time a long poller retrys on +// fatal errors before it actually fails. For test use only, +// not safe to call with a running worker. +func SetRetryLongPollGracePeriod(period time.Duration) { + retryLongPollGracePeriod = period +} + +func getRetryLongPollGracePeriod() time.Duration { + return retryLongPollGracePeriod +} + func createPollRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(retryPollOperationInitialInterval) policy.SetMaximumInterval(retryPollOperationMaxInterval) @@ -332,7 +344,9 @@ func (bw *baseWorker) pollTask() { task, err = bw.options.taskWorker.PollTask() bw.logPollTaskError(err) if err != nil { - if isNonRetriableError(err) { + // We retry "non retriable" errors while long polling for a while, because some proxies return + // unexpected values causing unnecessary downtime. + if isNonRetriableError(err) && bw.retrier.GetElapsedTime() > getRetryLongPollGracePeriod() { bw.logger.Error("Worker received non-retriable error. Shutting down.", tagError, err) if bw.fatalErrCb != nil { bw.fatalErrCb(err) diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 3364cd9e9..3331c9d37 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -142,7 +142,8 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { TaskToken: []byte("token"), WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: "wID", - RunId: "rID"}, + RunId: "rID", + }, ActivityType: &commonpb.ActivityType{Name: "test"}, ActivityId: uuid.New(), ScheduledTime: &now, @@ -224,7 +225,6 @@ func (s *WorkersTestSuite) TestLongRunningWorkflowTask() { } ctx = WithLocalActivityOptions(ctx, lao) err := ExecuteLocalActivity(ctx, localActivitySleep, time.Second).Get(ctx, nil) - if err != nil { return err } @@ -290,6 +290,7 @@ func (s *WorkersTestSuite) TestLongRunningWorkflowTask() { History: &historypb.History{Events: testEvents[0:3]}, NextPageToken: nil, } + s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInvalidArgument("")).Times(1) s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(task, nil).Times(1) s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInternal("")).AnyTimes() s.service.EXPECT().PollActivityTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollActivityTaskQueueResponse{}, nil).AnyTimes() @@ -362,7 +363,6 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { } ctx = WithLocalActivityOptions(ctx, lao) err := ExecuteLocalActivity(ctx, localActivitySleep, time.Second).Get(ctx, nil) - if err != nil { return err } diff --git a/test/integration_test.go b/test/integration_test.go index bf0d4756d..9705dfde7 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -62,6 +62,7 @@ import ( "go.temporal.io/sdk/client" contribtally "go.temporal.io/sdk/contrib/tally" "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/internal" "go.temporal.io/sdk/internal/common" "go.temporal.io/sdk/internal/common/metrics" "go.temporal.io/sdk/internal/interceptortest" @@ -2276,6 +2277,8 @@ func (ts *IntegrationTestSuite) TestWorkerFatalErrorOnStart() { } func (ts *IntegrationTestSuite) testWorkerFatalError(useWorkerRun bool) { + // Allow the worker to fail faster so the test does not take 2 minutes. + internal.SetRetryLongPollGracePeriod(5 * time.Second) // Make a new client that will fail a poll with a namespace not found c, err := client.Dial(client.Options{ HostPort: ts.config.ServiceAddr,