Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETCM-716] Readd block headers to work queue in case of errors #943

Merged
merged 1 commit into from
Mar 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 52 additions & 37 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ class FastSync(
private var currentSkeletonState: Option[HeaderSkeleton] = None
private var skeletonHandler: Option[ActorRef] = None
private var batchFailuresCount = 0
private var blockHeadersQueue: Seq[HeaderRange] = Nil

private var requestedBlockBodies: Map[ActorRef, Seq[ByteString]] = Map.empty
private var requestedReceipts: Map[ActorRef, Seq[ByteString]] = Map.empty
Expand Down Expand Up @@ -260,9 +259,13 @@ class FastSync(
updatedSkeleton.batchStartingHeaderNumbers.mkString(", ")
)
currentSkeletonState = Some(updatedSkeleton)
blockHeadersQueue ++= updatedSkeleton.batchStartingHeaderNumbers.map(from =>
HeaderRange(from, updatedSkeleton.batchSize)
)

val blockHeadersToRequest =
updatedSkeleton.batchStartingHeaderNumbers.map { from =>
HeaderRange(from, updatedSkeleton.batchSize)
}

syncState = syncState.enqueueHeaderRanges(blockHeadersToRequest)
}
}
}
Expand Down Expand Up @@ -333,7 +336,7 @@ class FastSync(
def handleMasterPeerFailure(header: BlockHeader): Unit = {
batchFailuresCount += 1
if (batchFailuresCount > fastSyncMaxBatchRetries) {
log.info("Max skeleton batch failures reached. Master peer must be wrong.")
log.info("Max number of allowed failures reached. Switching branch and master peer.")
handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration)

// Start branch resolution and wait for response from the FastSyncBranchResolver actor.
Expand All @@ -343,14 +346,14 @@ class FastSync(
}
}

blockHeadersQueue :+= request
syncState = syncState.enqueueHeaderRange(request)
error match {
// These are the reasons that make the master peer suspicious
case InvalidPenultimateHeader(_, header) => handleMasterPeerFailure(header)
case InvalidBatchHash(_, header) => handleMasterPeerFailure(header)
// Otherwise probably it's just this peer's fault
case _ =>
log.info(error.msg)
log.warning(error.msg)
blockHeadersError(peer, reason)
}
}
Expand Down Expand Up @@ -677,17 +680,16 @@ class FastSync(
private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = {
removeRequestHandler(handler)

requestedHeaders.get(peer).foreach(requested => syncState = syncState.enqueueHeaderRange(requested))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the relevant line for fixing the bug.

syncState = syncState
.enqueueBlockBodies(requestedBlockBodies.getOrElse(handler, Nil))
.enqueueReceipts(requestedReceipts.getOrElse(handler, Nil))

requestedHeaders -= peer
requestedBlockBodies = requestedBlockBodies - handler
requestedReceipts = requestedReceipts - handler

requestedHeaders -= peer
if (handshakedPeers.contains(peer.id)) {
blacklist.add(peer.id, blacklistDuration, reason)
}
blacklistIfHandshaked(peer.id, blacklistDuration, reason)
}

/**
Expand Down Expand Up @@ -872,7 +874,7 @@ class FastSync(
requestReceipts(peer)
} else if (syncState.blockBodiesQueue.nonEmpty) {
requestBlockBodies(peer)
} else if (blockHeadersQueue.nonEmpty) {
} else if (syncState.blockHeadersQueue.nonEmpty) {
requestBlockHeaders(peer)
} else if (shouldRequestNewSkeleton(peerInfo)) {
requestSkeletonHeaders(peer)
Expand Down Expand Up @@ -934,31 +936,36 @@ class FastSync(
}

def requestBlockHeaders(peer: Peer): Unit = {
val (request, remaining) = (blockHeadersQueue.head, blockHeadersQueue.tail)
val (toRequest, remaining) = (syncState.blockHeadersQueue.headOption, syncState.blockHeadersQueue.tail)

toRequest match {
case Some(request) =>
log.debug(
"Requesting [{}] block headers starting at block header [{}] from peer [{}]",
request.limit,
request.from,
peer.id.value
)

log.debug(
"Requesting [{}] block headers starting at block header [{}] from peer [{}]",
request.limit,
request.from,
peer.id.value
)
val handler = context.actorOf(
PeerRequestHandler.props[GetBlockHeaders, BlockHeaders](
peer,
peerResponseTimeout,
etcPeerManager,
peerEventBus,
requestMsg = GetBlockHeaders(Left(request.from), request.limit, skip = 0, reverse = false),
responseMsgCode = Codes.BlockHeadersCode
)
)

val handler = context.actorOf(
PeerRequestHandler.props[GetBlockHeaders, BlockHeaders](
peer,
peerResponseTimeout,
etcPeerManager,
peerEventBus,
requestMsg = GetBlockHeaders(Left(request.from), request.limit, skip = 0, reverse = false),
responseMsgCode = Codes.BlockHeadersCode
)
)
context watch handler
assignedHandlers += (handler -> peer)
requestedHeaders += (peer -> request)
syncState = syncState.copy(blockHeadersQueue = remaining)
peerRequestsTime += (peer -> Instant.now())
case None => log.warning("Tried to request more block headers but work queue was empty.")
}

context watch handler
assignedHandlers += (handler -> peer)
requestedHeaders += (peer -> request)
blockHeadersQueue = remaining
peerRequestsTime += (peer -> Instant.now())
}

def requestSkeletonHeaders(peer: Peer): Unit = {
Expand Down Expand Up @@ -1071,10 +1078,11 @@ object FastSync {
private case object PersistSyncState
private case object PrintStatus

case class SyncState(
final case class SyncState(
pivotBlock: BlockHeader,
lastFullBlockNumber: BigInt = 0,
safeDownloadTarget: BigInt = 0,
blockHeadersQueue: Seq[HeaderRange] = Nil,
blockBodiesQueue: Seq[ByteString] = Nil,
receiptsQueue: Seq[ByteString] = Nil,
downloadedNodesCount: Long = 0,
Expand All @@ -1086,13 +1094,20 @@ object FastSync {
stateSyncFinished: Boolean = false
) {

def enqueueHeaderRange(headerRange: HeaderRange): SyncState =
copy(blockHeadersQueue = blockHeadersQueue :+ headerRange)

def enqueueHeaderRanges(headerRanges: Seq[HeaderRange]): SyncState =
copy(blockHeadersQueue = blockHeadersQueue ++ headerRanges)

def enqueueBlockBodies(blockBodies: Seq[ByteString]): SyncState =
copy(blockBodiesQueue = blockBodiesQueue ++ blockBodies)

def enqueueReceipts(receipts: Seq[ByteString]): SyncState =
copy(receiptsQueue = receiptsQueue ++ receipts)

def blockChainWorkQueued: Boolean = blockBodiesQueue.nonEmpty || receiptsQueue.nonEmpty
def blockChainWorkQueued: Boolean =
blockHeadersQueue.nonEmpty || blockBodiesQueue.nonEmpty || receiptsQueue.nonEmpty

def updateNextBlockToValidate(header: BlockHeader, K: Int, X: Int): SyncState = copy(
nextBlockToFullyValidate =
Expand Down Expand Up @@ -1149,5 +1164,5 @@ object FastSync {
case object LastBlockValidationFailed extends PivotBlockUpdateReason
case object SyncRestart extends PivotBlockUpdateReason

private final case class HeaderRange(from: BigInt, limit: BigInt)
private[fast] final case class HeaderRange(from: BigInt, limit: BigInt)
}