-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
neutrino+query+rescan: improve rescan speed #236
Conversation
Use the work dipatcher interface from the query package to make getdata requests instead of using the old queryPeers function.
Use the work dispatcher query interface instead of the old queryPeers method for making getcfilter requests. This ensures that the queries are made to the most responsive peers. With this PR we can also remove the queryPeers function.
Let the GetCFilter function return, and hence unlock the mutex, as soon as it is done writing filters to the cache and then let the writing to the DB happen in a separate goroutine. This greatly improves the speed at which filters can be downloaded since the bottle neck in this operation is writing the filters to the db.
Let the rescan function wait until the filter headers have either caught up to the back end chain or until they have caught up to the specified rescan end block. This lets the rescan operation take advantage of doing batch filter fetching during rescan making the operation a lot faster since filters can be fetched in batches of 1000 instead of one at a time.
case <-s.quit: | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One worry with this is that we (in worst case sync from genesis) spin up like 700 goroutines (on mainnet). So another option to rate limit this a bit is to have a single outer goroutine that listens on a channel with a large buffer. Then we rate limit ourselves with that buffered channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also gonna look into doing batch writes to the db instead of one by one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch writes would be great, and shouldn't be too difficult to add in.
Re the amount of goroutines: we can instead make a basic worker pool here. Or we only allow so many of them to be active at one time via a semaphore.
return hash == ro.endBlock.Hash | ||
}); err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could maybe change this a bit so that if we are lagging by at least 1000, we start preemptively fetching filters (similarly to what is done for filter header syncing)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First time diving into the neutrino repository so I may be missing something. Better have an extra couple of eyes to check this but everything made sense to me. A couple of nits:
-
Maybe some of the functions defined inside functions can be put outside or as a struct method so our routines do not get that long.
-
Maybe we can only create the
persistChan
and write into it ifs.persistToDisk
.
For example, on local regtest: before this commit it would take 7 seconds to sync 3000 blocks but with this commit, it takes 900ms
Thats a big improvement 🔥 🔥 🔥 great work!
log.Warnf("Invalid block for %s "+ | ||
"received from %s -- ", | ||
blockHash, peer) | ||
fmt.Println(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's worth it to include the error to the log.Warnf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh haha yeah that was left over from debugging 🙈 will fix. thanks!
Thanks for taking a look @positiveblue!!! Will address your comments soon 👍 |
@ellemouton, remember to re-request review from reviewers for your latest update |
!lightninglabs-deploy mute |
Gave it a test on mainnet and there were no more single cfilters requests, just big chunks of blocks, which is great. Not always 1000, but never just 1. |
awesome! Thanks for testing @chappjc! |
hey guys, is anyone at lightninglabs looking into this? |
log.Warnf("Invalid block for %s "+ | ||
"received from %s -- ", | ||
blockHash, peer) | ||
fmt.Println(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prior logic would disconnect here, but now we'll continue...
Seems worthy of a future spin off to propagate a "ban worthy" error back up to the main scheduler.
@@ -575,11 +575,6 @@ type ChainService struct { // nolint:maligned | |||
FilterCache *lru.Cache | |||
BlockCache *lru.Cache | |||
|
|||
// queryPeers will be called to send messages to one or more peers, | |||
// expecting a response. | |||
queryPeers func(wire.Message, func(*ServerPeer, wire.Message, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌
} | ||
} | ||
|
||
// If the request filter type doesn't match the type we were expecting, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few lines here and below look to run over.
|
||
// At this point the filter matches what we know about it, and we | ||
// declare it sane. We send it into a channel to be processed elsewhere. | ||
q.filterChan <- &filterResponse{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should select on quit both here and below.
return | ||
for { | ||
select { | ||
case resp, ok = <-q.filterChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think better to not have the select assignment override the variable above? Also given that ok
isn't used in the scope below. So can use an intermediate variable.
case <-s.quit: | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch writes would be great, and shouldn't be too difficult to add in.
Re the amount of goroutines: we can instead make a basic worker pool here. Or we only allow so many of them to be active at one time via a semaphore.
|
||
for { | ||
select { | ||
case resp, ok = <-persistChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think would be cleaner to send the filter to persist, instead of relying on a chan close above as a signal to check a local closure variable.
} | ||
// waitFor is a helper closure that can be used to wait on block | ||
// notifications until the given predicate returns true. | ||
waitFor := func(predicate func(hash chainhash.Hash, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps move into new function to cut down on the vertical distance a bit here?
This PR should probably be split into 3 PR's. Doing it in 1 for now just to get some initial feedback and to better show the progression. The three sections are as follows:
GetBlock
andGetCFilter
functions to use the work dispatcher for their queries instead of using the oldqueryPeers
function. The function is now removed bringing us one step closer to removing all query logic from the main package.GetCFilter
function which is persisting filters to the DB. In this commit, this operation is spun off into a goroutine thus allowing theGetCFilter
function to return as soon as all the filters are written to the cache.rescan
can make use of batch filter fetching by waiting until the header chain is either current or until it is ahead of the specified end height. Before this commit, if rescan is started before the chain is current, then the filters are fetched one by one instead which is what makes things super slow. For example, on local regtest: before this commit it would take 7 seconds to sync 3000 blocks but with this commit, it takes 900ms. (full testnet rescan from local testnet bitcoind took like 5 mins)