Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPeersForCpl and collapse empty buckets #77

Merged
merged 7 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion table.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,58 @@ func (rt *RoutingTable) Close() error {
return nil
}

// NPeersForCPL returns the number of peers we have for a given Cpl
func (rt *RoutingTable) NPeersForCpl(cpl uint) int {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you get away without this function, and just call
len(GetPeersForCpl) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed GetPeersForCpl as we don't need it anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've removed GetPeersForCpl as we don't need it anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a tiny bit more efficient then just len(GetPeersForCpl) and we can always add GetPeersForCpl back in if we need it. Your call 😄

aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

// it's in the last bucket
if int(cpl) >= len(rt.buckets)-1 {
count := 0
b := rt.buckets[len(rt.buckets)-1]
for _, p := range b.peerIds() {
if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be using b.peers and p.kadID instead of b.peerIds and ConvertPeerID(p). Gotta cache those hashes 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.

count++
}
}
return count
} else {
return rt.buckets[cpl].len()
}
}

// GetPeersForCpl returns the peers in the Routing Table with this cpl.
func (rt *RoutingTable) GetPeersForCpl(cpl uint) []peer.ID {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

var peers []peer.ID

// it's in the last bucket
if int(cpl) >= len(rt.buckets)-1 {
b := rt.buckets[len(rt.buckets)-1]
for _, p := range b.peerIds() {
if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be using b.peers and p.kadID instead of b.peerIds and ConvertPeerID(p). Gotta cache those hashes 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've removed this function.

peers = append(peers, p)
}
}
} else {
for _, p := range rt.buckets[cpl].peerIds() {
peers = append(peers, p)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _, p := range rt.buckets[cpl].peerIds() {
peers = append(peers, p)
}
return rt.buckets[cpl].peerIds()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this function anymore.

}

return peers
}

// IsBucketFull returns true if the Logical bucket for a given Cpl is full
func (rt *RoutingTable) IsBucketFull(cpl uint) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function used for anything, or just here for convenience/to be more efficient then len(GetPeersForCpl) == bucketSize? The one place it's used in the libp2p/go-libp2p-kad-dht#601 seems like an oversight to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've removed this function now.

rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

return rt.NPeersForCpl(cpl) == rt.bucketsize
}

// TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op.
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time.
// If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having
Expand Down Expand Up @@ -117,6 +169,11 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {

// peer already exists in the Routing Table.
if peer := bucket.getPeer(p); peer != nil {
// if we're querying the peer first time after adding it, let's give it a
// usefulness bump. This will ONLY happen once.
if peer.LastUsefulAt.IsZero() && queryPeer {
peer.LastUsefulAt = lastUsefulAt
}
Comment on lines +146 to +150
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this fix related to the rest of the PR, or just a "while I'm already here" kind of thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,

This change is not related to the rest of the PR.
I think I wasn't being mindful when I put this change in as I didn't realize I hadn't changed the branch. Apologies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, while you're here and changing this do you mind add more information to the TryAddPeer comment about when we set LastUsefulAt? There is currently only information about when we set LastSuccessfulOutboundQuery

return false, nil
}

Expand Down Expand Up @@ -230,7 +287,21 @@ func (rt *RoutingTable) removePeer(p peer.ID) {
if bucket.remove(p) {
// peer removed callback
rt.PeerRemoved(p)
return

// remove this bucket if it was the last bucket and it's now empty
// provided it isn't the ONLY bucket we have.
if len(rt.buckets) > 1 && bucketID == len(rt.buckets)-1 && len(bucket.peers()) == 0 {
rt.buckets[bucketID] = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be just blanking here, but why are we doing this? Does it help the garbage collector out in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It helps the GC garbage collect the struct pointed to by the slice element as I understand from here:

https://github.com/golang/go/wiki/SliceTricks (Look at the section on Delete).
However, to truly grok the mechanics and reasons behind this, I've posted a question on one of our go forums and will post the answer here when I get one.

rt.buckets = rt.buckets[:bucketID]
return
}

// if the second last bucket just became empty, remove and replace it with the last bucket.
if bucketID == len(rt.buckets)-2 && len(bucket.peers()) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is good enough, as is shown in my test comment. What happens if we have buckets 1,2,3,4 and remove the peers from buckets 2 then 3 then 4? Here we'll collapse bucket 4 into bucket 3 and bucket 3 into bucket 2, leaving us with two total buckets instead of just one.

Perhaps doing this in a for loop would be better so we can close all buckets at the end that are empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and this has been fixed.

rt.buckets[bucketID] = rt.buckets[bucketID+1]
rt.buckets[bucketID+1] = nil
rt.buckets = rt.buckets[:bucketID+1]
}
}
}

Expand Down
156 changes: 156 additions & 0 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,162 @@ func TestBucket(t *testing.T) {
}
}

func TestNPeersForCpl(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

require.Equal(t, 0, rt.NPeersForCpl(0))
require.Equal(t, 0, rt.NPeersForCpl(1))

// one peer with cpl 1
p, _ := rt.GenRandPeerID(1)
rt.TryAddPeer(p, true)
require.Equal(t, 0, rt.NPeersForCpl(0))
require.Equal(t, 1, rt.NPeersForCpl(1))
require.Equal(t, 0, rt.NPeersForCpl(2))

// one peer with cpl 0
p, _ = rt.GenRandPeerID(0)
rt.TryAddPeer(p, true)
require.Equal(t, 1, rt.NPeersForCpl(0))
require.Equal(t, 1, rt.NPeersForCpl(1))
require.Equal(t, 0, rt.NPeersForCpl(2))

// split the bucket with a peer with cpl 1
p, _ = rt.GenRandPeerID(1)
rt.TryAddPeer(p, true)
require.Equal(t, 1, rt.NPeersForCpl(0))
require.Equal(t, 2, rt.NPeersForCpl(1))
require.Equal(t, 0, rt.NPeersForCpl(2))

p, _ = rt.GenRandPeerID(0)
rt.TryAddPeer(p, true)
require.Equal(t, 2, rt.NPeersForCpl(0))
}

func TestGetPeersForCpl(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

require.Empty(t, rt.GetPeersForCpl(0))
require.Empty(t, rt.GetPeersForCpl(1))

// one peer with cpl 1
p1, _ := rt.GenRandPeerID(1)
rt.TryAddPeer(p1, true)
require.Empty(t, rt.GetPeersForCpl(0))
require.Len(t, rt.GetPeersForCpl(1), 1)
require.Contains(t, rt.GetPeersForCpl(1), p1)
require.Empty(t, rt.GetPeersForCpl(2))

// one peer with cpl 0
p2, _ := rt.GenRandPeerID(0)
rt.TryAddPeer(p2, true)
require.Len(t, rt.GetPeersForCpl(0), 1)
require.Contains(t, rt.GetPeersForCpl(0), p2)
require.Len(t, rt.GetPeersForCpl(1), 1)
require.Contains(t, rt.GetPeersForCpl(1), p1)
require.Empty(t, rt.GetPeersForCpl(2))

// split the bucket with a peer with cpl 1
p3, _ := rt.GenRandPeerID(1)
rt.TryAddPeer(p3, true)
require.Len(t, rt.GetPeersForCpl(0), 1)
require.Contains(t, rt.GetPeersForCpl(0), p2)

require.Len(t, rt.GetPeersForCpl(1), 2)
require.Contains(t, rt.GetPeersForCpl(1), p1)
require.Contains(t, rt.GetPeersForCpl(1), p3)
require.Empty(t, rt.GetPeersForCpl(2))

p0, _ := rt.GenRandPeerID(0)
rt.TryAddPeer(p0, true)
require.Len(t, rt.GetPeersForCpl(0), 2)
}

func TestEmptyBucketCollapse(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)

m := pstore.NewMetrics()
rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

// generate peers with cpl 0,1,2 & 3
p1, _ := rt.GenRandPeerID(0)
p2, _ := rt.GenRandPeerID(1)
p3, _ := rt.GenRandPeerID(2)
p4, _ := rt.GenRandPeerID(3)

// remove peer on an empty bucket should not panic.
rt.RemovePeer(p1)

// add peer with cpl 0 and remove it..bucket should still exist as it's the ONLY bucket we have
b, err := rt.TryAddPeer(p1, true)
require.True(t, b)
require.NoError(t, err)
rt.RemovePeer(p1)
rt.tabLock.Lock()
require.Len(t, rt.buckets, 1)
rt.tabLock.Unlock()
require.Empty(t, rt.ListPeers())

// add peer with cpl 0 and cpl 1 and verify we have two buckets.
b, err = rt.TryAddPeer(p1, true)
require.True(t, b)
b, err = rt.TryAddPeer(p2, true)
require.True(t, b)
rt.tabLock.Lock()
require.Len(t, rt.buckets, 2)
rt.tabLock.Unlock()

// removing a peer from the last bucket collapses it.
rt.RemovePeer(p2)
rt.tabLock.Lock()
require.Len(t, rt.buckets, 1)
rt.tabLock.Unlock()
require.Len(t, rt.ListPeers(), 1)
require.Contains(t, rt.ListPeers(), p1)

// add p2 again
b, err = rt.TryAddPeer(p2, true)
require.True(t, b)
rt.tabLock.Lock()
require.Len(t, rt.buckets, 2)
rt.tabLock.Unlock()

// now remove a peer from the second-last i.e. first bucket and ensure it collapses
rt.RemovePeer(p1)
rt.tabLock.Lock()
require.Len(t, rt.buckets, 1)
rt.tabLock.Unlock()
require.Len(t, rt.ListPeers(), 1)
require.Contains(t, rt.ListPeers(), p2)

// let's have a total of 4 buckets now
rt.TryAddPeer(p1, true)
rt.TryAddPeer(p2, true)
rt.TryAddPeer(p3, true)
rt.TryAddPeer(p4, true)

rt.tabLock.Lock()
require.Len(t, rt.buckets, 4)
rt.tabLock.Unlock()

// removing a peer from the middle bucket does not collapse any bucket
rt.RemovePeer(p2)
rt.tabLock.Lock()
require.Len(t, rt.buckets, 4)
rt.tabLock.Unlock()
require.NotContains(t, rt.ListPeers(), p2)
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

func TestRemovePeer(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
Expand Down