Skip to content

Commit

Permalink
[ETCM-739] Refactor BlockFetcher
Browse files Browse the repository at this point in the history
Change BlockFetcher to typed actor

Split fetcher message handling among child actors

Abstract the fetch trait

Scalafmt

Refactor blockFetcherState
  • Loading branch information
AnastasiiaL committed Apr 29, 2021
1 parent c38c2c2 commit 9876ed2
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 748 deletions.
2 changes: 1 addition & 1 deletion nix/overlay.nix
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ rev: final: prev: {

mantis = final.callPackage ./mantis.nix {
src = ../.;
depsSha256 = "sha256-0AeemKFcIU3eVGse8QQGauJeRsF7IgCLo5Yqu2FZsMs=";
depsSha256 = "sha256-US4L/xh2otnEfOa05bazb14bgYhQZpF4GfFY30sDkNY=";
};

mantis-hash = final.mantis.override {
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ object Dependencies {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test"
)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.collection.immutable.Queue
* - haven't fetched any yet
* - are awaiting a response
* - are awaiting a response but it should be ignored due to blocks being invalidated
* @param stateNodeFetcher
* @param lastBlock
* @param knownTop
* @param blockProviders
Expand All @@ -41,17 +40,13 @@ case class BlockFetcherState(
waitingHeaders: Queue[BlockHeader],
fetchingHeadersState: FetchingHeadersState,
fetchingBodiesState: FetchingBodiesState,
pausedFetching: Boolean = false,
stateNodeFetcher: Option[StateNodeFetcher],
lastBlock: BigInt,
knownTop: BigInt,
blockProviders: Map[BigInt, PeerId]
) {

def isFetching: Boolean = isFetchingHeaders || isFetchingBodies

def isFetchingStateNode: Boolean = stateNodeFetcher.isDefined

private def hasEmptyBuffer: Boolean = readyBlocks.isEmpty && waitingHeaders.isEmpty

def hasFetchedTopHeader: Boolean = nextBlockToFetch == knownTop + 1
Expand Down Expand Up @@ -88,31 +83,6 @@ case class BlockFetcherState(
)
})

def tryInsertBlock(block: Block, peerId: PeerId): Either[String, BlockFetcherState] = {
val blockHash = block.hash
if (isExist(blockHash)) {
Right(this)
} else if (isExistInReadyBlocks(block.header.parentHash)) {
val newState = clearQueues()
.copy(
readyBlocks = readyBlocks.takeWhile(_.number < block.number).enqueue(block)
)
.withPeerForBlocks(peerId, Seq(block.number))
.withKnownTopAt(block.number)
Right(newState)
} else if (isExistInWaitingHeaders(block.header.parentHash)) {
// ignore already requested bodies
val newFetchingBodiesState =
if (fetchingBodiesState == AwaitingBodies) AwaitingBodiesToBeIgnored else fetchingBodiesState
val newState = copy(
waitingHeaders = waitingHeaders.takeWhile(_.number < block.number).enqueue(block.header),
fetchingBodiesState = newFetchingBodiesState
)
.withKnownTopAt(block.number)
Right(newState)
} else Left(s"Cannot insert block [${ByteStringUtils.hash2string(blockHash)}] into the queues")
}

/**
* Validates received headers consistency and their compatibility with the state
* TODO ETCM-370: This needs to be more fine-grained and detailed so blacklisting can be re-enabled
Expand Down Expand Up @@ -235,7 +205,7 @@ case class BlockFetcherState(
.filter(_.headOption.exists(block => block.number <= lower))
.filter(_.lastOption.exists(block => block.number >= upper))
.filter(_.nonEmpty)
.map(blocks => (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue())))
.map(blocks => (NonEmptyList(blocks.head, blocks.tail.toList), copy(readyBlocks = Queue(), lastBlock = blocks.last.number)))
}

def clearQueues(): BlockFetcherState = {
Expand Down Expand Up @@ -267,11 +237,11 @@ case class BlockFetcherState(
)
}

def isExist(hash: ByteString): Boolean = isExistInReadyBlocks(hash) || isExistInWaitingHeaders(hash)
def exists(hash: ByteString): Boolean = existsInReadyBlocks(hash) || existsInWaitingHeaders(hash)

def isExistInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash)
def existsInWaitingHeaders(hash: ByteString): Boolean = waitingHeaders.exists(_.hash == hash)

def isExistInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash)
def existsInReadyBlocks(hash: ByteString): Boolean = readyBlocks.exists(_.hash == hash)

def withLastBlock(nr: BigInt): BlockFetcherState = copy(lastBlock = nr)

Expand All @@ -296,14 +266,6 @@ case class BlockFetcherState(
def withNewBodiesFetch: BlockFetcherState = copy(fetchingBodiesState = AwaitingBodies)
def withBodiesFetchReceived: BlockFetcherState = copy(fetchingBodiesState = NotFetchingBodies)

def withPausedFetching: BlockFetcherState = copy(pausedFetching = true)
def withResumedFetching: BlockFetcherState = copy(pausedFetching = false)

def fetchingStateNode(hash: ByteString, requestor: ActorRef): BlockFetcherState =
copy(stateNodeFetcher = Some(StateNodeFetcher(hash, requestor)))

def notFetchingStateNode(): BlockFetcherState = copy(stateNodeFetcher = None)

def status: Map[String, Any] = Map(
"ready blocks" -> readyBlocks.size,
"known top" -> knownTop,
Expand All @@ -314,7 +276,6 @@ case class BlockFetcherState(
"fetched headers" -> waitingHeaders.size,
"fetching headers" -> isFetchingHeaders,
"fetching bodies" -> isFetchingBodies,
"fetching state node" -> isFetchingStateNode,
"fetched top header" -> hasFetchedTopHeader,
"first header" -> waitingHeaders.headOption.map(_.number),
"first block" -> readyBlocks.headOption.map(_.number),
Expand All @@ -333,7 +294,6 @@ object BlockFetcherState {
waitingHeaders = Queue(),
fetchingHeadersState = NotFetchingHeaders,
fetchingBodiesState = NotFetchingBodies,
stateNodeFetcher = None,
lastBlock = lastBlock,
knownTop = lastBlock + 1,
blockProviders = Map()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ class BlockImporter(

private def pickBlocks(state: ImporterState): Unit = {
val msg =
state.resolvingBranchFrom.fold[BlockFetcher.FetchMsg](BlockFetcher.PickBlocks(syncConfig.blocksBatchSize))(from =>
BlockFetcher.StrictPickBlocks(from, startingBlockNumber)
)
state.resolvingBranchFrom.fold[BlockFetcher.FetchCommand](
BlockFetcher.PickBlocks(syncConfig.blocksBatchSize, self)
)(from => BlockFetcher.StrictPickBlocks(from, startingBlockNumber, self))

fetcher ! msg
}
Expand Down Expand Up @@ -172,7 +172,7 @@ class BlockImporter(

err match {
case e: MissingNodeException =>
fetcher ! BlockFetcher.FetchStateNode(e.hash)
fetcher ! BlockFetcher.FetchStateNode(e.hash, self)
ResolvingMissingNode(NonEmptyList(notImportedBlocks.head, notImportedBlocks.tail))
case _ =>
val invalidBlockNr = notImportedBlocks.head.number
Expand All @@ -189,7 +189,7 @@ class BlockImporter(
if (blocks.isEmpty) {
importedBlocks.headOption match {
case Some(block) =>
supervisor ! ProgressProtocol.ImportedBlock(block.number, block.hasCheckpoint, internally = false)
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally = false)
case None => ()
}

Expand Down Expand Up @@ -243,7 +243,7 @@ class BlockImporter(
val (blocks, weights) = importedBlocksData.map(data => (data.block, data.weight)).unzip
broadcastBlocks(blocks, weights)
updateTxPool(importedBlocksData.map(_.block), Seq.empty)
supervisor ! ProgressProtocol.ImportedBlock(block.number, block.hasCheckpoint, internally)
supervisor ! ProgressProtocol.ImportedBlock(block.number, internally)
case BlockEnqueued => ()
case DuplicateBlock => ()
case UnknownParent => () // This is normal when receiving broadcast blocks
Expand All @@ -252,7 +252,7 @@ class BlockImporter(
broadcastBlocks(newBranch, weights)
newBranch.lastOption match {
case Some(newBlock) =>
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, block.hasCheckpoint, internally)
supervisor ! ProgressProtocol.ImportedBlock(newBlock.number, internally)
case None => ()
}
case BlockImportFailedDueToMissingNode(missingNodeException) if syncConfig.redownloadMissingStateNodes =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
import akka.actor.{ActorRef => ClassicActorRef}
import akka.util.ByteString
import io.iohk.ethereum.blockchain.sync.PeersClient.{BestPeer, Request}
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.FetchCommand
import io.iohk.ethereum.blockchain.sync.regular.BodiesFetcher.BodiesFetcherCommand
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.Message
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBodies, GetBlockBodies}
import io.iohk.ethereum.utils.Config.SyncConfig
import monix.execution.Scheduler

import scala.util.{Failure, Success}

class BodiesFetcher(
val peersClient: ClassicActorRef,
val syncConfig: SyncConfig,
val supervisor: ActorRef[FetchCommand],
context: ActorContext[BodiesFetcher.BodiesFetcherCommand]
) extends AbstractBehavior[BodiesFetcher.BodiesFetcherCommand](context)
with FetchRequest[BodiesFetcherCommand] {

val log = context.log
implicit val ec: Scheduler = Scheduler(context.executionContext)

import BodiesFetcher._

override def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): BodiesFetcherCommand = AdaptedMessage(peer, msg)

override def onMessage(message: BodiesFetcherCommand): Behavior[BodiesFetcherCommand] = {
message match {
case FetchBodies(hashes) =>
log.debug("Start fetching bodies")
requestBodies(hashes)
Behaviors.same
case AdaptedMessage(peer, BlockBodies(bodies)) =>
log.debug(s"Received ${bodies.size} block bodies")
supervisor ! BlockFetcher.ReceivedBodies(peer, bodies)
Behaviors.same
case BodiesFetcher.RetryBodiesRequest =>
supervisor ! BlockFetcher.RetryBodiesRequest
Behaviors.same
case _ => Behaviors.unhandled
}
}

private def requestBodies(hashes: Seq[ByteString]): Unit = {
val resp = makeRequest(Request.create(GetBlockBodies(hashes), BestPeer), BodiesFetcher.RetryBodiesRequest)
context.pipeToSelf(resp.runToFuture) {
case Success(res) => res
case Failure(_) => BodiesFetcher.RetryBodiesRequest
}
}
}

object BodiesFetcher {

def apply(
peersClient: ClassicActorRef,
syncConfig: SyncConfig,
supervisor: ActorRef[FetchCommand]
): Behavior[BodiesFetcherCommand] =
Behaviors.setup(context => new BodiesFetcher(peersClient, syncConfig, supervisor, context))

sealed trait BodiesFetcherCommand
final case class FetchBodies(hashes: Seq[ByteString]) extends BodiesFetcherCommand
final case object RetryBodiesRequest extends BodiesFetcherCommand
private final case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends BodiesFetcherCommand
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.iohk.ethereum.blockchain.sync.regular

import akka.actor.ActorRef
import io.iohk.ethereum.blockchain.sync.PeersClient
import io.iohk.ethereum.blockchain.sync.PeersClient.{BlacklistPeer, NoSuitablePeer, Request, RequestFailed}
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.Message
import io.iohk.ethereum.utils.Config.SyncConfig
import monix.eval.Task
import org.slf4j.Logger
import akka.pattern.ask
import akka.util.Timeout
import io.iohk.ethereum.utils.FunctorOps._

import scala.concurrent.duration._
import scala.util.Failure

trait FetchRequest[A] {
val peersClient: ActorRef
val syncConfig: SyncConfig
val log: Logger

def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): A

implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 2.second // some margin for actor communication

def makeRequest(request: Request[_], responseFallback: A): Task[A] =
Task
.deferFuture(peersClient ? request)
.tap(blacklistPeerOnFailedRequest)
.flatMap(handleRequestResult(responseFallback))
.onErrorHandle { error =>
log.error("Unexpected error while doing a request", error)
responseFallback
}

def blacklistPeerOnFailedRequest(msg: Any): Unit = msg match {
case RequestFailed(peer, reason) => peersClient ! BlacklistPeer(peer.id, reason)
case _ => ()
}

def handleRequestResult(fallback: A)(msg: Any): Task[A] = {
msg match {
case failed: RequestFailed =>
log.debug("Request failed due to {}", failed)
Task.now(fallback)
case NoSuitablePeer =>
Task.now(fallback).delayExecution(syncConfig.syncRetryInterval)
case Failure(cause) =>
log.error("Unexpected error on the request result", cause)
Task.now(fallback)
case PeersClient.Response(peer, msg) =>
Task.now(makeAdaptedMessage(peer, msg))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.iohk.ethereum.blockchain.sync.regular
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
import akka.actor.{ActorRef => ClassicActorRef}
import io.iohk.ethereum.blockchain.sync.PeersClient.{BestPeer, Request}
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.FetchCommand
import io.iohk.ethereum.blockchain.sync.regular.HeadersFetcher.HeadersFetcherCommand
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.Message
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockHeaders, GetBlockHeaders}
import io.iohk.ethereum.utils.Config.SyncConfig
import monix.eval.Task
import monix.execution.Scheduler
import org.slf4j.Logger

import scala.util.{Failure, Success}

class HeadersFetcher(
val peersClient: ClassicActorRef,
val syncConfig: SyncConfig,
val supervisor: ActorRef[FetchCommand],
context: ActorContext[HeadersFetcher.HeadersFetcherCommand]
) extends AbstractBehavior[HeadersFetcher.HeadersFetcherCommand](context)
with FetchRequest[HeadersFetcherCommand] {

override val log: Logger = context.log
implicit val ec: Scheduler = Scheduler(context.executionContext)

import HeadersFetcher._

override def makeAdaptedMessage[T <: Message](peer: Peer, msg: T): HeadersFetcherCommand = AdaptedMessage(peer, msg)

override def onMessage(message: HeadersFetcherCommand): Behavior[HeadersFetcherCommand] =
message match {
case FetchHeaders(blockNumber: BigInt, amount: BigInt) =>
log.debug("Start fetching headers from block {}", blockNumber)
requestHeaders(blockNumber, amount)
Behaviors.same
case AdaptedMessage(_, BlockHeaders(headers)) =>
log.debug("Fetched {} headers starting from block {}", headers.size, headers.headOption.map(_.number))
supervisor ! BlockFetcher.ReceivedHeaders(headers)
Behaviors.same
case HeadersFetcher.RetryHeadersRequest =>
supervisor ! BlockFetcher.RetryHeadersRequest
Behaviors.same
case _ => Behaviors.unhandled
}

private def requestHeaders(blockNr: BigInt, amount: BigInt): Unit = {
log.debug("Fetching headers from block {}", blockNr)
val msg = GetBlockHeaders(Left(blockNr), amount, skip = 0, reverse = false)

val resp = makeRequest(Request.create(msg, BestPeer), HeadersFetcher.RetryHeadersRequest)
.flatMap {
case AdaptedMessage(_, BlockHeaders(headers)) if headers.isEmpty =>
log.debug("Empty BlockHeaders response. Retry in {}", syncConfig.syncRetryInterval)
Task.now(HeadersFetcher.RetryHeadersRequest).delayResult(syncConfig.syncRetryInterval)
case res => Task.now(res)
}

context.pipeToSelf(resp.runToFuture) {
case Success(res) => res
case Failure(_) => HeadersFetcher.RetryHeadersRequest
}
}
}

object HeadersFetcher {

def apply(
peersClient: ClassicActorRef,
syncConfig: SyncConfig,
supervisor: ActorRef[FetchCommand]
): Behavior[HeadersFetcherCommand] =
Behaviors.setup(context => new HeadersFetcher(peersClient, syncConfig, supervisor, context))

sealed trait HeadersFetcherCommand
final case class FetchHeaders(blockNumber: BigInt, amount: BigInt) extends HeadersFetcherCommand
final case object RetryHeadersRequest extends HeadersFetcherCommand
private final case class AdaptedMessage[T <: Message](peer: Peer, msg: T) extends HeadersFetcherCommand
}
Loading

0 comments on commit 9876ed2

Please sign in to comment.