Skip to content

Commit

Permalink
[DNM] db: add DB.ApplyNoSyncWait for asynchronous apply
Browse files Browse the repository at this point in the history
ApplyNoSyncWait must only be used when WriteOptions.Sync is true. It enqueues
the Batch to the WAL, adds to the memtable, and waits until the batch is
visible in the memtable, and then returns to the caller. The caller is
responsible for calling Batch.SyncWait to wait until the write to the
WAL is fsynced.

This change required splitting the WaitGroup in the Batch into two
WaitGroups, so waiting for the visibility can happen separately from
waiting for the WAL write. Additionally, the channel used as a semaphore
for reserving space in the two lock-free queues is split into two channels,
since dequeueing from these queues can happen in arbitrary order.
There may be some performance overhead of pushing and popping from two
channels instead of one.

Informs cockroachdb/cockroach#17500

See discussion thread cockroachdb/cockroach#87050 (review)
  • Loading branch information
sumeerbhola committed Nov 9, 2022
1 parent 7b30bd8 commit df9ce5f
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 30 deletions.
16 changes: 16 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,14 @@ type Batch struct {
// memtable.
flushable *flushableBatch

// Synchronous Apply uses the commit WaitGroup for both publishing the
// seqnum and waiting for the WAL fsync (if needed). Asynchronous
// ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
// WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
// waiting for the WAL fsync.
commit sync.WaitGroup
fsyncWait sync.WaitGroup

commitErr error
applied uint32 // updated atomically
}
Expand Down Expand Up @@ -1205,6 +1212,15 @@ func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
return data[v:], data[:v], true
}

// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
func (b *Batch) SyncWait() error {
b.fsyncWait.Wait()
if b.commitErr != nil {
b.db = nil // prevent batch reuse on error
}
return b.commitErr
}

// BatchReader iterates over the entries contained in a batch.
type BatchReader []byte

Expand Down
25 changes: 25 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,31 @@ func TestBatchEmpty(t *testing.T) {
require.NoError(t, iter2.Close())
}

func TestBatchApplyNoSyncWait(t *testing.T) {
db, err := Open("", &Options{
FS: vfs.NewMem(),
})
require.NoError(t, err)
defer db.Close()
var batches []*Batch
options := &WriteOptions{Sync: true}
for i := 0; i < 10000; i++ {
b := db.NewBatch()
str := fmt.Sprintf("a%d", i)
require.NoError(t, b.Set([]byte(str), []byte(str), nil))
require.NoError(t, db.ApplyNoSyncWait(b, options))
val, closer, err := db.Get([]byte(str))
require.NoError(t, err)
require.Equal(t, str, string(val))
closer.Close()
batches = append(batches, b)
}
for _, b := range batches {
require.NoError(t, b.SyncWait())
b.Close()
}
}

func TestBatchReset(t *testing.T) {
db, err := Open("", &Options{
FS: vfs.NewMem(),
Expand Down
77 changes: 58 additions & 19 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (q *commitQueue) enqueue(b *Batch) {
ptrs := atomic.LoadUint64(&q.headTail)
head, tail := q.unpack(ptrs)
if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
// Queue is full. This should never be reached because commitPipeline.sem
// Queue is full. This should never be reached because commitPipeline.commitQueueSem
// limits the number of concurrent operations.
panic("pebble: not reached")
}
Expand Down Expand Up @@ -217,7 +217,27 @@ type commitPipeline struct {
// Queue of pending batches to commit.
pending commitQueue
env commitEnv
sem chan struct{}
// The commit path has two queues:
// - commitPipeline.pending contains batches whose seqnums have not yet been
// published. It is a lock-free single producer multi consumer queue.
// - LogWriter.flusher.syncQ contains state for batches that have asked for
// a sync. It is a lock-free single producer single consumer queue.
// These lock-free queues have a fixed capacity. And since they are
// lock-free, we cannot do blocking waits when pushing onto these queues,
// when case they are full. Additionally, adding to these queues happens
// while holding commitPipeline.mu, and we don't want to block while holding
// that mutex since it is also needed by other code.
//
// Popping from these queues is independent and for a particular batch can
// occur in either order, though it is more common that popping from the
// commitPipeline.pending will happen first.
//
// Due to these constraints, we reserve a unit of space in each queue before
// acquiring commitPipeline.mu, which also ensures that the push operation
// is guaranteed to have space in the queue. The commitQueueSem and
// logSyncQSem are used for this reservation.
commitQueueSem chan struct{}
logSyncQSem chan struct{}
// The mutex to use for synchronizing access to logSeqNum and serializing
// calls to commitEnv.write().
mu sync.Mutex
Expand All @@ -226,33 +246,49 @@ type commitPipeline struct {
func newCommitPipeline(env commitEnv) *commitPipeline {
p := &commitPipeline{
env: env,
// TODO(sumeer): stale comment. Now that we have two semaphores, the
// capacity of these queues can be different. Say half of the batches
// asked to be synced, but syncing took 5x the latency of adding to the
// memtable and publishing. Then the LogWriter.flusher.syncQ could be
// sized as 0.5*5 of the commitPipeling.pending queue. So we could size
// these based on typical experimental results.
//
// NB: the commit concurrency is one less than SyncConcurrency because we
// have to allow one "slot" for a concurrent WAL rotation which will close
// and sync the WAL.
sem: make(chan struct{}, record.SyncConcurrency-1),
commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
logSyncQSem: make(chan struct{}, record.SyncConcurrency-1),
}
return p
}

// Commit the specified batch, writing it to the WAL, optionally syncing the
// WAL, and applying the batch to the memtable. Upon successful return the
// batch's mutations will be visible for reading.
func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error {
// REQUIRES: noSyncWait => syncWAL
func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
if b.Empty() {
return nil
}

p.sem <- struct{}{}
// TODO(sumeer): will block here if SyncConcurrency is insufficient. Monitor
// and increase if needed.
p.commitQueueSem <- struct{}{}
if syncWAL {
p.logSyncQSem <- struct{}{}
}

// Prepare the batch for committing: enqueuing the batch in the pending
// queue, determining the batch sequence number and writing the data to the
// WAL.
//
// NB: We set Batch.commitErr on error so that the batch won't be a candidate
// for reuse. See Batch.release().
mem, err := p.prepare(b, syncWAL)
mem, err := p.prepare(b, syncWAL, noSyncWait)
if err != nil {
b.db = nil // prevent batch reuse on error
// NB: we are not doing <-p.commitQueueSem since the batch is still
// sitting in the pending queue. Should we fix this?
return err
}

Expand All @@ -265,7 +301,7 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error {
// Publish the batch sequence number.
p.publish(b)

<-p.sem
<-p.commitQueueSem

if b.commitErr != nil {
b.db = nil // prevent batch reuse on error
Expand Down Expand Up @@ -294,7 +330,7 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se
b.setCount(uint32(count))
b.commit.Add(1)

p.sem <- struct{}{}
p.commitQueueSem <- struct{}{}

p.mu.Lock()

Expand Down Expand Up @@ -341,27 +377,30 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se
// Publish the sequence number.
p.publish(b)

<-p.sem
<-p.commitQueueSem
}

func (p *commitPipeline) prepare(b *Batch, syncWAL bool) (*memTable, error) {
func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
n := uint64(b.Count())
if n == invalidBatchCount {
return nil, ErrInvalidBatch
}
count := 1
if syncWAL {
count++
}
// count represents the waiting needed for publish, and optionally the
// waiting needed for the WAL sync.
b.commit.Add(count)

var syncWG *sync.WaitGroup
var syncErr *error
commitCount := 1
if syncWAL {
syncWG, syncErr = &b.commit, &b.commitErr
syncErr = &b.commitErr
if noSyncWait {
syncWG = &b.fsyncWait
b.fsyncWait.Add(1)
} else {
syncWG = &b.commit
commitCount++
}
}
// commitCount represents the waiting needed for publish, and optionally the
// waiting needed for the WAL sync.
b.commit.Add(commitCount)

p.mu.Lock()

Expand Down
47 changes: 42 additions & 5 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type testCommitEnv struct {
sync.Mutex
buf []uint64
}
queueSemChan chan struct{}
}

func (e *testCommitEnv) env() commitEnv {
Expand All @@ -49,10 +50,14 @@ func (e *testCommitEnv) apply(b *Batch, mem *memTable) error {
return nil
}

func (e *testCommitEnv) write(b *Batch, _ *sync.WaitGroup, _ *error) (*memTable, error) {
func (e *testCommitEnv) write(b *Batch, wg *sync.WaitGroup, _ *error) (*memTable, error) {
n := int64(len(b.data))
atomic.AddInt64(&e.writePos, n)
atomic.AddUint64(&e.writeCount, 1)
if wg != nil {
wg.Done()
<-e.queueSemChan
}
return nil, nil
}

Expand Down Expand Up @@ -100,7 +105,7 @@ func TestCommitPipeline(t *testing.T) {
defer wg.Done()
var b Batch
_ = b.Set([]byte(fmt.Sprint(i)), nil, nil)
_ = p.Commit(&b, false)
_ = p.Commit(&b, false, false)
}(i)
}
wg.Wait()
Expand All @@ -120,6 +125,37 @@ func TestCommitPipeline(t *testing.T) {
}
}

func TestCommitPipelineSync(t *testing.T) {
var e testCommitEnv
p := newCommitPipeline(e.env())
e.queueSemChan = p.logSyncQSem

n := 10000
if invariants.RaceEnabled {
// Under race builds we have to limit the concurrency or we hit the
// following error:
//
// race: limit on 8128 simultaneously alive goroutines is exceeded, dying
n = 1000
}

for _, noSyncWait := range []bool{false, true} {
t.Run(fmt.Sprintf("no-sync-wait=%t", noSyncWait), func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
var b Batch
_ = b.Set([]byte(fmt.Sprint(i)), nil, nil)
_ = p.Commit(&b, true, noSyncWait)
}(i)
}
wg.Wait()
})
}
}

func TestCommitPipelineAllocateSeqNum(t *testing.T) {
var e testCommitEnv
p := newCommitPipeline(e.env())
Expand Down Expand Up @@ -203,11 +239,12 @@ func TestCommitPipelineWALClose(t *testing.T) {
},
}
p := newCommitPipeline(testEnv)
wal.QueueSemChan = p.logSyncQSem

// Launch N (commitConcurrency) goroutines which each create a batch and
// commit it with sync==true. Because of the syncDelayFile, none of these
// operations can complete until syncDelayFile.done is closed.
errCh := make(chan error, cap(p.sem))
errCh := make(chan error, cap(p.commitQueueSem))
walDone.Add(cap(errCh))
for i := 0; i < cap(errCh); i++ {
go func(i int) {
Expand All @@ -216,7 +253,7 @@ func TestCommitPipelineWALClose(t *testing.T) {
errCh <- err
return
}
errCh <- p.Commit(b, true /* sync */)
errCh <- p.Commit(b, true /* sync */, false)
}(i)
}

Expand Down Expand Up @@ -284,7 +321,7 @@ func BenchmarkCommitPipeline(b *testing.B) {
batch := newBatch(nil)
binary.BigEndian.PutUint64(buf, rng.Uint64())
batch.Set(buf, buf, nil)
if err := p.Commit(batch, true /* sync */); err != nil {
if err := p.Commit(batch, true /* sync */, false); err != nil {
b.Fatal(err)
}
batch.release()
Expand Down
24 changes: 23 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,27 @@ func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
//
// It is safe to modify the contents of the arguments after Apply returns.
func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
return d.applyInternal(batch, opts, false)
}

// ApplyNoSyncWait must only be used when opts.Sync is true and the caller
// does not want to wait for the WAL fsync to happen. The method will return
// once the mutation is applied to the memtable and is visible (note that a
// mutation is visible before the WAL sync even in the wait case, so we have
// not weakened the durability semantics). The caller must call Batch.SyncWait
// to wait for the WAL fsync. The caller must not Close the batch without
// first calling Batch.SyncWait.
// RECOMMENDATION: Prefer using Apply unless you really understand why you
// need ApplyNoSyncWait.
func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
if !opts.Sync {
return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
}
return d.applyInternal(batch, opts, true)
}

// REQUIRES: noSyncWait => opts.Sync
func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand Down Expand Up @@ -762,7 +783,7 @@ func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
if int(batch.memTableSize) >= d.largeBatchThreshold {
batch.flushable = newFlushableBatch(batch, d.opts.Comparer)
}
if err := d.commit.Commit(batch, sync); err != nil {
if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
// There isn't much we can do on an error here. The commit pipeline will be
// horked at this point.
d.opts.Logger.Fatalf("%v", err)
Expand Down Expand Up @@ -1986,6 +2007,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
WALMinSyncInterval: d.opts.WALMinSyncInterval,
})
d.mu.log.LogWriter.QueueSemChan = d.commit.logSyncQSem
}

immMem := d.mu.mem.mutable
Expand Down
1 change: 1 addition & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
}
d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig)
d.mu.log.LogWriter.QueueSemChan = d.commit.logSyncQSem
d.mu.versions.metrics.WAL.Files++
}
d.updateReadStateLocked(d.opts.DebugCheck)
Expand Down
Loading

0 comments on commit df9ce5f

Please sign in to comment.