diff --git a/blocksync/pool.go b/blocksync/pool.go index 9a95fe56214..8d2149d68c8 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -4,7 +4,8 @@ import ( "errors" "fmt" "math" - "sync/atomic" + "math/rand" + "sync" "time" flow "github.com/cometbft/cometbft/libs/flowrate" @@ -28,11 +29,13 @@ eg, L = latency = 0.1s */ const ( - requestIntervalMS = 2 - maxTotalRequesters = 600 + requestIntervalMS = 2 + maxTotalRequesters = 10 + //maxTotalRequesters = 1200 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 - requestRetrySeconds = 30 + //maxPendingRequestsPerPeer = 50 + requestRetrySeconds = 5 // Minimum recv rate to ensure we're receiving blocks from a peer fast // enough. If a peer is not sending us data at at least that rate, we @@ -42,11 +45,11 @@ const ( // sending data across atlantic ~ 7.5 KB/s. minRecvRate = 7680 - // Maximum difference between current and new block's height. - maxDiffBetweenCurrentAndReceivedBlockHeight = 100 + // // Maximum difference between current and new block's height. + // maxDiffBetweenCurrentAndReceivedBlockHeight = 4 ) -var peerTimeout = 15 * time.Second // not const so we can override with tests +var peerTimeout = 10 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -68,13 +71,11 @@ type BlockPool struct { // block requests requesters map[int64]*bpRequester height int64 // the lowest key in requesters. + heightRecv map[int64]bool // peers peers map[p2p.ID]*bpPeer maxPeerHeight int64 // the biggest reported height - // atomic - numPending int32 // number of requests pending assignment or block response - requestsCh chan<- BlockRequest errorsCh chan<- peerError } @@ -87,7 +88,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p requesters: make(map[int64]*bpRequester), height: start, - numPending: 0, + heightRecv: make(map[int64]bool), requestsCh: requestsCh, errorsCh: errorsCh, @@ -108,23 +109,15 @@ func (pool *BlockPool) OnStart() error { func (pool *BlockPool) makeRequestersRoutine() { for { if !pool.IsRunning() { - break + return } - _, numPending, lenRequesters := pool.GetStatus() + _, lenRequesters := pool.GetStatus() switch { - case numPending >= maxPendingRequests: - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() case lenRequesters >= maxTotalRequesters: - // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers pool.removeTimedoutPeers() default: - // request for more blocks. pool.makeNextRequester() } } @@ -154,13 +147,12 @@ func (pool *BlockPool) removeTimedoutPeers() { } } -// GetStatus returns pool's height, numPending requests and the number of -// requesters. -func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { +// GetStatus returns pool's height and the number of requesters. +func (pool *BlockPool) GetStatus() (height int64, lenRequesters int) { pool.mtx.Lock() defer pool.mtx.Unlock() - return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) + return pool.height, len(pool.requesters) } // IsCaughtUp returns true if this node is caught up, false - otherwise. @@ -219,6 +211,7 @@ func (pool *BlockPool) PopRequest() { pool.Logger.Error("Error stopping requester", "err", err) } delete(pool.requesters, pool.height) + // This is where we increase the pool's height pool.height++ } else { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) @@ -261,14 +254,16 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int if diff < 0 { diff *= -1 } - if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) - } + // const maxDiff = 50 // maximum difference between current and received block height + // if diff > maxDiff { + // pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + // } return } + fmt.Println("in add block", block.Height) + if requester.setBlock(block, peerID) { - atomic.AddInt32(&pool.numPending, -1) peer := pool.peers[peerID] if peer != nil { peer.decrPending(blockSize) @@ -349,13 +344,54 @@ func (pool *BlockPool) updateMaxPeerHeight() { pool.maxPeerHeight = max } -// Pick an available peer with the given height available. -// If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +// // Pick an available peer with the given height available. +// // If no peers are available, returns nil. +// func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +// pool.mtx.Lock() +// defer pool.mtx.Unlock() + +// for _, peer := range pool.peers { +// if peer.didTimeout { +// pool.removePeer(peer.id) +// continue +// } +// if peer.numPending >= maxPendingRequestsPerPeer { +// continue +// } +// if height < peer.base || height > peer.height { +// continue +// } +// peer.incrPending() +// return peer +// } +// return nil +// } + +func (pool *BlockPool) pickIncrAvailablePeers(height int64) []*bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() + // Convert the map to a slice for shuffling + peerSlice := make([]*bpPeer, 0, len(pool.peers)) for _, peer := range pool.peers { + peerSlice = append(peerSlice, peer) + } + + // Shuffle the slice + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(peerSlice), func(i, j int) { + peerSlice[i], peerSlice[j] = peerSlice[j], peerSlice[i] + }) + + var peers []*bpPeer + peerLimit := 3 + // if height > pool.height+10 { + // peerLimit = 1 + // } + for _, peer := range peerSlice { + if len(peers) >= peerLimit { + break + } if peer.didTimeout { pool.removePeer(peer.id) continue @@ -367,9 +403,15 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { continue } peer.incrPending() - return peer + peers = append(peers, peer) } - return nil + + // var peerIDs []p2p.ID + // for _, peer := range peers { + // peerIDs = append(peerIDs, peer.id) + // } + //fmt.Println("len of picking peers: ", len(peers), "for height ", height, "peers", peerIDs) + return peers } func (pool *BlockPool) makeNextRequester() { @@ -380,11 +422,11 @@ func (pool *BlockPool) makeNextRequester() { if nextHeight > pool.maxPeerHeight { return } + //fmt.Println("nextHeight", nextHeight) request := newBPRequester(pool, nextHeight) pool.requesters[nextHeight] = request - atomic.AddInt32(&pool.numPending, 1) err := request.Start() if err != nil { @@ -396,11 +438,11 @@ func (pool *BlockPool) requestersLen() int64 { return int64(len(pool.requesters)) } -func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { +func (pool *BlockPool) sendRequest(height int64, peerIDs p2p.ID) { if !pool.IsRunning() { return } - pool.requestsCh <- BlockRequest{height, peerID} + pool.requestsCh <- BlockRequest{height, peerIDs} } func (pool *BlockPool) sendError(err error, peerID p2p.ID) { @@ -410,25 +452,25 @@ func (pool *BlockPool) sendError(err error, peerID p2p.ID) { pool.errorsCh <- peerError{err, peerID} } -// for debugging purposes -// -//nolint:unused -func (pool *BlockPool) debug() string { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - str := "" - nextHeight := pool.height + pool.requestersLen() - for h := pool.height; h < nextHeight; h++ { - if pool.requesters[h] == nil { - str += fmt.Sprintf("H(%v):X ", h) - } else { - str += fmt.Sprintf("H(%v):", h) - str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil) - } - } - return str -} +// // for debugging purposes +// // +// //nolint:unused +// func (pool *BlockPool) debug() string { +// pool.mtx.Lock() +// defer pool.mtx.Unlock() + +// str := "" +// nextHeight := pool.height + pool.requestersLen() +// for h := pool.height; h < nextHeight; h++ { +// if pool.requesters[h] == nil { +// str += fmt.Sprintf("H(%v):X ", h) +// } else { +// str += fmt.Sprintf("H(%v):", h) +// str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil) +// } +// } +// return str +// } //------------------------------------- @@ -540,7 +582,9 @@ func (bpr *bpRequester) OnStart() error { // Returns true if the peer matches and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { + // temp remove + // || bpr.peerID != peerID + if bpr.block != nil { bpr.mtx.Unlock() return false } @@ -571,9 +615,9 @@ func (bpr *bpRequester) reset() { bpr.mtx.Lock() defer bpr.mtx.Unlock() - if bpr.block != nil { - atomic.AddInt32(&bpr.pool.numPending, 1) - } + // if bpr.block != nil { + // atomic.AddInt32(&bpr.pool.numPending, 1) + // } bpr.peerID = "" bpr.block = nil @@ -589,64 +633,169 @@ func (bpr *bpRequester) redo(peerID p2p.ID) { } } -// Responsible for making more requests as necessary -// Returns only when a block is found (e.g. AddBlock() is called) +// // Responsible for making more requests as necessary +// // Returns only when a block is found (e.g. AddBlock() is called) +// func (bpr *bpRequester) requestRoutine() { +// OUTER_LOOP: +// for { +// // Pick a peer to send request to. +// var peer *bpPeer +// PICK_PEER_LOOP: +// for { +// if !bpr.IsRunning() || !bpr.pool.IsRunning() { +// return +// } +// peer = bpr.pool.pickIncrAvailablePeer(bpr.height) +// if peer == nil { +// bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) +// time.Sleep(requestIntervalMS * time.Millisecond) +// continue PICK_PEER_LOOP +// } +// break PICK_PEER_LOOP +// } +// bpr.mtx.Lock() +// bpr.peerID = peer.id +// bpr.mtx.Unlock() + +// to := time.NewTimer(requestRetrySeconds * time.Second) +// // Send request and wait. +// //fmt.Println("sending request for ", bpr.height) +// fmt.Println("sending request for height ", bpr.height, "ID ", peer.id) +// bpr.pool.sendRequest(bpr.height, peer.id) +// WAIT_LOOP: +// for { +// select { +// case <-bpr.pool.Quit(): +// if err := bpr.Stop(); err != nil { +// bpr.Logger.Error("Error stopped requester", "err", err) +// } +// return +// case <-bpr.Quit(): +// return +// case <-to.C: +// bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) +// // Simulate a redo +// bpr.reset() +// continue OUTER_LOOP +// case peerID := <-bpr.redoCh: +// if peerID == bpr.peerID { +// bpr.reset() +// continue OUTER_LOOP +// } else { +// continue WAIT_LOOP +// } +// case <-bpr.gotBlockCh: +// // We got a block! +// // Continue the for-loop and wait til Quit. +// continue WAIT_LOOP +// } +// } +// } +// } + func (bpr *bpRequester) requestRoutine() { -OUTER_LOOP: for { - // Pick a peer to send request to. - var peer *bpPeer + // Pick peers to send request to. + var peers []*bpPeer PICK_PEER_LOOP: for { if !bpr.IsRunning() || !bpr.pool.IsRunning() { return } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { + peers = bpr.pool.pickIncrAvailablePeers(bpr.height) // function to get all available peers + if len(peers) < 3 { bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) time.Sleep(requestIntervalMS * time.Millisecond) continue PICK_PEER_LOOP } break PICK_PEER_LOOP } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() + // bpr.mtx.Lock() + // for _, peer := range peers { + // bpr.peerIDs = append(bpr.peerIDs, peer.id) + // // Send request and wait. + // // if i == 0 { + // // fmt.Println("sending request for height ", bpr.height) + // // } + // //fmt.Println("sending request for height ", bpr.height, "ID ", bpr.peerID) + // } + // //bpr.pool.sendRequest(bpr.height, bpr.peerIDs) + // bpr.mtx.Unlock() + + //to := time.NewTimer(requestRetrySeconds * time.Second) + + // Create a channel to signal the arrival of a block. + blockArrived := make(chan struct{}, 1) + var wg sync.WaitGroup + + fmt.Println("length peers ", len(peers)) + for _, peer := range peers { + wg.Add(1) + go func(peer *bpPeer) { + defer wg.Done() + fmt.Println("go routine send req ", bpr.height) + bpr.pool.sendRequest(bpr.height, peer.id) + select { + case <-blockArrived: + // If a block has arrived from another peer, stop this goroutine. + return + case <-time.After(requestRetrySeconds * time.Second): + // Timeout for this peer; consider sending a new request or logging an error. + bpr.Logger.Debug("Request to peer timed out", "peer", peer.id, "height", bpr.height) + } + }(peer) + } - to := time.NewTimer(requestRetrySeconds * time.Second) - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: for { select { case <-bpr.pool.Quit(): - if err := bpr.Stop(); err != nil { - bpr.Logger.Error("Error stopped requester", "err", err) - } + close(blockArrived) + wg.Wait() // Wait for all request goroutines to finish. return case <-bpr.Quit(): + close(blockArrived) + wg.Wait() // Wait for all request goroutines to finish. return - case <-to.C: - bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) - // Simulate a redo - bpr.reset() - continue OUTER_LOOP - case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() - continue OUTER_LOOP - } else { - continue WAIT_LOOP - } + // case <-to.C: + // fmt.Println("GOT TIMEOUT REQUEST ", bpr.height) + // if bpr.pool.heightRecv[bpr.height] { + // continue WAIT_LOOP + // } + // bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerIDs) + // // Simulate a redo + // bpr.reset() + // continue OUTER_LOOP + // case peerID := <-bpr.redoCh: + // fmt.Println("GOT REDO REQUEST ", bpr.height) + // if bpr.pool.heightRecv[bpr.height] { + // continue WAIT_LOOP + // } + // if idExists(peerID, bpr.peerIDs) { + // bpr.reset() + // continue OUTER_LOOP + // } else { + // continue WAIT_LOOP + // } case <-bpr.gotBlockCh: - // We got a block! - // Continue the for-loop and wait til Quit. - continue WAIT_LOOP + // We got a block! Signal all goroutines to stop. + fmt.Println("got block ch ", bpr.height) + close(blockArrived) + wg.Wait() // Wait for all request goroutines to finish. + return } } } } +func idExists(id p2p.ID, ids []p2p.ID) bool { + for _, existingID := range ids { + if existingID == id { + return true + } + } + return false +} + // BlockRequest stores a block request identified by the block Height and the PeerID responsible for // delivering the block type BlockRequest struct { diff --git a/blocksync/reactor.go b/blocksync/reactor.go index d6c01c81e44..041f607e6d5 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -229,6 +229,7 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { bcR.respondToPeer(msg, e.Src) case *bcproto.BlockResponse: bi, err := types.BlockFromProto(msg.Block) + fmt.Println("got a block resp ", bi.Height) if err != nil { bcR.Logger.Error("Peer sent us invalid block", "peer", e.Src, "msg", e.Message, "err", err) bcR.Switch.StopPeerForError(e.Src, err) @@ -285,10 +286,12 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) { case <-bcR.pool.Quit(): return case request := <-bcR.requestsCh: + //fmt.Println("len of peers ", len(request.PeerIDs), "IDs", request.PeerIDs) peer := bcR.Switch.Peers().Get(request.PeerID) if peer == nil { continue } + fmt.Println("try send env ", request.Height) queued := peer.TrySendEnvelope(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: &bcproto.BlockRequest{Height: request.Height}, @@ -314,9 +317,9 @@ FOR_LOOP: for { select { case <-switchToConsensusTicker.C: - height, numPending, lenRequesters := bcR.pool.GetStatus() + height, lenRequesters := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() - bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters, + bcR.Logger.Debug("Consensus ticker", "total", lenRequesters, "outbound", outbound, "inbound", inbound) if bcR.pool.IsCaughtUp() { bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)