Skip to content

Commit

Permalink
Revert "Treat serviceerror.Internal as non-retryable" (#6729)
Browse files Browse the repository at this point in the history
Reverts #6622
  • Loading branch information
alexshtin authored Oct 30, 2024
1 parent 57ee0ec commit 571b0ba
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 4 deletions.
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,12 @@ Do not turn this on if you aren't using Cassandra as the history task DLQ is not
70, // 70 attempts takes about an hour
`HistoryTaskDLQUnexpectedErrorAttempts is the number of task execution attempts before sending the task to DLQ.`,
)
HistoryTaskDLQInternalErrors = NewGlobalBoolSetting(
"history.TaskDLQInternalErrors",
false,
`HistoryTaskDLQInternalErrors causes history task processing to send tasks failing with serviceerror.Internal to
the dlq (or will drop them if not enabled)`,
)
HistoryTaskDLQErrorPattern = NewGlobalStringSetting(
"history.TaskDLQErrorPattern",
"",
Expand Down
3 changes: 2 additions & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ func IsServiceHandlerRetryableError(err error) bool {
}

switch err.(type) {
case *serviceerror.Unavailable:
case *serviceerror.Internal,
*serviceerror.Unavailable:
return true
}

Expand Down
1 change: 1 addition & 0 deletions service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
f.DLQWriter,
f.Config.TaskDLQEnabled,
f.Config.TaskDLQUnexpectedErrorAttempts,
f.Config.TaskDLQInternalErrors,
f.Config.TaskDLQErrorPattern,
)
return queues.NewScheduledQueue(
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type Config struct {

TaskDLQEnabled dynamicconfig.BoolPropertyFn
TaskDLQUnexpectedErrorAttempts dynamicconfig.IntPropertyFn
TaskDLQInternalErrors dynamicconfig.BoolPropertyFn
TaskDLQErrorPattern dynamicconfig.StringPropertyFn

TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -456,6 +457,7 @@ func NewConfig(

TaskDLQEnabled: dynamicconfig.HistoryTaskDLQEnabled.Get(dc),
TaskDLQUnexpectedErrorAttempts: dynamicconfig.HistoryTaskDLQUnexpectedErrorAttempts.Get(dc),
TaskDLQInternalErrors: dynamicconfig.HistoryTaskDLQInternalErrors.Get(dc),
TaskDLQErrorPattern: dynamicconfig.HistoryTaskDLQErrorPattern.Get(dc),

TaskSchedulerEnableRateLimiter: dynamicconfig.TaskSchedulerEnableRateLimiter.Get(dc),
Expand Down
1 change: 1 addition & 0 deletions service/history/outbound_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func (f *outboundQueueFactory) CreateQueue(
f.DLQWriter,
f.Config.TaskDLQEnabled,
f.Config.TaskDLQUnexpectedErrorAttempts,
f.Config.TaskDLQInternalErrors,
f.Config.TaskDLQErrorPattern,
)
return queues.NewImmediateQueue(
Expand Down
12 changes: 10 additions & 2 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,14 @@ type (
terminalFailureCause error
unexpectedErrorAttempts int
maxUnexpectedErrorAttempts dynamicconfig.IntPropertyFn
dlqInternalErrors dynamicconfig.BoolPropertyFn
dlqErrorPattern dynamicconfig.StringPropertyFn
}
ExecutableParams struct {
DLQEnabled dynamicconfig.BoolPropertyFn
DLQWriter *DLQWriter
MaxUnexpectedErrorAttempts dynamicconfig.IntPropertyFn
DLQInternalErrors dynamicconfig.BoolPropertyFn
DLQErrorPattern dynamicconfig.StringPropertyFn
}
ExecutableOption func(*ExecutableParams)
Expand Down Expand Up @@ -204,6 +206,9 @@ func NewExecutable(
MaxUnexpectedErrorAttempts: func() int {
return math.MaxInt
},
DLQInternalErrors: func() bool {
return false
},
DLQErrorPattern: func() string {
return ""
},
Expand Down Expand Up @@ -235,6 +240,7 @@ func NewExecutable(
dlqWriter: params.DLQWriter,
dlqEnabled: params.DLQEnabled,
maxUnexpectedErrorAttempts: params.MaxUnexpectedErrorAttempts,
dlqInternalErrors: params.DLQInternalErrors,
dlqErrorPattern: params.DLQErrorPattern,
}
executable.updatePriority()
Expand Down Expand Up @@ -446,14 +452,14 @@ func (e *executableImpl) isUnexpectedNonRetryableError(err error) bool {
}

if _, isDataLoss := err.(*serviceerror.DataLoss); isDataLoss {
metrics.TaskCorruptionCounter.With(e.taggedMetricsHandler).Record(1)
return true
}

isInternalError := common.IsInternalError(err)
if isInternalError {
metrics.TaskInternalErrorCounter.With(e.taggedMetricsHandler).Record(1)
return true
// Only DQL/drop when configured to
return e.dlqInternalErrors()
}

return false
Expand Down Expand Up @@ -520,8 +526,10 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
e.logger.Warn("Fail to process task", tag.Error(err), tag.ErrorType(err), tag.UnexpectedErrorAttempts(int32(e.unexpectedErrorAttempts)), tag.LifeCycleProcessingFailed)

if e.isUnexpectedNonRetryableError(err) {
// Terminal errors are likely due to data corruption.
// Drop the task by returning nil so that task will be marked as completed,
// or send it to the DLQ if that is enabled.
metrics.TaskCorruptionCounter.With(e.taggedMetricsHandler).Record(1)
if e.dlqEnabled() {
// Keep this message in sync with the log line mentioned in Investigation section of docs/admin/dlq.md
e.logger.Error("Marking task as terminally failed, will send to DLQ", tag.Error(err), tag.ErrorType(err))
Expand Down
3 changes: 3 additions & 0 deletions service/history/queues/executable_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func NewExecutableFactory(
dlqWriter *DLQWriter,
dlqEnabled dynamicconfig.BoolPropertyFn,
attemptsBeforeSendingToDlq dynamicconfig.IntPropertyFn,
dlqInternalErrors dynamicconfig.BoolPropertyFn,
dlqErrorPattern dynamicconfig.StringPropertyFn,
) *executableFactoryImpl {
return &executableFactoryImpl{
Expand All @@ -92,6 +93,7 @@ func NewExecutableFactory(
dlqWriter: dlqWriter,
dlqEnabled: dlqEnabled,
attemptsBeforeSendingToDlq: attemptsBeforeSendingToDlq,
dlqInternalErrors: dlqInternalErrors,
dlqErrorPattern: dlqErrorPattern,
}
}
Expand All @@ -113,6 +115,7 @@ func (f *executableFactoryImpl) NewExecutable(task tasks.Task, readerID int64) E
params.DLQEnabled = f.dlqEnabled
params.DLQWriter = f.dlqWriter
params.MaxUnexpectedErrorAttempts = f.attemptsBeforeSendingToDlq
params.DLQInternalErrors = f.dlqInternalErrors
params.DLQErrorPattern = f.dlqErrorPattern
},
)
Expand Down
42 changes: 41 additions & 1 deletion service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type (
dlqEnabled dynamicconfig.BoolPropertyFn
priorityAssigner queues.PriorityAssigner
maxUnexpectedErrorAttempts dynamicconfig.IntPropertyFn
dlqInternalErrors dynamicconfig.BoolPropertyFn
dlqErrorPattern dynamicconfig.StringPropertyFn
}
option func(*params)
Expand Down Expand Up @@ -539,13 +540,16 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttemptsThenDisable() {
s.Empty(queueWriter.EnqueueTaskRequests)
}

func (s *executableSuite) TestExecute_SendsInternalErrorsToDLQ() {
func (s *executableSuite) TestExecute_SendsInternalErrorsToDLQ_WhenEnabled() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqEnabled = func() bool {
return true
}
p.dlqInternalErrors = func() bool {
return true
}
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(queues.ExecuteResponse{
Expand All @@ -560,6 +564,35 @@ func (s *executableSuite) TestExecute_SendsInternalErrorsToDLQ() {
s.Len(queueWriter.EnqueueTaskRequests, 1)
}

func (s *executableSuite) TestExecute_DoesntSendInternalErrorsToDLQ_WhenDisabled() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqEnabled = func() bool {
return true
}
p.dlqInternalErrors = func() bool {
return false
}
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(queues.ExecuteResponse{
ExecutionMetricTags: nil,
ExecutedAsActive: false,
ExecutionErr: serviceerror.NewInternal("injected error"),
}).Times(2)

// Attempt 1
err := executable.Execute()
s.Error(executable.HandleErr(err))

// Attempt 2
err = executable.Execute()
s.Error(err)
s.Error(executable.HandleErr(err))
s.Empty(queueWriter.EnqueueTaskRequests)
}

func (s *executableSuite) TestExecute_SendInternalErrorsToDLQ_ThenDisable() {
queueWriter := &queuestest.FakeQueueWriter{}
dlqEnabled := true
Expand All @@ -568,6 +601,9 @@ func (s *executableSuite) TestExecute_SendInternalErrorsToDLQ_ThenDisable() {
p.dlqEnabled = func() bool {
return dlqEnabled
}
p.dlqInternalErrors = func() bool {
return true
}
})

injectedErr := serviceerror.NewInternal("injected error")
Expand Down Expand Up @@ -1005,6 +1041,9 @@ func (s *executableSuite) newTestExecutable(opts ...option) queues.Executable {
dlqEnabled: func() bool {
return false
},
dlqInternalErrors: func() bool {
return false
},
priorityAssigner: queues.NewNoopPriorityAssigner(),
maxUnexpectedErrorAttempts: func() int {
return math.MaxInt
Expand Down Expand Up @@ -1040,6 +1079,7 @@ func (s *executableSuite) newTestExecutable(opts ...option) queues.Executable {
params.DLQEnabled = p.dlqEnabled
params.DLQWriter = p.dlqWriter
params.MaxUnexpectedErrorAttempts = p.maxUnexpectedErrorAttempts
params.DLQInternalErrors = p.dlqInternalErrors
params.DLQErrorPattern = p.dlqErrorPattern
},
)
Expand Down
3 changes: 3 additions & 0 deletions service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,9 @@ func (s *queueBaseSuite) newQueueBase(
func() int {
return math.MaxInt
},
func() bool {
return false
},
func() string {
return ""
},
Expand Down
3 changes: 3 additions & 0 deletions service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func (s *scheduledQueueSuite) SetupTest() {
func() int {
return math.MaxInt
},
func() bool {
return false
},
func() string {
return ""
},
Expand Down
1 change: 1 addition & 0 deletions service/history/timer_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (f *timerQueueFactory) CreateQueue(
f.DLQWriter,
f.Config.TaskDLQEnabled,
f.Config.TaskDLQUnexpectedErrorAttempts,
f.Config.TaskDLQInternalErrors,
f.Config.TaskDLQErrorPattern,
)
return queues.NewScheduledQueue(
Expand Down
1 change: 1 addition & 0 deletions service/history/transfer_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (f *transferQueueFactory) CreateQueue(
f.DLQWriter,
f.Config.TaskDLQEnabled,
f.Config.TaskDLQUnexpectedErrorAttempts,
f.Config.TaskDLQInternalErrors,
f.Config.TaskDLQErrorPattern,
)
return queues.NewImmediateQueue(
Expand Down
1 change: 1 addition & 0 deletions service/history/visibility_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (f *visibilityQueueFactory) CreateQueue(
f.DLQWriter,
f.Config.TaskDLQEnabled,
f.Config.TaskDLQUnexpectedErrorAttempts,
f.Config.TaskDLQInternalErrors,
f.Config.TaskDLQErrorPattern,
)
return queues.NewImmediateQueue(
Expand Down

0 comments on commit 571b0ba

Please sign in to comment.