From 1bc52c434b5e220dde2168f9981fc3a04b1d4718 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Tue, 27 Sep 2022 16:58:44 +0900 Subject: [PATCH] feat(storage): change the trim not to remove the commit context This patch changes the method `internal/storage.(*Storage).Trim` not to remove the commit context. According to the RFC (i.e., [20220915_commit_context.md]((https://github.com/kakao/varlog/blob/main/docs/RFCs/20220915_commit_context.md)), the commit context is a pointer of the last commit message to help recover the log stream. Thus, the trim that removes the log entries' prefix does not have to remove the commit context. Moreover, removing a commit context while appending log entries is not trivial since Commit RPC can change the commit context continuously. Updates #125 --- internal/storage/storage.go | 43 ++---- internal/storage/storage_test.go | 244 +++++++++++++++++++------------ 2 files changed, 169 insertions(+), 118 deletions(-) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 94ac0efc1..6f8573254 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -212,8 +212,10 @@ func (s *Storage) NextCommitContextOf(cc CommitContext) (next CommitContext, err return decodeCommitContextKey(it.Key()), nil } -// Trim deletes log entries whose GLSNs are less than or equal to the argument glsn. -// It returns the ErrNoCommitContext if there are no logs to delete. +// Trim deletes log entries whose GLSNs are less than or equal to the argument +// glsn. Internally, it removes records for both data and commits but does not +// remove the commit context. +// It returns the ErrNoLogEntry if there are no logs to delete. func (s *Storage) Trim(glsn types.GLSN) error { lem, err := s.findLTE(glsn) if err != nil { @@ -227,50 +229,35 @@ func (s *Storage) Trim(glsn types.GLSN) error { _ = batch.Close() }() - // commit context - cc, err := s.CommitContextOf(trimGLSN) - if err != nil { - return err - } - cckBegin := make([]byte, commitContextKeyLength) - cckBegin = encodeCommitContextKeyInternal(CommitContext{}, cckBegin) - cckEnd := make([]byte, commitContextKeyLength) - cckEnd = encodeCommitContextKeyInternal(cc, cckEnd) - if cc.CommittedGLSNEnd-1 == trimGLSN { - // cc is deletable. - cc.Version++ - cckEnd = encodeCommitContextKeyInternal(cc, cckEnd) - } - if err := batch.DeleteRange(cckBegin, cckEnd, nil); err != nil { - return err - } - // commit ckBegin := make([]byte, commitKeyLength) ckBegin = encodeCommitKeyInternal(types.MinGLSN, ckBegin) ckEnd := make([]byte, commitKeyLength) ckEnd = encodeCommitKeyInternal(trimGLSN+1, ckEnd) - if err := batch.DeleteRange(ckBegin, ckEnd, nil); err != nil { - return err - } + _ = batch.DeleteRange(ckBegin, ckEnd, nil) // data dkBegin := make([]byte, dataKeyLength) dkBegin = encodeDataKeyInternal(types.MinLLSN, dkBegin) dkEnd := make([]byte, dataKeyLength) dkEnd = encodeDataKeyInternal(trimLLSN+1, dkEnd) - if err := batch.DeleteRange(dkBegin, dkEnd, nil); err != nil { - return err - } + _ = batch.DeleteRange(dkBegin, dkEnd, nil) return batch.Commit(s.writeOpts) } func (s *Storage) findLTE(glsn types.GLSN) (lem varlogpb.LogEntryMeta, err error) { - ck := make([]byte, commitKeyLength) + var upper []byte + if glsn < types.MaxGLSN { + upper = make([]byte, commitKeyLength) + upper = encodeCommitKeyInternal(glsn+1, upper) + } else { + upper = []byte{commitKeySentinelPrefix} + } + it := s.db.NewIter(&pebble.IterOptions{ LowerBound: []byte{commitKeyPrefix}, - UpperBound: encodeCommitKeyInternal(glsn+1, ck), + UpperBound: upper, }) defer func() { _ = it.Close() diff --git a/internal/storage/storage_test.go b/internal/storage/storage_test.go index 142ce4024..0e1aa17ac 100644 --- a/internal/storage/storage_test.go +++ b/internal/storage/storage_test.go @@ -718,99 +718,163 @@ func TestStorage_CommitContextOf(t *testing.T) { }) } -func TestStorage_Trim(t *testing.T) { - testStorage(t, func(t testing.TB, stg *Storage) { - err := stg.Trim(1) - assert.ErrorIs(t, err, ErrNoLogEntry) - - // CC : +-1-+ - // LLSN: 1 2 3 - // GLSN: 1 2 3 - wb := stg.NewWriteBatch() - assert.NoError(t, wb.Set(1, nil)) - assert.NoError(t, wb.Set(2, nil)) - assert.NoError(t, wb.Set(3, nil)) - assert.NoError(t, wb.Apply()) - assert.NoError(t, wb.Close()) +func TestStorage_TrimWhenNoLogEntry(t *testing.T) { + tcs := []struct { + name string + testf func(t testing.TB, stg *Storage) + }{ + { + name: "MinGLSN", + testf: func(t testing.TB, stg *Storage) { + err := stg.Trim(types.MinGLSN) + assert.ErrorIs(t, err, ErrNoLogEntry) + }, + }, + { + name: "MaxGLSN", + testf: func(t testing.TB, stg *Storage) { + err := stg.Trim(types.MaxGLSN) + assert.ErrorIs(t, err, ErrNoLogEntry) + }, + }, + { + name: "InvalidGLSN", + testf: func(t testing.TB, stg *Storage) { + err := stg.Trim(types.InvalidGLSN) + assert.ErrorIs(t, err, ErrNoLogEntry) + }, + }, + } - cb, err := stg.NewCommitBatch(CommitContext{ - Version: 1, - HighWatermark: 5, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 4, - CommittedLLSNBegin: 1, + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + testStorage(t, func(t testing.TB, stg *Storage) { + tc.testf(t, stg) + }) }) - assert.NoError(t, err) - assert.NoError(t, cb.Set(1, 1)) - assert.NoError(t, cb.Set(2, 2)) - assert.NoError(t, cb.Set(3, 3)) - assert.NoError(t, cb.Apply()) - assert.NoError(t, cb.Close()) + } +} - // CC : +-1-+ - // LLSN: _ 2 3 - // GLSN: _ 2 3 - err = stg.Trim(1) - assert.NoError(t, err) - it := stg.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{dataKeyPrefix}, - UpperBound: []byte{dataKeySentinelPrefix}, - }) - assert.True(t, it.First()) - assert.Equal(t, types.LLSN(2), decodeDataKey(it.Key())) - assert.True(t, it.Next()) - assert.Equal(t, types.LLSN(3), decodeDataKey(it.Key())) - assert.NoError(t, it.Close()) - it = stg.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{commitKeyPrefix}, - UpperBound: []byte{commitKeySentinelPrefix}, - }) - assert.True(t, it.First()) - assert.Equal(t, types.GLSN(2), decodeCommitKey(it.Key())) - assert.True(t, it.Next()) - assert.Equal(t, types.GLSN(3), decodeCommitKey(it.Key())) - assert.NoError(t, it.Close()) - it = stg.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{commitContextKeyPrefix}, - UpperBound: []byte{commitContextKeySentinelPrefix}, - }) - assert.True(t, it.First()) - assert.Equal(t, CommitContext{ - Version: 1, - HighWatermark: 5, - CommittedGLSNBegin: 1, - CommittedGLSNEnd: 4, - CommittedLLSNBegin: 1, - }, decodeCommitContextKey(it.Key())) - assert.NoError(t, it.Close()) +func TestStorage_Trim(t *testing.T) { + expectedCC := CommitContext{ + Version: 1, + HighWatermark: 3, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 1, + } - // CC : +-_-+ - // LLSN: _ _ _ - // GLSN: _ _ _ - err = stg.Trim(3) - assert.NoError(t, err) - it = stg.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{dataKeyPrefix}, - UpperBound: []byte{dataKeySentinelPrefix}, - }) - assert.False(t, it.First()) - assert.NoError(t, it.Close()) - it = stg.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{commitKeyPrefix}, - UpperBound: []byte{commitKeySentinelPrefix}, - }) - assert.False(t, it.First()) - assert.NoError(t, it.Close()) - it = stg.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte{commitContextKeyPrefix}, - UpperBound: []byte{commitContextKeySentinelPrefix}, - }) - assert.False(t, it.First()) - assert.NoError(t, it.Close()) + tcs := []struct { + name string + testf func(t testing.TB, stg *Storage) + }{ + { + name: "MinGLSN", + testf: func(t testing.TB, stg *Storage) { + err := stg.Trim(types.MinGLSN) + assert.NoError(t, err) + + it := stg.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte{dataKeyPrefix}, + UpperBound: []byte{dataKeySentinelPrefix}, + }) + assert.True(t, it.First()) + assert.Equal(t, types.LLSN(2), decodeDataKey(it.Key())) + assert.True(t, it.Next()) + assert.Equal(t, types.LLSN(3), decodeDataKey(it.Key())) + assert.NoError(t, it.Close()) + + it = stg.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte{commitKeyPrefix}, + UpperBound: []byte{commitKeySentinelPrefix}, + }) + assert.True(t, it.First()) + assert.Equal(t, types.GLSN(2), decodeCommitKey(it.Key())) + assert.True(t, it.Next()) + assert.Equal(t, types.GLSN(3), decodeCommitKey(it.Key())) + assert.NoError(t, it.Close()) + + cc, err := stg.ReadCommitContext() + assert.NoError(t, err) + assert.Equal(t, expectedCC, cc) + }, + }, + { + name: "LastCommittedGLSN", + testf: func(t testing.TB, stg *Storage) { + err := stg.Trim(3) + assert.NoError(t, err) + assert.NoError(t, err) + + it := stg.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte{dataKeyPrefix}, + UpperBound: []byte{dataKeySentinelPrefix}, + }) + assert.False(t, it.First()) + assert.NoError(t, it.Close()) + + it = stg.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte{commitKeyPrefix}, + UpperBound: []byte{commitKeySentinelPrefix}, + }) + assert.False(t, it.First()) + assert.NoError(t, it.Close()) + + cc, err := stg.ReadCommitContext() + assert.NoError(t, err) + assert.Equal(t, expectedCC, cc) + }, + }, + { + name: "MaxGLSN", + testf: func(t testing.TB, stg *Storage) { + err := stg.Trim(types.MaxGLSN) + assert.NoError(t, err) + + it := stg.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte{dataKeyPrefix}, + UpperBound: []byte{dataKeySentinelPrefix}, + }) + assert.False(t, it.First()) + assert.NoError(t, it.Close()) + + it = stg.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte{commitKeyPrefix}, + UpperBound: []byte{commitKeySentinelPrefix}, + }) + assert.False(t, it.First()) + assert.NoError(t, it.Close()) + + cc, err := stg.ReadCommitContext() + assert.NoError(t, err) + assert.Equal(t, expectedCC, cc) + }, + }, + } - for glsn := types.MinGLSN; glsn < types.GLSN(5); glsn++ { - err = stg.Trim(glsn) - assert.ErrorIs(t, err, ErrNoLogEntry) - } - }) + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + testStorage(t, func(t testing.TB, stg *Storage) { + // CC : +---+ + // LLSN: 1 2 3 + // GLSN: 1 2 3 + wb := stg.NewWriteBatch() + assert.NoError(t, wb.Set(1, nil)) + assert.NoError(t, wb.Set(2, nil)) + assert.NoError(t, wb.Set(3, nil)) + assert.NoError(t, wb.Apply()) + assert.NoError(t, wb.Close()) + + cb, err := stg.NewCommitBatch(expectedCC) + assert.NoError(t, err) + assert.NoError(t, cb.Set(1, 1)) + assert.NoError(t, cb.Set(2, 2)) + assert.NoError(t, cb.Set(3, 3)) + assert.NoError(t, cb.Apply()) + assert.NoError(t, cb.Close()) + + tc.testf(t, stg) + }) + }) + } }