diff --git a/batch.go b/batch.go index 37fabc1270..1afb976cae 100644 --- a/batch.go +++ b/batch.go @@ -255,7 +255,22 @@ type Batch struct { // memtable. flushable *flushableBatch + // Synchronous Apply uses the commit WaitGroup for both publishing the + // seqnum and waiting for the WAL fsync (if needed). Asynchronous + // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit + // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for + // waiting for the WAL fsync. + // + // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with + // SyncWait is causing higher memory usage because of the time duration + // between when the sync is already done, and a goroutine calls SyncWait + // (followed by Batch.Close), we could separate out {fsyncWait, commitErr} + // into a separate struct that is allocated separately (using another + // sync.Pool), and only that struct needs to outlive Batch.Close (which + // could then be called immediately after ApplyNoSncWait). commit sync.WaitGroup + fsyncWait sync.WaitGroup + commitErr error applied uint32 // updated atomically } @@ -1089,6 +1104,7 @@ func (b *Batch) Reset() { b.rangeKeysSeqNum = 0 b.flushable = nil b.commit = sync.WaitGroup{} + b.fsyncWait = sync.WaitGroup{} b.commitErr = nil atomic.StoreUint32(&b.applied, 0) if b.data != nil { @@ -1205,6 +1221,15 @@ func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) { return data[v:], data[:v], true } +// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait. +func (b *Batch) SyncWait() error { + b.fsyncWait.Wait() + if b.commitErr != nil { + b.db = nil // prevent batch reuse on error + } + return b.commitErr +} + // BatchReader iterates over the entries contained in a batch. type BatchReader []byte diff --git a/batch_test.go b/batch_test.go index 5c9902222f..b56168c230 100644 --- a/batch_test.go +++ b/batch_test.go @@ -223,6 +223,32 @@ func TestBatchEmpty(t *testing.T) { require.NoError(t, iter2.Close()) } +func TestBatchApplyNoSyncWait(t *testing.T) { + db, err := Open("", &Options{ + FS: vfs.NewMem(), + }) + require.NoError(t, err) + defer db.Close() + var batches []*Batch + options := &WriteOptions{Sync: true} + for i := 0; i < 10000; i++ { + b := db.NewBatch() + str := fmt.Sprintf("a%d", i) + require.NoError(t, b.Set([]byte(str), []byte(str), nil)) + require.NoError(t, db.ApplyNoSyncWait(b, options)) + // k-v pair is visible even if not yet synced. + val, closer, err := db.Get([]byte(str)) + require.NoError(t, err) + require.Equal(t, str, string(val)) + closer.Close() + batches = append(batches, b) + } + for _, b := range batches { + require.NoError(t, b.SyncWait()) + b.Close() + } +} + func TestBatchReset(t *testing.T) { db, err := Open("", &Options{ FS: vfs.NewMem(), @@ -244,6 +270,7 @@ func TestBatchReset(t *testing.T) { b.applied = 1 b.commitErr = errors.New("test-error") b.commit.Add(1) + b.fsyncWait.Add(1) require.Equal(t, uint32(3), b.Count()) require.Equal(t, uint64(1), b.countRangeDels) require.Equal(t, uint64(1), b.countRangeKeys) diff --git a/commit.go b/commit.go index 9bec350d5f..d93824c2d6 100644 --- a/commit.go +++ b/commit.go @@ -60,7 +60,7 @@ func (q *commitQueue) enqueue(b *Batch) { ptrs := atomic.LoadUint64(&q.headTail) head, tail := q.unpack(ptrs) if (tail+uint32(len(q.slots)))&(1< syncWAL +func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error { if b.Empty() { return nil } - p.sem <- struct{}{} + // Acquire semaphores. + p.commitQueueSem <- struct{}{} + if syncWAL { + p.logSyncQSem <- struct{}{} + } // Prepare the batch for committing: enqueuing the batch in the pending // queue, determining the batch sequence number and writing the data to the @@ -250,27 +287,41 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error { // // NB: We set Batch.commitErr on error so that the batch won't be a candidate // for reuse. See Batch.release(). - mem, err := p.prepare(b, syncWAL) + mem, err := p.prepare(b, syncWAL, noSyncWait) if err != nil { b.db = nil // prevent batch reuse on error + // NB: we are not doing <-p.commitQueueSem since the batch is still + // sitting in the pending queue. We should consider fixing this by also + // removing the batch from the pending queue. return err } // Apply the batch to the memtable. if err := p.env.apply(b, mem); err != nil { b.db = nil // prevent batch reuse on error + // NB: we are not doing <-p.commitQueueSem since the batch is still + // sitting in the pending queue. We should consider fixing this by also + // removing the batch from the pending queue. return err } // Publish the batch sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem - if b.commitErr != nil { - b.db = nil // prevent batch reuse on error + if !noSyncWait { + // Already waited for commit, so look at the error. + if b.commitErr != nil { + b.db = nil // prevent batch reuse on error + err = b.commitErr + } } - return b.commitErr + // Else noSyncWait. The LogWriter can be concurrently writing to + // b.commitErr. We will read b.commitErr in Batch.SyncWait after the + // LogWriter is done writing. + + return err } // AllocateSeqNum allocates count sequence numbers, invokes the prepare @@ -294,7 +345,7 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se b.setCount(uint32(count)) b.commit.Add(1) - p.sem <- struct{}{} + p.commitQueueSem <- struct{}{} p.mu.Lock() @@ -341,27 +392,30 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se // Publish the sequence number. p.publish(b) - <-p.sem + <-p.commitQueueSem } -func (p *commitPipeline) prepare(b *Batch, syncWAL bool) (*memTable, error) { +func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) { n := uint64(b.Count()) if n == invalidBatchCount { return nil, ErrInvalidBatch } - count := 1 - if syncWAL { - count++ - } - // count represents the waiting needed for publish, and optionally the - // waiting needed for the WAL sync. - b.commit.Add(count) - var syncWG *sync.WaitGroup var syncErr *error + commitCount := 1 if syncWAL { - syncWG, syncErr = &b.commit, &b.commitErr + syncErr = &b.commitErr + if noSyncWait { + syncWG = &b.fsyncWait + b.fsyncWait.Add(1) + } else { + syncWG = &b.commit + commitCount++ + } } + // commitCount represents the waiting needed for publish, and optionally the + // waiting needed for the WAL sync. + b.commit.Add(commitCount) p.mu.Lock() diff --git a/commit_test.go b/commit_test.go index c680ef6527..d4ae4c4daf 100644 --- a/commit_test.go +++ b/commit_test.go @@ -31,6 +31,7 @@ type testCommitEnv struct { sync.Mutex buf []uint64 } + queueSemChan chan struct{} } func (e *testCommitEnv) env() commitEnv { @@ -49,10 +50,14 @@ func (e *testCommitEnv) apply(b *Batch, mem *memTable) error { return nil } -func (e *testCommitEnv) write(b *Batch, _ *sync.WaitGroup, _ *error) (*memTable, error) { +func (e *testCommitEnv) write(b *Batch, wg *sync.WaitGroup, _ *error) (*memTable, error) { n := int64(len(b.data)) atomic.AddInt64(&e.writePos, n) atomic.AddUint64(&e.writeCount, 1) + if wg != nil { + wg.Done() + <-e.queueSemChan + } return nil, nil } @@ -100,7 +105,7 @@ func TestCommitPipeline(t *testing.T) { defer wg.Done() var b Batch _ = b.Set([]byte(fmt.Sprint(i)), nil, nil) - _ = p.Commit(&b, false) + _ = p.Commit(&b, false, false) }(i) } wg.Wait() @@ -120,6 +125,53 @@ func TestCommitPipeline(t *testing.T) { } } +func TestCommitPipelineSync(t *testing.T) { + n := 10000 + if invariants.RaceEnabled { + // Under race builds we have to limit the concurrency or we hit the + // following error: + // + // race: limit on 8128 simultaneously alive goroutines is exceeded, dying + n = 1000 + } + + for _, noSyncWait := range []bool{false, true} { + t.Run(fmt.Sprintf("no-sync-wait=%t", noSyncWait), func(t *testing.T) { + var e testCommitEnv + p := newCommitPipeline(e.env()) + e.queueSemChan = p.logSyncQSem + + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + var b Batch + require.NoError(t, b.Set([]byte(fmt.Sprint(i)), nil, nil)) + require.NoError(t, p.Commit(&b, true, noSyncWait)) + if noSyncWait { + require.NoError(t, b.SyncWait()) + } + }(i) + } + wg.Wait() + if s := atomic.LoadUint64(&e.writeCount); uint64(n) != s { + t.Fatalf("expected %d written batches, but found %d", n, s) + } + if n != len(e.applyBuf.buf) { + t.Fatalf("expected %d written batches, but found %d", + n, len(e.applyBuf.buf)) + } + if s := atomic.LoadUint64(&e.logSeqNum); uint64(n) != s { + t.Fatalf("expected %d, but found %d", n, s) + } + if s := atomic.LoadUint64(&e.visibleSeqNum); uint64(n) != s { + t.Fatalf("expected %d, but found %d", n, s) + } + }) + } +} + func TestCommitPipelineAllocateSeqNum(t *testing.T) { var e testCommitEnv p := newCommitPipeline(e.env()) @@ -185,9 +237,7 @@ func TestCommitPipelineWALClose(t *testing.T) { } // A basic commitEnv which writes to a WAL. - wal := record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), - }) + var wal *record.LogWriter var walDone sync.WaitGroup testEnv := commitEnv{ logSeqNum: new(uint64), @@ -203,11 +253,15 @@ func TestCommitPipelineWALClose(t *testing.T) { }, } p := newCommitPipeline(testEnv) + wal = record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + QueueSemChan: p.logSyncQSem, + }) // Launch N (commitConcurrency) goroutines which each create a batch and // commit it with sync==true. Because of the syncDelayFile, none of these // operations can complete until syncDelayFile.done is closed. - errCh := make(chan error, cap(p.sem)) + errCh := make(chan error, cap(p.commitQueueSem)) walDone.Add(cap(errCh)) for i := 0; i < cap(errCh); i++ { go func(i int) { @@ -216,7 +270,7 @@ func TestCommitPipelineWALClose(t *testing.T) { errCh <- err return } - errCh <- p.Commit(b, true /* sync */) + errCh <- p.Commit(b, true /* sync */, false) }(i) } @@ -234,62 +288,71 @@ func TestCommitPipelineWALClose(t *testing.T) { } func BenchmarkCommitPipeline(b *testing.B) { - for _, parallelism := range []int{1, 2, 4, 8, 16, 32, 64, 128} { - b.Run(fmt.Sprintf("parallel=%d", parallelism), func(b *testing.B) { - b.SetParallelism(parallelism) - mem := newMemTable(memTableOptions{}) - wal := record.NewLogWriter(io.Discard, 0 /* logNum */, record.LogWriterConfig{ - WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), - }) + for _, noSyncWait := range []bool{false, true} { + for _, parallelism := range []int{1, 2, 4, 8, 16, 32, 64, 128} { + b.Run(fmt.Sprintf("no-sync-wait=%t/parallel=%d", noSyncWait, parallelism), + func(b *testing.B) { + b.SetParallelism(parallelism) + mem := newMemTable(memTableOptions{}) + var wal *record.LogWriter + nullCommitEnv := commitEnv{ + logSeqNum: new(uint64), + visibleSeqNum: new(uint64), + apply: func(b *Batch, mem *memTable) error { + err := mem.apply(b, b.SeqNum()) + if err != nil { + return err + } + mem.writerUnref() + return nil + }, + write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) { + for { + err := mem.prepare(b) + if err == arenaskl.ErrArenaFull { + mem = newMemTable(memTableOptions{}) + continue + } + if err != nil { + return nil, err + } + break + } - nullCommitEnv := commitEnv{ - logSeqNum: new(uint64), - visibleSeqNum: new(uint64), - apply: func(b *Batch, mem *memTable) error { - err := mem.apply(b, b.SeqNum()) - if err != nil { - return err - } - mem.writerUnref() - return nil - }, - write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) { - for { - err := mem.prepare(b) - if err == arenaskl.ErrArenaFull { - mem = newMemTable(memTableOptions{}) - continue - } - if err != nil { - return nil, err - } - break + _, err := wal.SyncRecord(b.data, syncWG, syncErr) + return mem, err + }, } + p := newCommitPipeline(nullCommitEnv) + wal = record.NewLogWriter(io.Discard, 0, /* logNum */ + record.LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + QueueSemChan: p.logSyncQSem, + }) + const keySize = 8 + b.SetBytes(2 * keySize) + b.ResetTimer() - _, err := wal.SyncRecord(b.data, syncWG, syncErr) - return mem, err - }, - } - p := newCommitPipeline(nullCommitEnv) - - const keySize = 8 - b.SetBytes(2 * keySize) - b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + buf := make([]byte, keySize) - b.RunParallel(func(pb *testing.PB) { - rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) - buf := make([]byte, keySize) - - for pb.Next() { - batch := newBatch(nil) - binary.BigEndian.PutUint64(buf, rng.Uint64()) - batch.Set(buf, buf, nil) - if err := p.Commit(batch, true /* sync */); err != nil { - b.Fatal(err) - } - batch.release() - } - }) - }) + for pb.Next() { + batch := newBatch(nil) + binary.BigEndian.PutUint64(buf, rng.Uint64()) + batch.Set(buf, buf, nil) + if err := p.Commit(batch, true /* sync */, noSyncWait); err != nil { + b.Fatal(err) + } + if noSyncWait { + if err := batch.SyncWait(); err != nil { + b.Fatal(err) + } + } + batch.release() + } + }) + }) + } } } diff --git a/db.go b/db.go index d5b294b507..34573f20a9 100644 --- a/db.go +++ b/db.go @@ -724,6 +724,30 @@ func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error { // // It is safe to modify the contents of the arguments after Apply returns. func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { + return d.applyInternal(batch, opts, false) +} + +// ApplyNoSyncWait must only be used when opts.Sync is true and the caller +// does not want to wait for the WAL fsync to happen. The method will return +// once the mutation is applied to the memtable and is visible (note that a +// mutation is visible before the WAL sync even in the wait case, so we have +// not weakened the durability semantics). The caller must call Batch.SyncWait +// to wait for the WAL fsync. The caller must not Close the batch without +// first calling Batch.SyncWait. +// +// RECOMMENDATION: Prefer using Apply unless you really understand why you +// need ApplyNoSyncWait. +// EXPERIMENTAL: API/feature subject to change. Do not yet use outside +// CockroachDB. +func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error { + if !opts.Sync { + return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false") + } + return d.applyInternal(batch, opts, true) +} + +// REQUIRES: noSyncWait => opts.Sync +func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error { if err := d.closed.Load(); err != nil { panic(err) } @@ -762,7 +786,7 @@ func (d *DB) Apply(batch *Batch, opts *WriteOptions) error { if int(batch.memTableSize) >= d.largeBatchThreshold { batch.flushable = newFlushableBatch(batch, d.opts.Comparer) } - if err := d.commit.Commit(batch, sync); err != nil { + if err := d.commit.Commit(batch, sync, noSyncWait); err != nil { // There isn't much we can do on an error here. The commit pipeline will be // horked at this point. d.opts.Logger.Fatalf("%v", err) @@ -1988,6 +2012,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{ WALFsyncLatency: d.mu.log.metrics.fsyncLatency, WALMinSyncInterval: d.opts.WALMinSyncInterval, + QueueSemChan: d.commit.logSyncQSem, }) } diff --git a/open.go b/open.go index 4d0cf96990..5969cf490f 100644 --- a/open.go +++ b/open.go @@ -426,6 +426,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { logWriterConfig := record.LogWriterConfig{ WALMinSyncInterval: d.opts.WALMinSyncInterval, WALFsyncLatency: d.mu.log.metrics.fsyncLatency, + QueueSemChan: d.commit.logSyncQSem, } d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig) d.mu.versions.metrics.WAL.Files++ diff --git a/record/log_writer.go b/record/log_writer.go index 6098d19260..5658508259 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -133,7 +133,8 @@ func (q *syncQueue) load() (head, tail, realLength uint32) { return head, tail, realLength } -func (q *syncQueue) pop(head, tail uint32, err error) error { +// REQUIRES: queueSemChan is non-nil. +func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error { if tail == head { // Queue is empty. return nil @@ -153,6 +154,10 @@ func (q *syncQueue) pop(head, tail uint32, err error) error { // will try to enqueue before we've "freed" space in the queue. atomic.AddUint64(&q.headTail, 1) wg.Done() + // Is always non-nil in production. + if queueSemChan != nil { + <-queueSemChan + } } return nil @@ -291,12 +296,20 @@ type LogWriter struct { // used for min-sync-interval. In normal operation this points to // time.AfterFunc. afterFunc func(d time.Duration, f func()) syncTimer + + // See the comment for LogWriterConfig.QueueSemChan. + queueSemChan chan struct{} } // LogWriterConfig is a struct used for configuring new LogWriters type LogWriterConfig struct { WALMinSyncInterval durationFunc WALFsyncLatency prometheus.Histogram + // QueueSemChan is an optional channel to pop from when popping from + // LogWriter.flusher.syncQueue. It functions as a semaphore that prevents + // the syncQueue from overflowing (which will cause a panic). All production + // code ensures this is non-nil. + QueueSemChan chan struct{} } // CapAllocatedBlocks is the maximum number of blocks allocated by the @@ -319,6 +332,7 @@ func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterCon afterFunc: func(d time.Duration, f func()) syncTimer { return time.AfterFunc(d, f) }, + queueSemChan: logWriterConfig.QueueSemChan, } r.free.cond.L = &r.free.Mutex r.free.blocks = make([]*block, 0, CapAllocatedBlocks) @@ -441,7 +455,7 @@ func (w *LogWriter) flushLoop(context.Context) { // If flusher has an error, we propagate it to waiters. Note in spite of // error we consume the pending list above to free blocks for writers. if f.err != nil { - f.syncQ.pop(head, tail, f.err) + f.syncQ.pop(head, tail, f.err, w.queueSemChan) // Update the idleStartTime if work could not be done, so that we don't // include the duration we tried to do work as idle. We don't bother // with the rest of the accounting, which means we will undercount. @@ -518,7 +532,7 @@ func (w *LogWriter) flushPending( syncLatency, err = w.syncWithLatency() } f := &w.flusher - if popErr := f.syncQ.pop(head, tail, err); popErr != nil { + if popErr := f.syncQ.pop(head, tail, err, w.queueSemChan); popErr != nil { return synced, syncLatency, bytesWritten, popErr } } diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 31574797e4..33011e8b91 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -42,7 +42,7 @@ func TestSyncQueue(t *testing.T) { return } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -98,7 +98,7 @@ func TestFlusherCond(t *testing.T) { } head, tail, _ := q.load() - q.pop(head, tail, nil) + q.pop(head, tail, nil, nil) } }() @@ -199,6 +199,29 @@ func TestSyncRecord(t *testing.T) { } } +func TestSyncRecordWithSignalChan(t *testing.T) { + f := &syncFile{} + semChan := make(chan struct{}, 5) + for i := 0; i < cap(semChan); i++ { + semChan <- struct{}{} + } + w := NewLogWriter(f, 0, LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + QueueSemChan: semChan, + }) + require.Equal(t, cap(semChan), len(semChan)) + var syncErr error + for i := 0; i < 5; i++ { + var syncWG sync.WaitGroup + syncWG.Add(1) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + require.NoError(t, err) + syncWG.Wait() + require.NoError(t, syncErr) + require.Equal(t, cap(semChan)-(i+1), len(semChan)) + } +} + type fakeTimer struct { f func() }