Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync2: multipeer: fix edge cases #6447

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func OpenInMemory(opts ...Opt) (*sqliteDatabase, error) {
opts = append(opts, withForceFresh())
// Unique uri is needed to avoid sharing the same in-memory database,
// while allowing multiple connections to the same database.
uri := fmt.Sprintf("file:mem-%d?mode=memory&cache=shared", rand.Uint64())
uri := fmt.Sprintf("file:mem-%d-%d?mode=memory&cache=shared",
rand.Uint64(), rand.Uint64())
return Open(uri, opts...)
}

Expand Down
30 changes: 26 additions & 4 deletions sync2/multipeer/multipeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"errors"
"fmt"
"math"
"math/rand/v2"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -69,6 +71,9 @@
MinCompleteFraction float64 `mapstructure:"min-complete-fraction"`
// Interval between syncs.
SyncInterval time.Duration `mapstructure:"sync-interval"`
// Interval spread factor for split sync.
// The actual interval will be SyncInterval * (1 + (random[0..2]*SplitSyncIntervalSpread-1)).
SyncIntervalSpread float64 `mapstructure:"sync-interval-spread"`
// Interval between retries after a failed sync.
RetryInterval time.Duration `mapstructure:"retry-interval"`
// Interval between rechecking for peers after no synchronization peers were
Expand Down Expand Up @@ -96,6 +101,7 @@
MaxFullDiff: 10000,
MaxSyncDiff: 100,
SyncInterval: 5 * time.Minute,
SyncIntervalSpread: 0.5,
RetryInterval: 1 * time.Minute,
NoPeersRecheckInterval: 30 * time.Second,
SplitSyncGracePeriod: time.Minute,
Expand Down Expand Up @@ -264,20 +270,27 @@
}

func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Peer) error {
if len(syncPeers) == 0 {
return errors.New("no peers to sync against")
}

Check warning on line 275 in sync2/multipeer/multipeer.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/multipeer.go#L274-L275

Added lines #L274 - L275 were not covered by tests
var eg errgroup.Group
var someSucceeded atomic.Bool
for _, p := range syncPeers {
eg.Go(func() error {
if err := mpr.syncBase.WithPeerSyncer(ctx, p, func(ps PeerSyncer) error {
err := ps.Sync(ctx, nil, nil)
switch {
case err == nil:
someSucceeded.Store(true)
mpr.sl.NoteSync()
case errors.Is(err, context.Canceled):
return err
default:
// failing to sync against a particular peer is not considered
// a fatal sync failure, so we just log the error
mpr.logger.Error("error syncing peer", zap.Stringer("peer", p), zap.Error(err))
mpr.logger.Error("error syncing peer",
zap.Stringer("peer", p),
zap.Error(err))
}
return nil
}); err != nil {
Expand All @@ -286,7 +299,13 @@
return nil
})
}
return eg.Wait()
if err := eg.Wait(); err != nil {
return err
}

Check warning on line 304 in sync2/multipeer/multipeer.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/multipeer.go#L303-L304

Added lines #L303 - L304 were not covered by tests
if !someSucceeded.Load() {
return errors.New("all syncs failed")
}
return nil
}

func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) {
Expand Down Expand Up @@ -346,7 +365,7 @@
}

// Run runs the MultiPeerReconciler.
func (mpr *MultiPeerReconciler) Run(ctx context.Context) error {
func (mpr *MultiPeerReconciler) Run(ctx context.Context, kickCh chan struct{}) error {
// The point of using split sync, which syncs different key ranges against
// different peers, vs full sync which syncs the full key range against different
// peers, is:
Expand Down Expand Up @@ -384,7 +403,9 @@
lastWasSplit := false
LOOP:
for {
interval := mpr.cfg.SyncInterval
interval := time.Duration(
float64(mpr.cfg.SyncInterval) *
(1 + mpr.cfg.SyncIntervalSpread*(rand.Float64()*2-1)))
Comment on lines +406 to +408
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this interval calculation. Would it be simpler if SyncIntervalSpread would be defined as time.Duration and gave the maximum deviation from the interval?

interval := mpr.cfg.SyncInterval + rand.N(mpr.cfg.SyncIntervalSpread)

This will uniformly generate a duration between [SyncInterval, SyncInterval+SyncIntervalSprad) while the current definition is [SyncInterval, SyncInterval+2*SyncIntervalSpread) which is a bit odd to me?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was for SyncIntervalSpread to be a floating point number 0..1 and to have intervals between [SyncInterval - SyncInterval*SyncIntervalSpread, SyncInterval + SyncInterval*SyncIntervalSpread]
We could of course also use MinSyncInterval and MaxSyncInterval, but I'm not sure which is more convenient.
My idea was that if I e.g. want the actual sync interval to be uniformly spread across SyncInterval +/- 25% I just set SyncIntervalSpread to 0.25

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I should reflect this simpler explanation in the comments, incl. godoc comments for the config struct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer the Min- and Max- config options, but if you think a fractional spread is easier then go for that. But please add some explanation - to the config and/or here - what the values mean 🙂

full, err = mpr.syncOnce(ctx, lastWasSplit)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -407,6 +428,7 @@
err = ctx.Err()
break LOOP
case <-mpr.clock.After(interval):
case <-kickCh:
}
}
// The loop is only exited upon context cancellation.
Expand Down
64 changes: 61 additions & 3 deletions sync2/multipeer/multipeer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type multiPeerSyncTester struct {
reconciler *multipeer.MultiPeerReconciler
cancel context.CancelFunc
eg errgroup.Group
kickCh chan struct{}
// EXPECT() calls should not be done concurrently
// https://github.com/golang/mock/issues/533#issuecomment-821537840
mtx sync.Mutex
Expand All @@ -81,10 +82,13 @@ func newMultiPeerSyncTester(t *testing.T, addPeers int) *multiPeerSyncTester {
syncRunner: NewMocksyncRunner(ctrl),
peers: peers.New(),
clock: clockwork.NewFakeClock().(fakeClock),
kickCh: make(chan struct{}, 1),
}
cfg := multipeer.DefaultConfig()
cfg.SyncInterval = time.Minute
cfg.SyncInterval = 40 * time.Second
cfg.SyncIntervalSpread = 0.1
cfg.SyncPeerCount = numSyncPeers
cfg.RetryInterval = 5 * time.Second
cfg.MinSplitSyncPeers = 2
cfg.MinSplitSyncCount = 90
cfg.MaxFullDiff = 20
Expand All @@ -111,7 +115,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer {
func (mt *multiPeerSyncTester) start() context.Context {
var ctx context.Context
ctx, mt.cancel = context.WithTimeout(context.Background(), 10*time.Second)
mt.eg.Go(func() error { return mt.reconciler.Run(ctx) })
mt.eg.Go(func() error { return mt.reconciler.Run(ctx, mt.kickCh) })
mt.Cleanup(func() {
mt.cancel()
if err := mt.eg.Wait(); err != nil {
Expand All @@ -121,6 +125,10 @@ func (mt *multiPeerSyncTester) start() context.Context {
return ctx
}

func (mt *multiPeerSyncTester) kick() {
mt.kickCh <- struct{}{}
}

func (mt *multiPeerSyncTester) expectProbe(times int, pr rangesync.ProbeResult) *peerList {
var pl peerList
mt.syncBase.EXPECT().Probe(gomock.Any(), gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -183,7 +191,7 @@ func TestMultiPeerSync(t *testing.T) {
mt := newMultiPeerSyncTester(t, 0)
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
// Advance by sync interval. No peers yet
// Advance by sync interval (incl. spread). No peers yet
mt.clock.Advance(time.Minute)
mt.clock.BlockUntilContext(ctx, 1)
// It is safe to do EXPECT() calls while the MultiPeerReconciler is blocked
Expand Down Expand Up @@ -249,6 +257,34 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Wait()
})

t.Run("sync after kick", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
require.False(t, mt.reconciler.Synced())
expect := func() {
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 0.99, // high enough for full sync
})
mt.expectFullSync(pl, numSyncPeers, 0)
mt.syncBase.EXPECT().Wait()
}
expect()
// first full sync happens immediately
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
for i := 0; i < numSyncs; i++ {
expect()
mt.kick()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
}
require.True(t, mt.reconciler.Synced())
mt.syncBase.EXPECT().Wait()
})

t.Run("full sync, peers with low count ignored", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 0)
addedPeers := mt.addPeers(numSyncPeers)
Expand Down Expand Up @@ -349,6 +385,28 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Wait()
})

t.Run("all peers failed during full sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()

pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, numSyncPeers)
mt.syncBase.EXPECT().Wait().AnyTimes()

ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()

pl = mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, 0)
// Retry should happen after mere 5 seconds as no peers have succeeded, no
// need to wait full sync interval.
mt.clock.Advance(5 * time.Second)
mt.satisfy()

require.True(t, mt.reconciler.Synced())
})

t.Run("failed synced key handling during full sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
Expand Down
30 changes: 27 additions & 3 deletions sync2/multipeer/split_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
)

Expand Down Expand Up @@ -150,6 +151,8 @@
s.syncPeers = append(s.syncPeers, r.ps.Peer())
s.numRemaining--
s.logger.Debug("peer synced successfully",
log.ZShortStringer("x", sr.X),
log.ZShortStringer("y", sr.Y),
zap.Stringer("peer", r.ps.Peer()),
zap.Int("numPeers", s.numPeers),
zap.Int("numRemaining", s.numRemaining),
Expand Down Expand Up @@ -199,8 +202,18 @@
}
select {
case sr = <-s.slowRangeCh:
// push this syncRange to the back of the queue
s.sq.Update(sr, s.clock.Now())
// Push this syncRange to the back of the queue.
// There's some chance that the peer managed to complete
// the sync while the range was still sitting in the
// channel, so we double-check if it's done.
if !sr.Done {
s.logger.Debug("slow peer, reassigning the range",
log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y))
s.sq.Update(sr, s.clock.Now())
} else {
s.logger.Debug("slow peer, NOT reassigning the range: DONE",
log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y))
}

Check warning on line 216 in sync2/multipeer/split_sync.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/split_sync.go#L214-L216

Added lines #L214 - L216 were not covered by tests
case <-syncCtx.Done():
return syncCtx.Err()
case r := <-s.resCh:
Expand All @@ -210,5 +223,16 @@
}
}
}
return s.eg.Wait()
// Stop late peers that didn't manage to sync their ranges in time.
// The ranges were already reassigned to other peers and successfully
// synced by this point.
cancel()
err := s.eg.Wait()
if s.numRemaining == 0 {
// If all the ranges are synced, the split sync is considered successful
// even if some peers failed to sync their ranges, so that these ranges
// got synced by other peers.
return nil
}
return err

Check warning on line 237 in sync2/multipeer/split_sync.go

View check run for this annotation

Codecov / codecov/patch

sync2/multipeer/split_sync.go#L237

Added line #L237 was not covered by tests
}
Loading
Loading