Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETCM-213] Reload bloom after restart #742

Merged
merged 17 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
a4217e5
[ETCM-213] Expose db iterator over key-vals
KonradStaniec Oct 15, 2020
a6258da
[ETCM-213] Update SyncStateActor to load bloom filter at start
KonradStaniec Oct 16, 2020
1bdc6b9
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 16, 2020
116782f
[ETCM-213] Properly handle restart while loading bloom filter
KonradStaniec Oct 19, 2020
524a463
[ETCM-213] Extract bloom loading to sparate class. More tests.
KonradStaniec Oct 19, 2020
519f4d6
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 20, 2020
54e75c1
[ETCM-213] Fix scalafmt
KonradStaniec Oct 20, 2020
e9833d1
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 20, 2020
e4029e3
[ETCM-213] Refactor iterator impl
KonradStaniec Oct 21, 2020
cc0cd99
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 21, 2020
6642568
[ETCM-213] Pr comments
KonradStaniec Oct 22, 2020
5d37321
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 22, 2020
6e7bb45
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 23, 2020
29ca388
[ETCM-213] Fix merge conflicts
KonradStaniec Oct 23, 2020
ce8a83f
[ETCM-213] Add one todo regarding async processing
KonradStaniec Oct 23, 2020
f46fd50
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 23, 2020
1667f33
Merge remote-tracking branch 'origin/develop' into etcm-213/relad-blo…
KonradStaniec Oct 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions src/it/scala/io/iohk/ethereum/db/RockDbIteratorSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package io.iohk.ethereum.db

import java.nio.file.Files

import akka.util.ByteString
import cats.effect.Resource
import cats.effect.concurrent.{Deferred, Ref}
import io.iohk.ethereum.db.dataSource.{DataSourceUpdateOptimized, RocksDbConfig, RocksDbDataSource}
import io.iohk.ethereum.db.storage.{EvmCodeStorage, Namespaces, NodeStorage}
import io.iohk.ethereum.{FlatSpecBase, ResourceFixtures}
import monix.eval.Task
import monix.reactive.{Consumer, Observable}
import org.scalatest.matchers.should.Matchers

import scala.util.Random

class RockDbIteratorSpec extends FlatSpecBase with ResourceFixtures with Matchers {
type Fixture = RocksDbDataSource

override def fixtureResource: Resource[Task, RocksDbDataSource] = RockDbIteratorSpec.buildRockDbResource()

def genRandomArray(): Array[Byte] = {
val arr = new Array[Byte](32)
Random.nextBytes(arr)
arr
}

def genRandomByteString(): ByteString = {
ByteString.fromArrayUnsafe(genRandomArray())
}

def writeNValuesToDb(n: Int, db: RocksDbDataSource, namespace: IndexedSeq[Byte]): Task[Unit] = {
val iterable = (0 until n)
Observable.fromIterable(iterable).foreachL { _ =>
db.update(Seq(DataSourceUpdateOptimized(namespace, Seq(), Seq((genRandomArray(), genRandomArray())))))
}
}

it should "cancel ongoing iteration" in testCaseT { db =>
val largeNum = 1000000
val finishMark = 20000
for {
counter <- Ref.of[Task, Int](0)
cancelMark <- Deferred[Task, Unit]
_ <- writeNValuesToDb(largeNum, db, Namespaces.NodeNamespace)
fib <- db
.iterate(Namespaces.NodeNamespace)
.map(_.right.get)
.consumeWith(Consumer.foreachEval[Task, (Array[Byte], Array[Byte])] { _ =>
for {
_ <- counter.update(i => i + 1)
kapke marked this conversation as resolved.
Show resolved Hide resolved
cur <- counter.get
_ <- if (cur == finishMark) cancelMark.complete(()) else Task.now(())
kapke marked this conversation as resolved.
Show resolved Hide resolved
} yield ()
})
.start
_ <- cancelMark.get
// take in mind this test also check if all underlying rocksdb resources has been cleaned as if cancel
// would not close underlying DbIterator, whole test would kill jvm due to rocksdb error at native level becouse
kapke marked this conversation as resolved.
Show resolved Hide resolved
// iterators needs to be closed before closing db.
_ <- fib.cancel
finalCounter <- counter.get
} yield {
assert(finalCounter < largeNum)
}
}

it should "read all key values in db" in testCaseT { db =>
val largeNum = 100000
for {
counter <- Ref.of[Task, Int](0)
_ <- writeNValuesToDb(largeNum, db, Namespaces.NodeNamespace)
_ <- db
.iterate(Namespaces.NodeNamespace)
.map(_.right.get)
.consumeWith(Consumer.foreachEval[Task, (Array[Byte], Array[Byte])] { _ =>
counter.update(current => current + 1)
})
finalCounter <- counter.get
} yield {
assert(finalCounter == largeNum)
}
}

it should "iterate over keys and values from different namespaces" in testCaseT { db =>
val codeStorage = new EvmCodeStorage(db)
val codeKeyValues = (1 to 10).map(i => (ByteString(i.toByte), ByteString(i.toByte))).toList

val nodeStorage = new NodeStorage(db)
val nodeKeyValues = (20 to 30).map(i => (ByteString(i.toByte), ByteString(i.toByte).toArray)).toList

for {
_ <- Task(codeStorage.update(Seq(), codeKeyValues).commit())
_ <- Task(nodeStorage.update(Seq(), nodeKeyValues))
kapke marked this conversation as resolved.
Show resolved Hide resolved
result <- Task.parZip2(
codeStorage.storageContent.map(_.right.get).map(_._1).toListL,
nodeStorage.storageContent.map(_.right.get).map(_._1).toListL
)
(codeResult, nodeResult) = result
} yield {
codeResult shouldEqual codeKeyValues.map(_._1)
nodeResult shouldEqual nodeKeyValues.map(_._1)
}
}

it should "iterate over keys and values " in testCaseT { db =>
val keyValues = (1 to 100).map(i => (ByteString(i.toByte), ByteString(i.toByte))).toList
for {
_ <- Task(
db.update(
Seq(
DataSourceUpdateOptimized(Namespaces.NodeNamespace, Seq(), keyValues.map(e => (e._1.toArray, e._2.toArray)))
)
)
)
elems <- db.iterate(Namespaces.NodeNamespace).map(_.right.get).toListL
} yield {
val deserialized = elems.map { case (bytes, bytes1) => (ByteString(bytes), ByteString(bytes1)) }
assert(elems.size == keyValues.size)
assert(keyValues == deserialized)
}
}

it should "return empty list when iterating empty db" in testCaseT { db =>
for {
elems <- db.iterate().toListL
} yield {
assert(elems.isEmpty)
}
}
}

object RockDbIteratorSpec {
def getRockDbTestConfig(dbPath: String) = {
new RocksDbConfig {
override val createIfMissing: Boolean = true
override val paranoidChecks: Boolean = false
override val path: String = dbPath
override val maxThreads: Int = 1
override val maxOpenFiles: Int = 32
override val verifyChecksums: Boolean = false
override val levelCompaction: Boolean = true
override val blockSize: Long = 16384
override val blockCacheSize: Long = 33554432
}
}

def buildRockDbResource(): Resource[Task, RocksDbDataSource] = {
Resource.make {
Task {
val tempDir = Files.createTempDirectory("temp-iter-dir")
RocksDbDataSource(getRockDbTestConfig(tempDir.toAbsolutePath.toString), Namespaces.nsSeq)
}
}(db => Task(db.destroy()))
}
}
31 changes: 27 additions & 4 deletions src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfter {
}
}

it should "sync state to peer from partially synced state" in customTestCaseResourceM(
FakePeer.start2FakePeersRes()
) { case (peer1, peer2) =>
for {
_ <- peer2.importBlocksUntil(2000)(updateStateAtBlock(1500))
_ <- peer2.importBlocksUntil(3000)(updateStateAtBlock(2500, 1000, 2000))
_ <- peer1.importBlocksUntil(2000)(updateStateAtBlock(1500))
_ <- peer1.startWithState()
_ <- peer1.connectToPeers(Set(peer2.node))
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.waitForFastSyncFinish()
} yield {
assert(peer1.bl.getBestBlockNumber() == peer2.bl.getBestBlockNumber() - peer2.testSyncConfig.pivotBlockOffset)
}
}
}

object FastSyncItSpec {
Expand All @@ -145,8 +160,12 @@ object FastSyncItSpec {

val IdentityUpdate: (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = (_, world) => world

def updateWorldWithNAccounts(n: Int, world: InMemoryWorldStateProxy): InMemoryWorldStateProxy = {
val resultWorld = (0 until n).foldLeft(world) { (world, num) =>
def updateWorldWithAccounts(
startAccount: Int,
endAccount: Int,
world: InMemoryWorldStateProxy
): InMemoryWorldStateProxy = {
val resultWorld = (startAccount until endAccount).foldLeft(world) { (world, num) =>
val randomBalance = num
val randomAddress = Address(num)
val codeBytes = BigInt(num).toByteArray
Expand All @@ -160,10 +179,14 @@ object FastSyncItSpec {
InMemoryWorldStateProxy.persistState(resultWorld)
}

def updateStateAtBlock(blockWithUpdate: BigInt): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = {
def updateStateAtBlock(
blockWithUpdate: BigInt,
startAccount: Int = 0,
endAccount: Int = 1000
): (BigInt, InMemoryWorldStateProxy) => InMemoryWorldStateProxy = {
(blockNr: BigInt, world: InMemoryWorldStateProxy) =>
if (blockNr == blockWithUpdate) {
updateWorldWithNAccounts(1000, world)
updateWorldWithAccounts(startAccount, endAccount, world)
} else {
IdentityUpdate(blockNr, world)
}
Expand Down
9 changes: 9 additions & 0 deletions src/it/scala/io/iohk/ethereum/sync/FastSyncItSpecUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.testkit.TestProbe
import akka.util.{ByteString, Timeout}
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.FastSync.SyncState
import io.iohk.ethereum.{Fixtures, Timeouts}
import io.iohk.ethereum.blockchain.sync.{BlockBroadcast, BlockchainHostActor, FastSync, TestSyncConfig}
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor
Expand Down Expand Up @@ -363,6 +364,14 @@ object FastSyncItSpecUtils {
}
}

def startWithState(): Task[Unit] = {
val currentBest = bl.getBestBlock().header
val safeTarget = currentBest.number + syncConfig.fastSyncBlockValidationX
val nextToValidate = currentBest.number + 1
val syncState = SyncState(currentBest, safeTarget, Seq(), Seq(), 0, 0, currentBest.number, nextToValidate)
Task(storagesInstance.storages.fastSyncStateStorage.putSyncState(syncState)).map(_ => ())
kapke marked this conversation as resolved.
Show resolved Hide resolved
}

def startFastSync(): Task[Unit] = Task {
fastSync ! FastSync.Start
}
Expand Down
Loading