diff --git a/batch.go b/batch.go index dc879d5..2849103 100644 --- a/batch.go +++ b/batch.go @@ -8,10 +8,10 @@ import ( cid "github.com/ipfs/go-cid" ) -// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. +// parallelBatchCommits is the number of batch commits that can be in-flight before blocking. // TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage // devices, and CPUs to find the right value/formula. -var ParallelBatchCommits = runtime.NumCPU() * 2 +var parallelCommits = runtime.NumCPU() // ErrNotCommited is returned when closing a batch that hasn't been successfully // committed. @@ -32,11 +32,15 @@ func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch { for _, o := range opts { o(&bopts) } + + // Commit numCPU batches at once, but split the maximum buffer size over all commits in flight. + bopts.maxSize /= parallelCommits + bopts.maxNodes /= parallelCommits return &Batch{ na: na, ctx: ctx, cancel: cancel, - commitResults: make(chan error, ParallelBatchCommits), + commitResults: make(chan error, parallelCommits), opts: bopts, } } @@ -78,7 +82,7 @@ func (t *Batch) asyncCommit() { if numBlocks == 0 { return } - if t.activeCommits >= ParallelBatchCommits { + if t.activeCommits >= parallelCommits { select { case err := <-t.commitResults: t.activeCommits-- @@ -206,14 +210,16 @@ var defaultBatchOptions = batchOptions{ maxNodes: 128, } -// MaxSizeBatchOption sets the maximum size of a Batch. +// MaxSizeBatchOption sets the maximum amount of buffered data before writing +// blocks. func MaxSizeBatchOption(size int) BatchOption { return func(o *batchOptions) { o.maxSize = size } } -// MaxNodesBatchOption sets the maximum number of nodes in a Batch. +// MaxNodesBatchOption sets the maximum number of buffered nodes before writing +// blocks. func MaxNodesBatchOption(num int) BatchOption { return func(o *batchOptions) { o.maxNodes = num diff --git a/batch_test.go b/batch_test.go index c509ec4..28db71d 100644 --- a/batch_test.go +++ b/batch_test.go @@ -139,10 +139,10 @@ func TestBatchOptions(t *testing.T) { wantMaxNodes := 500 d := newTestDag() b := NewBatch(ctx, d, MaxSizeBatchOption(wantMaxSize), MaxNodesBatchOption(wantMaxNodes)) - if b.opts.maxSize != wantMaxSize { + if b.opts.maxSize != wantMaxSize/parallelCommits { t.Fatalf("maxSize incorrect, want: %d, got: %d", wantMaxSize, b.opts.maxSize) } - if b.opts.maxNodes != wantMaxNodes { + if b.opts.maxNodes != wantMaxNodes/parallelCommits { t.Fatalf("maxNodes incorrect, want: %d, got: %d", wantMaxNodes, b.opts.maxNodes) } } diff --git a/go.mod b/go.mod index 2111202..0aab53b 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,10 @@ module github.com/ipfs/go-ipld-format -go 1.14 - require ( github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-cid v0.0.2 + github.com/libp2p/go-buffer-pool v0.0.2 // indirect github.com/multiformats/go-multihash v0.0.1 ) + +go 1.13 diff --git a/go.sum b/go.sum index 79b1fea..f900301 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= +github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= +github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=