Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correctly track CPLs of never refreshed buckets #71

Merged
merged 2 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,16 @@ func (b *bucket) split(cpl int, target ID) *bucket {
}
return newbuck
}

// maxCommonPrefix returns the maximum common prefix length between any peer in
// the bucket with the target ID.
func (b *bucket) maxCommonPrefix(target ID) uint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting... it means that if we remove the maxCpl (because we fix the RPCs in the DHT to allow for querying random KadIDs) then if there's 2^10 peers in the network and bucket number 10 has someone really close to us (e.g. 20 shared bits) that I'm now going to be querying 20 buckets. Not sure if that's really what we want, is it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably query until we fail to fill a bucket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean do a query, wait for it to finish and stop if the percentage of the bucket that is full after the query drops below x%? That seems reasonable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something like that. But I'm fine cutting an RC without that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem at all since we max out at 15 anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll file an issue to track it

maxCpl := uint(0)
for e := b.list.Front(); e != nil; e = e.Next() {
cpl := uint(CommonPrefixLen(e.Value.(*PeerInfo).dhtId, target))
if cpl > maxCpl {
maxCpl = cpl
}
}
return maxCpl
}
14 changes: 14 additions & 0 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,17 @@ func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
}
return bucketID
}

// maxCommonPrefix returns the maximum common prefix length between any peer in
// the table and the current peer.
func (rt *RoutingTable) maxCommonPrefix() uint {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

for i := len(rt.buckets) - 1; i >= 0; i-- {
if rt.buckets[i].len() > 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#72

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While #72 is a valid point are we sure we actually want to return a smaller number of tracked buckets and ramp back up? Upsides: Scales nicely as the network grows without us touching anything. Downsides: Maybe we want some minimal number of buckets to ramp up our scale.

Feel free to ignore this comment or just say "meh, this was is probably fine"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of buckets shouldn't matter in practice, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of buckets we have shouldn't matter. We might care about the number of refreshes as long, but as long as it's ~15-20 and not ~100) it's probably fine.

I'm not sure if there's any tweaking to be done over how we do initial bootstrapping so that we fill our buckets fairly quickly. I suspect in the real network this will be much less problematic then the test network though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that we:

  1. Start with one big bucket.
  2. Split the big bucket as we add peers.
  3. Never shrink back down to 1 bucket.

In terms of refreshing the routing table, the logic in this PR will refresh every CPL, not regardless of how many buckets we actually have. In practice, the number of buckets we have shouldn't affect anything but memory usage, unless we're doing something wrong.

return rt.buckets[i].maxCommonPrefix(rt.local)
}
}
return 0
}
23 changes: 10 additions & 13 deletions table_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@ import (
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15

// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
type CplRefresh struct {
Cpl uint
LastRefreshAt time.Time
}

// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
func (rt *RoutingTable) GetTrackedCplsForRefresh() []time.Time {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
maxCommonPrefix := rt.maxCommonPrefix()
if maxCommonPrefix > maxCplForRefresh {
maxCommonPrefix = maxCplForRefresh
}

rt.cplRefreshLk.RLock()
defer rt.cplRefreshLk.RUnlock()

cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))

for c, t := range rt.cplRefreshedAt {
cpls = append(cpls, CplRefresh{c, t})
cpls := make([]time.Time, maxCommonPrefix+1)
for i := uint(0); i <= maxCommonPrefix; i++ {
// defaults to the zero value if we haven't refreshed it yet.
cpls[i] = rt.cplRefreshedAt[i]
}

return cpls
}

Expand Down
80 changes: 66 additions & 14 deletions table_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"

pstore "github.com/libp2p/go-libp2p-peerstore"
Expand Down Expand Up @@ -35,28 +36,79 @@ func TestGenRandPeerID(t *testing.T) {

func TestRefreshAndGetTrackedCpls(t *testing.T) {
t.Parallel()

const (
minCpl = 8
testCpl = 10
maxCpl = 12
)

local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

// push cpl's for tracking
for cpl := uint(0); cpl < maxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
// fetch cpl's
trackedCpls := rt.GetTrackedCplsForRefresh()
// should have nothing.
require.Len(t, trackedCpls, 1)

var peerIDs []peer.ID
for i := minCpl; i <= maxCpl; i++ {
id, err := rt.GenRandPeerID(uint(i))
require.NoError(t, err)
rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now())
peerIDs = append(peerIDs, id)
}

// fetch cpl's
trackedCpls := rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh))
actualCpls := make(map[uint]struct{})
for i := 0; i < len(trackedCpls); i++ {
actualCpls[trackedCpls[i].Cpl] = struct{}{}
// add peer IDs.
for i, id := range peerIDs {
added, err := rt.TryAddPeer(id, true)
require.NoError(t, err)
require.True(t, added)
require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i+1)
}

for i := uint(0); i < maxCplForRefresh; i++ {
_, ok := actualCpls[i]
require.True(t, ok, "tracked cpl's should have cpl %d", i)
// and remove down to the test CPL
for i := maxCpl; i > testCpl; i-- {
rt.RemovePeer(peerIDs[i-minCpl])
require.Len(t, rt.GetTrackedCplsForRefresh(), i)
}

// should be tracking testCpl
trackedCpls = rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, testCpl+1)
// they should all be zero
for _, refresh := range trackedCpls {
require.True(t, refresh.IsZero(), "tracked cpl's should be zero")
}

// add our peer ID to max out the table
added, err := rt.TryAddPeer(local, true)
require.NoError(t, err)
require.True(t, added)

// should be tracking the max
trackedCpls = rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh)+1)

// and not refreshed
for _, refresh := range trackedCpls {
require.True(t, refresh.IsZero(), "tracked cpl's should be zero")
}

now := time.Now()
// reset the test peer ID.
rt.ResetCplRefreshedAtForID(ConvertPeerID(peerIDs[testCpl-minCpl]), now)

// should still be tracking all buckets
trackedCpls = rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh)+1)

for i, refresh := range trackedCpls {
if i == testCpl {
require.True(t, now.Equal(refresh), "test cpl should have the correct refresh time")
} else {
require.True(t, refresh.IsZero(), "other cpl's should be 0")
}
}
}