diff --git a/batch.go b/batch.go index 6544dbe472..8cb060304c 100644 --- a/batch.go +++ b/batch.go @@ -255,7 +255,14 @@ type Batch struct { // memtable. flushable *flushableBatch + // Synchronous Apply uses the commit WaitGroup for both publishing the + // seqnum and waiting for the WAL fsync (if needed). Asynchronous + // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit + // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for + // waiting for the WAL fsync. commit sync.WaitGroup + fsyncWait sync.WaitGroup + commitErr error applied uint32 // updated atomically } @@ -1205,6 +1212,15 @@ func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) { return data[v:], data[:v], true } +// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait. +func (b *Batch) SyncWait() error { + b.fsyncWait.Wait() + if b.commitErr != nil { + b.db = nil // prevent batch reuse on error + } + return b.commitErr +} + // BatchReader iterates over the entries contained in a batch. type BatchReader []byte diff --git a/batch_test.go b/batch_test.go index a2b8121a34..d05ed05221 100644 --- a/batch_test.go +++ b/batch_test.go @@ -223,6 +223,31 @@ func TestBatchEmpty(t *testing.T) { require.NoError(t, iter2.Close()) } +func TestBatchApplyNoSyncWait(t *testing.T) { + db, err := Open("", &Options{ + FS: vfs.NewMem(), + }) + require.NoError(t, err) + defer db.Close() + var batches []*Batch + options := &WriteOptions{Sync: true} + for i := 0; i < 10000; i++ { + b := db.NewBatch() + str := fmt.Sprintf("a%d", i) + require.NoError(t, b.Set([]byte(str), []byte(str), nil)) + require.NoError(t, db.ApplyNoSyncWait(b, options)) + val, closer, err := db.Get([]byte(str)) + require.NoError(t, err) + require.Equal(t, str, string(val)) + closer.Close() + batches = append(batches, b) + } + for _, b := range batches { + require.NoError(t, b.SyncWait()) + b.Close() + } +} + func TestBatchReset(t *testing.T) { db, err := Open("", &Options{ FS: vfs.NewMem(), diff --git a/commit.go b/commit.go index 83d1a5067f..488cf3f908 100644 --- a/commit.go +++ b/commit.go @@ -60,7 +60,7 @@ func (q *commitQueue) enqueue(b *Batch) { ptrs := atomic.LoadUint64(&q.headTail) head, tail := q.unpack(ptrs) if (tail+uint32(len(q.slots)))&(1< syncWAL +func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error { if b.Empty() { return nil } - p.sem <- struct{}{} + // TODO(sumeer): will block here if SyncConcurrency is insufficient. Monitor + // and increase if needed. + p.commitQueueSem <- struct{}{} + if syncWAL { + p.logSyncQSem <- struct{}{} + } // Prepare the batch for committing: enqueuing the batch in the pending // queue, determining the batch sequence number and writing the data to the @@ -250,9 +284,11 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error { // // NB: We set Batch.commitErr on error so that the batch won't be a candidate // for reuse. See Batch.release(). - mem, err := p.prepare(b, syncWAL) + mem, err := p.prepare(b, syncWAL, noSyncWait) if err != nil { b.db = nil // prevent batch reuse on error + // NB: we are not doing <-p.commitQueueSem since the batch is still + // sitting in the pending queue. Should we fix this? return err } @@ -265,7 +301,7 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error { // Publish the batch sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem if b.commitErr != nil { b.db = nil // prevent batch reuse on error @@ -294,7 +330,7 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se b.setCount(uint32(count)) b.commit.Add(1) - p.sem <- struct{}{} + p.commitQueueSem <- struct{}{} p.mu.Lock() @@ -341,27 +377,30 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se // Publish the sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem } -func (p *commitPipeline) prepare(b *Batch, syncWAL bool) (*memTable, error) { +func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) { n := uint64(b.Count()) if n == invalidBatchCount { return nil, ErrInvalidBatch } - count := 1 - if syncWAL { - count++ - } - // count represents the waiting needed for publish, and optionally the - // waiting needed for the WAL sync. - b.commit.Add(count) - var syncWG *sync.WaitGroup var syncErr *error + commitCount := 1 if syncWAL { - syncWG, syncErr = &b.commit, &b.commitErr + syncErr = &b.commitErr + if noSyncWait { + syncWG = &b.fsyncWait + b.fsyncWait.Add(1) + } else { + syncWG = &b.commit + commitCount++ + } } + // commitCount represents the waiting needed for publish, and optionally the + // waiting needed for the WAL sync. + b.commit.Add(commitCount) p.mu.Lock() diff --git a/commit_test.go b/commit_test.go index c680ef6527..fcffcc6018 100644 --- a/commit_test.go +++ b/commit_test.go @@ -31,6 +31,7 @@ type testCommitEnv struct { sync.Mutex buf []uint64 } + queueSemChan chan struct{} } func (e *testCommitEnv) env() commitEnv { @@ -49,10 +50,14 @@ func (e *testCommitEnv) apply(b *Batch, mem *memTable) error { return nil } -func (e *testCommitEnv) write(b *Batch, _ *sync.WaitGroup, _ *error) (*memTable, error) { +func (e *testCommitEnv) write(b *Batch, wg *sync.WaitGroup, _ *error) (*memTable, error) { n := int64(len(b.data)) atomic.AddInt64(&e.writePos, n) atomic.AddUint64(&e.writeCount, 1) + if wg != nil { + wg.Done() + <-e.queueSemChan + } return nil, nil } @@ -100,7 +105,7 @@ func TestCommitPipeline(t *testing.T) { defer wg.Done() var b Batch _ = b.Set([]byte(fmt.Sprint(i)), nil, nil) - _ = p.Commit(&b, false) + _ = p.Commit(&b, false, false) }(i) } wg.Wait() @@ -120,6 +125,37 @@ func TestCommitPipeline(t *testing.T) { } } +func TestCommitPipelineSync(t *testing.T) { + var e testCommitEnv + p := newCommitPipeline(e.env()) + e.queueSemChan = p.logSyncQSem + + n := 10000 + if invariants.RaceEnabled { + // Under race builds we have to limit the concurrency or we hit the + // following error: + // + // race: limit on 8128 simultaneously alive goroutines is exceeded, dying + n = 1000 + } + + for _, noSyncWait := range []bool{false, true} { + t.Run(fmt.Sprintf("no-sync-wait=%t", noSyncWait), func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + var b Batch + _ = b.Set([]byte(fmt.Sprint(i)), nil, nil) + _ = p.Commit(&b, true, noSyncWait) + }(i) + } + wg.Wait() + }) + } +} + func TestCommitPipelineAllocateSeqNum(t *testing.T) { var e testCommitEnv p := newCommitPipeline(e.env()) @@ -203,11 +239,12 @@ func TestCommitPipelineWALClose(t *testing.T) { }, } p := newCommitPipeline(testEnv) + wal.QueueSemChan = p.logSyncQSem // Launch N (commitConcurrency) goroutines which each create a batch and // commit it with sync==true. Because of the syncDelayFile, none of these // operations can complete until syncDelayFile.done is closed. - errCh := make(chan error, cap(p.sem)) + errCh := make(chan error, cap(p.commitQueueSem)) walDone.Add(cap(errCh)) for i := 0; i < cap(errCh); i++ { go func(i int) { @@ -216,7 +253,7 @@ func TestCommitPipelineWALClose(t *testing.T) { errCh <- err return } - errCh <- p.Commit(b, true /* sync */) + errCh <- p.Commit(b, true /* sync */, false) }(i) } @@ -284,7 +321,7 @@ func BenchmarkCommitPipeline(b *testing.B) { batch := newBatch(nil) binary.BigEndian.PutUint64(buf, rng.Uint64()) batch.Set(buf, buf, nil) - if err := p.Commit(batch, true /* sync */); err != nil { + if err := p.Commit(batch, true /* sync */, false); err != nil { b.Fatal(err) } batch.release() diff --git a/db.go b/db.go index 2e2e019123..a268c82268 100644 --- a/db.go +++ b/db.go @@ -724,6 +724,27 @@ func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error { // // It is safe to modify the contents of the arguments after Apply returns. func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { + return d.applyInternal(batch, opts, false) +} + +// ApplyNoSyncWait must only be used when opts.Sync is true and the caller +// does not want to wait for the WAL fsync to happen. The method will return +// once the mutation is applied to the memtable and is visible (note that a +// mutation is visible before the WAL sync even in the wait case, so we have +// not weakened the durability semantics). The caller must call Batch.SyncWait +// to wait for the WAL fsync. The caller must not Close the batch without +// first calling Batch.SyncWait. +// RECOMMENDATION: Prefer using Apply unless you really understand why you +// need ApplyNoSyncWait. +func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error { + if !opts.Sync { + return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false") + } + return d.applyInternal(batch, opts, true) +} + +// REQUIRES: noSyncWait => opts.Sync +func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error { if err := d.closed.Load(); err != nil { panic(err) } @@ -762,7 +783,7 @@ func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { if int(batch.memTableSize) >= d.largeBatchThreshold { batch.flushable = newFlushableBatch(batch, d.opts.Comparer) } - if err := d.commit.Commit(batch, sync); err != nil { + if err := d.commit.Commit(batch, sync, noSyncWait); err != nil { // There isn't much we can do on an error here. The commit pipeline will be // horked at this point. d.opts.Logger.Fatalf("%v", err) @@ -1986,6 +2007,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { WALFsyncLatency: d.mu.log.metrics.fsyncLatency, WALMinSyncInterval: d.opts.WALMinSyncInterval, }) + d.mu.log.LogWriter.QueueSemChan = d.commit.logSyncQSem } immMem := d.mu.mem.mutable diff --git a/open.go b/open.go index 4d0cf96990..f4cb9f8133 100644 --- a/open.go +++ b/open.go @@ -428,6 +428,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { WALFsyncLatency: d.mu.log.metrics.fsyncLatency, } d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig) + d.mu.log.LogWriter.QueueSemChan = d.commit.logSyncQSem d.mu.versions.metrics.WAL.Files++ } d.updateReadStateLocked(d.opts.DebugCheck) diff --git a/record/log_writer.go b/record/log_writer.go index 6098d19260..b080e0e733 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -37,6 +37,8 @@ type syncer interface { Sync() error } +// TODO(sumeer): increase this if the ApplyNoSyncWait API results in this +// becoming the bottleneck. const ( syncConcurrencyBits = 9 @@ -133,7 +135,7 @@ func (q *syncQueue) load() (head, tail, realLength uint32) { return head, tail, realLength } -func (q *syncQueue) pop(head, tail uint32, err error) error { +func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error { if tail == head { // Queue is empty. return nil @@ -153,6 +155,10 @@ func (q *syncQueue) pop(head, tail uint32, err error) error { // will try to enqueue before we've "freed" space in the queue. atomic.AddUint64(&q.headTail, 1) wg.Done() + // Is always non-nil in production. + if queueSemChan != nil { + <-queueSemChan + } } return nil @@ -291,6 +297,10 @@ type LogWriter struct { // used for min-sync-interval. In normal operation this points to // time.AfterFunc. afterFunc func(d time.Duration, f func()) syncTimer + + // QueueSemChan is an optional channel to signal when popping from syncQueue. + // TODO(sumeer): pass this as a parameter to NewLogWriter. + QueueSemChan chan struct{} } // LogWriterConfig is a struct used for configuring new LogWriters @@ -441,7 +451,7 @@ func (w *LogWriter) flushLoop(context.Context) { // If flusher has an error, we propagate it to waiters. Note in spite of // error we consume the pending list above to free blocks for writers. if f.err != nil { - f.syncQ.pop(head, tail, f.err) + f.syncQ.pop(head, tail, f.err, w.QueueSemChan) // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. @@ -518,7 +528,7 @@ func (w *LogWriter) flushPending( syncLatency, err = w.syncWithLatency() } f := &w.flusher - if popErr := f.syncQ.pop(head, tail, err); popErr != nil { + if popErr := f.syncQ.pop(head, tail, err, w.QueueSemChan); popErr != nil { return synced, syncLatency, bytesWritten, popErr } } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 31574797e4..d79f8a485b 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -42,7 +42,7 @@ func TestSyncQueue(t *testing.T) { return } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -98,7 +98,7 @@ func TestFlusherCond(t *testing.T) { } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -199,6 +199,27 @@ func TestSyncRecord(t *testing.T) { } } +func TestSyncRecordWithSignalChan(t *testing.T) { + f := &syncFile{} + w := NewLogWriter(f, 0, LogWriterConfig{WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) + semChan := make(chan struct{}, 5) + for i := 0; i < cap(semChan); i++ { + semChan <- struct{}{} + } + w.QueueSemChan = semChan + require.Equal(t, cap(semChan), len(semChan)) + var syncErr error + for i := 0; i < 5; i++ { + var syncWG sync.WaitGroup + syncWG.Add(1) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + require.NoError(t, err) + syncWG.Wait() + require.NoError(t, syncErr) + require.Equal(t, cap(semChan)-(i+1), len(semChan)) + } +} + type fakeTimer struct { f func() }