Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Routing Table churn #90

Merged
merged 3 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type PeerInfo struct {

// Id of the peer in the DHT XOR keyspace
dhtId ID

// if a bucket is full, this peer can be replaced to make space for a new peer.
replaceable bool
}

// bucket holds a list of peers.
Expand Down Expand Up @@ -76,6 +79,14 @@ func (b *bucket) min(lessThan func(p1 *PeerInfo, p2 *PeerInfo) bool) *PeerInfo {
return minVal
}

// updateAllWith updates all the peers in the bucket by applying the given update function.
func (b *bucket) updateAllWith(updateFnc func(p *PeerInfo)) {
for e := b.list.Front(); e != nil; e = e.Next() {
val := e.Value.(*PeerInfo)
updateFnc(val)
}
}

// return the Ids of all the peers in the bucket.
func (b *bucket) peerIds() []peer.ID {
ps := make([]peer.ID, 0, b.list.Len())
Expand Down
43 changes: 42 additions & 1 deletion bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBucketMinimum(t *testing.T) {
return first.LastUsefulAt.Before(second.LastUsefulAt)
}).Id)

// first is till min
// first is still min
b.pushFront(&PeerInfo{Id: pid2, LastUsefulAt: time.Now().AddDate(1, 0, 0)})
require.Equal(t, pid1, b.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
Expand All @@ -37,3 +37,44 @@ func TestBucketMinimum(t *testing.T) {
return first.LastUsefulAt.Before(second.LastUsefulAt)
}).Id)
}

func TestUpdateAllWith(t *testing.T) {
t.Parallel()

b := newBucket()
// dont crash
b.updateAllWith(func(p *PeerInfo) {})

pid1 := test.RandPeerIDFatal(t)
pid2 := test.RandPeerIDFatal(t)
pid3 := test.RandPeerIDFatal(t)

// peer1
b.pushFront(&PeerInfo{Id: pid1, replaceable: false})
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = true
})
require.True(t, b.getPeer(pid1).replaceable)

// peer2
b.pushFront(&PeerInfo{Id: pid2, replaceable: false})
b.updateAllWith(func(p *PeerInfo) {
if p.Id == pid1 {
p.replaceable = false
} else {
p.replaceable = true
}
})
require.True(t, b.getPeer(pid2).replaceable)
require.False(t, b.getPeer(pid1).replaceable)

// peer3
b.pushFront(&PeerInfo{Id: pid3, replaceable: false})
require.False(t, b.getPeer(pid3).replaceable)
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = true
})
require.True(t, b.getPeer(pid1).replaceable)
require.True(t, b.getPeer(pid2).replaceable)
require.True(t, b.getPeer(pid3).replaceable)
}
36 changes: 28 additions & 8 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int {
// the boolean value will ALWAYS be false i.e. the peer wont be added to the Routing Table it it's not already there.
//
// A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table.
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

return rt.addPeer(p, queryPeer)
return rt.addPeer(p, queryPeer, isReplaceable)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]

Expand Down Expand Up @@ -183,6 +183,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
Expand All @@ -203,27 +204,31 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
}
}

// the bucket to which the peer belongs is full. Let's try to find a peer
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
minLast := bucket.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
// in that bucket which is replaceable.
// we don't really need a stable sort here as it dosen't matter which peer we evict
// as long as it's a replaceable peer.
replaceablePeer := bucket.min(func(p1 *PeerInfo, p2 *PeerInfo) bool {
return p1.replaceable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment here about us not needing a stable sort here just to call it out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

})

if time.Since(minLast.LastUsefulAt) > rt.usefulnessGracePeriod {
if replaceablePeer != nil && replaceablePeer.replaceable {
// let's evict it and add the new peer
if rt.removePeer(minLast.Id) {
if rt.removePeer(replaceablePeer.Id) {
bucket.pushFront(&PeerInfo{
Id: p,
LastUsefulAt: lastUsefulAt,
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
Expand All @@ -237,6 +242,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
return false, ErrPeerRejectedNoCapacity
}

// MarkAllPeersIrreplaceable marks all peers in the routing table as irreplaceable
// This means that we will never replace an existing peer in the table to make space for a new peer.
// However, they can still be removed by calling the `RemovePeer` API.
func (rt *RoutingTable) MarkAllPeersIrreplaceable() {
Comment on lines +245 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a function just waiting to get deprecated, as opposed to basically exposing the updateAllWith function. However, if we're going to refactor this plus the kad-dht repo in the near future to stop these updates from being such a pain I don't really mind.

Comment on lines +245 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see how this is used in the DHT PR to ensure we don't run into race conditions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is up and I think we can keep this for now. Let me know what you think.

rt.tabLock.Lock()
defer rt.tabLock.Unlock()

for i := range rt.buckets {
b := rt.buckets[i]
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = false
})
}
}

// GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
rt.tabLock.RLock()
Expand Down
4 changes: 2 additions & 2 deletions table_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) {

// add peer IDs.
for i, id := range peerIDs {
added, err := rt.TryAddPeer(id, true)
added, err := rt.TryAddPeer(id, true, false)
require.NoError(t, err)
require.True(t, added)
require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i+1)
Expand All @@ -83,7 +83,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) {
}

// add our peer ID to max out the table
added, err := rt.TryAddPeer(local, true)
added, err := rt.TryAddPeer(local, true, false)
require.NoError(t, err)
require.True(t, added)

Expand Down
Loading