Skip to content

Commit

Permalink
Merge pull request kakao#162 from ijsong/change-commit-context
Browse files Browse the repository at this point in the history
feat(storagenode): store only the latest commit context for every commit
  • Loading branch information
ijsong authored Sep 27, 2022
2 parents c5f2412 + ecf3a12 commit afa6fa4
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/storage/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var commitBatchPool = sync.Pool{
New: func() interface{} {
return &CommitBatch{
cck: make([]byte, commitContextKeyLength),
cc: make([]byte, commitContextLength),
ck: make([]byte, commitKeyLength),
dk: make([]byte, dataKeyLength),
}
Expand All @@ -22,6 +23,7 @@ type CommitBatch struct {
batch *pebble.Batch
writeOpts *pebble.WriteOptions
cck []byte
cc []byte
ck []byte
dk []byte
}
Expand Down
73 changes: 73 additions & 0 deletions internal/storage/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"encoding/binary"
"unsafe"

"github.com/kakao/varlog/pkg/types"
)
Expand All @@ -15,11 +16,17 @@ const (
commitKeySentinelPrefix = byte('d')
commitKeyLength = 9 // prefix(1) + GLSN(8)

// Deprecated: Log stream replica will not store a sequence of commit contexts.
commitContextKeyPrefix = byte('x')
commitContextKeySentinelPrefix = byte('y')
commitContextKeyLength = 41 // prefix(1) + CommitContext(40)

commitContextKeyMarker = byte('b')
commitContextLength = 40
)

var commitContextKey = []byte{commitContextKeyMarker}

func encodeDataKeyInternal(llsn types.LLSN, key []byte) []byte {
key[0] = dataKeyPrefix
binary.BigEndian.PutUint64(key[1:], uint64(llsn))
Expand Down Expand Up @@ -92,3 +99,69 @@ func decodeCommitContextKey(k []byte) (cc CommitContext) {

return cc
}

// encodeCommitContext serializes commit context into byte slice.
func encodeCommitContext(cc CommitContext, key []byte) []byte {
sz := types.GLSNLen
offset := 0
binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.HighWatermark))

offset += sz
binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.CommittedGLSNBegin))

offset += sz
binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.CommittedGLSNEnd))

offset += sz
binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.CommittedLLSNBegin))

offset += sz
sz = types.VersionLen
binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.Version))

return key
}

// decodeCommitContext deserializes a commit context from a byte slice.
func decodeCommitContext(k []byte) (cc CommitContext) {
if len(k) != commitContextLength {
panic("storage: invalid key type")
}
sz := types.GLSNLen
offset := 0
cc.HighWatermark = types.GLSN(binary.BigEndian.Uint64(k[offset : offset+sz]))

offset += sz
cc.CommittedGLSNBegin = types.GLSN(binary.BigEndian.Uint64(k[offset : offset+sz]))

offset += sz
cc.CommittedGLSNEnd = types.GLSN(binary.BigEndian.Uint64(k[offset : offset+sz]))

offset += sz
cc.CommittedLLSNBegin = types.LLSN(binary.BigEndian.Uint64(k[offset : offset+sz]))

offset += sz
sz = types.VersionLen
cc.Version = types.Version(binary.BigEndian.Uint64(k[offset : offset+sz]))

return cc
}

// encodeCommitContextUnsafe is similar to encodeCommitContext except that it
// shares same memory with the argument cc.
//
// Experimental: It casts struct CommitContext to byte slice; hence its byte
// representation differs across architectures. A user should call
// decodeCommitContextUnsafe to decode bytes encoded by this function.
func encodeCommitContextUnsafe(cc *CommitContext) []byte {
return (*(*[commitContextLength]byte)(unsafe.Pointer(cc)))[:]
}

// decodeCommitContextUnsafe is similar to decodeCommitContext.
//
// Experimental: A user has to use this function to decode a byte slice encoded
// by encodeCommitContextUnsafe.
func decodeCommitContextUnsafe(buf []byte) (cc CommitContext) {
cc = *(*CommitContext)(unsafe.Pointer(&buf[0]))
return cc
}
100 changes: 100 additions & 0 deletions internal/storage/encode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package storage

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestEncodeCommitContextUnsafe(t *testing.T) {
expected := CommitContext{
Version: 1,
HighWatermark: 2,
CommittedGLSNBegin: 3,
CommittedGLSNEnd: 4,
CommittedLLSNBegin: 5,
}
buf := encodeCommitContextUnsafe(&expected)
actual := decodeCommitContextUnsafe(buf)
require.Equal(t, expected, actual)
}

func BenchmarkCommitContext_Encode(b *testing.B) {
tcs := []struct {
name string
benchf func(*testing.B, CommitContext)
}{
{
name: "Safe",
benchf: func(b *testing.B, cc CommitContext) {
key := make([]byte, commitContextKeyLength)
for i := 0; i < b.N; i++ {
_ = encodeCommitContext(cc, key)
}
},
},
{
name: "Unsafe",
benchf: func(b *testing.B, cc CommitContext) {
var key []byte
for i := 0; i < b.N; i++ {
key = encodeCommitContextUnsafe(&cc)
_ = key
}
},
},
}

for _, tc := range tcs {
b.Run(tc.name, func(b *testing.B) {
cc := CommitContext{
Version: 1,
HighWatermark: 2,
CommittedGLSNBegin: 3,
CommittedGLSNEnd: 4,
CommittedLLSNBegin: 5,
}
tc.benchf(b, cc)
})
}
}

func BenchmarkCommitContext_Decode(b *testing.B) {
tcs := []struct {
name string
benchf func(*testing.B, CommitContext)
}{
{
name: "Safe",
benchf: func(b *testing.B, cc CommitContext) {
buf := make([]byte, commitContextLength)
buf = encodeCommitContext(cc, buf)
for i := 0; i < b.N; i++ {
_ = decodeCommitContext(buf)
}
},
},
{
name: "Unsafe",
benchf: func(b *testing.B, cc CommitContext) {
buf := encodeCommitContextUnsafe(&cc)
for i := 0; i < b.N; i++ {
_ = decodeCommitContextUnsafe(buf)
}
},
},
}

for _, tc := range tcs {
b.Run(tc.name, func(b *testing.B) {
cc := CommitContext{
Version: 1,
HighWatermark: 2,
CommittedGLSNBegin: 3,
CommittedGLSNEnd: 4,
CommittedLLSNBegin: 5,
}
tc.benchf(b, cc)
})
}
}
15 changes: 15 additions & 0 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (s *Storage) NewWriteBatch() *WriteBatch {
// NewCommitBatch creates a batch for commit operations.
func (s *Storage) NewCommitBatch(cc CommitContext) (*CommitBatch, error) {
cb := newCommitBatch(s.db.NewBatch(), s.writeOpts)
if err := cb.batch.Set(commitContextKey, encodeCommitContext(cc, cb.cc), nil); err != nil {
_ = cb.Close()
return nil, err
}
if err := cb.batch.Set(encodeCommitContextKeyInternal(cc, cb.cck), nil, nil); err != nil {
_ = cb.Close()
return nil, err
Expand Down Expand Up @@ -160,6 +164,17 @@ func (s *Storage) readLLSN(llsn types.LLSN) (le varlogpb.LogEntry, err error) {
return le, ErrNoLogEntry
}

func (s *Storage) ReadCommitContext() (cc CommitContext, err error) {
buf, closer, err := s.db.Get(commitContextKey)
if err != nil {
return
}
defer func() {
_ = closer.Close()
}()
return decodeCommitContext(buf), nil
}

// CommitContextOf looks up a commit context that contains the log entry for the argument glsn.
func (s *Storage) CommitContextOf(glsn types.GLSN) (cc CommitContext, err error) {
cck := make([]byte, commitContextKeyLength)
Expand Down
37 changes: 37 additions & 0 deletions internal/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap"

Expand Down Expand Up @@ -112,6 +113,42 @@ func TestStorage(t *testing.T) {
})
}

func TestStorage_CommitContext(t *testing.T) {
testStorage(t, func(t testing.TB, stg *Storage) {
expected := CommitContext{
Version: 1,
HighWatermark: 1,
CommittedGLSNBegin: 1,
CommittedGLSNEnd: 2,
CommittedLLSNBegin: 1,
}
cb, err := stg.NewCommitBatch(expected)
require.NoError(t, err)
require.NoError(t, cb.Apply())
require.NoError(t, cb.Close())

actual, err := stg.ReadCommitContext()
require.NoError(t, err)
require.Equal(t, expected, actual)

expected = CommitContext{
Version: 2,
HighWatermark: 3,
CommittedGLSNBegin: 2,
CommittedGLSNEnd: 4,
CommittedLLSNBegin: 2,
}
cb, err = stg.NewCommitBatch(expected)
require.NoError(t, err)
require.NoError(t, cb.Apply())
require.NoError(t, cb.Close())

actual, err = stg.ReadCommitContext()
require.NoError(t, err)
require.Equal(t, expected, actual)
})
}

func TestStorage_WriteBatch(t *testing.T) {
testStorage(t, func(t testing.TB, stg *Storage) {
wb := stg.NewWriteBatch()
Expand Down

0 comments on commit afa6fa4

Please sign in to comment.