From cc5620509c301da73a43478430a0ecb94e3a5f69 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 21 Apr 2020 15:54:57 +0530 Subject: [PATCH 1/7] nPeerForCpl and collapse empty buckets --- table.go | 36 +++++++++++++++- table_test.go | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/table.go b/table.go index 93d526a..9cf6991 100644 --- a/table.go +++ b/table.go @@ -85,6 +85,26 @@ func (rt *RoutingTable) Close() error { return nil } +// NPeersForCPL returns the number of peers we have for a given Cpl +func (rt *RoutingTable) NPeersForCpl(cpl uint) int { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + // it's in the last bucket + if int(cpl) >= len(rt.buckets)-1 { + count := 0 + b := rt.buckets[len(rt.buckets)-1] + for _, p := range b.peerIds() { + if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) { + count++ + } + } + return count + } else { + return rt.buckets[cpl].len() + } +} + // TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op. // If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time. // If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having @@ -230,7 +250,21 @@ func (rt *RoutingTable) removePeer(p peer.ID) { if bucket.remove(p) { // peer removed callback rt.PeerRemoved(p) - return + + // remove this bucket if it was the last bucket and it's now empty + // provided it isn't the ONLY bucket we have. + if len(rt.buckets) > 1 && bucketID == len(rt.buckets)-1 && len(bucket.peers()) == 0 { + rt.buckets[bucketID] = nil + rt.buckets = rt.buckets[:bucketID] + return + } + + // if the second last bucket just became empty, remove and replace it with the last bucket. + if bucketID == len(rt.buckets)-2 && len(bucket.peers()) == 0 { + rt.buckets[bucketID] = rt.buckets[bucketID+1] + rt.buckets[bucketID+1] = nil + rt.buckets = rt.buckets[:bucketID+1] + } } } diff --git a/table_test.go b/table_test.go index 3e6f8b4..f487c08 100644 --- a/table_test.go +++ b/table_test.go @@ -80,6 +80,119 @@ func TestBucket(t *testing.T) { } } +func TestNPeersForCpl(t *testing.T) { + t.Parallel() + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + require.NoError(t, err) + + require.Equal(t, 0, rt.NPeersForCpl(0)) + require.Equal(t, 0, rt.NPeersForCpl(1)) + + // one peer with cpl 1 + p, _ := rt.GenRandPeerID(1) + rt.TryAddPeer(p, true) + require.Equal(t, 0, rt.NPeersForCpl(0)) + require.Equal(t, 1, rt.NPeersForCpl(1)) + require.Equal(t, 0, rt.NPeersForCpl(2)) + + // one peer with cpl 0 + p, _ = rt.GenRandPeerID(0) + rt.TryAddPeer(p, true) + require.Equal(t, 1, rt.NPeersForCpl(0)) + require.Equal(t, 1, rt.NPeersForCpl(1)) + require.Equal(t, 0, rt.NPeersForCpl(2)) + + // split the bucket with a peer with cpl 1 + p, _ = rt.GenRandPeerID(1) + rt.TryAddPeer(p, true) + require.Equal(t, 1, rt.NPeersForCpl(0)) + require.Equal(t, 2, rt.NPeersForCpl(1)) + require.Equal(t, 0, rt.NPeersForCpl(2)) + + p, _ = rt.GenRandPeerID(0) + rt.TryAddPeer(p, true) + require.Equal(t, 2, rt.NPeersForCpl(0)) +} + +func TestEmptyBucketCollapse(t *testing.T) { + t.Parallel() + local := test.RandPeerIDFatal(t) + + m := pstore.NewMetrics() + rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + require.NoError(t, err) + + // generate peers with cpl 0,1,2 & 3 + p1, _ := rt.GenRandPeerID(0) + p2, _ := rt.GenRandPeerID(1) + p3, _ := rt.GenRandPeerID(2) + p4, _ := rt.GenRandPeerID(3) + + // remove peer on an empty bucket should not panic. + rt.RemovePeer(p1) + + // add peer with cpl 0 and remove it..bucket should still exist as it's the ONLY bucket we have + b, err := rt.TryAddPeer(p1, true) + require.True(t, b) + require.NoError(t, err) + rt.RemovePeer(p1) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 1) + rt.tabLock.Unlock() + require.Empty(t, rt.ListPeers()) + + // add peer with cpl 0 and cpl 1 and verify we have two buckets. + b, err = rt.TryAddPeer(p1, true) + require.True(t, b) + b, err = rt.TryAddPeer(p2, true) + require.True(t, b) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 2) + rt.tabLock.Unlock() + + // removing a peer from the last bucket collapses it. + rt.RemovePeer(p2) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 1) + rt.tabLock.Unlock() + require.Len(t, rt.ListPeers(), 1) + require.Contains(t, rt.ListPeers(), p1) + + // add p2 again + b, err = rt.TryAddPeer(p2, true) + require.True(t, b) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 2) + rt.tabLock.Unlock() + + // now remove a peer from the second-last i.e. first bucket and ensure it collapses + rt.RemovePeer(p1) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 1) + rt.tabLock.Unlock() + require.Len(t, rt.ListPeers(), 1) + require.Contains(t, rt.ListPeers(), p2) + + // let's have a total of 4 buckets now + rt.TryAddPeer(p1, true) + rt.TryAddPeer(p2, true) + rt.TryAddPeer(p3, true) + rt.TryAddPeer(p4, true) + + rt.tabLock.Lock() + require.Len(t, rt.buckets, 4) + rt.tabLock.Unlock() + + // removing a peer from the middle bucket does not collapse any bucket + rt.RemovePeer(p2) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 4) + rt.tabLock.Unlock() + require.NotContains(t, rt.ListPeers(), p2) +} + func TestRemovePeer(t *testing.T) { t.Parallel() local := test.RandPeerIDFatal(t) From c52371e282946b1f66670b3ca3ef9f1063fc1777 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 21 Apr 2020 16:38:52 +0530 Subject: [PATCH 2/7] is bucket full --- table.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/table.go b/table.go index 9cf6991..c793a5d 100644 --- a/table.go +++ b/table.go @@ -105,6 +105,14 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int { } } +// IsBucketFull returns true if the Logical bucket for a given Cpl is full +func (rt *RoutingTable) IsBucketFull(cpl uint) bool { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + return rt.NPeersForCpl(cpl) == rt.bucketsize +} + // TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op. // If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time. // If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having From 40bab452084973f66f3225dcca730b8ce4e949c3 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 21 Apr 2020 17:19:20 +0530 Subject: [PATCH 3/7] get peers for a cpl --- table.go | 24 ++++++++++++++++++++++++ table_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/table.go b/table.go index c793a5d..d301ace 100644 --- a/table.go +++ b/table.go @@ -105,6 +105,30 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int { } } +// GetPeersForCpl returns the peers in the Routing Table with this cpl. +func (rt *RoutingTable) GetPeersForCpl(cpl uint) []peer.ID { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + var peers []peer.ID + + // it's in the last bucket + if int(cpl) >= len(rt.buckets)-1 { + b := rt.buckets[len(rt.buckets)-1] + for _, p := range b.peerIds() { + if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) { + peers = append(peers, p) + } + } + } else { + for _, p := range rt.buckets[cpl].peerIds() { + peers = append(peers, p) + } + } + + return peers +} + // IsBucketFull returns true if the Logical bucket for a given Cpl is full func (rt *RoutingTable) IsBucketFull(cpl uint) bool { rt.tabLock.RLock() diff --git a/table_test.go b/table_test.go index f487c08..1f8c1ec 100644 --- a/table_test.go +++ b/table_test.go @@ -116,6 +116,49 @@ func TestNPeersForCpl(t *testing.T) { require.Equal(t, 2, rt.NPeersForCpl(0)) } +func TestGetPeersForCpl(t *testing.T) { + t.Parallel() + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + require.NoError(t, err) + + require.Empty(t, rt.GetPeersForCpl(0)) + require.Empty(t, rt.GetPeersForCpl(1)) + + // one peer with cpl 1 + p1, _ := rt.GenRandPeerID(1) + rt.TryAddPeer(p1, true) + require.Empty(t, rt.GetPeersForCpl(0)) + require.Len(t, rt.GetPeersForCpl(1), 1) + require.Contains(t, rt.GetPeersForCpl(1), p1) + require.Empty(t, rt.GetPeersForCpl(2)) + + // one peer with cpl 0 + p2, _ := rt.GenRandPeerID(0) + rt.TryAddPeer(p2, true) + require.Len(t, rt.GetPeersForCpl(0), 1) + require.Contains(t, rt.GetPeersForCpl(0), p2) + require.Len(t, rt.GetPeersForCpl(1), 1) + require.Contains(t, rt.GetPeersForCpl(1), p1) + require.Empty(t, rt.GetPeersForCpl(2)) + + // split the bucket with a peer with cpl 1 + p3, _ := rt.GenRandPeerID(1) + rt.TryAddPeer(p3, true) + require.Len(t, rt.GetPeersForCpl(0), 1) + require.Contains(t, rt.GetPeersForCpl(0), p2) + + require.Len(t, rt.GetPeersForCpl(1), 2) + require.Contains(t, rt.GetPeersForCpl(1), p1) + require.Contains(t, rt.GetPeersForCpl(1), p3) + require.Empty(t, rt.GetPeersForCpl(2)) + + p0, _ := rt.GenRandPeerID(0) + rt.TryAddPeer(p0, true) + require.Len(t, rt.GetPeersForCpl(0), 2) +} + func TestEmptyBucketCollapse(t *testing.T) { t.Parallel() local := test.RandPeerIDFatal(t) From 9ee07aec44d41f9e4b399568289c00e4354d482b Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 21 Apr 2020 19:24:17 +0530 Subject: [PATCH 4/7] first peer query after connection --- table.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/table.go b/table.go index d301ace..319f2f7 100644 --- a/table.go +++ b/table.go @@ -169,6 +169,11 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // peer already exists in the Routing Table. if peer := bucket.getPeer(p); peer != nil { + // if we're querying the peer first time after adding it, let's give it a + // usefulness bump. This will ONLY happen once. + if peer.LastUsefulAt.IsZero() && queryPeer { + peer.LastUsefulAt = lastUsefulAt + } return false, nil } From ee0822c537f5a7d903fa2a1e120a7e5d06516936 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 23 Apr 2020 23:21:59 +0530 Subject: [PATCH 5/7] changes as per review --- table.go | 68 ++++++++++++++------------------------------------- table_test.go | 51 ++++---------------------------------- 2 files changed, 24 insertions(+), 95 deletions(-) diff --git a/table.go b/table.go index 319f2f7..fad8ea2 100644 --- a/table.go +++ b/table.go @@ -94,8 +94,8 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int { if int(cpl) >= len(rt.buckets)-1 { count := 0 b := rt.buckets[len(rt.buckets)-1] - for _, p := range b.peerIds() { - if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) { + for _, p := range b.peers() { + if CommonPrefixLen(rt.local, p.dhtId) == int(cpl) { count++ } } @@ -105,38 +105,6 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int { } } -// GetPeersForCpl returns the peers in the Routing Table with this cpl. -func (rt *RoutingTable) GetPeersForCpl(cpl uint) []peer.ID { - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() - - var peers []peer.ID - - // it's in the last bucket - if int(cpl) >= len(rt.buckets)-1 { - b := rt.buckets[len(rt.buckets)-1] - for _, p := range b.peerIds() { - if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) { - peers = append(peers, p) - } - } - } else { - for _, p := range rt.buckets[cpl].peerIds() { - peers = append(peers, p) - } - } - - return peers -} - -// IsBucketFull returns true if the Logical bucket for a given Cpl is full -func (rt *RoutingTable) IsBucketFull(cpl uint) bool { - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() - - return rt.NPeersForCpl(cpl) == rt.bucketsize -} - // TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op. // If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time. // If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having @@ -285,23 +253,25 @@ func (rt *RoutingTable) removePeer(p peer.ID) { bucketID := rt.bucketIdForPeer(p) bucket := rt.buckets[bucketID] if bucket.remove(p) { - // peer removed callback - rt.PeerRemoved(p) - - // remove this bucket if it was the last bucket and it's now empty - // provided it isn't the ONLY bucket we have. - if len(rt.buckets) > 1 && bucketID == len(rt.buckets)-1 && len(bucket.peers()) == 0 { - rt.buckets[bucketID] = nil - rt.buckets = rt.buckets[:bucketID] - return + for { + lastBucketIndex := len(rt.buckets) - 1 + + // remove the last bucket if it's empty and it isn't the only bucket we have + if len(rt.buckets) > 1 && rt.buckets[lastBucketIndex].len() == 0 { + rt.buckets[lastBucketIndex] = nil + rt.buckets = rt.buckets[:lastBucketIndex] + } else if len(rt.buckets) >= 2 && rt.buckets[lastBucketIndex-1].len() == 0 { + // if the second last bucket just became empty, remove and replace it with the last bucket. + rt.buckets[lastBucketIndex-1] = rt.buckets[lastBucketIndex] + rt.buckets[lastBucketIndex] = nil + rt.buckets = rt.buckets[:lastBucketIndex] + } else { + break + } } - // if the second last bucket just became empty, remove and replace it with the last bucket. - if bucketID == len(rt.buckets)-2 && len(bucket.peers()) == 0 { - rt.buckets[bucketID] = rt.buckets[bucketID+1] - rt.buckets[bucketID+1] = nil - rt.buckets = rt.buckets[:bucketID+1] - } + // peer removed callback + rt.PeerRemoved(p) } } diff --git a/table_test.go b/table_test.go index 1f8c1ec..88cfe22 100644 --- a/table_test.go +++ b/table_test.go @@ -116,49 +116,6 @@ func TestNPeersForCpl(t *testing.T) { require.Equal(t, 2, rt.NPeersForCpl(0)) } -func TestGetPeersForCpl(t *testing.T) { - t.Parallel() - local := test.RandPeerIDFatal(t) - m := pstore.NewMetrics() - rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, NoOpThreshold) - require.NoError(t, err) - - require.Empty(t, rt.GetPeersForCpl(0)) - require.Empty(t, rt.GetPeersForCpl(1)) - - // one peer with cpl 1 - p1, _ := rt.GenRandPeerID(1) - rt.TryAddPeer(p1, true) - require.Empty(t, rt.GetPeersForCpl(0)) - require.Len(t, rt.GetPeersForCpl(1), 1) - require.Contains(t, rt.GetPeersForCpl(1), p1) - require.Empty(t, rt.GetPeersForCpl(2)) - - // one peer with cpl 0 - p2, _ := rt.GenRandPeerID(0) - rt.TryAddPeer(p2, true) - require.Len(t, rt.GetPeersForCpl(0), 1) - require.Contains(t, rt.GetPeersForCpl(0), p2) - require.Len(t, rt.GetPeersForCpl(1), 1) - require.Contains(t, rt.GetPeersForCpl(1), p1) - require.Empty(t, rt.GetPeersForCpl(2)) - - // split the bucket with a peer with cpl 1 - p3, _ := rt.GenRandPeerID(1) - rt.TryAddPeer(p3, true) - require.Len(t, rt.GetPeersForCpl(0), 1) - require.Contains(t, rt.GetPeersForCpl(0), p2) - - require.Len(t, rt.GetPeersForCpl(1), 2) - require.Contains(t, rt.GetPeersForCpl(1), p1) - require.Contains(t, rt.GetPeersForCpl(1), p3) - require.Empty(t, rt.GetPeersForCpl(2)) - - p0, _ := rt.GenRandPeerID(0) - rt.TryAddPeer(p0, true) - require.Len(t, rt.GetPeersForCpl(0), 2) -} - func TestEmptyBucketCollapse(t *testing.T) { t.Parallel() local := test.RandPeerIDFatal(t) @@ -206,6 +163,7 @@ func TestEmptyBucketCollapse(t *testing.T) { // add p2 again b, err = rt.TryAddPeer(p2, true) require.True(t, b) + require.NoError(t, err) rt.tabLock.Lock() require.Len(t, rt.buckets, 2) rt.tabLock.Unlock() @@ -228,12 +186,13 @@ func TestEmptyBucketCollapse(t *testing.T) { require.Len(t, rt.buckets, 4) rt.tabLock.Unlock() - // removing a peer from the middle bucket does not collapse any bucket + // removing from 2,3 and then 4 leaves us with ONLY one bucket rt.RemovePeer(p2) + rt.RemovePeer(p3) + rt.RemovePeer(p4) rt.tabLock.Lock() - require.Len(t, rt.buckets, 4) + require.Len(t, rt.buckets, 1) rt.tabLock.Unlock() - require.NotContains(t, rt.ListPeers(), p2) } func TestRemovePeer(t *testing.T) { From f3c8e922dcf49a8c23cdf98811ce128892901f4f Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 23 Apr 2020 23:23:01 +0530 Subject: [PATCH 6/7] changes as per review --- table_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/table_test.go b/table_test.go index 88cfe22..ab5352a 100644 --- a/table_test.go +++ b/table_test.go @@ -193,6 +193,22 @@ func TestEmptyBucketCollapse(t *testing.T) { rt.tabLock.Lock() require.Len(t, rt.buckets, 1) rt.tabLock.Unlock() + + // an empty bucket in the middle DOES NOT collapse buckets + rt.TryAddPeer(p1, true) + rt.TryAddPeer(p2, true) + rt.TryAddPeer(p3, true) + rt.TryAddPeer(p4, true) + + rt.tabLock.Lock() + require.Len(t, rt.buckets, 4) + rt.tabLock.Unlock() + + rt.RemovePeer(p2) + rt.tabLock.Lock() + require.Len(t, rt.buckets, 4) + rt.tabLock.Unlock() + require.NotContains(t, rt.ListPeers(), p2) } func TestRemovePeer(t *testing.T) { From 2e988e5cf5490ce57206c38b38159ab6f16b22b4 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 11 May 2020 12:26:44 +0530 Subject: [PATCH 7/7] changes as per review --- table.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/table.go b/table.go index fad8ea2..0405d28 100644 --- a/table.go +++ b/table.go @@ -105,11 +105,17 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int { } } -// TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op. +// TryAddPeer tries to add a peer to the Routing table. +// If the peer ALREADY exists in the Routing Table and has been queried before, this call is a no-op. +// If the peer ALREADY exists in the Routing Table but hasn't been queried before, we set it's LastUsefulAt value to +// the current time. This needs to done because we don't mark peers as "Useful"(by setting the LastUsefulAt value) +// when we first connect to them. +// // If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time. // If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having // no LastSuccessfulOutboundQuery. // +// // If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer // whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer. // If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity".