diff --git a/sql/database.go b/sql/database.go index 0f593ab9c9..90931ff611 100644 --- a/sql/database.go +++ b/sql/database.go @@ -227,7 +227,8 @@ func OpenInMemory(opts ...Opt) (*sqliteDatabase, error) { opts = append(opts, withForceFresh()) // Unique uri is needed to avoid sharing the same in-memory database, // while allowing multiple connections to the same database. - uri := fmt.Sprintf("file:mem-%d?mode=memory&cache=shared", rand.Uint64()) + uri := fmt.Sprintf("file:mem-%d-%d?mode=memory&cache=shared", + rand.Uint64(), rand.Uint64()) return Open(uri, opts...) } diff --git a/sync2/multipeer/multipeer.go b/sync2/multipeer/multipeer.go index 3b88bcf6ec..00832edc38 100644 --- a/sync2/multipeer/multipeer.go +++ b/sync2/multipeer/multipeer.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "math" + "math/rand/v2" + "sync/atomic" "time" "github.com/jonboulle/clockwork" @@ -69,6 +71,9 @@ type MultiPeerReconcilerConfig struct { MinCompleteFraction float64 `mapstructure:"min-complete-fraction"` // Interval between syncs. SyncInterval time.Duration `mapstructure:"sync-interval"` + // Interval spread factor for split sync. + // The actual interval will be SyncInterval * (1 + (random[0..2]*SplitSyncIntervalSpread-1)). + SyncIntervalSpread float64 `mapstructure:"sync-interval-spread"` // Interval between retries after a failed sync. RetryInterval time.Duration `mapstructure:"retry-interval"` // Interval between rechecking for peers after no synchronization peers were @@ -96,6 +101,7 @@ func DefaultConfig() MultiPeerReconcilerConfig { MaxFullDiff: 10000, MaxSyncDiff: 100, SyncInterval: 5 * time.Minute, + SyncIntervalSpread: 0.5, RetryInterval: 1 * time.Minute, NoPeersRecheckInterval: 30 * time.Second, SplitSyncGracePeriod: time.Minute, @@ -264,20 +270,27 @@ func (mpr *MultiPeerReconciler) needSplitSync(s syncability) bool { } func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Peer) error { + if len(syncPeers) == 0 { + return errors.New("no peers to sync against") + } var eg errgroup.Group + var someSucceeded atomic.Bool for _, p := range syncPeers { eg.Go(func() error { if err := mpr.syncBase.WithPeerSyncer(ctx, p, func(ps PeerSyncer) error { err := ps.Sync(ctx, nil, nil) switch { case err == nil: + someSucceeded.Store(true) mpr.sl.NoteSync() case errors.Is(err, context.Canceled): return err default: // failing to sync against a particular peer is not considered // a fatal sync failure, so we just log the error - mpr.logger.Error("error syncing peer", zap.Stringer("peer", p), zap.Error(err)) + mpr.logger.Error("error syncing peer", + zap.Stringer("peer", p), + zap.Error(err)) } return nil }); err != nil { @@ -286,7 +299,13 @@ func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Pe return nil }) } - return eg.Wait() + if err := eg.Wait(); err != nil { + return err + } + if !someSucceeded.Load() { + return errors.New("all syncs failed") + } + return nil } func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) { @@ -346,7 +365,7 @@ func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) } // Run runs the MultiPeerReconciler. -func (mpr *MultiPeerReconciler) Run(ctx context.Context) error { +func (mpr *MultiPeerReconciler) Run(ctx context.Context, kickCh chan struct{}) error { // The point of using split sync, which syncs different key ranges against // different peers, vs full sync which syncs the full key range against different // peers, is: @@ -384,7 +403,9 @@ func (mpr *MultiPeerReconciler) Run(ctx context.Context) error { lastWasSplit := false LOOP: for { - interval := mpr.cfg.SyncInterval + interval := time.Duration( + float64(mpr.cfg.SyncInterval) * + (1 + mpr.cfg.SyncIntervalSpread*(rand.Float64()*2-1))) full, err = mpr.syncOnce(ctx, lastWasSplit) if err != nil { if errors.Is(err, context.Canceled) { @@ -407,6 +428,7 @@ LOOP: err = ctx.Err() break LOOP case <-mpr.clock.After(interval): + case <-kickCh: } } // The loop is only exited upon context cancellation. diff --git a/sync2/multipeer/multipeer_test.go b/sync2/multipeer/multipeer_test.go index dc3487424d..4bd05227fe 100644 --- a/sync2/multipeer/multipeer_test.go +++ b/sync2/multipeer/multipeer_test.go @@ -67,6 +67,7 @@ type multiPeerSyncTester struct { reconciler *multipeer.MultiPeerReconciler cancel context.CancelFunc eg errgroup.Group + kickCh chan struct{} // EXPECT() calls should not be done concurrently // https://github.com/golang/mock/issues/533#issuecomment-821537840 mtx sync.Mutex @@ -81,10 +82,13 @@ func newMultiPeerSyncTester(t *testing.T, addPeers int) *multiPeerSyncTester { syncRunner: NewMocksyncRunner(ctrl), peers: peers.New(), clock: clockwork.NewFakeClock().(fakeClock), + kickCh: make(chan struct{}, 1), } cfg := multipeer.DefaultConfig() - cfg.SyncInterval = time.Minute + cfg.SyncInterval = 40 * time.Second + cfg.SyncIntervalSpread = 0.1 cfg.SyncPeerCount = numSyncPeers + cfg.RetryInterval = 5 * time.Second cfg.MinSplitSyncPeers = 2 cfg.MinSplitSyncCount = 90 cfg.MaxFullDiff = 20 @@ -111,7 +115,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer { func (mt *multiPeerSyncTester) start() context.Context { var ctx context.Context ctx, mt.cancel = context.WithTimeout(context.Background(), 10*time.Second) - mt.eg.Go(func() error { return mt.reconciler.Run(ctx) }) + mt.eg.Go(func() error { return mt.reconciler.Run(ctx, mt.kickCh) }) mt.Cleanup(func() { mt.cancel() if err := mt.eg.Wait(); err != nil { @@ -121,6 +125,10 @@ func (mt *multiPeerSyncTester) start() context.Context { return ctx } +func (mt *multiPeerSyncTester) kick() { + mt.kickCh <- struct{}{} +} + func (mt *multiPeerSyncTester) expectProbe(times int, pr rangesync.ProbeResult) *peerList { var pl peerList mt.syncBase.EXPECT().Probe(gomock.Any(), gomock.Any()).DoAndReturn( @@ -183,7 +191,7 @@ func TestMultiPeerSync(t *testing.T) { mt := newMultiPeerSyncTester(t, 0) ctx := mt.start() mt.clock.BlockUntilContext(ctx, 1) - // Advance by sync interval. No peers yet + // Advance by sync interval (incl. spread). No peers yet mt.clock.Advance(time.Minute) mt.clock.BlockUntilContext(ctx, 1) // It is safe to do EXPECT() calls while the MultiPeerReconciler is blocked @@ -249,6 +257,34 @@ func TestMultiPeerSync(t *testing.T) { mt.syncBase.EXPECT().Wait() }) + t.Run("sync after kick", func(t *testing.T) { + mt := newMultiPeerSyncTester(t, 10) + mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes() + require.False(t, mt.reconciler.Synced()) + expect := func() { + pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{ + FP: "foo", + Count: 100, + Sim: 0.99, // high enough for full sync + }) + mt.expectFullSync(pl, numSyncPeers, 0) + mt.syncBase.EXPECT().Wait() + } + expect() + // first full sync happens immediately + ctx := mt.start() + mt.clock.BlockUntilContext(ctx, 1) + mt.satisfy() + for i := 0; i < numSyncs; i++ { + expect() + mt.kick() + mt.clock.BlockUntilContext(ctx, 1) + mt.satisfy() + } + require.True(t, mt.reconciler.Synced()) + mt.syncBase.EXPECT().Wait() + }) + t.Run("full sync, peers with low count ignored", func(t *testing.T) { mt := newMultiPeerSyncTester(t, 0) addedPeers := mt.addPeers(numSyncPeers) @@ -349,6 +385,28 @@ func TestMultiPeerSync(t *testing.T) { mt.syncBase.EXPECT().Wait() }) + t.Run("all peers failed during full sync", func(t *testing.T) { + mt := newMultiPeerSyncTester(t, 10) + mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes() + + pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99}) + mt.expectFullSync(pl, numSyncPeers, numSyncPeers) + mt.syncBase.EXPECT().Wait().AnyTimes() + + ctx := mt.start() + mt.clock.BlockUntilContext(ctx, 1) + mt.satisfy() + + pl = mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99}) + mt.expectFullSync(pl, numSyncPeers, 0) + // Retry should happen after mere 5 seconds as no peers have succeeded, no + // need to wait full sync interval. + mt.clock.Advance(5 * time.Second) + mt.satisfy() + + require.True(t, mt.reconciler.Synced()) + }) + t.Run("failed synced key handling during full sync", func(t *testing.T) { mt := newMultiPeerSyncTester(t, 10) mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes() diff --git a/sync2/multipeer/split_sync.go b/sync2/multipeer/split_sync.go index 6d69293451..9ea1bd437f 100644 --- a/sync2/multipeer/split_sync.go +++ b/sync2/multipeer/split_sync.go @@ -12,6 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/fetch/peers" + "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p" ) @@ -150,6 +151,8 @@ func (s *splitSync) handleSyncResult(r syncResult) error { s.syncPeers = append(s.syncPeers, r.ps.Peer()) s.numRemaining-- s.logger.Debug("peer synced successfully", + log.ZShortStringer("x", sr.X), + log.ZShortStringer("y", sr.Y), zap.Stringer("peer", r.ps.Peer()), zap.Int("numPeers", s.numPeers), zap.Int("numRemaining", s.numRemaining), @@ -199,8 +202,18 @@ func (s *splitSync) Sync(ctx context.Context) error { } select { case sr = <-s.slowRangeCh: - // push this syncRange to the back of the queue - s.sq.Update(sr, s.clock.Now()) + // Push this syncRange to the back of the queue. + // There's some chance that the peer managed to complete + // the sync while the range was still sitting in the + // channel, so we double-check if it's done. + if !sr.Done { + s.logger.Debug("slow peer, reassigning the range", + log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y)) + s.sq.Update(sr, s.clock.Now()) + } else { + s.logger.Debug("slow peer, NOT reassigning the range: DONE", + log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y)) + } case <-syncCtx.Done(): return syncCtx.Err() case r := <-s.resCh: @@ -210,5 +223,16 @@ func (s *splitSync) Sync(ctx context.Context) error { } } } - return s.eg.Wait() + // Stop late peers that didn't manage to sync their ranges in time. + // The ranges were already reassigned to other peers and successfully + // synced by this point. + cancel() + err := s.eg.Wait() + if s.numRemaining == 0 { + // If all the ranges are synced, the split sync is considered successful + // even if some peers failed to sync their ranges, so that these ranges + // got synced by other peers. + return nil + } + return err } diff --git a/sync2/multipeer/split_sync_test.go b/sync2/multipeer/split_sync_test.go index d81e64c424..4dbfdb8980 100644 --- a/sync2/multipeer/split_sync_test.go +++ b/sync2/multipeer/split_sync_test.go @@ -3,6 +3,7 @@ package multipeer_test import ( "context" "errors" + "fmt" "sync" "testing" "time" @@ -14,7 +15,6 @@ import ( "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" - "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/fetch/peers" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/sync2/multipeer" @@ -23,9 +23,9 @@ import ( type splitSyncTester struct { testing.TB - + ctrl *gomock.Controller syncPeers []p2p.Peer - clock clockwork.Clock + clock clockwork.FakeClock mtx sync.Mutex fail map[hexRange]bool expPeerRanges map[hexRange]int @@ -57,6 +57,8 @@ var tstRanges = []hexRange{ func newTestSplitSync(t testing.TB) *splitSyncTester { ctrl := gomock.NewController(t) tst := &splitSyncTester{ + TB: t, + ctrl: ctrl, syncPeers: make([]p2p.Peer, 4), clock: clockwork.NewFakeClock(), fail: make(map[hexRange]bool), @@ -71,45 +73,7 @@ func newTestSplitSync(t testing.TB) *splitSyncTester { peers: peers.New(), } for n := range tst.syncPeers { - tst.syncPeers[n] = p2p.Peer(types.RandomBytes(20)) - } - for index, p := range tst.syncPeers { - tst.syncBase.EXPECT(). - WithPeerSyncer(gomock.Any(), p, gomock.Any()). - DoAndReturn(func( - _ context.Context, - peer p2p.Peer, - toCall func(multipeer.PeerSyncer) error, - ) error { - s := NewMockPeerSyncer(ctrl) - s.EXPECT().Peer().Return(p).AnyTimes() - // TODO: do better job at tracking Release() calls - s.EXPECT(). - Sync(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(_ context.Context, x, y rangesync.KeyBytes) error { - tst.mtx.Lock() - defer tst.mtx.Unlock() - require.NotNil(t, x) - require.NotNil(t, y) - k := hexRange{x.String(), y.String()} - tst.peerRanges[k] = append(tst.peerRanges[k], peer) - count, found := tst.expPeerRanges[k] - require.True(t, found, "peer range not found: x %s y %s", x, y) - if tst.fail[k] { - t.Logf("ERR: peer %d x %s y %s", - index, x.String(), y.String()) - tst.fail[k] = false - return errors.New("injected fault") - } else { - t.Logf("OK: peer %d x %s y %s", - index, x.String(), y.String()) - tst.expPeerRanges[k] = count + 1 - } - return nil - }) - return toCall(s) - }). - AnyTimes() + tst.syncPeers[n] = p2p.Peer(fmt.Sprintf("peer%d", n)) } for _, p := range tst.syncPeers { tst.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} }) @@ -126,8 +90,48 @@ func newTestSplitSync(t testing.TB) *splitSyncTester { return tst } +func (tst *splitSyncTester) expectPeerSync(p p2p.Peer) { + tst.syncBase.EXPECT(). + WithPeerSyncer(gomock.Any(), p, gomock.Any()). + DoAndReturn(func( + _ context.Context, + peer p2p.Peer, + toCall func(multipeer.PeerSyncer) error, + ) error { + s := NewMockPeerSyncer(tst.ctrl) + s.EXPECT().Peer().Return(p).AnyTimes() + s.EXPECT(). + Sync(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, x, y rangesync.KeyBytes) error { + tst.mtx.Lock() + defer tst.mtx.Unlock() + require.NotNil(tst, x) + require.NotNil(tst, y) + k := hexRange{x.String(), y.String()} + tst.peerRanges[k] = append(tst.peerRanges[k], peer) + count, found := tst.expPeerRanges[k] + require.True(tst, found, "peer range not found: x %s y %s", x, y) + if tst.fail[k] { + tst.Logf("ERR: peer %s x %s y %s", + string(p), x.String(), y.String()) + tst.fail[k] = false + return errors.New("injected fault") + } else { + tst.Logf("OK: peer %s x %s y %s", + string(p), x.String(), y.String()) + tst.expPeerRanges[k] = count + 1 + } + return nil + }) + return toCall(s) + }).AnyTimes() +} + func TestSplitSync(t *testing.T) { tst := newTestSplitSync(t) + for _, p := range tst.syncPeers { + tst.expectPeerSync(p) + } var eg errgroup.Group eg.Go(func() error { return tst.splitSync.Sync(context.Background()) @@ -138,8 +142,11 @@ func TestSplitSync(t *testing.T) { } } -func TestSplitSyncRetry(t *testing.T) { +func TestSplitSync_Retry(t *testing.T) { tst := newTestSplitSync(t) + for _, p := range tst.syncPeers { + tst.expectPeerSync(p) + } tst.fail[tstRanges[1]] = true tst.fail[tstRanges[2]] = true var eg errgroup.Group @@ -152,3 +159,49 @@ func TestSplitSyncRetry(t *testing.T) { require.Equal(t, 1, count, "peer range not synced: x %s y %s", pr[0], pr[1]) } } + +func TestSplitSync_SlowPeers(t *testing.T) { + tst := newTestSplitSync(t) + + for _, p := range tst.syncPeers[:2] { + tst.expectPeerSync(p) + } + + for _, p := range tst.syncPeers[2:] { + tst.syncBase.EXPECT(). + WithPeerSyncer(gomock.Any(), p, gomock.Any()). + DoAndReturn(func( + _ context.Context, + peer p2p.Peer, + toCall func(multipeer.PeerSyncer) error, + ) error { + s := NewMockPeerSyncer(tst.ctrl) + s.EXPECT().Peer().Return(p).AnyTimes() + s.EXPECT(). + Sync(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, x, y rangesync.KeyBytes) error { + <-ctx.Done() + return nil + }) + return toCall(s) + }) + } + + var eg errgroup.Group + eg.Go(func() error { + return tst.splitSync.Sync(context.Background()) + }) + + require.Eventually(t, func() bool { + tst.mtx.Lock() + defer tst.mtx.Unlock() + return len(tst.peerRanges) == 2 + }, 10*time.Millisecond, time.Millisecond) + // Make sure all 4 grace period timers are started. + tst.clock.BlockUntil(4) + tst.clock.Advance(time.Minute) + require.NoError(t, eg.Wait()) + for pr, count := range tst.expPeerRanges { + require.Equal(t, 1, count, "bad sync count: x %s y %s", pr[0], pr[1]) + } +} diff --git a/sync2/p2p.go b/sync2/p2p.go index ba3d098eef..e43155155b 100644 --- a/sync2/p2p.go +++ b/sync2/p2p.go @@ -22,9 +22,13 @@ import ( type Config struct { rangesync.RangeSetReconcilerConfig `mapstructure:",squash"` multipeer.MultiPeerReconcilerConfig `mapstructure:",squash"` - EnableActiveSync bool `mapstructure:"enable-active-sync"` - TrafficLimit int `mapstructure:"traffic-limit"` - MessageLimit int `mapstructure:"message-limit"` + TrafficLimit int `mapstructure:"traffic-limit"` + MessageLimit int `mapstructure:"message-limit"` + MaxDepth int `mapstructure:"max-depth"` + BatchSize int `mapstructure:"batch-size"` + MaxAttempts int `mapstructure:"max-attempts"` + MaxBatchRetries int `mapstructure:"max-batch-retries"` + FailedBatchDelay time.Duration `mapstructure:"failed-batch-delay"` } // DefaultConfig returns the default configuration for the P2PHashSync. @@ -34,20 +38,27 @@ func DefaultConfig() Config { MultiPeerReconcilerConfig: multipeer.DefaultConfig(), TrafficLimit: 200_000_000, MessageLimit: 20_000_000, + MaxDepth: 24, + BatchSize: 1000, + MaxAttempts: 3, + MaxBatchRetries: 3, + FailedBatchDelay: 10 * time.Second, } } // P2PHashSync is handles the synchronization of a local OrderedSet against other peers. type P2PHashSync struct { - logger *zap.Logger - cfg Config - os rangesync.OrderedSet - syncBase multipeer.SyncBase - reconciler *multipeer.MultiPeerReconciler - cancel context.CancelFunc - eg errgroup.Group - start sync.Once - running atomic.Bool + logger *zap.Logger + cfg Config + enableActiveSync bool + os rangesync.OrderedSet + syncBase multipeer.SyncBase + reconciler *multipeer.MultiPeerReconciler + cancel context.CancelFunc + eg errgroup.Group + startOnce sync.Once + running atomic.Bool + kickCh chan struct{} } // NewP2PHashSync creates a new P2PHashSync. @@ -56,23 +67,24 @@ func NewP2PHashSync( d *rangesync.Dispatcher, name string, os rangesync.OrderedSet, - keyLen, maxDepth int, + keyLen int, peers *peers.Peers, handler multipeer.SyncKeyHandler, cfg Config, - requester rangesync.Requester, + enableActiveSync bool, ) *P2PHashSync { s := &P2PHashSync{ - logger: logger, - os: os, - cfg: cfg, + logger: logger, + os: os, + cfg: cfg, + kickCh: make(chan struct{}, 1), + enableActiveSync: enableActiveSync, } - // var ps multipeer.PairwiseSyncer - ps := rangesync.NewPairwiseSetSyncer(logger, requester, name, cfg.RangeSetReconcilerConfig) + ps := rangesync.NewPairwiseSetSyncer(logger, d, name, cfg.RangeSetReconcilerConfig) s.syncBase = multipeer.NewSetSyncBase(logger, ps, s.os, handler) s.reconciler = multipeer.NewMultiPeerReconciler( logger, cfg.MultiPeerReconcilerConfig, - s.syncBase, peers, keyLen, maxDepth) + s.syncBase, peers, keyLen, cfg.MaxDepth) d.Register(name, s.serve) return s } @@ -92,6 +104,9 @@ func (s *P2PHashSync) Set() rangesync.OrderedSet { // Load loads the OrderedSet from the underlying storage. func (s *P2PHashSync) Load() error { + if s.os.Loaded() { + return nil + } s.logger.Info("loading the set") start := time.Now() // We pre-load the set to avoid waiting for it to load during a @@ -106,30 +121,51 @@ func (s *P2PHashSync) Load() error { s.logger.Info("done loading the set", zap.Duration("elapsed", time.Since(start)), zap.Int("count", info.Count), - zap.Stringer("fingerprint", info.Fingerprint)) + zap.Stringer("fingerprint", info.Fingerprint), + zap.Int("maxDepth", s.cfg.MaxDepth)) return nil } -// Start starts the multi-peer reconciler. -func (s *P2PHashSync) Start() { - if !s.cfg.EnableActiveSync { - s.logger.Info("active sync is disabled") - return - } +func (s *P2PHashSync) start() (isWaiting bool) { s.running.Store(true) - s.start.Do(func() { - s.eg.Go(func() error { - defer s.running.Store(false) - var ctx context.Context - ctx, s.cancel = context.WithCancel(context.Background()) - return s.reconciler.Run(ctx) - }) + isWaiting = true + s.startOnce.Do(func() { + isWaiting = false + if s.enableActiveSync { + s.eg.Go(func() error { + defer s.running.Store(false) + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + return s.reconciler.Run(ctx, s.kickCh) + }) + return + } else { + s.logger.Info("active syncv2 is disabled") + return + } }) + return isWaiting +} + +// Start starts the multi-peer reconciler if it is not already running. +func (s *P2PHashSync) Start() { + s.start() +} + +// StartAndSync starts the multi-peer reconciler if it is not already running, and waits +// until the local OrderedSet is in sync with the peers. +func (s *P2PHashSync) StartAndSync(ctx context.Context) error { + if s.start() { + // If the multipeer reconciler is waiting for sync, we kick it to start + // the sync so as not to wait for the next scheduled sync interval. + s.kickCh <- struct{}{} + } + return s.WaitForSync(ctx) } // Stop stops the multi-peer reconciler. func (s *P2PHashSync) Stop() { - if !s.cfg.EnableActiveSync || !s.running.Load() { + if !s.enableActiveSync || !s.running.Load() { return } if s.cancel != nil { diff --git a/sync2/p2p_test.go b/sync2/p2p_test.go index 7d9eb222eb..dd2d1ba7d2 100644 --- a/sync2/p2p_test.go +++ b/sync2/p2p_test.go @@ -96,8 +96,8 @@ func TestP2P(t *testing.T) { } } cfg := sync2.DefaultConfig() - cfg.EnableActiveSync = true cfg.SyncInterval = 100 * time.Millisecond + cfg.MaxDepth = maxDepth host := mesh.Hosts()[n] handlers[n] = &fakeHandler{ mtx: &mtx, @@ -113,7 +113,7 @@ func TestP2P(t *testing.T) { eg.Go(func() error { return srv.Run(ctx) }) hs[n] = sync2.NewP2PHashSync( logger.Named(fmt.Sprintf("node%d", n)), - d, "test", &os, keyLen, maxDepth, ps, handlers[n], cfg, srv) + d, "test", &os, keyLen, ps, handlers[n], cfg, true) require.NoError(t, hs[n].Load()) is := hs[n].Set().(*rangesync.DumbSet) is.SetAllowMultiReceive(true)