Skip to content

Commit

Permalink
feat(storage): introduce append batch
Browse files Browse the repository at this point in the history
There were already two kinds of batches in storage:

- The write batch saves only data keyed by the LLSN.
- The commit batch saves only the commit context and GLSN, and the commit context must exist.

This patch introduces a new batch type - append batch. The append b catch saves both log entries and
a commit context. However, it is not necessary to keep the commit context. During synchronization,
the append batch must store log entries from a destination replica.

Updates kakao#125
  • Loading branch information
ijsong committed Oct 13, 2022
1 parent 7b67ba2 commit d7d5667
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 16 deletions.
71 changes: 71 additions & 0 deletions internal/storage/append_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package storage

import (
"sync"

"github.com/cockroachdb/pebble"

"github.com/kakao/varlog/pkg/types"
)

var appendBatchPool = sync.Pool{
New: func() interface{} {
return &AppendBatch{
dk: make([]byte, dataKeyLength),
ck: make([]byte, commitKeyLength),
cc: make([]byte, commitContextLength),
}
},
}

// AppendBatch is a batch to put one or more log entries.
type AppendBatch struct {
batch *pebble.Batch
writeOpts *pebble.WriteOptions
dk []byte
ck []byte
cc []byte
}

func newAppendBatch(batch *pebble.Batch, writeOpts *pebble.WriteOptions) *AppendBatch {
ab := appendBatchPool.Get().(*AppendBatch)
ab.batch = batch
ab.writeOpts = writeOpts
return ab
}

func (ab *AppendBatch) release() {
ab.batch = nil
ab.writeOpts = nil
appendBatchPool.Put(ab)
}

// SetLogEntry inserts a log entry.
func (ab *AppendBatch) SetLogEntry(llsn types.LLSN, glsn types.GLSN, data []byte) error {
dk := encodeDataKeyInternal(llsn, ab.dk)
ck := encodeCommitKeyInternal(glsn, ab.ck)
if err := ab.batch.Set(dk, data, nil); err != nil {
return err
}
if err := ab.batch.Set(ck, dk, nil); err != nil {
return err
}
return nil
}

// SetCommitContext inserts a commit context.
func (ab *AppendBatch) SetCommitContext(cc CommitContext) error {
return ab.batch.Set(commitContextKey, encodeCommitContext(cc, ab.cc), nil)
}

// Apply saves a batch of appended log entries to the storage.
func (ab *AppendBatch) Apply() error {
return ab.batch.Commit(ab.writeOpts)
}

// Close releases an AppendBatch.
func (ab *AppendBatch) Close() error {
err := ab.batch.Close()
ab.release()
return err
}
6 changes: 6 additions & 0 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func (s *Storage) NewCommitBatch(cc CommitContext) (*CommitBatch, error) {
return cb, nil
}

// NewAppendBatch creates a batch for appending log entries. It does not put
// commit context.
func (s *Storage) NewAppendBatch() *AppendBatch {
return newAppendBatch(s.db.NewBatch(), s.writeOpts)
}

// NewScanner creates a scanner for the given key range.
func (s *Storage) NewScanner(opts ...ScanOption) *Scanner {
scanner := newScanner()
Expand Down
23 changes: 7 additions & 16 deletions internal/storage/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,16 @@ func TestGetUnderlyingDB(tb testing.TB, stg *Storage) *pebble.DB {
// TestAppendLogEntryWithoutCommitContext stores log entries without commit
// context.
func TestAppendLogEntryWithoutCommitContext(tb testing.TB, stg *Storage, llsn types.LLSN, glsn types.GLSN, data []byte) {
db := TestGetUnderlyingDB(tb, stg)

ck := make([]byte, commitKeyLength)
ck = encodeCommitKeyInternal(glsn, ck)

dk := make([]byte, dataKeyLength)
dk = encodeDataKeyInternal(llsn, dk)

batch := db.NewBatch()
require.NoError(tb, batch.Set(dk, data, nil))
require.NoError(tb, batch.Set(ck, dk, nil))
require.NoError(tb, batch.Commit(pebble.Sync))
batch := stg.NewAppendBatch()
require.NoError(tb, batch.SetLogEntry(llsn, glsn, data))
require.NoError(tb, batch.Apply())
require.NoError(tb, batch.Close())
}

// TestSetCommitContext stores only commit context.
func TestSetCommitContext(tb testing.TB, stg *Storage, cc CommitContext) {
db := TestGetUnderlyingDB(tb, stg)
value := make([]byte, commitContextLength)
value = encodeCommitContext(cc, value)
require.NoError(tb, db.Set(commitContextKey, value, pebble.Sync))
batch := stg.NewAppendBatch()
require.NoError(tb, batch.SetCommitContext(cc))
require.NoError(tb, batch.Apply())
require.NoError(tb, batch.Close())
}

0 comments on commit d7d5667

Please sign in to comment.