Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Routing Table churn #90

Merged
merged 3 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type PeerInfo struct {

// Id of the peer in the DHT XOR keyspace
dhtId ID

// if a bucket is full, this peer can be replaced to make space for a new peer.
replaceable bool
}

// bucket holds a list of peers.
Expand Down Expand Up @@ -54,26 +57,26 @@ func (b *bucket) peers() []PeerInfo {
return ps
}

// returns the "minimum" peer in the bucket based on the `lessThan` comparator passed to it.
// It is NOT safe for the comparator to mutate the given `PeerInfo`
// as we pass in a pointer to it.
// findFirst returns the first peer in the bucket that satisfies the given predicate.
// It is NOT safe for the predicate to mutate the given `PeerInfo` as we pass in a pointer to it.
// It is NOT safe to modify the returned value.
func (b *bucket) min(lessThan func(p1 *PeerInfo, p2 *PeerInfo) bool) *PeerInfo {
if b.list.Len() == 0 {
return nil
func (b *bucket) findFirst(p func(p *PeerInfo) bool) *PeerInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still use a comparison function like min that determines which peer is more "evictable" than another peer. Then we go through the bucket and find the most evictable peer and check if it is actually evictable.

Previously: Find oldest time, then check if time is too old
This PR: Find first replaceable
Suggestion: comparison function if p1.replaceable return p1; if p2.replaceable return p2; return p1. Evaluation function return p.replaceable. It's really easy to plug in the older code here if required.

Note: if we wanted to enable stable sorting we would have to compare peerIDs if both results were equal (e.g. added at the same time or both replaceable), but since we just need one of them it shouldn't really matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made this change. Let me know what you think. And I agree, we don't need a stable sort for now.

for e := b.list.Front(); e != nil; e = e.Next() {
val := e.Value.(*PeerInfo)
if p(val) {
return val
}
}

minVal := b.list.Front().Value.(*PeerInfo)
return nil
}

for e := b.list.Front().Next(); e != nil; e = e.Next() {
// updateAllWith updates all the peers in the bucket by applying the given update function.
func (b *bucket) updateAllWith(updateFnc func(p *PeerInfo)) {
for e := b.list.Front(); e != nil; e = e.Next() {
val := e.Value.(*PeerInfo)

if lessThan(val, minVal) {
minVal = val
}
updateFnc(val)
}

return minVal
}

// return the Ids of all the peers in the bucket.
Expand Down
74 changes: 56 additions & 18 deletions bucket_test.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,77 @@
package kbucket

import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core/test"

"github.com/stretchr/testify/require"
"testing"
)

func TestBucketMinimum(t *testing.T) {
func TestBucketFindFirst(t *testing.T) {
t.Parallel()

b := newBucket()
require.Nil(t, b.min(func(p1 *PeerInfo, p2 *PeerInfo) bool { return true }))
require.Nil(t, b.findFirst(func(p1 *PeerInfo) bool { return true }))

pid1 := test.RandPeerIDFatal(t)
pid2 := test.RandPeerIDFatal(t)
pid3 := test.RandPeerIDFatal(t)

// first is min
b.pushFront(&PeerInfo{Id: pid1, LastUsefulAt: time.Now()})
require.Equal(t, pid1, b.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
// first is replacable
b.pushFront(&PeerInfo{Id: pid1, replaceable: true})
require.Equal(t, pid1, b.findFirst(func(p *PeerInfo) bool {
return p.replaceable
}).Id)

// first is till min
b.pushFront(&PeerInfo{Id: pid2, LastUsefulAt: time.Now().AddDate(1, 0, 0)})
require.Equal(t, pid1, b.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
// above peer is stll the replacable one
b.pushFront(&PeerInfo{Id: pid2, replaceable: false})
require.Equal(t, pid1, b.findFirst(func(p *PeerInfo) bool {
return p.replaceable
}).Id)

// second is the min
b.pushFront(&PeerInfo{Id: pid3, LastUsefulAt: time.Now().AddDate(-1, 0, 0)})
require.Equal(t, pid3, b.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
// new peer is replacable.
b.pushFront(&PeerInfo{Id: pid3, replaceable: true})
require.Equal(t, pid3, b.findFirst(func(p *PeerInfo) bool {
return p.replaceable
}).Id)
}

func TestUpdateAllWith(t *testing.T) {
t.Parallel()

b := newBucket()
// dont crash
b.updateAllWith(func(p *PeerInfo) {})

pid1 := test.RandPeerIDFatal(t)
pid2 := test.RandPeerIDFatal(t)
pid3 := test.RandPeerIDFatal(t)

// peer1
b.pushFront(&PeerInfo{Id: pid1, replaceable: false})
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = true
})
require.True(t, b.getPeer(pid1).replaceable)

// peer2
b.pushFront(&PeerInfo{Id: pid2, replaceable: false})
b.updateAllWith(func(p *PeerInfo) {
if p.Id == pid1 {
p.replaceable = false
} else {
p.replaceable = true
}
})
require.True(t, b.getPeer(pid2).replaceable)
require.False(t, b.getPeer(pid1).replaceable)

// peer3
b.pushFront(&PeerInfo{Id: pid3, replaceable: false})
require.False(t, b.getPeer(pid3).replaceable)
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = true
})
require.True(t, b.getPeer(pid1).replaceable)
require.True(t, b.getPeer(pid2).replaceable)
require.True(t, b.getPeer(pid3).replaceable)
}
34 changes: 26 additions & 8 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int {
// the boolean value will ALWAYS be false i.e. the peer wont be added to the Routing Table it it's not already there.
//
// A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table.
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

return rt.addPeer(p, queryPeer)
return rt.addPeer(p, queryPeer, isReplaceable)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]

Expand Down Expand Up @@ -183,6 +183,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
Expand All @@ -203,27 +204,29 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
}
}

// the bucket to which the peer belongs is full. Let's try to find a peer
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
minLast := bucket.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
// in that bucket which is replaceable.
replaceablePeer := bucket.findFirst(func(p *PeerInfo) bool {
return p.replaceable
})

if time.Since(minLast.LastUsefulAt) > rt.usefulnessGracePeriod {
if replaceablePeer != nil {
// let's evict it and add the new peer
if rt.removePeer(minLast.Id) {
if rt.removePeer(replaceablePeer.Id) {
bucket.pushFront(&PeerInfo{
Id: p,
LastUsefulAt: lastUsefulAt,
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
Expand All @@ -237,6 +240,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
return false, ErrPeerRejectedNoCapacity
}

// MarkAllPeersIrreplaceable marks all peers in the routing table as irreplaceable
// This means that we will never replace an existing peer in the table to make space for a new peer.
// However, they can still be removed by calling the `RemovePeer` API.
func (rt *RoutingTable) MarkAllPeersIrreplaceable() {
Comment on lines +245 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a function just waiting to get deprecated, as opposed to basically exposing the updateAllWith function. However, if we're going to refactor this plus the kad-dht repo in the near future to stop these updates from being such a pain I don't really mind.

Comment on lines +245 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see how this is used in the DHT PR to ensure we don't run into race conditions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is up and I think we can keep this for now. Let me know what you think.

rt.tabLock.Lock()
defer rt.tabLock.Unlock()

for i := range rt.buckets {
b := rt.buckets[i]
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = false
})
}
}

// GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
rt.tabLock.RLock()
Expand Down
4 changes: 2 additions & 2 deletions table_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) {

// add peer IDs.
for i, id := range peerIDs {
added, err := rt.TryAddPeer(id, true)
added, err := rt.TryAddPeer(id, true, false)
require.NoError(t, err)
require.True(t, added)
require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i+1)
Expand All @@ -83,7 +83,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) {
}

// add our peer ID to max out the table
added, err := rt.TryAddPeer(local, true)
added, err := rt.TryAddPeer(local, true, false)
require.NoError(t, err)
require.True(t, added)

Expand Down
Loading