Skip to content

Commit

Permalink
Merge pull request #71 from libp2p/fix/cpl-refresh-tracking
Browse files Browse the repository at this point in the history
fix: correctly track CPLs of never refreshed buckets
  • Loading branch information
Stebalien authored Apr 6, 2020
2 parents ee7b926 + 844420e commit 7438bac
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 27 deletions.
13 changes: 13 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,16 @@ func (b *bucket) split(cpl int, target ID) *bucket {
}
return newbuck
}

// maxCommonPrefix returns the maximum common prefix length between any peer in
// the bucket with the target ID.
func (b *bucket) maxCommonPrefix(target ID) uint {
maxCpl := uint(0)
for e := b.list.Front(); e != nil; e = e.Next() {
cpl := uint(CommonPrefixLen(e.Value.(*PeerInfo).dhtId, target))
if cpl > maxCpl {
maxCpl = cpl
}
}
return maxCpl
}
14 changes: 14 additions & 0 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,17 @@ func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int {
}
return bucketID
}

// maxCommonPrefix returns the maximum common prefix length between any peer in
// the table and the current peer.
func (rt *RoutingTable) maxCommonPrefix() uint {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

for i := len(rt.buckets) - 1; i >= 0; i-- {
if rt.buckets[i].len() > 0 {
return rt.buckets[i].maxCommonPrefix(rt.local)
}
}
return 0
}
23 changes: 10 additions & 13 deletions table_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@ import (
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15

// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
type CplRefresh struct {
Cpl uint
LastRefreshAt time.Time
}

// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
func (rt *RoutingTable) GetTrackedCplsForRefresh() []time.Time {
maxCommonPrefix := rt.maxCommonPrefix()
if maxCommonPrefix > maxCplForRefresh {
maxCommonPrefix = maxCplForRefresh
}

rt.cplRefreshLk.RLock()
defer rt.cplRefreshLk.RUnlock()

cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))

for c, t := range rt.cplRefreshedAt {
cpls = append(cpls, CplRefresh{c, t})
cpls := make([]time.Time, maxCommonPrefix+1)
for i := uint(0); i <= maxCommonPrefix; i++ {
// defaults to the zero value if we haven't refreshed it yet.
cpls[i] = rt.cplRefreshedAt[i]
}

return cpls
}

Expand Down
80 changes: 66 additions & 14 deletions table_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"

pstore "github.com/libp2p/go-libp2p-peerstore"
Expand Down Expand Up @@ -35,28 +36,79 @@ func TestGenRandPeerID(t *testing.T) {

func TestRefreshAndGetTrackedCpls(t *testing.T) {
t.Parallel()

const (
minCpl = 8
testCpl = 10
maxCpl = 12
)

local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

// push cpl's for tracking
for cpl := uint(0); cpl < maxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
// fetch cpl's
trackedCpls := rt.GetTrackedCplsForRefresh()
// should have nothing.
require.Len(t, trackedCpls, 1)

var peerIDs []peer.ID
for i := minCpl; i <= maxCpl; i++ {
id, err := rt.GenRandPeerID(uint(i))
require.NoError(t, err)
rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now())
peerIDs = append(peerIDs, id)
}

// fetch cpl's
trackedCpls := rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh))
actualCpls := make(map[uint]struct{})
for i := 0; i < len(trackedCpls); i++ {
actualCpls[trackedCpls[i].Cpl] = struct{}{}
// add peer IDs.
for i, id := range peerIDs {
added, err := rt.TryAddPeer(id, true)
require.NoError(t, err)
require.True(t, added)
require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i+1)
}

for i := uint(0); i < maxCplForRefresh; i++ {
_, ok := actualCpls[i]
require.True(t, ok, "tracked cpl's should have cpl %d", i)
// and remove down to the test CPL
for i := maxCpl; i > testCpl; i-- {
rt.RemovePeer(peerIDs[i-minCpl])
require.Len(t, rt.GetTrackedCplsForRefresh(), i)
}

// should be tracking testCpl
trackedCpls = rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, testCpl+1)
// they should all be zero
for _, refresh := range trackedCpls {
require.True(t, refresh.IsZero(), "tracked cpl's should be zero")
}

// add our peer ID to max out the table
added, err := rt.TryAddPeer(local, true)
require.NoError(t, err)
require.True(t, added)

// should be tracking the max
trackedCpls = rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh)+1)

// and not refreshed
for _, refresh := range trackedCpls {
require.True(t, refresh.IsZero(), "tracked cpl's should be zero")
}

now := time.Now()
// reset the test peer ID.
rt.ResetCplRefreshedAtForID(ConvertPeerID(peerIDs[testCpl-minCpl]), now)

// should still be tracking all buckets
trackedCpls = rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh)+1)

for i, refresh := range trackedCpls {
if i == testCpl {
require.True(t, now.Equal(refresh), "test cpl should have the correct refresh time")
} else {
require.True(t, refresh.IsZero(), "other cpl's should be 0")
}
}
}

0 comments on commit 7438bac

Please sign in to comment.