From f8366fc4290e7ab5109c8943dfadf9d75c6ca2f0 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 11 Mar 2024 20:48:38 +0800 Subject: [PATCH] feat(blocksync): sort peers by download rate & multiple requests for closer blocks (#2475) It can be reviewed commit by commit. 1. Request the block N from peer B immediately after getting `NoBlockResponse` from peer A https://github.com/cometbft/cometbft/pull/2475/commits/a718fb47913d58322e0af27167c76f5a2978be20 2. Sort peers by download rate (the fastest peer is picked first) https://github.com/cometbft/cometbft/pull/2475/commits/cf19851c7b51510e3b406c151139d40ad2e02a66 3. Request a block from peer B if we are approaching pool's height (less than 50 blocks) and the current peer A is slow in sending us the block https://github.com/cometbft/cometbft/pull/2475/commits/e06ebfe60bc162b8def8d0958a098b9d45a860f7 https://github.com/cometbft/cometbft/pull/2475/commits/e97007fd691fde41b2f362703ebe03607025fbc8 Closes #2379 ### Benchmarks Osmosis: without this PR (baseline): 11.45m with this PR: 7.29m ~ 36% decrease --- #### PR checklist - [x] Tests written/updated - [x] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [x] Updated relevant documentation (`docs/` or `spec/`) and code comments - [x] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec --- .../2475-blocksync-2nd-request.md | 3 + .../2475-blocksync-no-block-response.md | 3 + .../improvements/2475-blocksync-sort-peers.md | 2 + internal/blocksync/pool.go | 374 +++++++++++++----- internal/blocksync/reactor.go | 7 +- 5 files changed, 292 insertions(+), 97 deletions(-) create mode 100644 .changelog/unreleased/improvements/2475-blocksync-2nd-request.md create mode 100644 .changelog/unreleased/improvements/2475-blocksync-no-block-response.md create mode 100644 .changelog/unreleased/improvements/2475-blocksync-sort-peers.md diff --git a/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md b/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md new file mode 100644 index 00000000000..67614a8e35f --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md @@ -0,0 +1,3 @@ +- `[blocksync]` Request a block from peer B if we are approaching pool's height + (less than 50 blocks) and the current peer A is slow in sending us the + block [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md b/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md new file mode 100644 index 00000000000..d01b3679866 --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md @@ -0,0 +1,3 @@ +- `[blocksync]` Request the block N from peer B immediately after getting + `NoBlockResponse` from peer A + [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md b/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md new file mode 100644 index 00000000000..5c544401ba6 --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md @@ -0,0 +1,2 @@ +- `[blocksync]` Sort peers by download rate (the fastest peer is picked first) + [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 779162385e8..7ca6a396407 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "sort" "time" flow "github.com/cometbft/cometbft/internal/flowrate" @@ -34,9 +35,20 @@ const ( // enough. If a peer is not sending us data at least that rate, we consider // them to have timed out, and we disconnect. // - // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, - // sending data across atlantic ~ 7.5 KB/s. - minRecvRate = 7680 + // Based on the experiments with [Osmosis](https://osmosis.zone/), the + // minimum rate could be as high as 500 KB/s. However, we're setting it to + // 128 KB/s for now to be conservative. + minRecvRate = 128 * 1024 // 128 KB/s + + // peerConnWait is the time that must have elapsed since the pool routine + // was created before we start making requests. This is to give the peer + // routine time to connect to peers. + peerConnWait = 3 * time.Second + + // If we're within minBlocksForSingleRequest blocks of the pool's height, we + // send 2 parallel requests to 2 peers for the same block. If we're further + // away, we send a single request. + minBlocksForSingleRequest = 50 ) var ( @@ -58,7 +70,8 @@ var ( // BlockPool keeps track of the block sync peers, block requests and block responses. type BlockPool struct { service.BaseService - startTime time.Time + startTime time.Time + startHeight int64 mtx cmtsync.Mutex // block requests @@ -66,7 +79,8 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[p2p.ID]*bpPeer - maxPeerHeight int64 // the biggest reported height + sortedPeers []*bpPeer // sorted by curRate, highest first + maxPeerHeight int64 // the biggest reported height requestsCh chan<- BlockRequest errorsCh chan<- peerError @@ -78,8 +92,9 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), - requesters: make(map[int64]*bpRequester), - height: start, + requesters: make(map[int64]*bpRequester), + height: start, + startHeight: start, requestsCh: requestsCh, errorsCh: errorsCh, @@ -91,8 +106,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // OnStart implements service.Service by spawning requesters routine and recording // pool's start time. func (pool *BlockPool) OnStart() error { - go pool.makeRequestersRoutine() pool.startTime = time.Now() + go pool.makeRequestersRoutine() return nil } @@ -102,6 +117,14 @@ func (pool *BlockPool) makeRequestersRoutine() { return } + // Check if we are within peerConnWait seconds of start time + // This gives us some time to connect to peers before starting a wave of requests + if time.Since(pool.startTime) < peerConnWait { + // Calculate the duration to sleep until peerConnWait seconds have passed since pool.startTime + sleepDuration := peerConnWait - time.Since(pool.startTime) + time.Sleep(sleepDuration) + } + pool.mtx.Lock() var ( maxRequestersCreated = len(pool.requesters) >= len(pool.peers)*maxPendingRequestsPerPeer @@ -119,6 +142,8 @@ func (pool *BlockPool) makeRequestersRoutine() { time.Sleep(requestInterval) default: pool.makeNextRequester(nextHeight) + // Sleep for a bit to make the requests more ordered. + time.Sleep(requestInterval) } } } @@ -140,11 +165,16 @@ func (pool *BlockPool) removeTimedoutPeers() { "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) peer.didTimeout = true } + + peer.curRate = curRate } + if peer.didTimeout { pool.removePeer(peer.id) } } + + pool.sortPeers() } // IsCaughtUp returns true if this node is caught up, false - otherwise. @@ -191,9 +221,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first, second *types.Block, firstExtComm return } -// PopRequest pops the first block at pool.height. -// It must have been validated by the second Commit from PeekTwoBlocks. -// TODO(thane): (?) and its corresponding ExtendedCommit. +// PopRequest removes the requester at pool.height and increments pool.height. func (pool *BlockPool) PopRequest() { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -208,24 +236,46 @@ func (pool *BlockPool) PopRequest() { } delete(pool.requesters, pool.height) pool.height++ + + // Notify the next minBlocksForSingleRequest requesters about new height, so + // they can potentially request a block from the second peer. + for i := int64(0); i < minBlocksForSingleRequest && i < int64(len(pool.requesters)); i++ { + pool.requesters[pool.height+i].newHeight(pool.height) + } } -// RedoRequest invalidates the block at pool.height, -// Remove the peer and redo request from others. +// RemovePeerAndRedoAllPeerRequests retries the request at the given height and +// all the requests made to the same peer. The peer is removed from the pool. // Returns the ID of the removed peer. -func (pool *BlockPool) RedoRequest(height int64) p2p.ID { +func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) p2p.ID { pool.mtx.Lock() defer pool.mtx.Unlock() request := pool.requesters[height] - peerID := request.getPeerID() - if peerID != p2p.ID("") { - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(peerID) - } + peerID := request.gotBlockFromPeerID() + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) return peerID } +// RedoRequestFrom retries the request at the given height. It does not remove the +// peer. +func (pool *BlockPool) RedoRequestFrom(height int64, peerID p2p.ID) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if requester, ok := pool.requesters[height]; ok { // If we requested this block + if requester.didRequestFrom(peerID) { // From this specific peer + requester.redo(peerID) + } + } +} + +// Deprecated: use RemovePeerAndRedoAllPeerRequests instead. +func (pool *BlockPool) RedoRequest(height int64) p2p.ID { + return pool.RemovePeerAndRedoAllPeerRequests(height) +} + // AddBlock validates that the block comes from the peer it was expected from // and calls the requester to store it. // @@ -240,34 +290,32 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *ty defer pool.mtx.Unlock() if extCommit != nil && block.Height != extCommit.Height { - return fmt.Errorf("heights don't match, not adding block (block height: %d, commit height: %d)", block.Height, extCommit.Height) + err := fmt.Errorf("block height %d != extCommit height %d", block.Height, extCommit.Height) + // Peer sent us an invalid block => remove it. + pool.sendError(err, peerID) + return err } requester := pool.requesters[block.Height] if requester == nil { - pool.Logger.Info( - "peer sent us a block we didn't expect", - "peer", - peerID, - "curHeight", - pool.height, - "blockHeight", - block.Height) - diff := pool.height - block.Height - if diff < 0 { - diff *= -1 - } - const maxDiff = 100 // maximum difference between current and received block height - if diff > maxDiff { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + // Because we're issuing 2nd requests for closer blocks, it's possible to + // receive a block we've already processed from a second peer. Hence, we + // can't punish it. But if the peer sent us a block we clearly didn't + // request, we disconnect. + if block.Height > pool.height || block.Height < pool.startHeight { + err := fmt.Errorf("peer sent us block #%d we didn't expect (current height: %d, start height: %d)", + block.Height, pool.height, pool.startHeight) + pool.sendError(err, peerID) + return err } - return fmt.Errorf("peer sent us a block we didn't expect (peer: %s, current height: %d, block height: %d)", peerID, pool.height, block.Height) + + return fmt.Errorf("got an already committed block #%d (possibly from the slow peer %s)", block.Height, peerID) } if !requester.setBlock(block, extCommit, peerID) { - err := errors.New("requester is different or block already exists") + err := fmt.Errorf("requested block #%d from %v, not %s", block.Height, requester.requestedFrom(), peerID) pool.sendError(err, peerID) - return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) + return err } peer := pool.peers[peerID] @@ -278,6 +326,13 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *ty return nil } +// Height returns the pool's height. +func (pool *BlockPool) Height() int64 { + pool.mtx.Lock() + defer pool.mtx.Unlock() + return pool.height +} + // MaxPeerHeight returns the highest reported height. func (pool *BlockPool) MaxPeerHeight() int64 { pool.mtx.Lock() @@ -298,6 +353,9 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { peer = newBPPeer(pool, peerID, base, height) peer.setLogger(pool.Logger.With("peer", peerID)) pool.peers[peerID] = peer + // no need to sort because curRate is 0 at start. + // just add to the beginning so it's picked first by pickIncrAvailablePeer. + pool.sortedPeers = append([]*bpPeer{peer}, pool.sortedPeers...) } if height > pool.maxPeerHeight { @@ -316,7 +374,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { - if requester.getPeerID() == peerID { + if requester.didRequestFrom(peerID) { requester.redo(peerID) } } @@ -328,6 +386,12 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { } delete(pool.peers, peerID) + for i, p := range pool.sortedPeers { + if p.id == peerID { + pool.sortedPeers = append(pool.sortedPeers[:i], pool.sortedPeers[i+1:]...) + break + } + } // Find a new peer with the biggest height and update maxPeerHeight if the // peer's height was the biggest. @@ -350,11 +414,14 @@ func (pool *BlockPool) updateMaxPeerHeight() { // Pick an available peer with the given height available. // If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +func (pool *BlockPool) pickIncrAvailablePeer(height int64, excludePeerID p2p.ID) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() - for _, peer := range pool.peers { + for _, peer := range pool.sortedPeers { + if peer.id == excludePeerID { + continue + } if peer.didTimeout { pool.removePeer(peer.id) continue @@ -368,9 +435,19 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { peer.incrPending() return peer } + return nil } +// Sort peers by curRate, highest first. +// +// CONTRACT: pool.mtx must be locked. +func (pool *BlockPool) sortPeers() { + sort.Slice(pool.sortedPeers, func(i, j int) bool { + return pool.sortedPeers[i].curRate > pool.sortedPeers[j].curRate + }) +} + func (pool *BlockPool) makeNextRequester(nextHeight int64) { pool.mtx.Lock() request := newBPRequester(pool, nextHeight) @@ -421,6 +498,7 @@ func (pool *BlockPool) debug() string { type bpPeer struct { didTimeout bool + curRate int64 numPending int32 height int64 base int64 @@ -493,28 +571,42 @@ func (peer *bpPeer) onTimeout() { //------------------------------------- +// bpRequester requests a block from a peer. +// +// If the height is within minBlocksForSingleRequest blocks of the pool's +// height, it will send an additional request to another peer. This is to avoid +// a situation where blocksync is stuck because of a single slow peer. Note +// that it's okay to send a single request when the requested height is far +// from the pool's height. If the peer is slow, it will timeout and be replaced +// with another peer. type bpRequester struct { service.BaseService - pool *BlockPool - height int64 - gotBlockCh chan struct{} - redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat - mtx cmtsync.Mutex - peerID p2p.ID - block *types.Block - extCommit *types.ExtendedCommit + pool *BlockPool + height int64 + gotBlockCh chan struct{} + redoCh chan p2p.ID // redo may got multiple messages, add peerId to identify repeat + newHeightCh chan int64 + + mtx cmtsync.Mutex + peerID p2p.ID + secondPeerID p2p.ID // alternative peer to request from (if close to pool's height) + gotBlockFrom p2p.ID + block *types.Block + extCommit *types.ExtendedCommit } func newBPRequester(pool *BlockPool, height int64) *bpRequester { bpr := &bpRequester{ - pool: pool, - height: height, - gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan p2p.ID, 1), + pool: pool, + height: height, + gotBlockCh: make(chan struct{}, 1), + redoCh: make(chan p2p.ID, 1), + newHeightCh: make(chan int64, 1), - peerID: "", - block: nil, + peerID: "", + secondPeerID: "", + block: nil, } bpr.BaseService = *service.NewBaseService(nil, "bpRequester", bpr) return bpr @@ -525,15 +617,21 @@ func (bpr *bpRequester) OnStart() error { return nil } -// Returns true if the peer matches and block doesn't already exist. +// Returns true if the peer(s) match and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID p2p.ID) bool { bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { + if bpr.peerID != peerID && bpr.secondPeerID != peerID { bpr.mtx.Unlock() return false } + if bpr.block != nil { + bpr.mtx.Unlock() + return true // getting a block from both peers is not an error + } + bpr.block = block bpr.extCommit = extCommit + bpr.gotBlockFrom = peerID bpr.mtx.Unlock() select { @@ -555,20 +653,54 @@ func (bpr *bpRequester) getExtendedCommit() *types.ExtendedCommit { return bpr.extCommit } -func (bpr *bpRequester) getPeerID() p2p.ID { +// Returns the IDs of peers we've requested a block from. +func (bpr *bpRequester) requestedFrom() []p2p.ID { bpr.mtx.Lock() defer bpr.mtx.Unlock() - return bpr.peerID + peerIDs := make([]p2p.ID, 0, 2) + if bpr.peerID != "" { + peerIDs = append(peerIDs, bpr.peerID) + } + if bpr.secondPeerID != "" { + peerIDs = append(peerIDs, bpr.secondPeerID) + } + return peerIDs } -// This is called from the requestRoutine, upon redo(). -func (bpr *bpRequester) reset() { +// Returns true if we've requested a block from the given peer. +func (bpr *bpRequester) didRequestFrom(peerID p2p.ID) bool { bpr.mtx.Lock() defer bpr.mtx.Unlock() + return bpr.peerID == peerID || bpr.secondPeerID == peerID +} - bpr.peerID = "" - bpr.block = nil - bpr.extCommit = nil +// Returns the ID of the peer who sent us the block. +func (bpr *bpRequester) gotBlockFromPeerID() p2p.ID { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + return bpr.gotBlockFrom +} + +// Removes the block (IF we got it from the given peer) and resets the peer. +func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + + // Only remove the block if we got it from that peer. + if bpr.gotBlockFrom == peerID { + bpr.block = nil + bpr.extCommit = nil + bpr.gotBlockFrom = "" + removedBlock = true + } + + if bpr.peerID == peerID { + bpr.peerID = "" + } else { + bpr.secondPeerID = "" + } + + return removedBlock } // Tells bpRequester to pick another peer and try again. @@ -581,34 +713,75 @@ func (bpr *bpRequester) redo(peerID p2p.ID) { } } +func (bpr *bpRequester) pickPeerAndSendRequest() { + bpr.mtx.Lock() + secondPeerID := bpr.secondPeerID + bpr.mtx.Unlock() + + var peer *bpPeer +PICK_PEER_LOOP: + for { + if !bpr.IsRunning() || !bpr.pool.IsRunning() { + return + } + peer = bpr.pool.pickIncrAvailablePeer(bpr.height, secondPeerID) + if peer == nil { + bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) + time.Sleep(requestInterval) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + bpr.mtx.Lock() + bpr.peerID = peer.id + bpr.mtx.Unlock() + + bpr.pool.sendRequest(bpr.height, peer.id) +} + +// Picks a second peer and sends a request to it. If the second peer is already +// set, does nothing. +func (bpr *bpRequester) pickSecondPeerAndSendRequest() { + bpr.mtx.Lock() + if bpr.secondPeerID != "" { + bpr.mtx.Unlock() + return + } + peerID := bpr.peerID + bpr.mtx.Unlock() + + secondPeer := bpr.pool.pickIncrAvailablePeer(bpr.height, peerID) + if secondPeer != nil { + bpr.mtx.Lock() + bpr.secondPeerID = secondPeer.id + bpr.mtx.Unlock() + + bpr.pool.sendRequest(bpr.height, secondPeer.id) + } +} + +// Informs the requester of a new pool's height. +func (bpr *bpRequester) newHeight(height int64) { + select { + case bpr.newHeightCh <- height: + default: + } +} + // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called). func (bpr *bpRequester) requestRoutine() { + gotBlock := false + OUTER_LOOP: for { - // Pick a peer to send request to. - var peer *bpPeer - PICK_PEER_LOOP: - for { - if !bpr.IsRunning() || !bpr.pool.IsRunning() { - return - } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { - bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) - time.Sleep(requestInterval) - continue PICK_PEER_LOOP - } - break PICK_PEER_LOOP + bpr.pickPeerAndSendRequest() + + poolHeight := bpr.pool.Height() + if bpr.height-poolHeight < minBlocksForSingleRequest { + bpr.pickSecondPeerAndSendRequest() } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() - to := time.NewTimer(requestRetrySeconds * time.Second) - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: for { select { case <-bpr.pool.Quit(): @@ -618,21 +791,34 @@ OUTER_LOOP: return case <-bpr.Quit(): return - case <-to.C: - bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) - // Simulate a redo - bpr.reset() - continue OUTER_LOOP + case <-time.After(requestRetrySeconds * time.Second): + if !gotBlock { + bpr.Logger.Debug("Retrying block request(s) after timeout", "height", bpr.height, "peer", bpr.peerID, "secondPeerID", bpr.secondPeerID) + bpr.reset(bpr.peerID) + bpr.reset(bpr.secondPeerID) + continue OUTER_LOOP + } case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() + if bpr.didRequestFrom(peerID) { + removedBlock := bpr.reset(peerID) + if removedBlock { + gotBlock = false + } + } + // If both peers returned NoBlockResponse or bad block, reschedule both + // requests. If not, wait for the other peer. + if len(bpr.requestedFrom()) == 0 { continue OUTER_LOOP } - continue WAIT_LOOP + case newHeight := <-bpr.newHeightCh: + if !gotBlock && bpr.height-newHeight < minBlocksForSingleRequest { + // The operation is a noop if the second peer is already set. The cost is checking a mutex. + bpr.pickSecondPeerAndSendRequest() + } case <-bpr.gotBlockCh: + gotBlock = true // We got a block! // Continue the for-loop and wait til Quit. - continue WAIT_LOOP } } } diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 977b970cd54..abeac526bd8 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -276,7 +276,7 @@ func (bcR *Reactor) Receive(e p2p.Envelope) { } if err := bcR.pool.AddBlock(e.Src.ID(), bi, extCommit, msg.Block.Size()); err != nil { - bcR.Logger.Error("failed to add block", "err", err) + bcR.Logger.Error("failed to add block", "peer", e.Src, "err", err) } case *bcproto.StatusRequest: // Send peer our state. @@ -292,6 +292,7 @@ func (bcR *Reactor) Receive(e p2p.Envelope) { bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height) case *bcproto.NoBlockResponse: bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height) + bcR.pool.RedoRequestFrom(msg.Height, e.Src.ID()) default: bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } @@ -496,14 +497,14 @@ FOR_LOOP: } if err != nil { bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) + peerID := bcR.pool.RemovePeerAndRedoAllPeerRequests(first.Height) peer := bcR.Switch.Peers().Get(peerID) if peer != nil { // NOTE: we've already removed the peer's request, but we // still need to clean up the rest. bcR.Switch.StopPeerForError(peer, ErrReactorValidation{Err: err}) } - peerID2 := bcR.pool.RedoRequest(second.Height) + peerID2 := bcR.pool.RemovePeerAndRedoAllPeerRequests(second.Height) peer2 := bcR.Switch.Peers().Get(peerID2) if peer2 != nil && peer2 != peer { // NOTE: we've already removed the peer's request, but we