-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ETCM-213] Reload bloom after restart #742
Conversation
After margin [ETCM-141] scalafmt . This PR can be updated by git merge |
Just fyi: Iterating over 10milion keys and values saved in db, on my machine (ssd drive) take around 8s. Now there is around 150M mpt nodes to save. So taking worst case into account i.e restart neat the finish and ~10s for 10M values, the worst time user will wait to re-lad bloom filter will be 150s which imo it is reasonable time to wait. Take in mind that in case of loading failure (i.e not loading all nodes from db) will not make state sync unusable but will just degrade its performance due to possible large number of false positives i.e large number of unnecessary db calls. That why error handling is done is in best effort manner. We report the loading failure and error, but we proceed to start state sync without any loading restarts and recovers. We could in theory save the last loaded key before failure, and try to restart iterator from this key but imo for it is not worth it. and bloom filter of 150m keys, with 3% false positive setting should weigh less 200mb |
a1de42a
to
e9833d1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job. One minor comment only
private def moveIterator(it: RocksIterator): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = { | ||
Observable | ||
.fromTask(Task(it.seekToFirst())) | ||
.flatMap(_ => Observable.fromTask(Task(it.isValid))) | ||
.flatMap { valid => | ||
if (!valid) { | ||
Observable.empty | ||
} else { | ||
Observable.fromTask(Task(Right(it.key(), it.value()))) ++ Observable | ||
.repeatEvalF { | ||
Task { | ||
it.next() | ||
}.flatMap { _ => | ||
if (it.isValid) { | ||
Task(Right(it.key(), it.value())) | ||
} else { | ||
Task.raiseError(IterationFinished) | ||
} | ||
} | ||
} | ||
.onErrorHandleWith { | ||
case IterationFinished => Observable.empty | ||
case ex => Observable(Left(IterationError(ex))) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe sth like that?
private def moveIterator(it: RocksIterator): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = {
Observable
.fromTask(Task(it.seekToFirst()))
.flatMap(_ => Observable.fromTask(Task(it.isValid)))
.filter(identity)
.flatMap { _ =>
Observable.fromTask(Task(Right(it.key(), it.value()))) ++ Observable
.repeatEvalF {
Task {
it.next()
}.flatMap { _ =>
Task(it.isValid).flatMap {
case true => Task(Right(it.key(), it.value()))
case false => Task.raiseError(IterationFinished)
}
}
}
.onErrorHandleWith {
case IterationFinished => Observable.empty
case ex => Observable(Left(IterationError(ex)))
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, second it.isValid
shold probably be in Task. I will refactor whole bit in repeatEvalF
to be in for-comp
@@ -35,6 +35,8 @@ class FastSyncStateStorage(val dataSource: DataSource) | |||
|
|||
override def keySerializer: String => IndexedSeq[Byte] = _.getBytes | |||
|
|||
override def keyDeserializer: IndexedSeq[Byte] => String = b => new String(b.toArray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as in appstate storage - I'd add charset here
@@ -13,6 +13,7 @@ class KnownNodesStorage(val dataSource: DataSource) extends TransactionalKeyValu | |||
|
|||
val namespace: IndexedSeq[Byte] = Namespaces.KnownNodesNamespace | |||
def keySerializer: String => IndexedSeq[Byte] = _.getBytes | |||
def keyDeserializer: IndexedSeq[Byte] => String = k => new String(k.toArray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
src/test/scala/io/iohk/ethereum/blockchain/sync/LoadableBloomFilterSpec.scala
Outdated
Show resolved
Hide resolved
"LoadableBloomFilter" should "load all correct elements " in testCaseM[Task] { | ||
for { | ||
source <- Task(Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L)))) | ||
filter <- Task.now(LoadableBloomFilter[Long](1000, source)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not filter = LoadableBloomFilter(...)
then? (and below too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be honest constant changes between =
and <-
just makes code unreadable for me, but sure i will change it 👍
override def buildBlockChain(): BlockchainImpl = { | ||
val storages = getNewStorages | ||
//iterating 1M key and values should force scheduler actor o enqueue last received command i.e StartSyncing | ||
(0 until 1000000).foreach { i => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about overriding storages in a way that will make source for bloom filter return observable which doesn't emit elements until some signal?
Refactor source iterator Add docs to iterate methods Add explicit charset to storages
import java.nio.charset.Charset | ||
|
||
object StorageStringCharset { | ||
val UTF8Charset = Charset.forName("UTF-8") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: there is java.nio.StandardCharsets.UTF8
too
stateStorage = storages.stateStorage | ||
) { | ||
override def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] = { | ||
Observable.repeatEvalF(Task(Right(ByteString(1)))).takeWhile(_ => !loadingFinished) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Observable.repeat
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
Reload fast sync bloom filter after node restart
Proposed Solution
Expose rockdb iterator over the node storage column and code column and consume it by putting all known keys in bloom filter
Important Changes Introduced
LoadabeBloomFilter
class which can load data from provided source