Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETCM-739] Refactor BlockFetcher #976

Merged
merged 1 commit into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nix/overlay.nix
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ rev: final: prev: {

mantis = final.callPackage ./mantis.nix {
src = ../.;
depsSha256 = "sha256-0AeemKFcIU3eVGse8QQGauJeRsF7IgCLo5Yqu2FZsMs=";
depsSha256 = "sha256-US4L/xh2otnEfOa05bazb14bgYhQZpF4GfFY30sDkNY=";
};

mantis-hash = final.mantis.override {
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ object Dependencies {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.5" % "it,test"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class BlockImporterItSpec

val msg = fetcherProbe
.fishForMessage(Timeouts.longTimeout) {
case BlockFetcher.FetchStateNode(_) => true
case BlockFetcher.FetchStateNode(_, _) => true
case _ => false
}
.asInstanceOf[BlockFetcher.FetchStateNode]
Expand Down
18 changes: 0 additions & 18 deletions src/it/scala/io/iohk/ethereum/sync/RegularSyncItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,6 @@ class RegularSyncItSpec extends FreeSpecBase with Matchers with BeforeAndAfterAl
testScheduler.awaitTermination(120.second)
}

"a peer should reorganise when receives a checkpoint older than the current best from a peer" in customTestCaseResourceM(
dzajkowski marked this conversation as resolved.
Show resolved Hide resolved
FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
for {
_ <- peer1.importBlocksUntil(20)(IdentityUpdate)
_ <- peer2.importBlocksUntil(30)(IdentityUpdate)
_ <- peer1.startRegularSync()
_ <- peer2.startRegularSync()
_ <- peer1.addCheckpointedBlock(peer1.bl.getBestBlock().get)
_ <- peer1.waitForRegularSyncLoadLastBlock(21)
_ <- peer2.getCheckpointFromPeer(peer1.bl.getBestBlock().get, PeerId("Peer1"))
_ <- peer2.waitForRegularSyncLoadLastBlock(21)
} yield {
assert(peer1.bl.getBestBlock().get.hash == peer2.bl.getBestBlock().get.hash)
assert(peer1.bl.getLatestCheckpointBlockNumber() == peer2.bl.getLatestCheckpointBlockNumber())
}
}

"peer 2 should sync to the top of peer1 blockchain" - {
"given a previously imported blockchain" in customTestCaseResourceM(FakePeer.start2FakePeersRes()) {
case (peer1, peer2) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package io.iohk.ethereum.sync.util

import akka.actor.ActorRef
import akka.actor.{ActorRef, typed}
import akka.util.ByteString
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcast.BlockToBroadcast
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.Start
import io.iohk.ethereum.blockchain.sync.regular.{
BlockBroadcast,
BlockBroadcasterActor,
BlockFetcher,
BlockImporter,
RegularSync
}
import io.iohk.ethereum.blockchain.sync.regular.{BlockBroadcast, BlockBroadcasterActor, BlockFetcher, BlockImporter, RegularSync}
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.NewCheckpoint
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
import io.iohk.ethereum.checkpointing.CheckpointingTestHelpers
Expand All @@ -36,6 +30,8 @@ import io.iohk.ethereum.utils._
import io.iohk.ethereum.vm.EvmConfig
import monix.eval.Task
import monix.execution.Scheduler
import akka.actor.typed.scaladsl.adapter._
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcher.AdaptedMessageFromEventBus

import scala.concurrent.duration._
object RegularSyncItSpecUtils {
Expand Down Expand Up @@ -86,15 +82,15 @@ object RegularSyncItSpecUtils {
"block-broadcaster"
)

val fetcher: ActorRef =
system.actorOf(
BlockFetcher.props(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
val fetcher: typed.ActorRef[BlockFetcher.FetchCommand] =
system.spawn(
BlockFetcher(peersClient, peerEventBus, regularSync, syncConfig, validators.blockValidator),
"block-fetcher"
)

lazy val blockImporter = system.actorOf(
BlockImporter.props(
fetcher,
fetcher.toClassic,
ledger,
bl,
syncConfig,
Expand Down Expand Up @@ -181,7 +177,7 @@ object RegularSyncItSpecUtils {

def getCheckpointFromPeer(checkpoint: Block, peerId: PeerId): Task[Unit] = Task {
blockImporter ! Start
fetcher ! MessageFromPeer(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
fetcher ! AdaptedMessageFromEventBus(NewBlock(checkpoint, checkpoint.header.difficulty), peerId)
}

private def getMptForBlock(block: Block) = {
Expand Down
Loading