From df9ce5fa155a0bb3097be3d126c4f87d2c2ea608 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 9 Nov 2022 14:45:01 -0500 Subject: [PATCH] [DNM] db: add DB.ApplyNoSyncWait for asynchronous apply ApplyNoSyncWait must only be used when WriteOptions.Sync is true. It enqueues the Batch to the WAL, adds to the memtable, and waits until the batch is visible in the memtable, and then returns to the caller. The caller is responsible for calling Batch.SyncWait to wait until the write to the WAL is fsynced. This change required splitting the WaitGroup in the Batch into two WaitGroups, so waiting for the visibility can happen separately from waiting for the WAL write. Additionally, the channel used as a semaphore for reserving space in the two lock-free queues is split into two channels, since dequeueing from these queues can happen in arbitrary order. There may be some performance overhead of pushing and popping from two channels instead of one. Informs https://github.com/cockroachdb/cockroach/issues/17500 See discussion thread https://github.com/cockroachdb/cockroach/pull/87050#pullrequestreview-1119047188 --- batch.go | 16 ++++++++ batch_test.go | 25 +++++++++++++ commit.go | 77 +++++++++++++++++++++++++++++---------- commit_test.go | 47 +++++++++++++++++++++--- db.go | 24 +++++++++++- open.go | 1 + record/log_writer.go | 16 ++++++-- record/log_writer_test.go | 25 ++++++++++++- 8 files changed, 201 insertions(+), 30 deletions(-) diff --git a/batch.go b/batch.go index 6544dbe472..8cb060304c 100644 --- a/batch.go +++ b/batch.go @@ -255,7 +255,14 @@ type Batch struct { // memtable. flushable *flushableBatch + // Synchronous Apply uses the commit WaitGroup for both publishing the + // seqnum and waiting for the WAL fsync (if needed). Asynchronous + // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit + // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for + // waiting for the WAL fsync. commit sync.WaitGroup + fsyncWait sync.WaitGroup + commitErr error applied uint32 // updated atomically } @@ -1205,6 +1212,15 @@ func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) { return data[v:], data[:v], true } +// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait. +func (b *Batch) SyncWait() error { + b.fsyncWait.Wait() + if b.commitErr != nil { + b.db = nil // prevent batch reuse on error + } + return b.commitErr +} + // BatchReader iterates over the entries contained in a batch. type BatchReader []byte diff --git a/batch_test.go b/batch_test.go index a2b8121a34..d05ed05221 100644 --- a/batch_test.go +++ b/batch_test.go @@ -223,6 +223,31 @@ func TestBatchEmpty(t *testing.T) { require.NoError(t, iter2.Close()) } +func TestBatchApplyNoSyncWait(t *testing.T) { + db, err := Open("", &Options{ + FS: vfs.NewMem(), + }) + require.NoError(t, err) + defer db.Close() + var batches []*Batch + options := &WriteOptions{Sync: true} + for i := 0; i < 10000; i++ { + b := db.NewBatch() + str := fmt.Sprintf("a%d", i) + require.NoError(t, b.Set([]byte(str), []byte(str), nil)) + require.NoError(t, db.ApplyNoSyncWait(b, options)) + val, closer, err := db.Get([]byte(str)) + require.NoError(t, err) + require.Equal(t, str, string(val)) + closer.Close() + batches = append(batches, b) + } + for _, b := range batches { + require.NoError(t, b.SyncWait()) + b.Close() + } +} + func TestBatchReset(t *testing.T) { db, err := Open("", &Options{ FS: vfs.NewMem(), diff --git a/commit.go b/commit.go index 83d1a5067f..488cf3f908 100644 --- a/commit.go +++ b/commit.go @@ -60,7 +60,7 @@ func (q *commitQueue) enqueue(b *Batch) { ptrs := atomic.LoadUint64(&q.headTail) head, tail := q.unpack(ptrs) if (tail+uint32(len(q.slots)))&(1< syncWAL +func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error { if b.Empty() { return nil } - p.sem <- struct{}{} + // TODO(sumeer): will block here if SyncConcurrency is insufficient. Monitor + // and increase if needed. + p.commitQueueSem <- struct{}{} + if syncWAL { + p.logSyncQSem <- struct{}{} + } // Prepare the batch for committing: enqueuing the batch in the pending // queue, determining the batch sequence number and writing the data to the @@ -250,9 +284,11 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error { // // NB: We set Batch.commitErr on error so that the batch won't be a candidate // for reuse. See Batch.release(). - mem, err := p.prepare(b, syncWAL) + mem, err := p.prepare(b, syncWAL, noSyncWait) if err != nil { b.db = nil // prevent batch reuse on error + // NB: we are not doing <-p.commitQueueSem since the batch is still + // sitting in the pending queue. Should we fix this? return err } @@ -265,7 +301,7 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error { // Publish the batch sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem if b.commitErr != nil { b.db = nil // prevent batch reuse on error @@ -294,7 +330,7 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se b.setCount(uint32(count)) b.commit.Add(1) - p.sem <- struct{}{} + p.commitQueueSem <- struct{}{} p.mu.Lock() @@ -341,27 +377,30 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se // Publish the sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem } -func (p *commitPipeline) prepare(b *Batch, syncWAL bool) (*memTable, error) { +func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) { n := uint64(b.Count()) if n == invalidBatchCount { return nil, ErrInvalidBatch } - count := 1 - if syncWAL { - count++ - } - // count represents the waiting needed for publish, and optionally the - // waiting needed for the WAL sync. - b.commit.Add(count) - var syncWG *sync.WaitGroup var syncErr *error + commitCount := 1 if syncWAL { - syncWG, syncErr = &b.commit, &b.commitErr + syncErr = &b.commitErr + if noSyncWait { + syncWG = &b.fsyncWait + b.fsyncWait.Add(1) + } else { + syncWG = &b.commit + commitCount++ + } } + // commitCount represents the waiting needed for publish, and optionally the + // waiting needed for the WAL sync. + b.commit.Add(commitCount) p.mu.Lock() diff --git a/commit_test.go b/commit_test.go index c680ef6527..fcffcc6018 100644 --- a/commit_test.go +++ b/commit_test.go @@ -31,6 +31,7 @@ type testCommitEnv struct { sync.Mutex buf []uint64 } + queueSemChan chan struct{} } func (e *testCommitEnv) env() commitEnv { @@ -49,10 +50,14 @@ func (e *testCommitEnv) apply(b *Batch, mem *memTable) error { return nil } -func (e *testCommitEnv) write(b *Batch, _ *sync.WaitGroup, _ *error) (*memTable, error) { +func (e *testCommitEnv) write(b *Batch, wg *sync.WaitGroup, _ *error) (*memTable, error) { n := int64(len(b.data)) atomic.AddInt64(&e.writePos, n) atomic.AddUint64(&e.writeCount, 1) + if wg != nil { + wg.Done() + <-e.queueSemChan + } return nil, nil } @@ -100,7 +105,7 @@ func TestCommitPipeline(t *testing.T) { defer wg.Done() var b Batch _ = b.Set([]byte(fmt.Sprint(i)), nil, nil) - _ = p.Commit(&b, false) + _ = p.Commit(&b, false, false) }(i) } wg.Wait() @@ -120,6 +125,37 @@ func TestCommitPipeline(t *testing.T) { } } +func TestCommitPipelineSync(t *testing.T) { + var e testCommitEnv + p := newCommitPipeline(e.env()) + e.queueSemChan = p.logSyncQSem + + n := 10000 + if invariants.RaceEnabled { + // Under race builds we have to limit the concurrency or we hit the + // following error: + // + // race: limit on 8128 simultaneously alive goroutines is exceeded, dying + n = 1000 + } + + for _, noSyncWait := range []bool{false, true} { + t.Run(fmt.Sprintf("no-sync-wait=%t", noSyncWait), func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + var b Batch + _ = b.Set([]byte(fmt.Sprint(i)), nil, nil) + _ = p.Commit(&b, true, noSyncWait) + }(i) + } + wg.Wait() + }) + } +} + func TestCommitPipelineAllocateSeqNum(t *testing.T) { var e testCommitEnv p := newCommitPipeline(e.env()) @@ -203,11 +239,12 @@ func TestCommitPipelineWALClose(t *testing.T) { }, } p := newCommitPipeline(testEnv) + wal.QueueSemChan = p.logSyncQSem // Launch N (commitConcurrency) goroutines which each create a batch and // commit it with sync==true. Because of the syncDelayFile, none of these // operations can complete until syncDelayFile.done is closed. - errCh := make(chan error, cap(p.sem)) + errCh := make(chan error, cap(p.commitQueueSem)) walDone.Add(cap(errCh)) for i := 0; i < cap(errCh); i++ { go func(i int) { @@ -216,7 +253,7 @@ func TestCommitPipelineWALClose(t *testing.T) { errCh <- err return } - errCh <- p.Commit(b, true /* sync */) + errCh <- p.Commit(b, true /* sync */, false) }(i) } @@ -284,7 +321,7 @@ func BenchmarkCommitPipeline(b *testing.B) { batch := newBatch(nil) binary.BigEndian.PutUint64(buf, rng.Uint64()) batch.Set(buf, buf, nil) - if err := p.Commit(batch, true /* sync */); err != nil { + if err := p.Commit(batch, true /* sync */, false); err != nil { b.Fatal(err) } batch.release() diff --git a/db.go b/db.go index 2e2e019123..a268c82268 100644 --- a/db.go +++ b/db.go @@ -724,6 +724,27 @@ func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error { // // It is safe to modify the contents of the arguments after Apply returns. func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { + return d.applyInternal(batch, opts, false) +} + +// ApplyNoSyncWait must only be used when opts.Sync is true and the caller +// does not want to wait for the WAL fsync to happen. The method will return +// once the mutation is applied to the memtable and is visible (note that a +// mutation is visible before the WAL sync even in the wait case, so we have +// not weakened the durability semantics). The caller must call Batch.SyncWait +// to wait for the WAL fsync. The caller must not Close the batch without +// first calling Batch.SyncWait. +// RECOMMENDATION: Prefer using Apply unless you really understand why you +// need ApplyNoSyncWait. +func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error { + if !opts.Sync { + return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false") + } + return d.applyInternal(batch, opts, true) +} + +// REQUIRES: noSyncWait => opts.Sync +func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error { if err := d.closed.Load(); err != nil { panic(err) } @@ -762,7 +783,7 @@ func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { if int(batch.memTableSize) >= d.largeBatchThreshold { batch.flushable = newFlushableBatch(batch, d.opts.Comparer) } - if err := d.commit.Commit(batch, sync); err != nil { + if err := d.commit.Commit(batch, sync, noSyncWait); err != nil { // There isn't much we can do on an error here. The commit pipeline will be // horked at this point. d.opts.Logger.Fatalf("%v", err) @@ -1986,6 +2007,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { WALFsyncLatency: d.mu.log.metrics.fsyncLatency, WALMinSyncInterval: d.opts.WALMinSyncInterval, }) + d.mu.log.LogWriter.QueueSemChan = d.commit.logSyncQSem } immMem := d.mu.mem.mutable diff --git a/open.go b/open.go index 4d0cf96990..f4cb9f8133 100644 --- a/open.go +++ b/open.go @@ -428,6 +428,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { WALFsyncLatency: d.mu.log.metrics.fsyncLatency, } d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig) + d.mu.log.LogWriter.QueueSemChan = d.commit.logSyncQSem d.mu.versions.metrics.WAL.Files++ } d.updateReadStateLocked(d.opts.DebugCheck) diff --git a/record/log_writer.go b/record/log_writer.go index 6098d19260..b080e0e733 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -37,6 +37,8 @@ type syncer interface { Sync() error } +// TODO(sumeer): increase this if the ApplyNoSyncWait API results in this +// becoming the bottleneck. const ( syncConcurrencyBits = 9 @@ -133,7 +135,7 @@ func (q *syncQueue) load() (head, tail, realLength uint32) { return head, tail, realLength } -func (q *syncQueue) pop(head, tail uint32, err error) error { +func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error { if tail == head { // Queue is empty. return nil @@ -153,6 +155,10 @@ func (q *syncQueue) pop(head, tail uint32, err error) error { // will try to enqueue before we've "freed" space in the queue. atomic.AddUint64(&q.headTail, 1) wg.Done() + // Is always non-nil in production. + if queueSemChan != nil { + <-queueSemChan + } } return nil @@ -291,6 +297,10 @@ type LogWriter struct { // used for min-sync-interval. In normal operation this points to // time.AfterFunc. afterFunc func(d time.Duration, f func()) syncTimer + + // QueueSemChan is an optional channel to signal when popping from syncQueue. + // TODO(sumeer): pass this as a parameter to NewLogWriter. + QueueSemChan chan struct{} } // LogWriterConfig is a struct used for configuring new LogWriters @@ -441,7 +451,7 @@ func (w *LogWriter) flushLoop(context.Context) { // If flusher has an error, we propagate it to waiters. Note in spite of // error we consume the pending list above to free blocks for writers. if f.err != nil { - f.syncQ.pop(head, tail, f.err) + f.syncQ.pop(head, tail, f.err, w.QueueSemChan) // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. @@ -518,7 +528,7 @@ func (w *LogWriter) flushPending( syncLatency, err = w.syncWithLatency() } f := &w.flusher - if popErr := f.syncQ.pop(head, tail, err); popErr != nil { + if popErr := f.syncQ.pop(head, tail, err, w.QueueSemChan); popErr != nil { return synced, syncLatency, bytesWritten, popErr } } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 31574797e4..d79f8a485b 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -42,7 +42,7 @@ func TestSyncQueue(t *testing.T) { return } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -98,7 +98,7 @@ func TestFlusherCond(t *testing.T) { } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -199,6 +199,27 @@ func TestSyncRecord(t *testing.T) { } } +func TestSyncRecordWithSignalChan(t *testing.T) { + f := &syncFile{} + w := NewLogWriter(f, 0, LogWriterConfig{WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) + semChan := make(chan struct{}, 5) + for i := 0; i < cap(semChan); i++ { + semChan <- struct{}{} + } + w.QueueSemChan = semChan + require.Equal(t, cap(semChan), len(semChan)) + var syncErr error + for i := 0; i < 5; i++ { + var syncWG sync.WaitGroup + syncWG.Add(1) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + require.NoError(t, err) + syncWG.Wait() + require.NoError(t, syncErr) + require.Equal(t, cap(semChan)-(i+1), len(semChan)) + } +} + type fakeTimer struct { f func() }