Skip to content

Commit

Permalink
feat(storage): change the trim not to remove the commit context
Browse files Browse the repository at this point in the history
This patch changes the method `internal/storage.(*Storage).Trim` not to remove the commit context.
According to the RFC (i.e., [20220915_commit_context.md]((https://github.com/kakao/varlog/blob/main/docs/RFCs/20220915_commit_context.md)), the commit context is a pointer of the last commit message to help recover the log stream.
Thus, the trim that removes the log entries' prefix does not have to remove the commit context.
Moreover, removing a commit context while appending log entries is not trivial since Commit RPC can
change the commit context continuously.

Updates kakao#125
  • Loading branch information
ijsong committed Sep 29, 2022
1 parent afa6fa4 commit 1bc52c4
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 118 deletions.
43 changes: 15 additions & 28 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ func (s *Storage) NextCommitContextOf(cc CommitContext) (next CommitContext, err
return decodeCommitContextKey(it.Key()), nil
}

// Trim deletes log entries whose GLSNs are less than or equal to the argument glsn.
// It returns the ErrNoCommitContext if there are no logs to delete.
// Trim deletes log entries whose GLSNs are less than or equal to the argument
// glsn. Internally, it removes records for both data and commits but does not
// remove the commit context.
// It returns the ErrNoLogEntry if there are no logs to delete.
func (s *Storage) Trim(glsn types.GLSN) error {
lem, err := s.findLTE(glsn)
if err != nil {
Expand All @@ -227,50 +229,35 @@ func (s *Storage) Trim(glsn types.GLSN) error {
_ = batch.Close()
}()

// commit context
cc, err := s.CommitContextOf(trimGLSN)
if err != nil {
return err
}
cckBegin := make([]byte, commitContextKeyLength)
cckBegin = encodeCommitContextKeyInternal(CommitContext{}, cckBegin)
cckEnd := make([]byte, commitContextKeyLength)
cckEnd = encodeCommitContextKeyInternal(cc, cckEnd)
if cc.CommittedGLSNEnd-1 == trimGLSN {
// cc is deletable.
cc.Version++
cckEnd = encodeCommitContextKeyInternal(cc, cckEnd)
}
if err := batch.DeleteRange(cckBegin, cckEnd, nil); err != nil {
return err
}

// commit
ckBegin := make([]byte, commitKeyLength)
ckBegin = encodeCommitKeyInternal(types.MinGLSN, ckBegin)
ckEnd := make([]byte, commitKeyLength)
ckEnd = encodeCommitKeyInternal(trimGLSN+1, ckEnd)
if err := batch.DeleteRange(ckBegin, ckEnd, nil); err != nil {
return err
}
_ = batch.DeleteRange(ckBegin, ckEnd, nil)

// data
dkBegin := make([]byte, dataKeyLength)
dkBegin = encodeDataKeyInternal(types.MinLLSN, dkBegin)
dkEnd := make([]byte, dataKeyLength)
dkEnd = encodeDataKeyInternal(trimLLSN+1, dkEnd)
if err := batch.DeleteRange(dkBegin, dkEnd, nil); err != nil {
return err
}
_ = batch.DeleteRange(dkBegin, dkEnd, nil)

return batch.Commit(s.writeOpts)
}

func (s *Storage) findLTE(glsn types.GLSN) (lem varlogpb.LogEntryMeta, err error) {
ck := make([]byte, commitKeyLength)
var upper []byte
if glsn < types.MaxGLSN {
upper = make([]byte, commitKeyLength)
upper = encodeCommitKeyInternal(glsn+1, upper)
} else {
upper = []byte{commitKeySentinelPrefix}
}

it := s.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: encodeCommitKeyInternal(glsn+1, ck),
UpperBound: upper,
})
defer func() {
_ = it.Close()
Expand Down
244 changes: 154 additions & 90 deletions internal/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,99 +718,163 @@ func TestStorage_CommitContextOf(t *testing.T) {
})
}

func TestStorage_Trim(t *testing.T) {
testStorage(t, func(t testing.TB, stg *Storage) {
err := stg.Trim(1)
assert.ErrorIs(t, err, ErrNoLogEntry)

// CC : +-1-+
// LLSN: 1 2 3
// GLSN: 1 2 3
wb := stg.NewWriteBatch()
assert.NoError(t, wb.Set(1, nil))
assert.NoError(t, wb.Set(2, nil))
assert.NoError(t, wb.Set(3, nil))
assert.NoError(t, wb.Apply())
assert.NoError(t, wb.Close())
func TestStorage_TrimWhenNoLogEntry(t *testing.T) {
tcs := []struct {
name string
testf func(t testing.TB, stg *Storage)
}{
{
name: "MinGLSN",
testf: func(t testing.TB, stg *Storage) {
err := stg.Trim(types.MinGLSN)
assert.ErrorIs(t, err, ErrNoLogEntry)
},
},
{
name: "MaxGLSN",
testf: func(t testing.TB, stg *Storage) {
err := stg.Trim(types.MaxGLSN)
assert.ErrorIs(t, err, ErrNoLogEntry)
},
},
{
name: "InvalidGLSN",
testf: func(t testing.TB, stg *Storage) {
err := stg.Trim(types.InvalidGLSN)
assert.ErrorIs(t, err, ErrNoLogEntry)
},
},
}

cb, err := stg.NewCommitBatch(CommitContext{
Version: 1,
HighWatermark: 5,
CommittedGLSNBegin: 1,
CommittedGLSNEnd: 4,
CommittedLLSNBegin: 1,
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
testStorage(t, func(t testing.TB, stg *Storage) {
tc.testf(t, stg)
})
})
assert.NoError(t, err)
assert.NoError(t, cb.Set(1, 1))
assert.NoError(t, cb.Set(2, 2))
assert.NoError(t, cb.Set(3, 3))
assert.NoError(t, cb.Apply())
assert.NoError(t, cb.Close())
}
}

// CC : +-1-+
// LLSN: _ 2 3
// GLSN: _ 2 3
err = stg.Trim(1)
assert.NoError(t, err)
it := stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{dataKeyPrefix},
UpperBound: []byte{dataKeySentinelPrefix},
})
assert.True(t, it.First())
assert.Equal(t, types.LLSN(2), decodeDataKey(it.Key()))
assert.True(t, it.Next())
assert.Equal(t, types.LLSN(3), decodeDataKey(it.Key()))
assert.NoError(t, it.Close())
it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: []byte{commitKeySentinelPrefix},
})
assert.True(t, it.First())
assert.Equal(t, types.GLSN(2), decodeCommitKey(it.Key()))
assert.True(t, it.Next())
assert.Equal(t, types.GLSN(3), decodeCommitKey(it.Key()))
assert.NoError(t, it.Close())
it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitContextKeyPrefix},
UpperBound: []byte{commitContextKeySentinelPrefix},
})
assert.True(t, it.First())
assert.Equal(t, CommitContext{
Version: 1,
HighWatermark: 5,
CommittedGLSNBegin: 1,
CommittedGLSNEnd: 4,
CommittedLLSNBegin: 1,
}, decodeCommitContextKey(it.Key()))
assert.NoError(t, it.Close())
func TestStorage_Trim(t *testing.T) {
expectedCC := CommitContext{
Version: 1,
HighWatermark: 3,
CommittedGLSNBegin: 1,
CommittedGLSNEnd: 4,
CommittedLLSNBegin: 1,
}

// CC : +-_-+
// LLSN: _ _ _
// GLSN: _ _ _
err = stg.Trim(3)
assert.NoError(t, err)
it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{dataKeyPrefix},
UpperBound: []byte{dataKeySentinelPrefix},
})
assert.False(t, it.First())
assert.NoError(t, it.Close())
it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: []byte{commitKeySentinelPrefix},
})
assert.False(t, it.First())
assert.NoError(t, it.Close())
it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitContextKeyPrefix},
UpperBound: []byte{commitContextKeySentinelPrefix},
})
assert.False(t, it.First())
assert.NoError(t, it.Close())
tcs := []struct {
name string
testf func(t testing.TB, stg *Storage)
}{
{
name: "MinGLSN",
testf: func(t testing.TB, stg *Storage) {
err := stg.Trim(types.MinGLSN)
assert.NoError(t, err)

it := stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{dataKeyPrefix},
UpperBound: []byte{dataKeySentinelPrefix},
})
assert.True(t, it.First())
assert.Equal(t, types.LLSN(2), decodeDataKey(it.Key()))
assert.True(t, it.Next())
assert.Equal(t, types.LLSN(3), decodeDataKey(it.Key()))
assert.NoError(t, it.Close())

it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: []byte{commitKeySentinelPrefix},
})
assert.True(t, it.First())
assert.Equal(t, types.GLSN(2), decodeCommitKey(it.Key()))
assert.True(t, it.Next())
assert.Equal(t, types.GLSN(3), decodeCommitKey(it.Key()))
assert.NoError(t, it.Close())

cc, err := stg.ReadCommitContext()
assert.NoError(t, err)
assert.Equal(t, expectedCC, cc)
},
},
{
name: "LastCommittedGLSN",
testf: func(t testing.TB, stg *Storage) {
err := stg.Trim(3)
assert.NoError(t, err)
assert.NoError(t, err)

it := stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{dataKeyPrefix},
UpperBound: []byte{dataKeySentinelPrefix},
})
assert.False(t, it.First())
assert.NoError(t, it.Close())

it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: []byte{commitKeySentinelPrefix},
})
assert.False(t, it.First())
assert.NoError(t, it.Close())

cc, err := stg.ReadCommitContext()
assert.NoError(t, err)
assert.Equal(t, expectedCC, cc)
},
},
{
name: "MaxGLSN",
testf: func(t testing.TB, stg *Storage) {
err := stg.Trim(types.MaxGLSN)
assert.NoError(t, err)

it := stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{dataKeyPrefix},
UpperBound: []byte{dataKeySentinelPrefix},
})
assert.False(t, it.First())
assert.NoError(t, it.Close())

it = stg.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitKeyPrefix},
UpperBound: []byte{commitKeySentinelPrefix},
})
assert.False(t, it.First())
assert.NoError(t, it.Close())

cc, err := stg.ReadCommitContext()
assert.NoError(t, err)
assert.Equal(t, expectedCC, cc)
},
},
}

for glsn := types.MinGLSN; glsn < types.GLSN(5); glsn++ {
err = stg.Trim(glsn)
assert.ErrorIs(t, err, ErrNoLogEntry)
}
})
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
testStorage(t, func(t testing.TB, stg *Storage) {
// CC : +---+
// LLSN: 1 2 3
// GLSN: 1 2 3
wb := stg.NewWriteBatch()
assert.NoError(t, wb.Set(1, nil))
assert.NoError(t, wb.Set(2, nil))
assert.NoError(t, wb.Set(3, nil))
assert.NoError(t, wb.Apply())
assert.NoError(t, wb.Close())

cb, err := stg.NewCommitBatch(expectedCC)
assert.NoError(t, err)
assert.NoError(t, cb.Set(1, 1))
assert.NoError(t, cb.Set(2, 2))
assert.NoError(t, cb.Set(3, 3))
assert.NoError(t, cb.Apply())
assert.NoError(t, cb.Close())

tc.testf(t, stg)
})
})
}
}

0 comments on commit 1bc52c4

Please sign in to comment.