Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

neutrino+query+rescan: improve rescan speed #236

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,9 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
log.Debugf("Fetching filters for heights=[%v, %v], stophash=%v",
q.startHeight, q.stopHeight, q.stopHash)

persistChan := make(chan *filterResponse, len(q.headerIndex))
go func() {
defer close(persistChan)
defer s.mtxCFilter.Unlock()

// Hand the query to the work manager, and consume the verified
Expand Down Expand Up @@ -782,7 +784,33 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
numFilters)
}

if s.persistToDisk {
persistChan <- resp
}
}()

if s.persistToDisk {
// Persisting to disk is the bottleneck for fetching filters.
// So we run the persisting logic in a separate goroutine so
// that we can unlock the mtxCFilter mutex as soon as we are
// done with caching the filters in order to allow more
// GetCFilter calls from the caller sooner.
go func() {
var (
resp *filterResponse
ok bool
)

for {
select {
case resp, ok = <-persistChan:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think would be cleaner to send the filter to persist, instead of relying on a chan close above as a signal to check a local closure variable.

if !ok {
return
}

case <-s.quit:
return
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One worry with this is that we (in worst case sync from genesis) spin up like 700 goroutines (on mainnet). So another option to rate limit this a bit is to have a single outer goroutine that listens on a channel with a large buffer. Then we rate limit ourselves with that buffered channel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also gonna look into doing batch writes to the db instead of one by one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch writes would be great, and shouldn't be too difficult to add in.

Re the amount of goroutines: we can instead make a basic worker pool here. Or we only allow so many of them to be active at one time via a semaphore.

err = s.FilterDB.PutFilter(
resp.blockHash, resp.filter,
dbFilterType,
Expand All @@ -795,8 +823,8 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
log.Tracef("Wrote filter for block %s, type %d",
resp.blockHash, dbFilterType)
}
}
}()
}()
}

var ok bool
var resultFilter *gcs.Filter
Expand Down