diff --git a/internal/storage/append_batch.go b/internal/storage/append_batch.go new file mode 100644 index 000000000..9ec25d05b --- /dev/null +++ b/internal/storage/append_batch.go @@ -0,0 +1,71 @@ +package storage + +import ( + "sync" + + "github.com/cockroachdb/pebble" + + "github.com/kakao/varlog/pkg/types" +) + +var appendBatchPool = sync.Pool{ + New: func() interface{} { + return &AppendBatch{ + dk: make([]byte, dataKeyLength), + ck: make([]byte, commitKeyLength), + cc: make([]byte, commitContextLength), + } + }, +} + +// AppendBatch is a batch to put one or more log entries. +type AppendBatch struct { + batch *pebble.Batch + writeOpts *pebble.WriteOptions + dk []byte + ck []byte + cc []byte +} + +func newAppendBatch(batch *pebble.Batch, writeOpts *pebble.WriteOptions) *AppendBatch { + ab := appendBatchPool.Get().(*AppendBatch) + ab.batch = batch + ab.writeOpts = writeOpts + return ab +} + +func (ab *AppendBatch) release() { + ab.batch = nil + ab.writeOpts = nil + appendBatchPool.Put(ab) +} + +// SetLogEntry inserts a log entry. +func (ab *AppendBatch) SetLogEntry(llsn types.LLSN, glsn types.GLSN, data []byte) error { + dk := encodeDataKeyInternal(llsn, ab.dk) + ck := encodeCommitKeyInternal(glsn, ab.ck) + if err := ab.batch.Set(dk, data, nil); err != nil { + return err + } + if err := ab.batch.Set(ck, dk, nil); err != nil { + return err + } + return nil +} + +// SetCommitContext inserts a commit context. +func (ab *AppendBatch) SetCommitContext(cc CommitContext) error { + return ab.batch.Set(commitContextKey, encodeCommitContext(cc, ab.cc), nil) +} + +// Apply saves a batch of appended log entries to the storage. +func (ab *AppendBatch) Apply() error { + return ab.batch.Commit(ab.writeOpts) +} + +// Close releases an AppendBatch. +func (ab *AppendBatch) Close() error { + err := ab.batch.Close() + ab.release() + return err +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 744a11731..3d843fbeb 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -101,6 +101,12 @@ func (s *Storage) NewCommitBatch(cc CommitContext) (*CommitBatch, error) { return cb, nil } +// NewAppendBatch creates a batch for appending log entries. It does not put +// commit context. +func (s *Storage) NewAppendBatch() *AppendBatch { + return newAppendBatch(s.db.NewBatch(), s.writeOpts) +} + // NewScanner creates a scanner for the given key range. func (s *Storage) NewScanner(opts ...ScanOption) *Scanner { scanner := newScanner() diff --git a/internal/storage/testing.go b/internal/storage/testing.go index a76555934..33bc22f0e 100644 --- a/internal/storage/testing.go +++ b/internal/storage/testing.go @@ -31,25 +31,16 @@ func TestGetUnderlyingDB(tb testing.TB, stg *Storage) *pebble.DB { // TestAppendLogEntryWithoutCommitContext stores log entries without commit // context. func TestAppendLogEntryWithoutCommitContext(tb testing.TB, stg *Storage, llsn types.LLSN, glsn types.GLSN, data []byte) { - db := TestGetUnderlyingDB(tb, stg) - - ck := make([]byte, commitKeyLength) - ck = encodeCommitKeyInternal(glsn, ck) - - dk := make([]byte, dataKeyLength) - dk = encodeDataKeyInternal(llsn, dk) - - batch := db.NewBatch() - require.NoError(tb, batch.Set(dk, data, nil)) - require.NoError(tb, batch.Set(ck, dk, nil)) - require.NoError(tb, batch.Commit(pebble.Sync)) + batch := stg.NewAppendBatch() + require.NoError(tb, batch.SetLogEntry(llsn, glsn, data)) + require.NoError(tb, batch.Apply()) require.NoError(tb, batch.Close()) } // TestSetCommitContext stores only commit context. func TestSetCommitContext(tb testing.TB, stg *Storage, cc CommitContext) { - db := TestGetUnderlyingDB(tb, stg) - value := make([]byte, commitContextLength) - value = encodeCommitContext(cc, value) - require.NoError(tb, db.Set(commitContextKey, value, pebble.Sync)) + batch := stg.NewAppendBatch() + require.NoError(tb, batch.SetCommitContext(cc)) + require.NoError(tb, batch.Apply()) + require.NoError(tb, batch.Close()) }