Skip to content

Commit

Permalink
db: add DB.ApplyNoSyncWait for asynchronous apply
Browse files Browse the repository at this point in the history
ApplyNoSyncWait must only be used when WriteOptions.Sync is true. It enqueues
the Batch to the WAL, adds to the memtable, and waits until the batch is
visible in the memtable, and then returns to the caller. The caller is
responsible for calling Batch.SyncWait to wait until the write to the
WAL is fsynced.

This change required splitting the WaitGroup in the Batch into two
WaitGroups, so waiting for the visibility can happen separately from
waiting for the WAL write. Additionally, the channel used as a semaphore
for reserving space in the two lock-free queues is split into two channels,
since dequeueing from these queues can happen in arbitrary order.

Benchmarks indicate that the overhead of pushing and popping on an extra
channel is tolerable. Benchmarks were run on a macbook pro -- note these are
not doing an actual sync since they use io.Discard, and are only benchmarking
the commit pipeline.

Sync wait on master (old) vs this branch (new):

name                                               old time/op    new time/op    delta
CommitPipeline/no-sync-wait=false/parallel=1-10      1.09µs ± 6%    1.15µs ± 9%    ~     (p=0.310 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=2-10      1.53µs ± 4%    1.54µs ± 2%    ~     (p=0.841 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=4-10      1.54µs ± 1%    1.59µs ± 1%  +2.87%  (p=0.008 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=8-10      1.52µs ± 1%    1.55µs ± 1%  +2.43%  (p=0.008 n=5+5)

name                                               old speed      new speed      delta
CommitPipeline/no-sync-wait=false/parallel=1-10    14.7MB/s ± 5%  13.9MB/s ±10%    ~     (p=0.310 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=2-10    10.5MB/s ± 4%  10.4MB/s ± 2%    ~     (p=0.841 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=4-10    10.4MB/s ± 1%  10.1MB/s ± 1%  -2.78%  (p=0.008 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=8-10    10.5MB/s ± 1%  10.3MB/s ± 1%  -2.35%  (p=0.008 n=5+5)

name                                               old alloc/op   new alloc/op   delta
CommitPipeline/no-sync-wait=false/parallel=1-10      1.37kB ± 0%    1.40kB ± 0%  +2.31%  (p=0.008 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=2-10      1.37kB ± 0%    1.40kB ± 0%  +2.31%  (p=0.008 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=4-10      1.37kB ± 0%    1.40kB ± 0%  +2.15%  (p=0.008 n=5+5)
CommitPipeline/no-sync-wait=false/parallel=8-10      1.37kB ± 0%    1.40kB ± 0%  +2.34%  (p=0.008 n=5+5)

name                                               old allocs/op  new allocs/op  delta
CommitPipeline/no-sync-wait=false/parallel=1-10        2.00 ± 0%      2.00 ± 0%    ~     (all equal)
CommitPipeline/no-sync-wait=false/parallel=2-10        2.00 ± 0%      2.00 ± 0%    ~     (all equal)
CommitPipeline/no-sync-wait=false/parallel=4-10        2.00 ± 0%      2.00 ± 0%    ~     (all equal)
CommitPipeline/no-sync-wait=false/parallel=8-10        2.00 ± 0%      2.00 ± 0%    ~     (all equal)

Sync wait on this branch (old) vs async wait on this branch (new):

name                            old time/op    new time/op    delta
CommitPipeline/parallel=1-10      1.15µs ± 9%    1.20µs ± 7%     ~     (p=0.421 n=5+5)
CommitPipeline/parallel=2-10      1.54µs ± 2%    1.59µs ± 2%   +3.50%  (p=0.008 n=5+5)
CommitPipeline/parallel=4-10      1.59µs ± 1%    1.58µs ± 1%     ~     (p=0.802 n=5+5)
CommitPipeline/parallel=8-10      1.55µs ± 1%    1.56µs ± 1%     ~     (p=0.452 n=5+5)

name                            old speed      new speed      delta
CommitPipeline/parallel=1-10    13.9MB/s ±10%  13.3MB/s ± 7%     ~     (p=0.421 n=5+5)
CommitPipeline/parallel=2-10    10.4MB/s ± 2%  10.1MB/s ± 2%   -3.36%  (p=0.008 n=5+5)
CommitPipeline/parallel=4-10    10.1MB/s ± 1%  10.1MB/s ± 1%     ~     (p=0.786 n=5+5)
CommitPipeline/parallel=8-10    10.3MB/s ± 1%  10.3MB/s ± 1%     ~     (p=0.452 n=5+5)

name                            old alloc/op   new alloc/op   delta
CommitPipeline/parallel=1-10      1.40kB ± 0%    1.40kB ± 0%     ~     (p=0.651 n=5+5)
CommitPipeline/parallel=2-10      1.40kB ± 0%    1.39kB ± 0%   -0.21%  (p=0.008 n=5+5)
CommitPipeline/parallel=4-10      1.40kB ± 0%    1.40kB ± 0%     ~     (p=0.706 n=5+5)
CommitPipeline/parallel=8-10      1.40kB ± 0%    1.40kB ± 0%     ~     (p=0.587 n=5+5)

name                            old allocs/op  new allocs/op  delta
CommitPipeline/parallel=1-10        2.00 ± 0%      2.00 ± 0%     ~     (all equal)
CommitPipeline/parallel=2-10        2.00 ± 0%      2.00 ± 0%     ~     (all equal)
CommitPipeline/parallel=4-10        2.00 ± 0%      2.00 ± 0%     ~     (all equal)
CommitPipeline/parallel=8-10        2.00 ± 0%      2.00 ± 0%     ~     (all equal)

Informs cockroachdb/cockroach#17500

See discussion thread cockroachdb/cockroach#87050 (review)
  • Loading branch information
sumeerbhola committed Jan 9, 2023
1 parent 65ff304 commit 6d20eb4
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 88 deletions.
25 changes: 25 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,22 @@ type Batch struct {
// memtable.
flushable *flushableBatch

// Synchronous Apply uses the commit WaitGroup for both publishing the
// seqnum and waiting for the WAL fsync (if needed). Asynchronous
// ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
// WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
// waiting for the WAL fsync.
//
// TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with
// SyncWait is causing higher memory usage because of the time duration
// between when the sync is already done, and a goroutine calls SyncWait
// (followed by Batch.Close), we could separate out {fsyncWait, commitErr}
// into a separate struct that is allocated separately (using another
// sync.Pool), and only that struct needs to outlive Batch.Close (which
// could then be called immediately after ApplyNoSncWait).
commit sync.WaitGroup
fsyncWait sync.WaitGroup

commitErr error
applied uint32 // updated atomically
}
Expand Down Expand Up @@ -1089,6 +1104,7 @@ func (b *Batch) Reset() {
b.rangeKeysSeqNum = 0
b.flushable = nil
b.commit = sync.WaitGroup{}
b.fsyncWait = sync.WaitGroup{}
b.commitErr = nil
atomic.StoreUint32(&b.applied, 0)
if b.data != nil {
Expand Down Expand Up @@ -1205,6 +1221,15 @@ func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
return data[v:], data[:v], true
}

// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
func (b *Batch) SyncWait() error {
b.fsyncWait.Wait()
if b.commitErr != nil {
b.db = nil // prevent batch reuse on error
}
return b.commitErr
}

// BatchReader iterates over the entries contained in a batch.
type BatchReader []byte

Expand Down
27 changes: 27 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,32 @@ func TestBatchEmpty(t *testing.T) {
require.NoError(t, iter2.Close())
}

func TestBatchApplyNoSyncWait(t *testing.T) {
db, err := Open("", &Options{
FS: vfs.NewMem(),
})
require.NoError(t, err)
defer db.Close()
var batches []*Batch
options := &WriteOptions{Sync: true}
for i := 0; i < 10000; i++ {
b := db.NewBatch()
str := fmt.Sprintf("a%d", i)
require.NoError(t, b.Set([]byte(str), []byte(str), nil))
require.NoError(t, db.ApplyNoSyncWait(b, options))
// k-v pair is visible even if not yet synced.
val, closer, err := db.Get([]byte(str))
require.NoError(t, err)
require.Equal(t, str, string(val))
closer.Close()
batches = append(batches, b)
}
for _, b := range batches {
require.NoError(t, b.SyncWait())
b.Close()
}
}

func TestBatchReset(t *testing.T) {
db, err := Open("", &Options{
FS: vfs.NewMem(),
Expand All @@ -244,6 +270,7 @@ func TestBatchReset(t *testing.T) {
b.applied = 1
b.commitErr = errors.New("test-error")
b.commit.Add(1)
b.fsyncWait.Add(1)
require.Equal(t, uint32(3), b.Count())
require.Equal(t, uint64(1), b.countRangeDels)
require.Equal(t, uint64(1), b.countRangeKeys)
Expand Down
98 changes: 76 additions & 22 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (q *commitQueue) enqueue(b *Batch) {
ptrs := atomic.LoadUint64(&q.headTail)
head, tail := q.unpack(ptrs)
if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
// Queue is full. This should never be reached because commitPipeline.sem
// Queue is full. This should never be reached because commitPipeline.commitQueueSem
// limits the number of concurrent operations.
panic("pebble: not reached")
}
Expand Down Expand Up @@ -217,7 +217,27 @@ type commitPipeline struct {
// Queue of pending batches to commit.
pending commitQueue
env commitEnv
sem chan struct{}
// The commit path has two queues:
// - commitPipeline.pending contains batches whose seqnums have not yet been
// published. It is a lock-free single producer multi consumer queue.
// - LogWriter.flusher.syncQ contains state for batches that have asked for
// a sync. It is a lock-free single producer single consumer queue.
// These lock-free queues have a fixed capacity. And since they are
// lock-free, we cannot do blocking waits when pushing onto these queues, in
// case they are full. Additionally, adding to these queues happens while
// holding commitPipeline.mu, and we don't want to block while holding that
// mutex since it is also needed by other code.
//
// Popping from these queues is independent and for a particular batch can
// occur in either order, though it is more common that popping from the
// commitPipeline.pending will happen first.
//
// Due to these constraints, we reserve a unit of space in each queue before
// acquiring commitPipeline.mu, which also ensures that the push operation
// is guaranteed to have space in the queue. The commitQueueSem and
// logSyncQSem are used for this reservation.
commitQueueSem chan struct{}
logSyncQSem chan struct{}
// The mutex to use for synchronizing access to logSeqNum and serializing
// calls to commitEnv.write().
mu sync.Mutex
Expand All @@ -226,51 +246,82 @@ type commitPipeline struct {
func newCommitPipeline(env commitEnv) *commitPipeline {
p := &commitPipeline{
env: env,
// The capacity of both commitQueue.slots and syncQueue.slots is set to
// record.SyncConcurrency, which also determines the value of these
// semaphores. We used to have a single semaphore, which required that the
// capacity of these queues be the same. Now that we have two semaphores,
// the capacity of these queues could be changed to be different. Say half
// of the batches asked to be synced, but syncing took 5x the latency of
// adding to the memtable and publishing. Then syncQueue.slots could be
// sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
// that LogWriterMetrics.SyncQueueLen has high utilization under some
// workloads.
//
// NB: the commit concurrency is one less than SyncConcurrency because we
// have to allow one "slot" for a concurrent WAL rotation which will close
// and sync the WAL.
sem: make(chan struct{}, record.SyncConcurrency-1),
commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
logSyncQSem: make(chan struct{}, record.SyncConcurrency-1),
}
return p
}

// Commit the specified batch, writing it to the WAL, optionally syncing the
// WAL, and applying the batch to the memtable. Upon successful return the
// batch's mutations will be visible for reading.
func (p *commitPipeline) Commit(b *Batch, syncWAL bool) error {
// REQUIRES: noSyncWait => syncWAL
func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
if b.Empty() {
return nil
}

p.sem <- struct{}{}
// Acquire semaphores.
p.commitQueueSem <- struct{}{}
if syncWAL {
p.logSyncQSem <- struct{}{}
}

// Prepare the batch for committing: enqueuing the batch in the pending
// queue, determining the batch sequence number and writing the data to the
// WAL.
//
// NB: We set Batch.commitErr on error so that the batch won't be a candidate
// for reuse. See Batch.release().
mem, err := p.prepare(b, syncWAL)
mem, err := p.prepare(b, syncWAL, noSyncWait)
if err != nil {
b.db = nil // prevent batch reuse on error
// NB: we are not doing <-p.commitQueueSem since the batch is still
// sitting in the pending queue. We should consider fixing this by also
// removing the batch from the pending queue.
return err
}

// Apply the batch to the memtable.
if err := p.env.apply(b, mem); err != nil {
b.db = nil // prevent batch reuse on error
// NB: we are not doing <-p.commitQueueSem since the batch is still
// sitting in the pending queue. We should consider fixing this by also
// removing the batch from the pending queue.
return err
}

// Publish the batch sequence number.
p.publish(b)

<-p.sem
<-p.commitQueueSem

if b.commitErr != nil {
b.db = nil // prevent batch reuse on error
if !noSyncWait {
// Already waited for commit, so look at the error.
if b.commitErr != nil {
b.db = nil // prevent batch reuse on error
err = b.commitErr
}
}
return b.commitErr
// Else noSyncWait. The LogWriter can be concurrently writing to
// b.commitErr. We will read b.commitErr in Batch.SyncWait after the
// LogWriter is done writing.

return err
}

// AllocateSeqNum allocates count sequence numbers, invokes the prepare
Expand All @@ -294,7 +345,7 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se
b.setCount(uint32(count))
b.commit.Add(1)

p.sem <- struct{}{}
p.commitQueueSem <- struct{}{}

p.mu.Lock()

Expand Down Expand Up @@ -341,27 +392,30 @@ func (p *commitPipeline) AllocateSeqNum(count int, prepare func(), apply func(se
// Publish the sequence number.
p.publish(b)

<-p.sem
<-p.commitQueueSem
}

func (p *commitPipeline) prepare(b *Batch, syncWAL bool) (*memTable, error) {
func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
n := uint64(b.Count())
if n == invalidBatchCount {
return nil, ErrInvalidBatch
}
count := 1
if syncWAL {
count++
}
// count represents the waiting needed for publish, and optionally the
// waiting needed for the WAL sync.
b.commit.Add(count)

var syncWG *sync.WaitGroup
var syncErr *error
commitCount := 1
if syncWAL {
syncWG, syncErr = &b.commit, &b.commitErr
syncErr = &b.commitErr
if noSyncWait {
syncWG = &b.fsyncWait
b.fsyncWait.Add(1)
} else {
syncWG = &b.commit
commitCount++
}
}
// commitCount represents the waiting needed for publish, and optionally the
// waiting needed for the WAL sync.
b.commit.Add(commitCount)

p.mu.Lock()

Expand Down
Loading

0 comments on commit 6d20eb4

Please sign in to comment.