From 7c961a02b2fa2e9abdfe2753d57925c70b429774 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sun, 5 Apr 2020 16:15:10 -0700 Subject: [PATCH 1/2] fix: correctly track CPLs of never refreshed buckets To determine how many buckets we should refresh, we: 1. Look at the max bucket. 2. Look at the peer with the greatest common prefix in the max bucket. 3. Return the "last refresh" times for all buckets between 0 and `min(maxPeer, 15)` BREAKING: this returns a slice of times instead of CplRefresh objects because we want to refresh _all_ buckets between 0 and the max CPL. --- bucket.go | 13 ++++++++ table.go | 14 ++++++++ table_refresh.go | 23 ++++++------- table_refresh_test.go | 78 +++++++++++++++++++++++++++++++++++-------- 4 files changed, 102 insertions(+), 26 deletions(-) diff --git a/bucket.go b/bucket.go index 2dbb307..aa3f046 100644 --- a/bucket.go +++ b/bucket.go @@ -118,3 +118,16 @@ func (b *bucket) split(cpl int, target ID) *bucket { } return newbuck } + +// maxCommonPrefix returns the maximum common prefix length between any peer in +// the bucket with the target ID. +func (b *bucket) maxCommonPrefix(target ID) uint { + maxCpl := uint(0) + for e := b.list.Front(); e != nil; e = e.Next() { + cpl := uint(CommonPrefixLen(e.Value.(*PeerInfo).dhtId, target)) + if cpl > maxCpl { + maxCpl = cpl + } + } + return maxCpl +} diff --git a/table.go b/table.go index b760aab..c69e8a6 100644 --- a/table.go +++ b/table.go @@ -365,3 +365,17 @@ func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int { } return bucketID } + +// maxCommonPrefix returns the maximum common prefix length between any peer in +// the table and the current peer. +func (rt *RoutingTable) maxCommonPrefix() uint { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + for i := len(rt.buckets) - 1; i >= 0; i-- { + if rt.buckets[i].len() > 0 { + return rt.buckets[i].maxCommonPrefix(rt.local) + } + } + return 0 +} diff --git a/table_refresh.go b/table_refresh.go index 927f0e7..a7bfa09 100644 --- a/table_refresh.go +++ b/table_refresh.go @@ -15,25 +15,22 @@ import ( // This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now. const maxCplForRefresh uint = 15 -// CplRefresh contains a CPL(common prefix length) with the host & the last time -// we refreshed that cpl/searched for an ID which has that cpl with the host. -type CplRefresh struct { - Cpl uint - LastRefreshAt time.Time -} - // GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh. // Caller is free to modify the returned slice as it is a defensive copy. -func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh { +func (rt *RoutingTable) GetTrackedCplsForRefresh() []time.Time { + maxCommonPrefix := rt.maxCommonPrefix() + if maxCommonPrefix > maxCplForRefresh { + maxCommonPrefix = maxCplForRefresh + } + rt.cplRefreshLk.RLock() defer rt.cplRefreshLk.RUnlock() - cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt)) - - for c, t := range rt.cplRefreshedAt { - cpls = append(cpls, CplRefresh{c, t}) + cpls := make([]time.Time, maxCommonPrefix) + for i := uint(0); i < maxCommonPrefix; i++ { + // defaults to the zero value if we haven't refreshed it yet. + cpls[i] = rt.cplRefreshedAt[i] } - return cpls } diff --git a/table_refresh_test.go b/table_refresh_test.go index 4548c4c..fcfbaf5 100644 --- a/table_refresh_test.go +++ b/table_refresh_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/test" pstore "github.com/libp2p/go-libp2p-peerstore" @@ -35,28 +36,79 @@ func TestGenRandPeerID(t *testing.T) { func TestRefreshAndGetTrackedCpls(t *testing.T) { t.Parallel() + + const ( + minCpl = 8 + testCpl = 10 + maxCpl = 12 + ) + local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold) require.NoError(t, err) - // push cpl's for tracking - for cpl := uint(0); cpl < maxCplForRefresh; cpl++ { - peerID, err := rt.GenRandPeerID(cpl) + // fetch cpl's + trackedCpls := rt.GetTrackedCplsForRefresh() + // should have nothing. + require.Len(t, trackedCpls, 0) + + var peerIDs []peer.ID + for i := minCpl; i <= maxCpl; i++ { + id, err := rt.GenRandPeerID(uint(i)) require.NoError(t, err) - rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now()) + peerIDs = append(peerIDs, id) } - // fetch cpl's - trackedCpls := rt.GetTrackedCplsForRefresh() + // add peer IDs. + for i, id := range peerIDs { + added, err := rt.TryAddPeer(id, true) + require.NoError(t, err) + require.True(t, added) + require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i) + } + + // and remove down to the test CPL + for i := maxCpl; i > testCpl; i-- { + rt.RemovePeer(peerIDs[i-minCpl]) + require.Len(t, rt.GetTrackedCplsForRefresh(), i-1) + } + + // should be tracking testCpl + trackedCpls = rt.GetTrackedCplsForRefresh() + require.Len(t, trackedCpls, testCpl) + // they should all be zero + for _, refresh := range trackedCpls { + require.True(t, refresh.IsZero(), "tracked cpl's should be zero") + } + + // add our peer ID to max out the table + added, err := rt.TryAddPeer(local, true) + require.NoError(t, err) + require.True(t, added) + + // should be tracking the max + trackedCpls = rt.GetTrackedCplsForRefresh() require.Len(t, trackedCpls, int(maxCplForRefresh)) - actualCpls := make(map[uint]struct{}) - for i := 0; i < len(trackedCpls); i++ { - actualCpls[trackedCpls[i].Cpl] = struct{}{} + + // and not refreshed + for _, refresh := range trackedCpls { + require.True(t, refresh.IsZero(), "tracked cpl's should be zero") } - for i := uint(0); i < maxCplForRefresh; i++ { - _, ok := actualCpls[i] - require.True(t, ok, "tracked cpl's should have cpl %d", i) + now := time.Now() + // reset the test peer ID. + rt.ResetCplRefreshedAtForID(ConvertPeerID(peerIDs[testCpl-minCpl]), now) + + // should still be tracking all buckets + trackedCpls = rt.GetTrackedCplsForRefresh() + require.Len(t, trackedCpls, int(maxCplForRefresh)) + + for i, refresh := range trackedCpls { + if i == testCpl { + require.True(t, now.Equal(refresh), "test cpl should have the correct refresh time") + } else { + require.True(t, refresh.IsZero(), "other cpl's should be 0") + } } } From 844420e876b475d9be7f48c0747ca472e17b6538 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sun, 5 Apr 2020 19:25:15 -0700 Subject: [PATCH 2/2] address comments --- table_refresh.go | 4 ++-- table_refresh_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/table_refresh.go b/table_refresh.go index a7bfa09..dcc7b8b 100644 --- a/table_refresh.go +++ b/table_refresh.go @@ -26,8 +26,8 @@ func (rt *RoutingTable) GetTrackedCplsForRefresh() []time.Time { rt.cplRefreshLk.RLock() defer rt.cplRefreshLk.RUnlock() - cpls := make([]time.Time, maxCommonPrefix) - for i := uint(0); i < maxCommonPrefix; i++ { + cpls := make([]time.Time, maxCommonPrefix+1) + for i := uint(0); i <= maxCommonPrefix; i++ { // defaults to the zero value if we haven't refreshed it yet. cpls[i] = rt.cplRefreshedAt[i] } diff --git a/table_refresh_test.go b/table_refresh_test.go index fcfbaf5..c26e09a 100644 --- a/table_refresh_test.go +++ b/table_refresh_test.go @@ -51,7 +51,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { // fetch cpl's trackedCpls := rt.GetTrackedCplsForRefresh() // should have nothing. - require.Len(t, trackedCpls, 0) + require.Len(t, trackedCpls, 1) var peerIDs []peer.ID for i := minCpl; i <= maxCpl; i++ { @@ -65,18 +65,18 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { added, err := rt.TryAddPeer(id, true) require.NoError(t, err) require.True(t, added) - require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i) + require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i+1) } // and remove down to the test CPL for i := maxCpl; i > testCpl; i-- { rt.RemovePeer(peerIDs[i-minCpl]) - require.Len(t, rt.GetTrackedCplsForRefresh(), i-1) + require.Len(t, rt.GetTrackedCplsForRefresh(), i) } // should be tracking testCpl trackedCpls = rt.GetTrackedCplsForRefresh() - require.Len(t, trackedCpls, testCpl) + require.Len(t, trackedCpls, testCpl+1) // they should all be zero for _, refresh := range trackedCpls { require.True(t, refresh.IsZero(), "tracked cpl's should be zero") @@ -89,7 +89,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { // should be tracking the max trackedCpls = rt.GetTrackedCplsForRefresh() - require.Len(t, trackedCpls, int(maxCplForRefresh)) + require.Len(t, trackedCpls, int(maxCplForRefresh)+1) // and not refreshed for _, refresh := range trackedCpls { @@ -102,7 +102,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { // should still be tracking all buckets trackedCpls = rt.GetTrackedCplsForRefresh() - require.Len(t, trackedCpls, int(maxCplForRefresh)) + require.Len(t, trackedCpls, int(maxCplForRefresh)+1) for i, refresh := range trackedCpls { if i == testCpl {