Skip to content

Commit

Permalink
[ETCM-739] Refactor BlockFetcher
Browse files Browse the repository at this point in the history
Change BlockFetcher to typed actor

Split fetcher message handling among child actors

Abstract the fetch trait

Scalafmt

Refactor blockFetcherState

Fix it tests
  • Loading branch information
AnastasiiaL committed May 3, 2021
1 parent c38c2c2 commit 791117e
Show file tree
Hide file tree
Showing 15 changed files with 602 additions and 780 deletions.
2 changes: 1 addition & 1 deletion nix/overlay.nix
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ rev: final: prev: {

mantis = final.callPackage ./mantis.nix {
src = ../.;
depsSha256 = "sha256-0AeemKFcIU3eVGse8QQGauJeRsF7IgCLo5Yqu2FZsMs=";
depsSha256 = "sha256-US4L/xh2otnEfOa05bazb14bgYhQZpF4GfFY30sDkNY=";
};

mantis-hash = final.mantis.override {
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ object Dependencies {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class BlockImporterItSpec
eventually {

val msg = fetcherProbe.fishForMessage(Timeouts.longTimeout) {
case BlockFetcher.FetchStateNode(_) => true
case BlockFetcher.FetchStateNode(_, _) => true
case _ => false
}.asInstanceOf[BlockFetcher.FetchStateNode]

Expand Down
18 changes: 0 additions & 18 deletions src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,6 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
testScheduler.awaitTermination(120.second)
}

"a peer should reorganise when receives a checkpoint older than the current best from a peer" in customTestCaseResourceM(
FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
for {
_ <- peer1.importBlocksUntil(20)(IdentityUpdate)
_ <- peer2.importBlocksUntil(30)(IdentityUpdate)
_ <- peer1.startRegularSync()
_ <- peer2.startRegularSync()
_ <- peer1.addCheckpointedBlock(peer1.bl.getBestBlock().get)
_ <- peer1.waitForRegularSyncLoadLastBlock(21)
_ <- peer2.getCheckpointFromPeer(peer1.bl.getBestBlock().get, PeerId("Peer1"))
_ <- peer2.waitForRegularSyncLoadLastBlock(21)
} yield {
assert(peer1.bl.getBestBlock().get.hash == peer2.bl.getBestBlock().get.hash)
assert(peer1.bl.getLatestCheckpointBlockNumber() == peer2.bl.getLatestCheckpointBlockNumber())
}
}

"peer 2 should sync to the top of peer1 blockchain" - {
"given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package io.iohk.ethereum.sync.util

import akka.actor.ActorRef
import akka.actor.{ActorRef, typed}
import akka.util.ByteString
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.Start
import io.iohk.ethereum.blockchain.sync.regular.{
BlockBroadcast,
BlockBroadcasterActor,
BlockFetcher,
BlockImporter,
RegularSync
}
import io.iohk.ethereum.blockchain.sync.regular.{BlockBroadcast, BlockBroadcasterActor, BlockFetcher, BlockImporter, RegularSync}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers
Expand All @@ -36,6 +30,8 @@ import io.iohk.ethereum.utils._
import io.iohk.ethereum.vm.EvmConfig
import monix.eval.Task
import monix.execution.Scheduler
import akka.actor.typed.scaladsl.adapter._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.AdaptedMessageFromEventBus

import scala.concurrent.duration._
object RegularSyncItSpecUtils {
Expand Down Expand Up @@ -86,15 +82,15 @@ object RegularSyncItSpecUtils {
"block-broadcaster"
)

val fetcher: ActorRef =
system.actorOf(
BlockFetcher.props(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
system.spawn(
BlockFetcher(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
"block-fetcher"
)

lazy val blockImporter = system.actorOf(
BlockImporter.props(
fetcher,
fetcher.toClassic,
ledger,
bl,
syncConfig,
Expand Down Expand Up @@ -181,7 +177,7 @@ object RegularSyncItSpecUtils {

def getCheckpointFromPeer(checkpoint: Block, peerId: PeerId): Task[Unit] = Task {
blockImporter ! Start
fetcher ! MessageFromPeer(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
fetcher ! AdaptedMessageFromEventBus(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
}

private def getMptForBlock(block: Block) = {
Expand Down
Loading

0 comments on commit 791117e

Please sign in to comment.