Skip to content

Commit

Permalink
Stop retrying event tracker on context cancellation and timeout (#1705)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergerad authored Jul 13, 2023
1 parent f17b6ee commit affd397
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 19 deletions.
18 changes: 14 additions & 4 deletions helper/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,27 @@ var (
errInvalidDuration = errors.New("invalid duration")
)

// RetryForever will execute a function until it completes without error
// RetryForever will execute a function until it completes without error or
// the context is cancelled or expired.
func RetryForever(ctx context.Context, interval time.Duration, fn func(context.Context) error) {
_ = retry.Do(ctx, retry.NewConstant(interval), func(context.Context) error {
if err := fn(ctx); err != nil {
return retry.RetryableError(err)
// Execute function and end retries if no error or context done
err := fn(ctx)
if err == nil || IsContextDone(err) {
return nil
}

return nil
// Retry on all other errors
return retry.RetryableError(err)
})
}

// IsContextDone returns true if the error is due to the context being cancelled
// or expired. This is useful for determining if a function should retry.
func IsContextDone(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}

// Min returns the strictly lower number
func Min(a, b uint64) uint64 {
if a < b {
Expand Down
14 changes: 3 additions & 11 deletions helper/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,11 @@ func TestRetryForever_AlwaysReturnError_ShouldNeverEnd(t *testing.T) {
require.False(t, ended)
}

func TestRetryForever_ReturnNilAfterFirstRun_ShouldEnd(t *testing.T) {
func TestRetryForever_CancelContext_ShouldEnd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
cancel()
RetryForever(ctx, time.Millisecond*100, func(ctx context.Context) error {
select {
case <-ctx.Done():

return nil
default:
cancel()

return errors.New("")
}
return errors.New("")
})
<-ctx.Done()
require.True(t, errors.Is(ctx.Err(), context.Canceled))
}
22 changes: 18 additions & 4 deletions tracker/event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,21 @@ func (e *EventTracker) Start(ctx context.Context) error {

// Init and start block tracker concurrently, retrying indefinitely
go common.RetryForever(ctx, time.Second, func(context.Context) error {
// Init
start := time.Now().UTC()

if err := blockTracker.Init(); err != nil {
e.logger.Error("failed to init blocktracker", "error", err)

return err
}
elapsed := time.Now().UTC().Sub(start)

elapsed := time.Now().UTC().Sub(start) // Calculate the elapsed time

// Start
if err := blockTracker.Start(); err != nil {
if common.IsContextDone(err) {
return nil
}

e.logger.Error("failed to start blocktracker", "error", err)

return err
Expand All @@ -117,8 +121,18 @@ func (e *EventTracker) Start(ctx context.Context) error {
return err
}
// Sync concurrently, retrying indefinitely
go common.RetryForever(ctx, time.Second, func(context.Context) error {
go common.RetryForever(ctx, time.Second, func(ctx context.Context) error {
// Some errors from sync can cause this channel to be closed.
// We need to ensure that it is not closed before we retry,
// otherwise we will get a panic.
tt.ReadyCh = make(chan struct{})

// Run the sync
if err := tt.Sync(ctx); err != nil {
if common.IsContextDone(err) {
return nil
}

e.logger.Error("failed to sync", "error", err)

return err
Expand Down

0 comments on commit affd397

Please sign in to comment.