diff --git a/batch.go b/batch.go index 20dd566003..f08e3ce659 100644 --- a/batch.go +++ b/batch.go @@ -318,9 +318,6 @@ type BatchCommitStats struct { // SemaphoreWaitDuration is the wait time for semaphores in // commitPipeline.Commit. SemaphoreWaitDuration time.Duration - // WALQueueWaitDuration is the wait time for allocating memory blocks in the - // LogWriter (due to the LogWriter not writing fast enough). - WALQueueWaitDuration time.Duration // MemTableWriteStallDuration is the wait caused by a write stall due to too // many memtables (due to not flushing fast enough). MemTableWriteStallDuration time.Duration diff --git a/batch_test.go b/batch_test.go index 5a0d73dc7d..c0b1fe0030 100644 --- a/batch_test.go +++ b/batch_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/batchskl" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/testkeys" - "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -1233,28 +1232,6 @@ func TestBatchCommitStats(t *testing.T) { } } - // WAL queue stall funcs. - // - // The LogWriter gets changed when stalling/unstalling the memtable, so we - // need to use a hook to tell us about the latest LogWriter. - var unstallWALQueue func() - stallWALQueue := func() { - var unstallLatestWALQueue func() - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = func(w *record.LogWriter) { - // db.mu will be held when this is called. - unstallLatestWALQueue = w.ReserveAllFreeBlocksForTesting() - } - db.mu.log.registerLogWriterForTesting(db.mu.log.LogWriter) - unstallWALQueue = func() { - db.mu.Lock() - defer db.mu.Unlock() - db.mu.log.registerLogWriterForTesting = nil - unstallLatestWALQueue() - } - } - // Commit wait stall funcs. var unstallCommitWait func() stallCommitWait := func() { @@ -1268,12 +1245,9 @@ func TestBatchCommitStats(t *testing.T) { stallCommitSemaphore() stallMemtable() stallL0ReadAmp() - stallWALQueue() stallCommitWait() // Exceed initialMemTableSize -- this is needed to make stallMemtable work. - // It also exceeds record.blockSize, requiring a new block to be allocated, - // which is what we need for stallWALQueue to work. require.NoError(t, b.Set(make([]byte, initialMemTableSize), nil, nil)) var commitWG sync.WaitGroup @@ -1291,8 +1265,6 @@ func TestBatchCommitStats(t *testing.T) { time.Sleep(sleepDuration) unstallL0ReadAmp() time.Sleep(sleepDuration) - unstallWALQueue() - time.Sleep(sleepDuration) unstallCommitWait() // Wait for Apply to return. @@ -1303,10 +1275,6 @@ func TestBatchCommitStats(t *testing.T) { return errors.Errorf("SemaphoreWaitDuration %s is too low", stats.SemaphoreWaitDuration.String()) } - if expectedDuration > stats.WALQueueWaitDuration { - return errors.Errorf("WALQueueWaitDuration %s is too low", - stats.WALQueueWaitDuration.String()) - } if expectedDuration > stats.MemTableWriteStallDuration { return errors.Errorf("MemTableWriteStallDuration %s is too low", stats.MemTableWriteStallDuration.String()) diff --git a/commit_test.go b/commit_test.go index 0e1ad4ac3c..ee627a113f 100644 --- a/commit_test.go +++ b/commit_test.go @@ -245,7 +245,7 @@ func TestCommitPipelineWALClose(t *testing.T) { return nil }, write: func(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) { - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return nil, err }, } @@ -316,7 +316,7 @@ func BenchmarkCommitPipeline(b *testing.B) { break } - _, _, err := wal.SyncRecord(b.data, syncWG, syncErr) + _, err := wal.SyncRecord(b.data, syncWG, syncErr) return mem, err }, } diff --git a/db.go b/db.go index 4b4a6aa3c4..dd9c4ed8fa 100644 --- a/db.go +++ b/db.go @@ -909,7 +909,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem b.flushable.setSeqNum(b.SeqNum()) if !d.opts.DisableWAL { var err error - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } @@ -947,7 +947,7 @@ func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*mem } if b.flushable == nil { - size, b.commitStats.WALQueueWaitDuration, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) + size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr) if err != nil { panic(err) } diff --git a/record/log_writer.go b/record/log_writer.go index 90f8c2ef34..7cc7a83063 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -266,10 +266,7 @@ type LogWriter struct { block *block free struct { sync.Mutex - // Condition variable used to signal a block is freed. - cond sync.Cond - blocks []*block - allocated int + blocks []*block } flusher struct { @@ -313,9 +310,10 @@ type LogWriterConfig struct { QueueSemChan chan struct{} } -// CapAllocatedBlocks is the maximum number of blocks allocated by the -// LogWriter. -const CapAllocatedBlocks = 16 +// initialAllocatedBlocksCap is the initial capacity of the various slices +// intended to hold LogWriter blocks. The LogWriter may allocate more blocks +// than this threshold allows. +const initialAllocatedBlocksCap = 32 // NewLogWriter returns a new LogWriter. func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter { @@ -335,9 +333,7 @@ func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterCon }, queueSemChan: logWriterConfig.QueueSemChan, } - r.free.cond.L = &r.free.Mutex - r.free.blocks = make([]*block, 0, CapAllocatedBlocks) - r.free.allocated = 1 + r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap) r.block = &block{} r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ) r.flusher.closed = make(chan struct{}) @@ -405,8 +401,12 @@ func (w *LogWriter) flushLoop(context.Context) { // the flush work (w.block.written.Load()). // The list of full blocks that need to be written. This is copied from - // f.pending on every loop iteration, though the number of elements is small - // (usually 1, max 16). + // f.pending on every loop iteration, though the number of elements is + // usually small (most frequently 1). In the case of the WAL LogWriter, the + // number of blocks is bounded by the size of the WAL's corresponding + // memtable (MemtableSize/BlockSize). With the default 64 MiB memtables, + // this works out to at most 2048 elements if the entirety of the memtable's + // contents are queued. pending := make([]*block, 0, cap(f.pending)) for { for { @@ -432,8 +432,7 @@ func (w *LogWriter) flushLoop(context.Context) { // Found work to do, so no longer idle. workStartTime := time.Now() idleDuration := workStartTime.Sub(idleStartTime) - pending = pending[:len(f.pending)] - copy(pending, f.pending) + pending = append(pending[:0], f.pending...) f.pending = f.pending[:0] f.metrics.PendingBufferLen.AddSample(int64(len(pending))) @@ -556,28 +555,18 @@ func (w *LogWriter) flushBlock(b *block) error { b.flushed = 0 w.free.Lock() w.free.blocks = append(w.free.blocks, b) - w.free.cond.Signal() w.free.Unlock() return nil } // queueBlock queues the current block for writing to the underlying writer, // allocates a new block and reserves space for the next header. -func (w *LogWriter) queueBlock() (waitDuration time.Duration) { +func (w *LogWriter) queueBlock() { // Allocate a new block, blocking until one is available. We do this first // because w.block is protected by w.flusher.Mutex. w.free.Lock() if len(w.free.blocks) == 0 { - if w.free.allocated < cap(w.free.blocks) { - w.free.allocated++ - w.free.blocks = append(w.free.blocks, &block{}) - } else { - now := time.Now() - for len(w.free.blocks) == 0 { - w.free.cond.Wait() - } - waitDuration = time.Since(now) - } + w.free.blocks = append(w.free.blocks, &block{}) } nextBlock := w.free.blocks[len(w.free.blocks)-1] w.free.blocks = w.free.blocks[:len(w.free.blocks)-1] @@ -592,28 +581,6 @@ func (w *LogWriter) queueBlock() (waitDuration time.Duration) { f.Unlock() w.blockNum++ - return waitDuration -} - -// ReserveAllFreeBlocksForTesting is used to only for testing. -func (w *LogWriter) ReserveAllFreeBlocksForTesting() (releaseFunc func()) { - w.free.Lock() - defer w.free.Unlock() - free := w.free.blocks - w.free.blocks = nil - return func() { - w.free.Lock() - defer w.free.Unlock() - // It is possible that someone has pushed a free block and w.free.blocks - // is no longer nil. That is harmless. Also, the waiter loops on the - // condition len(w.free.blocks) == 0, so to actually unblock it we need to - // give it a free block. - if len(free) == 0 { - free = append(free, &block{}) - } - w.free.blocks = free - w.free.cond.Broadcast() - } } // Close flushes and syncs any unwritten data and closes the writer. @@ -665,7 +632,7 @@ func (w *LogWriter) Close() error { // of the record. // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) WriteRecord(p []byte) (int64, error) { - logSize, _, err := w.SyncRecord(p, nil, nil) + logSize, err := w.SyncRecord(p, nil, nil) return logSize, err } @@ -676,9 +643,9 @@ func (w *LogWriter) WriteRecord(p []byte) (int64, error) { // External synchronisation provided by commitPipeline.mu. func (w *LogWriter) SyncRecord( p []byte, wg *sync.WaitGroup, err *error, -) (logSize int64, waitDuration time.Duration, err2 error) { +) (logSize int64, err2 error) { if w.err != nil { - return -1, 0, w.err + return -1, w.err } // The `i == 0` condition ensures we handle empty records. Such records can @@ -686,9 +653,7 @@ func (w *LogWriter) SyncRecord( // MANIFEST is currently written using Writer, it is good to support the same // semantics with LogWriter. for i := 0; i == 0 || len(p) > 0; i++ { - var wd time.Duration - p, wd = w.emitFragment(i, p) - waitDuration += wd + p = w.emitFragment(i, p) } if wg != nil { @@ -707,7 +672,7 @@ func (w *LogWriter) SyncRecord( // race with our read. That's ok because the only error we could be seeing is // one to syncing for which the caller can receive notification of by passing // in a non-nil err argument. - return offset, waitDuration, nil + return offset, nil } // Size returns the current size of the file. @@ -728,7 +693,7 @@ func (w *LogWriter) emitEOFTrailer() { b.written.Store(i + int32(recyclableHeaderSize)) } -func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDuration time.Duration) { +func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) { b := w.block i := b.written.Load() first := n == 0 @@ -762,9 +727,9 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte, waitDurati for i := b.written.Load(); i < blockSize; i++ { b.buf[i] = 0 } - waitDuration = w.queueBlock() + w.queueBlock() } - return p[r:], waitDuration + return p[r:] } // Metrics must be called after Close. The callee will no longer modify the diff --git a/record/log_writer_test.go b/record/log_writer_test.go index e17577b74e..09a9c03349 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -148,7 +148,7 @@ func TestSyncError(t *testing.T) { var syncErr error var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err = w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() if injectedErr != syncErr { @@ -186,7 +186,7 @@ func TestSyncRecord(t *testing.T) { for i := 0; i < 100000; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - offset, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + offset, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -214,7 +214,7 @@ func TestSyncRecordWithSignalChan(t *testing.T) { for i := 0; i < 5; i++ { var syncWG sync.WaitGroup syncWG.Add(1) - _, _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &syncWG, &syncErr) require.NoError(t, err) syncWG.Wait() require.NoError(t, syncErr) @@ -273,7 +273,7 @@ func TestMinSyncInterval(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -344,7 +344,7 @@ func TestMinSyncIntervalClose(t *testing.T) { syncRecord := func(n int) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) - _, _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) + _, err := w.SyncRecord(bytes.Repeat([]byte{'a'}, n), wg, new(error)) require.NoError(t, err) return wg } @@ -379,7 +379,7 @@ func TestMetricsWithoutSync(t *testing.T) { f := &syncFileWithWait{} f.writeWG.Add(1) w := NewLogWriter(f, 0, LogWriterConfig{WALFsyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{})}) - offset, _, err := w.SyncRecord([]byte("hello"), nil, nil) + offset, err := w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) const recordSize = 16 require.EqualValues(t, recordSize, offset) @@ -388,7 +388,7 @@ func TestMetricsWithoutSync(t *testing.T) { // constitutes ~14 blocks (each 32KB). const numRecords = 28 << 10 for i := 0; i < numRecords; i++ { - _, _, err = w.SyncRecord([]byte("hello"), nil, nil) + _, err = w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) } // Unblock the flush loop. It will run once or twice to write these blocks, @@ -430,7 +430,7 @@ func TestMetricsWithSync(t *testing.T) { wg.Add(100) for i := 0; i < 100; i++ { var syncErr error - _, _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) + _, err := w.SyncRecord([]byte("hello"), &wg, &syncErr) require.NoError(t, err) } // Unblock the flush loop. It may have run once or twice for these writes,