Skip to content

Commit

Permalink
perf: flush import batches in parallel (backport #793) (#820)
Browse files Browse the repository at this point in the history
Co-authored-by: Elias Naur <103319121+elias-orijtech@users.noreply.github.com>
  • Loading branch information
mergify[bot] and elias-orijtech committed Aug 23, 2023
1 parent 4637357 commit 8ae3597
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
22 changes: 20 additions & 2 deletions import.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Importer struct {
batchSize uint32
stack []*Node
nonces []uint32

// inflightCommit tracks a batch commit, if any.
inflightCommit <-chan error
}

// newImporter creates a new Importer for an empty MutableTree.
Expand Down Expand Up @@ -78,10 +81,21 @@ func (i *Importer) writeNode(node *Node) error {

i.batchSize++
if i.batchSize >= maxBatchSize {
if err := i.batch.Write(); err != nil {
// Wait for previous batch.
var err error
if i.inflightCommit != nil {
err = <-i.inflightCommit
i.inflightCommit = nil
}
if err != nil {
return err
}
i.batch.Close()
result := make(chan error)
i.inflightCommit = result
go func(batch db.Batch) {
defer batch.Close()
result <- batch.Write()
}(i.batch)
i.batch = i.tree.ndb.db.NewBatch()
i.batchSize = 0
}
Expand All @@ -92,6 +106,10 @@ func (i *Importer) writeNode(node *Node) error {
// Close frees all resources. It is safe to call multiple times. Uncommitted nodes may already have
// been flushed to the database, but will not be visible.
func (i *Importer) Close() {
if i.inflightCommit != nil {
<-i.inflightCommit
i.inflightCommit = nil
}
if i.batch != nil {
i.batch.Close()
}
Expand Down
12 changes: 10 additions & 2 deletions import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,17 @@ func TestImporter_Commit_Empty(t *testing.T) {
}

func BenchmarkImport(b *testing.B) {
benchmarkImport(b, 4096)
}

func BenchmarkImportBatch(b *testing.B) {
benchmarkImport(b, maxBatchSize*10)
}

func benchmarkImport(b *testing.B, nodes int) {
b.StopTimer()
tree := setupExportTreeSized(b, 4096)
exported := make([]*ExportNode, 0, 4096)
tree := setupExportTreeSized(b, nodes)
exported := make([]*ExportNode, 0, nodes)
exporter, err := tree.Export()
require.NoError(b, err)
for {
Expand Down

0 comments on commit 8ae3597

Please sign in to comment.