diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index 86eb0b28db..db1770a800 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -131,7 +131,6 @@ class FastSync( private var currentSkeletonState: Option[HeaderSkeleton] = None private var skeletonHandler: Option[ActorRef] = None private var batchFailuresCount = 0 - private var blockHeadersQueue: Seq[HeaderRange] = Nil private var requestedBlockBodies: Map[ActorRef, Seq[ByteString]] = Map.empty private var requestedReceipts: Map[ActorRef, Seq[ByteString]] = Map.empty @@ -260,9 +259,13 @@ class FastSync( updatedSkeleton.batchStartingHeaderNumbers.mkString(", ") ) currentSkeletonState = Some(updatedSkeleton) - blockHeadersQueue ++= updatedSkeleton.batchStartingHeaderNumbers.map(from => - HeaderRange(from, updatedSkeleton.batchSize) - ) + + val blockHeadersToRequest = + updatedSkeleton.batchStartingHeaderNumbers.map { from => + HeaderRange(from, updatedSkeleton.batchSize) + } + + syncState = syncState.enqueueHeaderRanges(blockHeadersToRequest) } } } @@ -333,7 +336,7 @@ class FastSync( def handleMasterPeerFailure(header: BlockHeader): Unit = { batchFailuresCount += 1 if (batchFailuresCount > fastSyncMaxBatchRetries) { - log.info("Max skeleton batch failures reached. Master peer must be wrong.") + log.info("Max number of allowed failures reached. Switching branch and master peer.") handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration) // Start branch resolution and wait for response from the FastSyncBranchResolver actor. @@ -343,14 +346,14 @@ class FastSync( } } - blockHeadersQueue :+= request + syncState = syncState.enqueueHeaderRange(request) error match { // These are the reasons that make the master peer suspicious case InvalidPenultimateHeader(_, header) => handleMasterPeerFailure(header) case InvalidBatchHash(_, header) => handleMasterPeerFailure(header) // Otherwise probably it's just this peer's fault case _ => - log.info(error.msg) + log.warning(error.msg) blockHeadersError(peer, reason) } } @@ -677,17 +680,16 @@ class FastSync( private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = { removeRequestHandler(handler) + requestedHeaders.get(peer).foreach(requested => syncState = syncState.enqueueHeaderRange(requested)) syncState = syncState .enqueueBlockBodies(requestedBlockBodies.getOrElse(handler, Nil)) .enqueueReceipts(requestedReceipts.getOrElse(handler, Nil)) + requestedHeaders -= peer requestedBlockBodies = requestedBlockBodies - handler requestedReceipts = requestedReceipts - handler - requestedHeaders -= peer - if (handshakedPeers.contains(peer.id)) { - blacklist.add(peer.id, blacklistDuration, reason) - } + blacklistIfHandshaked(peer.id, blacklistDuration, reason) } /** @@ -872,7 +874,7 @@ class FastSync( requestReceipts(peer) } else if (syncState.blockBodiesQueue.nonEmpty) { requestBlockBodies(peer) - } else if (blockHeadersQueue.nonEmpty) { + } else if (syncState.blockHeadersQueue.nonEmpty) { requestBlockHeaders(peer) } else if (shouldRequestNewSkeleton(peerInfo)) { requestSkeletonHeaders(peer) @@ -934,31 +936,36 @@ class FastSync( } def requestBlockHeaders(peer: Peer): Unit = { - val (request, remaining) = (blockHeadersQueue.head, blockHeadersQueue.tail) + val (toRequest, remaining) = (syncState.blockHeadersQueue.headOption, syncState.blockHeadersQueue.tail) + + toRequest match { + case Some(request) => + log.debug( + "Requesting [{}] block headers starting at block header [{}] from peer [{}]", + request.limit, + request.from, + peer.id.value + ) - log.debug( - "Requesting [{}] block headers starting at block header [{}] from peer [{}]", - request.limit, - request.from, - peer.id.value - ) + val handler = context.actorOf( + PeerRequestHandler.props[GetBlockHeaders, BlockHeaders]( + peer, + peerResponseTimeout, + etcPeerManager, + peerEventBus, + requestMsg = GetBlockHeaders(Left(request.from), request.limit, skip = 0, reverse = false), + responseMsgCode = Codes.BlockHeadersCode + ) + ) - val handler = context.actorOf( - PeerRequestHandler.props[GetBlockHeaders, BlockHeaders]( - peer, - peerResponseTimeout, - etcPeerManager, - peerEventBus, - requestMsg = GetBlockHeaders(Left(request.from), request.limit, skip = 0, reverse = false), - responseMsgCode = Codes.BlockHeadersCode - ) - ) + context watch handler + assignedHandlers += (handler -> peer) + requestedHeaders += (peer -> request) + syncState = syncState.copy(blockHeadersQueue = remaining) + peerRequestsTime += (peer -> Instant.now()) + case None => log.warning("Tried to request more block headers but work queue was empty.") + } - context watch handler - assignedHandlers += (handler -> peer) - requestedHeaders += (peer -> request) - blockHeadersQueue = remaining - peerRequestsTime += (peer -> Instant.now()) } def requestSkeletonHeaders(peer: Peer): Unit = { @@ -1071,10 +1078,11 @@ object FastSync { private case object PersistSyncState private case object PrintStatus - case class SyncState( + final case class SyncState( pivotBlock: BlockHeader, lastFullBlockNumber: BigInt = 0, safeDownloadTarget: BigInt = 0, + blockHeadersQueue: Seq[HeaderRange] = Nil, blockBodiesQueue: Seq[ByteString] = Nil, receiptsQueue: Seq[ByteString] = Nil, downloadedNodesCount: Long = 0, @@ -1086,13 +1094,20 @@ object FastSync { stateSyncFinished: Boolean = false ) { + def enqueueHeaderRange(headerRange: HeaderRange): SyncState = + copy(blockHeadersQueue = blockHeadersQueue :+ headerRange) + + def enqueueHeaderRanges(headerRanges: Seq[HeaderRange]): SyncState = + copy(blockHeadersQueue = blockHeadersQueue ++ headerRanges) + def enqueueBlockBodies(blockBodies: Seq[ByteString]): SyncState = copy(blockBodiesQueue = blockBodiesQueue ++ blockBodies) def enqueueReceipts(receipts: Seq[ByteString]): SyncState = copy(receiptsQueue = receiptsQueue ++ receipts) - def blockChainWorkQueued: Boolean = blockBodiesQueue.nonEmpty || receiptsQueue.nonEmpty + def blockChainWorkQueued: Boolean = + blockHeadersQueue.nonEmpty || blockBodiesQueue.nonEmpty || receiptsQueue.nonEmpty def updateNextBlockToValidate(header: BlockHeader, K: Int, X: Int): SyncState = copy( nextBlockToFullyValidate = @@ -1149,5 +1164,5 @@ object FastSync { case object LastBlockValidationFailed extends PivotBlockUpdateReason case object SyncRestart extends PivotBlockUpdateReason - private final case class HeaderRange(from: BigInt, limit: BigInt) + private[fast] final case class HeaderRange(from: BigInt, limit: BigInt) }