From bb67bab8e996b28a331436b9a1b0c90e01c533dd Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 20 Jul 2023 17:24:29 -0400 Subject: [PATCH] record: allow unbounded queued blocks in LogWriter Previously, a LogWriter would allocate up to 16 blocks of 32 KiB for buffering WAL writes. If all 16 blocks had been allocated and no free blocks were available, a batch writing to the WAL would queue until the flushing goroutine freed blocks. In testing of write-heavy workloads, especially with larger value sizes, we've seen queueing at the LogWriter. This queueing blocks the commit pipeline, preventing any batches from committing regardless of priority and whether they require waiting for fsync. This commit modifies LogWriter to allow the queueing of an unbounded number of blocks. In practice, for the current WAL, the memtable size serves as an upper bound. With a 64 MiB memtable, at most 64 MiB / 32 KiB = 2,048 blocks may queue. This is not an unreasonable of additional memory overhead for a write-heavy workload. Beyond improving throughput for write-heavy workloads, removing this hard bound improves tolerance of momentary disk stalls. Informs cockroachdb/cockroach#88699. --- batch.go | 6 ++- batch_test.go | 32 -------------- commit_test.go | 4 +- db.go | 4 +- record/log_writer.go | 81 ++++++++++------------------------ record/log_writer_test.go | 91 +++++++++++++++++++++++++++++++++++---- vfs/vfstest/vfstest.go | 32 ++++++++++++++ 7 files changed, 147 insertions(+), 103 deletions(-) create mode 100644 vfs/vfstest/vfstest.go diff --git a/batch.go b/batch.go index 46510412ab..ac991b8115 100644 --- a/batch.go +++ b/batch.go @@ -319,7 +319,11 @@ type BatchCommitStats struct { // commitPipeline.Commit. SemaphoreWaitDuration time.Duration // WALQueueWaitDuration is the wait time for allocating memory blocks in the - // LogWriter (due to the LogWriter not writing fast enough). + // LogWriter (due to the LogWriter not writing fast enough). At the moment + // this is duration is always zero because a single WAL will allow + // allocating memory blocks up to the entire memtable size. In the future, + // we may pipeline WALs and bound the WAL queued blocks separately, so this + // field is preserved for that possibility. WALQueueWaitDuration time.Duration // MemTableWriteStallDuration is the wait caused by a write stall due to too // many memtables (due to not flushing fast enough). diff --git a/batch_test.go b/batch_test.go index 5a0d73dc7d..c0b1fe0030 100644 --- a/batch_test.go +++ b/batch_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/testkeys" - "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -1233,28 +1232,6 @@ func TestBatchCommitStats(t *testing.T) { } } - // WAL queue stall funcs. - // - // The LogWriter gets changed when stalling/unstalling the memtable, so we - // need to use a hook to tell us about the latest LogWriter. - var unstallWALQueue func() - stallWALQueue := func() { - var unstallLatestWALQueue func() - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = func(w *record.LogWriter) { - // db.mu will be held when this is called. - unstallLatestWALQueue = w.ReserveAllFreeBlocksForTesting() - } - db.mu.log.registerLogWriterForTesting(db.mu.log.LogWriter) - unstallWALQueue = func() { - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = nil - unstallLatestWALQueue() - } - } - // Commit wait stall funcs. var unstallCommitWait func() stallCommitWait := func() { @@ -1268,12 +1245,9 @@ func TestBatchCommitStats(t *testing.T) { stallCommitSemaphore() stallMemtable() stallL0ReadAmp() - stallWALQueue() stallCommitWait() // Exceed initialMemTableSize -- this is needed to make stallMemtable work. - // It also exceeds record.blockSize, requiring a new block to be allocated, - // which is what we need for stallWALQueue to work. require.NoError(t, b.Set(make([]byte, initialMemTableSize), nil, nil)) var commitWG sync.WaitGroup @@ -1291,8 +1265,6 @@ func TestBatchCommitStats(t *testing.T) { time.Sleep(sleepDuration) unstallL0ReadAmp() time.Sleep(sleepDuration) - unstallWALQueue() - time.Sleep(sleepDuration) unstallCommitWait() // Wait for Apply to return. @@ -1303,10 +1275,6 @@ func TestBatchCommitStats(t *testing.T) { return errors.Errorf("SemaphoreWaitDuration %s is too low", stats.SemaphoreWaitDuration.String()) } - if expectedDuration > stats.WALQueueWaitDuration { - return errors.Errorf("WALQueueWaitDuration %s is too low", - stats.WALQueueWaitDuration.String()) - } if expectedDuration > stats.MemTableWriteStallDuration { return errors.Errorf("MemTableWriteStallDuration %s is too low", stats.MemTableWriteStallDuration.String()) diff --git a/commit_test.go b/commit_test.go index 0e1ad4ac3c..ee627a113f 100644 --- a/commit_test.go +++ b/commit_test.go @@ -245,7 +245,7 @@ func TestCommitPipelineWALClose(t *testing.T) { return nil }, write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) { - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return nil, err }, } @@ -316,7 +316,7 @@ func BenchmarkCommitPipeline(b *testing.B) { break } - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return mem, err }, } diff --git a/db.go b/db.go index be6730f65b..95526f0389 100644 --- a/db.go +++ b/db.go @@ -909,7 +909,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem b.flushable.setSeqNum(b.SeqNum()) if !d.opts.DisableWAL { var err error - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } @@ -947,7 +947,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem } if b.flushable == nil { - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } diff --git a/record/log_writer.go b/record/log_writer.go index 90f8c2ef34..7cc7a83063 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -266,10 +266,7 @@ type LogWriter struct { block *block free struct { sync.Mutex - // Condition variable used to signal a block is freed. - cond sync.Cond - blocks []*block - allocated int + blocks []*block } flusher struct { @@ -313,9 +310,10 @@ type LogWriterConfig struct { QueueSemChan chan struct{} } -// CapAllocatedBlocks is the maximum number of blocks allocated by the -// LogWriter. -const CapAllocatedBlocks = 16 +// initialAllocatedBlocksCap is the initial capacity of the various slices +// intended to hold LogWriter blocks. The LogWriter may allocate more blocks +// than this threshold allows. +const initialAllocatedBlocksCap = 32 // NewLogWriter returns a new LogWriter. func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter { @@ -335,9 +333,7 @@ func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterCon }, queueSemChan: logWriterConfig.QueueSemChan, } - r.free.cond.L = &r.free.Mutex - r.free.blocks = make([]*block, 0, CapAllocatedBlocks) - r.free.allocated = 1 + r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap) r.block = &block{} r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ) r.flusher.closed = make(chan struct{}) @@ -405,8 +401,12 @@ func (w *LogWriter) flushLoop(context.Context) { // the flush work (w.block.written.Load()). // The list of full blocks that need to be written. This is copied from - // f.pending on every loop iteration, though the number of elements is small - // (usually 1, max 16). + // f.pending on every loop iteration, though the number of elements is + // usually small (most frequently 1). In the case of the WAL LogWriter, the + // number of blocks is bounded by the size of the WAL's corresponding + // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables, + // this works out to at most 2048 elements if the entirety of the memtable's + // contents are queued. pending := make([]*block, 0, cap(f.pending)) for { for { @@ -432,8 +432,7 @@ func (w *LogWriter) flushLoop(context.Context) { // Found work to do, so no longer idle. workStartTime := time.Now() idleDuration := workStartTime.Sub(idleStartTime) - pending = pending[:len(f.pending)] - copy(pending, f.pending) + pending = append(pending[:0], f.pending...) f.pending = f.pending[:0] f.metrics.PendingBufferLen.AddSample(int64(len(pending))) @@ -556,28 +555,18 @@ func (w *LogWriter) flushBlock(b *block) error { b.flushed = 0 w.free.Lock() w.free.blocks = append(w.free.blocks, b) - w.free.cond.Signal() w.free.Unlock() return nil } // queueBlock queues the current block for writing to the underlying writer, // allocates a new block and reserves space for the next header. -func (w *LogWriter) queueBlock() (waitDuration time.Duration) { +func (w *LogWriter) queueBlock() { // Allocate a new block, blocking until one is available. We do this first // because w.block is protected by w.flusher.Mutex. w.free.Lock() if len(w.free.blocks) == 0 { - if w.free.allocated < cap(w.free.blocks) { - w.free.allocated++ - w.free.blocks = append(w.free.blocks, &block{}) - } else { - now := time.Now() - for len(w.free.blocks) == 0 { - w.free.cond.Wait() - } - waitDuration = time.Since(now) - } + w.free.blocks = append(w.free.blocks, &block{}) } nextBlock := w.free.blocks[len(w.free.blocks)-1] w.free.blocks = w.free.blocks[:len(w.free.blocks)-1] @@ -592,28 +581,6 @@ func (w *LogWriter) queueBlock() (waitDuration time.Duration) { f.Unlock() w.blockNum++ - return waitDuration -} - -// ReserveAllFreeBlocksForTesting is used to only for testing. -func (w *LogWriter) ReserveAllFreeBlocksForTesting() (releaseFunc func()) { - w.free.Lock() - defer w.free.Unlock() - free := w.free.blocks - w.free.blocks = nil - return func() { - w.free.Lock() - defer w.free.Unlock() - // It is possible that someone has pushed a free block and w.free.blocks - // is no longer nil. That is harmless. Also, the waiter loops on the - // condition len(w.free.blocks) == 0, so to actually unblock it we need to - // give it a free block. - if len(free) == 0 { - free = append(free, &block{}) - } - w.free.blocks = free - w.free.cond.Broadcast() - } } // Close flushes and syncs any unwritten data and closes the writer. @@ -665,7 +632,7 @@ func (w *LogWriter) Close() error { // of the record. // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) WriteRecord(p []byte) (int64, error) { - logSize, _, err := w.SyncRecord(p, nil, nil) + logSize, err := w.SyncRecord(p, nil, nil) return logSize, err } @@ -676,9 +643,9 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) { // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) SyncRecord( p []byte, wg *sync.WaitGroup, err *error, -) (logSize int64, waitDuration time.Duration, err2 error) { +) (logSize int64, err2 error) { if w.err != nil { - return -1, 0, w.err + return -1, w.err } // The `i == 0` condition ensures we handle empty records. Such records can @@ -686,9 +653,7 @@ func (w *LogWriter) SyncRecord( // MANIFEST is currently written using Writer, it is good to support the same // semantics with LogWriter. for i := 0; i == 0 || len(p) > 0; i++ { - var wd time.Duration - p, wd = w.emitFragment(i, p) - waitDuration += wd + p = w.emitFragment(i, p) } if wg != nil { @@ -707,7 +672,7 @@ func (w *LogWriter) SyncRecord( // race with our read. That's ok because the only error we could be seeing is // one to syncing for which the caller can receive notification of by passing // in a non-nil err argument. - return offset, waitDuration, nil + return offset, nil } // Size returns the current size of the file. @@ -728,7 +693,7 @@ func (w *LogWriter) emitEOFTrailer() { b.written.Store(i + int32(recyclableHeaderSize)) } -func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDuration time.Duration) { +func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) { b := w.block i := b.written.Load() first := n == 0 @@ -762,9 +727,9 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDurati for i := b.written.Load(); i < blockSize; i++ { b.buf[i] = 0 } - waitDuration = w.queueBlock() + w.queueBlock() } - return p[r:], waitDuration + return p[r:] } // Metrics must be called after Close. The callee will no longer modify the diff --git a/record/log_writer_test.go b/record/log_writer_test.go index e17577b74e..9a70a624b3 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -6,6 +6,7 @@ package record import ( "bytes" + "fmt" "math" "sort" "sync" @@ -14,7 +15,10 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/vfs" + "github.com/cockroachdb/pebble/vfs/errorfs" + "github.com/cockroachdb/pebble/vfs/vfstest" "github.com/prometheus/client_golang/prometheus" prometheusgo "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -148,7 +152,7 @@ func TestSyncError(t *testing.T) { var syncErr error var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() if injectedErr != syncErr { @@ -186,7 +190,7 @@ func TestSyncRecord(t *testing.T) { for i := 0; i < 100000; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - offset, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + offset, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -214,7 +218,7 @@ func TestSyncRecordWithSignalChan(t *testing.T) { for i := 0; i < 5; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -273,7 +277,7 @@ func TestMinSyncInterval(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -344,7 +348,7 @@ func TestMinSyncIntervalClose(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -379,7 +383,7 @@ func TestMetricsWithoutSync(t *testing.T) { f := &syncFileWithWait{} f.writeWG.Add(1) w := NewLogWriter(f, 0, LogWriterConfig{WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) - offset, _, err := w.SyncRecord([]byte("hello"), nil, nil) + offset, err := w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) const recordSize = 16 require.EqualValues(t, recordSize, offset) @@ -388,7 +392,7 @@ func TestMetricsWithoutSync(t *testing.T) { // constitutes ~14 blocks (each 32KB). const numRecords = 28 << 10 for i := 0; i < numRecords; i++ { - _, _, err = w.SyncRecord([]byte("hello"), nil, nil) + _, err = w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) } // Unblock the flush loop. It will run once or twice to write these blocks, @@ -430,7 +434,7 @@ func TestMetricsWithSync(t *testing.T) { wg.Add(100) for i := 0; i < 100; i++ { var syncErr error - _, _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) require.NoError(t, err) } // Unblock the flush loop. It may have run once or twice for these writes, @@ -506,3 +510,74 @@ func valueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float return val } + +// TestQueueWALBlocks tests queueing many un-flushed WAL blocks when syncing is +// blocked. +func TestQueueWALBlocks(t *testing.T) { + blockWriteCh := make(chan struct{}, 1) + f := errorfs.WrapFile(vfstest.DiscardFile, errorfs.InjectorFunc(func(op errorfs.Op, path string) error { + if op == errorfs.OpFileWrite { + <-blockWriteCh + } + return nil + })) + w := NewLogWriter(f, 0, LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + }) + const numBlocks = 1024 + var b [blockSize]byte + var logSize int64 + for i := 0; i < numBlocks; i++ { + var err error + logSize, err = w.SyncRecord(b[:], nil, nil) + if err != nil { + t.Fatal(err) + } + } + close(blockWriteCh) + require.NoError(t, w.Close()) + + m := w.Metrics() + t.Logf("LogSize is %s", humanize.Bytes.Int64(logSize)) + t.Logf("Mean pending buffer len is %.2f", m.PendingBufferLen.Mean()) + require.GreaterOrEqual(t, logSize, int64(numBlocks*blockSize)) +} + +// BenchmarkQueueWALBlocks exercises queueing within the LogWriter. It can be +// useful to measure allocations involved when flushing is slow enough to +// accumulate a large backlog fo queued blocks. +func BenchmarkQueueWALBlocks(b *testing.B) { + const dataVolume = 64 << 20 /* 64 MB */ + for _, writeSize := range []int64{64, 512, 1024, 2048, 32768} { + b.Run(fmt.Sprintf("record-size=%s", humanize.Bytes.Int64(writeSize)), func(b *testing.B) { + record := make([]byte, writeSize) + numRecords := int(dataVolume / writeSize) + + for j := 0; j < b.N; j++ { + b.StopTimer() + blockWriteCh := make(chan struct{}, 1) + f := errorfs.WrapFile(vfstest.DiscardFile, errorfs.InjectorFunc(func(op errorfs.Op, path string) error { + if op == errorfs.OpFileWrite { + <-blockWriteCh + } + return nil + })) + w := NewLogWriter(f, 0, LogWriterConfig{ + WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}), + }) + + b.StartTimer() + for n := numRecords; n > 0; n-- { + if _, err := w.SyncRecord(record[:], nil, nil); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + + b.SetBytes(dataVolume) + close(blockWriteCh) + require.NoError(b, w.Close()) + } + }) + } +} diff --git a/vfs/vfstest/vfstest.go b/vfs/vfstest/vfstest.go new file mode 100644 index 0000000000..7ea3207df9 --- /dev/null +++ b/vfs/vfstest/vfstest.go @@ -0,0 +1,32 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// Package vfstest provides facilities for interacting with or faking +// filesystems during tests and benchmarks. +package vfstest + +import ( + "os" + + "github.com/cockroachdb/pebble/vfs" +) + +// DiscardFile implements vfs.File but discards all written data and reads +// without mutating input buffers. +var DiscardFile vfs.File = (*discardFile)(nil) + +type discardFile struct{} + +func (*discardFile) Close() error { return nil } +func (*discardFile) Read(p []byte) (int, error) { return len(p), nil } +func (*discardFile) ReadAt(p []byte, off int64) (int, error) { return len(p), nil } +func (*discardFile) Write(p []byte) (int, error) { return len(p), nil } +func (*discardFile) WriteAt(p []byte, ofs int64) (int, error) { return len(p), nil } +func (*discardFile) Preallocate(offset, length int64) error { return nil } +func (*discardFile) Stat() (os.FileInfo, error) { return nil, nil } +func (*discardFile) Sync() error { return nil } +func (*discardFile) SyncTo(length int64) (fullSync bool, err error) { return false, nil } +func (*discardFile) SyncData() error { return nil } +func (*discardFile) Prefetch(offset int64, length int64) error { return nil } +func (*discardFile) Fd() uintptr { return 0 }