diff --git a/bucket.go b/bucket.go index 9e18994..a9a781d 100644 --- a/bucket.go +++ b/bucket.go @@ -26,6 +26,9 @@ type PeerInfo struct { // Id of the peer in the DHT XOR keyspace dhtId ID + + // if a bucket is full, this peer can be replaced to make space for a new peer. + replaceable bool } // bucket holds a list of peers. @@ -76,6 +79,14 @@ func (b *bucket) min(lessThan func(p1 *PeerInfo, p2 *PeerInfo) bool) *PeerInfo { return minVal } +// updateAllWith updates all the peers in the bucket by applying the given update function. +func (b *bucket) updateAllWith(updateFnc func(p *PeerInfo)) { + for e := b.list.Front(); e != nil; e = e.Next() { + val := e.Value.(*PeerInfo) + updateFnc(val) + } +} + // return the Ids of all the peers in the bucket. func (b *bucket) peerIds() []peer.ID { ps := make([]peer.ID, 0, b.list.Len()) diff --git a/bucket_test.go b/bucket_test.go index e0f6208..f240a90 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -25,7 +25,7 @@ func TestBucketMinimum(t *testing.T) { return first.LastUsefulAt.Before(second.LastUsefulAt) }).Id) - // first is till min + // first is still min b.pushFront(&PeerInfo{Id: pid2, LastUsefulAt: time.Now().AddDate(1, 0, 0)}) require.Equal(t, pid1, b.min(func(first *PeerInfo, second *PeerInfo) bool { return first.LastUsefulAt.Before(second.LastUsefulAt) @@ -37,3 +37,44 @@ func TestBucketMinimum(t *testing.T) { return first.LastUsefulAt.Before(second.LastUsefulAt) }).Id) } + +func TestUpdateAllWith(t *testing.T) { + t.Parallel() + + b := newBucket() + // dont crash + b.updateAllWith(func(p *PeerInfo) {}) + + pid1 := test.RandPeerIDFatal(t) + pid2 := test.RandPeerIDFatal(t) + pid3 := test.RandPeerIDFatal(t) + + // peer1 + b.pushFront(&PeerInfo{Id: pid1, replaceable: false}) + b.updateAllWith(func(p *PeerInfo) { + p.replaceable = true + }) + require.True(t, b.getPeer(pid1).replaceable) + + // peer2 + b.pushFront(&PeerInfo{Id: pid2, replaceable: false}) + b.updateAllWith(func(p *PeerInfo) { + if p.Id == pid1 { + p.replaceable = false + } else { + p.replaceable = true + } + }) + require.True(t, b.getPeer(pid2).replaceable) + require.False(t, b.getPeer(pid1).replaceable) + + // peer3 + b.pushFront(&PeerInfo{Id: pid3, replaceable: false}) + require.False(t, b.getPeer(pid3).replaceable) + b.updateAllWith(func(p *PeerInfo) { + p.replaceable = true + }) + require.True(t, b.getPeer(pid1).replaceable) + require.True(t, b.getPeer(pid2).replaceable) + require.True(t, b.getPeer(pid3).replaceable) +} diff --git a/table.go b/table.go index 09626e1..f5a7741 100644 --- a/table.go +++ b/table.go @@ -132,15 +132,15 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int { // the boolean value will ALWAYS be false i.e. the peer wont be added to the Routing Table it it's not already there. // // A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table. -func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) { +func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) { rt.tabLock.Lock() defer rt.tabLock.Unlock() - return rt.addPeer(p, queryPeer) + return rt.addPeer(p, queryPeer, isReplaceable) } // locking is the responsibility of the caller -func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { +func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) { bucketID := rt.bucketIdForPeer(p) bucket := rt.buckets[bucketID] @@ -183,6 +183,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { LastSuccessfulOutboundQueryAt: now, AddedAt: now, dhtId: ConvertPeerID(p), + replaceable: isReplaceable, }) rt.PeerAdded(p) return true, nil @@ -203,6 +204,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { LastSuccessfulOutboundQueryAt: now, AddedAt: now, dhtId: ConvertPeerID(p), + replaceable: isReplaceable, }) rt.PeerAdded(p) return true, nil @@ -210,20 +212,23 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { } // the bucket to which the peer belongs is full. Let's try to find a peer - // in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it. - minLast := bucket.min(func(first *PeerInfo, second *PeerInfo) bool { - return first.LastUsefulAt.Before(second.LastUsefulAt) + // in that bucket which is replaceable. + // we don't really need a stable sort here as it dosen't matter which peer we evict + // as long as it's a replaceable peer. + replaceablePeer := bucket.min(func(p1 *PeerInfo, p2 *PeerInfo) bool { + return p1.replaceable }) - if time.Since(minLast.LastUsefulAt) > rt.usefulnessGracePeriod { + if replaceablePeer != nil && replaceablePeer.replaceable { // let's evict it and add the new peer - if rt.removePeer(minLast.Id) { + if rt.removePeer(replaceablePeer.Id) { bucket.pushFront(&PeerInfo{ Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: now, AddedAt: now, dhtId: ConvertPeerID(p), + replaceable: isReplaceable, }) rt.PeerAdded(p) return true, nil @@ -237,6 +242,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { return false, ErrPeerRejectedNoCapacity } +// MarkAllPeersIrreplaceable marks all peers in the routing table as irreplaceable +// This means that we will never replace an existing peer in the table to make space for a new peer. +// However, they can still be removed by calling the `RemovePeer` API. +func (rt *RoutingTable) MarkAllPeersIrreplaceable() { + rt.tabLock.Lock() + defer rt.tabLock.Unlock() + + for i := range rt.buckets { + b := rt.buckets[i] + b.updateAllWith(func(p *PeerInfo) { + p.replaceable = false + }) + } +} + // GetPeerInfos returns the peer information that we've stored in the buckets func (rt *RoutingTable) GetPeerInfos() []PeerInfo { rt.tabLock.RLock() diff --git a/table_refresh_test.go b/table_refresh_test.go index 23943ec..f244290 100644 --- a/table_refresh_test.go +++ b/table_refresh_test.go @@ -62,7 +62,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { // add peer IDs. for i, id := range peerIDs { - added, err := rt.TryAddPeer(id, true) + added, err := rt.TryAddPeer(id, true, false) require.NoError(t, err) require.True(t, added) require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i+1) @@ -83,7 +83,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { } // add our peer ID to max out the table - added, err := rt.TryAddPeer(local, true) + added, err := rt.TryAddPeer(local, true, false) require.NoError(t, err) require.True(t, added) diff --git a/table_test.go b/table_test.go index 13fe73f..501b2f2 100644 --- a/table_test.go +++ b/table_test.go @@ -102,27 +102,27 @@ func TestNPeersForCpl(t *testing.T) { // one peer with cpl 1 p, _ := rt.GenRandPeerID(1) - rt.TryAddPeer(p, true) + rt.TryAddPeer(p, true, false) require.Equal(t, 0, rt.NPeersForCpl(0)) require.Equal(t, 1, rt.NPeersForCpl(1)) require.Equal(t, 0, rt.NPeersForCpl(2)) // one peer with cpl 0 p, _ = rt.GenRandPeerID(0) - rt.TryAddPeer(p, true) + rt.TryAddPeer(p, true, false) require.Equal(t, 1, rt.NPeersForCpl(0)) require.Equal(t, 1, rt.NPeersForCpl(1)) require.Equal(t, 0, rt.NPeersForCpl(2)) // split the bucket with a peer with cpl 1 p, _ = rt.GenRandPeerID(1) - rt.TryAddPeer(p, true) + rt.TryAddPeer(p, true, false) require.Equal(t, 1, rt.NPeersForCpl(0)) require.Equal(t, 2, rt.NPeersForCpl(1)) require.Equal(t, 0, rt.NPeersForCpl(2)) p, _ = rt.GenRandPeerID(0) - rt.TryAddPeer(p, true) + rt.TryAddPeer(p, true, false) require.Equal(t, 2, rt.NPeersForCpl(0)) } @@ -144,7 +144,7 @@ func TestEmptyBucketCollapse(t *testing.T) { rt.RemovePeer(p1) // add peer with cpl 0 and remove it..bucket should still exist as it's the ONLY bucket we have - b, err := rt.TryAddPeer(p1, true) + b, err := rt.TryAddPeer(p1, true, false) require.True(t, b) require.NoError(t, err) rt.RemovePeer(p1) @@ -154,9 +154,9 @@ func TestEmptyBucketCollapse(t *testing.T) { require.Empty(t, rt.ListPeers()) // add peer with cpl 0 and cpl 1 and verify we have two buckets. - b, err = rt.TryAddPeer(p1, true) + b, err = rt.TryAddPeer(p1, true, false) require.True(t, b) - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.True(t, b) rt.tabLock.Lock() require.Len(t, rt.buckets, 2) @@ -171,7 +171,7 @@ func TestEmptyBucketCollapse(t *testing.T) { require.Contains(t, rt.ListPeers(), p1) // add p2 again - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.True(t, b) require.NoError(t, err) rt.tabLock.Lock() @@ -187,10 +187,10 @@ func TestEmptyBucketCollapse(t *testing.T) { require.Contains(t, rt.ListPeers(), p2) // let's have a total of 4 buckets now - rt.TryAddPeer(p1, true) - rt.TryAddPeer(p2, true) - rt.TryAddPeer(p3, true) - rt.TryAddPeer(p4, true) + rt.TryAddPeer(p1, true, false) + rt.TryAddPeer(p2, true, false) + rt.TryAddPeer(p3, true, false) + rt.TryAddPeer(p4, true, false) rt.tabLock.Lock() require.Len(t, rt.buckets, 4) @@ -205,10 +205,10 @@ func TestEmptyBucketCollapse(t *testing.T) { rt.tabLock.Unlock() // an empty bucket in the middle DOES NOT collapse buckets - rt.TryAddPeer(p1, true) - rt.TryAddPeer(p2, true) - rt.TryAddPeer(p3, true) - rt.TryAddPeer(p4, true) + rt.TryAddPeer(p1, true, false) + rt.TryAddPeer(p2, true, false) + rt.TryAddPeer(p3, true, false) + rt.TryAddPeer(p4, true, false) rt.tabLock.Lock() require.Len(t, rt.buckets, 4) @@ -231,10 +231,10 @@ func TestRemovePeer(t *testing.T) { p1, _ := rt.GenRandPeerID(0) p2, _ := rt.GenRandPeerID(0) - b, err := rt.TryAddPeer(p1, true) + b, err := rt.TryAddPeer(p1, true, false) require.True(t, b) require.NoError(t, err) - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.True(t, b) require.NoError(t, err) @@ -271,7 +271,7 @@ func TestTableCallbacks(t *testing.T) { delete(pset, p) } - rt.TryAddPeer(peers[0], true) + rt.TryAddPeer(peers[0], true, false) if _, ok := pset[peers[0]]; !ok { t.Fatal("should have this peer") } @@ -282,7 +282,7 @@ func TestTableCallbacks(t *testing.T) { } for _, p := range peers { - rt.TryAddPeer(p, true) + rt.TryAddPeer(p, true, false) } out := rt.ListPeers() @@ -313,7 +313,7 @@ func TestTryAddPeerLoad(t *testing.T) { } for i := 0; i < 10000; i++ { - rt.TryAddPeer(peers[rand.Intn(len(peers))], true) + rt.TryAddPeer(peers[rand.Intn(len(peers))], true, false) } for i := 0; i < 100; i++ { @@ -336,7 +336,7 @@ func TestTableFind(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 5; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.TryAddPeer(peers[i], true) + rt.TryAddPeer(peers[i], true, false) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -353,7 +353,7 @@ func TestUpdateLastSuccessfulOutboundQueryAt(t *testing.T) { require.NoError(t, err) p := test.RandPeerIDFatal(t) - b, err := rt.TryAddPeer(p, true) + b, err := rt.TryAddPeer(p, true, false) require.True(t, b) require.NoError(t, err) @@ -374,7 +374,7 @@ func TestUpdateLastUsefulAt(t *testing.T) { require.NoError(t, err) p := test.RandPeerIDFatal(t) - b, err := rt.TryAddPeer(p, true) + b, err := rt.TryAddPeer(p, true, false) require.True(t, b) require.NoError(t, err) @@ -389,63 +389,52 @@ func TestUpdateLastUsefulAt(t *testing.T) { } func TestTryAddPeer(t *testing.T) { - minThreshold := 24 * 1 * time.Hour t.Parallel() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, minThreshold, nil) + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold, nil) require.NoError(t, err) // generate 2 peers to saturate the first bucket for cpl=0 p1, _ := rt.GenRandPeerID(0) - b, err := rt.TryAddPeer(p1, true) + b, err := rt.TryAddPeer(p1, true, false) require.NoError(t, err) require.True(t, b) p2, _ := rt.GenRandPeerID(0) - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, true) require.NoError(t, err) require.True(t, b) require.Equal(t, p1, rt.Find(p1)) require.Equal(t, p2, rt.Find(p2)) - // trying to add a peer with cpl=0 fails + // trying to add a peer with cpl=0 works as p2 is replacable p3, _ := rt.GenRandPeerID(0) - b, err = rt.TryAddPeer(p3, true) - require.Equal(t, ErrPeerRejectedNoCapacity, err) - require.False(t, b) - require.Empty(t, rt.Find(p3)) - - // however, trying to add peer with cpl=1 works - p4, _ := rt.GenRandPeerID(1) - b, err = rt.TryAddPeer(p4, true) - require.NoError(t, err) - require.True(t, b) - require.Equal(t, p4, rt.Find(p4)) - - // adding a peer with cpl 0 works if an existing peer has LastUsefulAt above the max threshold - // because that existing peer will get replaced - require.True(t, rt.UpdateLastUsefulAt(p2, time.Now().AddDate(0, 0, -2))) - b, err = rt.TryAddPeer(p3, true) + b, err = rt.TryAddPeer(p3, true, false) require.NoError(t, err) require.True(t, b) require.Equal(t, p3, rt.Find(p3)) - require.Equal(t, p1, rt.Find(p1)) // p2 has been removed require.Empty(t, rt.Find(p2)) - // however adding peer fails if below threshold + // however adding peer fails as there are no more replacable peers. p5, err := rt.GenRandPeerID(0) require.NoError(t, err) - require.True(t, rt.UpdateLastUsefulAt(p1, time.Now())) - b, err = rt.TryAddPeer(p5, true) + b, err = rt.TryAddPeer(p5, true, false) require.Error(t, err) require.False(t, b) + // however, trying to add peer with cpl=1 works + p4, _ := rt.GenRandPeerID(1) + b, err = rt.TryAddPeer(p4, true, false) + require.NoError(t, err) + require.True(t, b) + require.Equal(t, p4, rt.Find(p4)) + // adding non query peer p6, err := rt.GenRandPeerID(3) require.NoError(t, err) - b, err = rt.TryAddPeer(p6, false) + b, err = rt.TryAddPeer(p6, false, false) require.NoError(t, err) require.True(t, b) rt.tabLock.Lock() @@ -456,6 +445,34 @@ func TestTryAddPeer(t *testing.T) { } +func TestMarkAllPeersIrreplaceable(t *testing.T) { + t.Parallel() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold, nil) + require.NoError(t, err) + + // generate 2 peers + p1, _ := rt.GenRandPeerID(0) + b, err := rt.TryAddPeer(p1, true, true) + require.NoError(t, err) + require.True(t, b) + p2, _ := rt.GenRandPeerID(0) + b, err = rt.TryAddPeer(p2, true, true) + require.NoError(t, err) + require.True(t, b) + require.Equal(t, p1, rt.Find(p1)) + require.Equal(t, p2, rt.Find(p2)) + + rt.MarkAllPeersIrreplaceable() + ps := rt.GetPeerInfos() + for i := range ps { + require.False(t, ps[i].replaceable) + } + +} + func TestTableFindMultiple(t *testing.T) { t.Parallel() @@ -467,7 +484,7 @@ func TestTableFindMultiple(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 18; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.TryAddPeer(peers[i], true) + rt.TryAddPeer(peers[i], true, false) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -489,7 +506,7 @@ func TestTableFindMultipleBuckets(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.TryAddPeer(peers[i], true) + rt.TryAddPeer(peers[i], true, false) } closest := SortClosestPeers(rt.ListPeers(), ConvertPeerID(peers[2])) @@ -540,7 +557,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.TryAddPeer(peers[n], true) + tab.TryAddPeer(peers[n], true, false) } done <- struct{}{} }() @@ -548,7 +565,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.TryAddPeer(peers[n], true) + tab.TryAddPeer(peers[n], true, false) } done <- struct{}{} }() @@ -620,17 +637,17 @@ func TestDiversityFiltering(t *testing.T) { rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), NoOpThreshold, df) require.NoError(t, err) p, _ := rt.GenRandPeerID(2) - b, err := rt.TryAddPeer(p, true) + b, err := rt.TryAddPeer(p, true, false) require.NoError(t, err) require.True(t, b) p2, _ := rt.GenRandPeerID(2) - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.Error(t, err) require.False(t, b) rt.RemovePeer(p) - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.NoError(t, err) require.True(t, b) } @@ -646,10 +663,10 @@ func TestGetPeerInfos(t *testing.T) { p1 := test.RandPeerIDFatal(t) p2 := test.RandPeerIDFatal(t) - b, err := rt.TryAddPeer(p1, false) + b, err := rt.TryAddPeer(p1, false, false) require.True(t, b) require.NoError(t, err) - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.True(t, b) require.NoError(t, err) @@ -671,7 +688,7 @@ func TestPeerRemovedNotificationWhenPeerIsEvicted(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, 1*time.Hour, nil) + rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, NoOpThreshold, nil) require.NoError(t, err) pset := make(map[peer.ID]struct{}) rt.PeerAdded = func(p peer.ID) { @@ -685,12 +702,12 @@ func TestPeerRemovedNotificationWhenPeerIsEvicted(t *testing.T) { p2, _ := rt.GenRandPeerID(0) // first peer works - b, err := rt.TryAddPeer(p1, true) + b, err := rt.TryAddPeer(p1, true, false) require.NoError(t, err) require.True(t, b) // second is rejected because of capacity - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.False(t, b) require.Error(t, err) @@ -698,10 +715,14 @@ func TestPeerRemovedNotificationWhenPeerIsEvicted(t *testing.T) { require.Contains(t, pset, p1) require.NotContains(t, pset, p2) - // update last useful at so it's ripe for eviction. - require.True(t, rt.UpdateLastUsefulAt(p1, time.Now().AddDate(-1, 0, 0))) + // mark peers as replacable so we can evict. + i := rt.bucketIdForPeer(p1) + rt.tabLock.Lock() + bucket := rt.buckets[i] + rt.tabLock.Unlock() + bucket.getPeer(p1).replaceable = true - b, err = rt.TryAddPeer(p2, true) + b, err = rt.TryAddPeer(p2, true, false) require.NoError(t, err) require.True(t, b) require.Contains(t, pset, p2) @@ -722,7 +743,7 @@ func BenchmarkAddPeer(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { - tab.TryAddPeer(peers[i], true) + tab.TryAddPeer(peers[i], true, false) } } @@ -736,7 +757,7 @@ func BenchmarkFinds(b *testing.B) { var peers []peer.ID for i := 0; i < b.N; i++ { peers = append(peers, test.RandPeerIDFatal(b)) - tab.TryAddPeer(peers[i], true) + tab.TryAddPeer(peers[i], true, false) } b.StartTimer()