Skip to content

Commit

Permalink
fix(p2p/session): return peer to the queue in case of ErrNotFound (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Feb 26, 2024
1 parent aecd7cf commit e2197d9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 47 deletions.
31 changes: 0 additions & 31 deletions p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"context"
"sort"
"sync"
"time"

Expand All @@ -18,8 +17,6 @@ const (
defaultScore float32 = 1
// maxPeerTrackerSize specifies the max amount of peers that can be added to the peerTracker.
maxPeerTrackerSize = 100
// minPeerTrackerSizeBeforeGC specifies the minimum amount of tracked peers before the peerTracker starts removing peers with lower peer scores.
minPeerTrackerSizeBeforeGC = 10
)

var (
Expand Down Expand Up @@ -244,7 +241,6 @@ func (p *peerTracker) gc() {
return
case <-ticker.C:
p.cleanUpDisconnectedPeers()
p.cleanUpTrackedPeers()
p.dumpPeers(p.ctx)
}
}
Expand All @@ -265,33 +261,6 @@ func (p *peerTracker) cleanUpDisconnectedPeers() {
p.metrics.peersDisconnected(-deletedDisconnectedNum)
}

func (p *peerTracker) cleanUpTrackedPeers() {
p.peerLk.Lock()
defer p.peerLk.Unlock()

if len(p.trackedPeers) <= minPeerTrackerSizeBeforeGC {
return
}

var deletedTrackedNum int
orderedPeers := make([]*peerStat, 0, len(p.trackedPeers))
for _, peer := range p.trackedPeers {
orderedPeers = append(orderedPeers, peer)
}
sort.Slice(orderedPeers, func(i, j int) bool {
return orderedPeers[i].peerScore < orderedPeers[j].peerScore
})

for _, peer := range orderedPeers[:len(orderedPeers)-minPeerTrackerSizeBeforeGC] {
if peer.peerScore > defaultScore {
break
}
delete(p.trackedPeers, peer.peerID)
deletedTrackedNum++
}
p.metrics.peersTracked(-deletedTrackedNum)
}

// dumpPeers stores peers to the peerTracker's PeerIDStore if
// present.
func (p *peerTracker) dumpPeers(ctx context.Context) {
Expand Down
26 changes: 10 additions & 16 deletions p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,17 @@ func TestPeerTracker_GC(t *testing.T) {

maxAwaitingTime = time.Millisecond

peerlist := generateRandomPeerlist(t, minPeerTrackerSizeBeforeGC)
for i := 0; i < minPeerTrackerSizeBeforeGC; i++ {
peerlist := generateRandomPeerlist(t, 10)
for i := 0; i < 10; i++ {
p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5}
}

// add peers to trackedPeers to make total number of peers > maxPeerTrackerSize
peerlist = generateRandomPeerlist(t, 4)
pid1 := peerlist[0]
pid2 := peerlist[1]
pid3 := peerlist[2]
pid4 := peerlist[3]

p.trackedPeers[pid1] = &peerStat{peerID: pid1, peerScore: 0.5}
p.trackedPeers[pid2] = &peerStat{peerID: pid2, peerScore: 10}
p.disconnectedPeers[pid3] = &peerStat{peerID: pid3, pruneDeadline: time.Now()}
p.disconnectedPeers[pid4] = &peerStat{peerID: pid4, pruneDeadline: time.Now().Add(time.Minute * 10)}
pid1 := peerlist[2]
pid2 := peerlist[3]

p.disconnectedPeers[pid1] = &peerStat{peerID: pid1, pruneDeadline: time.Now()}
p.disconnectedPeers[pid2] = &peerStat{peerID: pid2, pruneDeadline: time.Now().Add(time.Minute * 10)}
assert.True(t, len(p.trackedPeers) > 0)
assert.True(t, len(p.disconnectedPeers) > 0)

Expand All @@ -60,14 +55,13 @@ func TestPeerTracker_GC(t *testing.T) {
err = p.stop(ctx)
require.NoError(t, err)

// ensure amount of peers in trackedPeers is equal to minPeerTrackerSizeBeforeGC
require.Len(t, p.trackedPeers, minPeerTrackerSizeBeforeGC)
require.Nil(t, p.disconnectedPeers[pid3])
require.Len(t, p.trackedPeers, 10)
require.Nil(t, p.disconnectedPeers[pid1])

// ensure good peers were dumped to store
peers, err := pidstore.Load(ctx)
require.NoError(t, err)
require.Equal(t, minPeerTrackerSizeBeforeGC, len(peers))
require.Equal(t, 10, len(peers))
}

func TestPeerTracker_BlockPeer(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ func (s *session[H]) doRequest(
"err", err,
"peer", stat.peerID,
)
// ErrNotFound is not a critical error here. It means
// that peer hasn't synced the requested range yet.
// Returning peer to the queue, so it could serve another ranges.
if errors.Is(err, header.ErrNotFound) {
s.queue.push(stat)
}
return
}

Expand Down

0 comments on commit e2197d9

Please sign in to comment.