Skip to content

Commit

Permalink
Add common.RetryForever() and use for concurrent sync operations (#1503)
Browse files Browse the repository at this point in the history
* Add common.RetryForever() and use for concurrent block/event synchronisation operations that can take a long time and fail at any point.
  • Loading branch information
sergerad committed May 22, 2023
1 parent 97e07b2 commit 60b21b4
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 29 deletions.
12 changes: 8 additions & 4 deletions consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package polybft

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -353,18 +354,21 @@ func (p *Polybft) Start() error {
return fmt.Errorf("failed to start syncer. Error: %w", err)
}

// start syncing
go func() {
// sync concurrently, retrying indefinitely
go common.RetryForever(context.Background(), time.Second, func(context.Context) error {
blockHandler := func(b *types.FullBlock) bool {
p.runtime.OnBlockInserted(b)

return false
}

if err := p.syncer.Sync(blockHandler); err != nil {
p.logger.Error("blocks synchronization failed", "error", err)

return err
}
}()

return nil
})

// start consensus runtime
if err := p.startRuntime(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ require (
github.com/dave/jennifer v1.6.1
github.com/quasilyte/go-ruleguard v0.3.19
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/sethvargo/go-retry v0.2.4
golang.org/x/sync v0.2.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
gopkg.in/DataDog/dd-trace-go.v1 v1.50.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ github.com/secure-systems-lab/go-securesystemslib v0.3.1/go.mod h1:o8hhjkbNl2gOa
github.com/secure-systems-lab/go-securesystemslib v0.5.0 h1:oTiNu0QnulMQgN/hLK124wJD/r2f9ZhIUuKIeBsCBT8=
github.com/secure-systems-lab/go-securesystemslib v0.5.0/go.mod h1:uoCqUC0Ap7jrBSEanxT+SdACYJTVplRXWLkGMuDjXqk=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=
github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM=
github.com/shurcooL/github_flavored_markdown v0.0.0-20181002035957-2122de532470/go.mod h1:2dOwnU2uBioM+SGy2aZoq1f/Sd1l9OkAeAUvjSyvgU0=
Expand Down
13 changes: 13 additions & 0 deletions helper/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
Expand All @@ -18,6 +19,7 @@ import (
"time"

"github.com/0xPolygon/polygon-edge/helper/hex"
"github.com/sethvargo/go-retry"
)

var (
Expand All @@ -30,6 +32,17 @@ var (
errInvalidDuration = errors.New("invalid duration")
)

// RetryForever will execute a function until it completes without error
func RetryForever(ctx context.Context, interval time.Duration, fn func(context.Context) error) {
_ = retry.Do(ctx, retry.NewConstant(interval), func(context.Context) error {
if err := fn(ctx); err != nil {
return retry.RetryableError(err)
}

return nil
})
}

// Min returns the strictly lower number
func Min(a, b uint64) uint64 {
if a < b {
Expand Down
34 changes: 34 additions & 0 deletions helper/common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package common

import (
"context"
"encoding/json"
"errors"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -98,3 +100,35 @@ func Test_Duration_Marshal_UnmarshalJSON(t *testing.T) {
require.Equal(t, origTimer, otherTimer)
})
}

func TestRetryForever_AlwaysReturnError_ShouldNeverEnd(t *testing.T) {
interval := time.Millisecond * 10
ended := false

go func() {
RetryForever(context.Background(), interval, func(ctx context.Context) error {
return errors.New("")
})

ended = true
}()
time.Sleep(interval * 10)
require.False(t, ended)
}

func TestRetryForever_ReturnNilAfterFirstRun_ShouldEnd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
RetryForever(ctx, time.Millisecond*100, func(ctx context.Context) error {
select {
case <-ctx.Done():

return nil
default:
cancel()

return errors.New("")
}
})
<-ctx.Done()
require.True(t, errors.Is(ctx.Err(), context.Canceled))
}
11 changes: 7 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ func (s *Server) setupGRPC() error {
return err
}

// Start server with infinite retries
go func() {
if err := s.grpcServer.Serve(lis); err != nil {
s.logger.Error(err.Error())
Expand Down Expand Up @@ -996,11 +997,13 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server {
ReadHeaderTimeout: 60 * time.Second,
}

go func() {
s.logger.Info("Prometheus server started", "addr=", listenAddr.String())
s.logger.Info("Prometheus server started", "addr=", listenAddr.String())

if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
go func() {
if err := srv.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
}
}
}()

Expand Down
53 changes: 32 additions & 21 deletions tracker/event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package tracker

import (
"context"
"time"

"github.com/0xPolygon/polygon-edge/helper/common"
hcf "github.com/hashicorp/go-hclog"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/blocktracker"
Expand Down Expand Up @@ -64,6 +66,29 @@ func (e *EventTracker) Start(ctx context.Context) error {
blockMaxBacklog := e.numBlockConfirmations*2 + 1
blockTracker := blocktracker.NewBlockTracker(provider.Eth(), blocktracker.WithBlockMaxBacklog(blockMaxBacklog))

go func() {
<-ctx.Done()
blockTracker.Close()
store.Close()
}()

// Init and start block tracker concurrently, retrying indefinitely
go common.RetryForever(ctx, time.Second, func(context.Context) error {
if err := blockTracker.Init(); err != nil {
e.logger.Error("failed to init blocktracker", "error", err)

return err
}

if err := blockTracker.Start(); err != nil {
e.logger.Error("failed to start blocktracker", "error", err)

return err
}

return nil
})

tt, err := tracker.NewTracker(provider.Eth(),
tracker.WithBatchSize(10),
tracker.WithBlockTracker(blockTracker),
Expand All @@ -79,30 +104,16 @@ func (e *EventTracker) Start(ctx context.Context) error {
if err != nil {
return err
}

go func() {
if err := blockTracker.Init(); err != nil {
e.logger.Error("failed to init blocktracker", "error", err)

return
}

if err := blockTracker.Start(); err != nil {
e.logger.Error("failed to start blocktracker", "error", err)
}
}()

go func() {
<-ctx.Done()
blockTracker.Close()
store.Close()
}()

go func() {
// Sync concurrently, retrying indefinitely
go common.RetryForever(ctx, time.Second, func(context.Context) error {
if err := tt.Sync(ctx); err != nil {
e.logger.Error("failed to sync", "error", err)

return err
}
}()

return nil
})

return nil
}

0 comments on commit 60b21b4

Please sign in to comment.