diff --git a/internal/storage/commit_batch.go b/internal/storage/commit_batch.go index 2d1d63e94..00aea697f 100644 --- a/internal/storage/commit_batch.go +++ b/internal/storage/commit_batch.go @@ -12,6 +12,7 @@ var commitBatchPool = sync.Pool{ New: func() interface{} { return &CommitBatch{ cck: make([]byte, commitContextKeyLength), + cc: make([]byte, commitContextLength), ck: make([]byte, commitKeyLength), dk: make([]byte, dataKeyLength), } @@ -22,6 +23,7 @@ type CommitBatch struct { batch *pebble.Batch writeOpts *pebble.WriteOptions cck []byte + cc []byte ck []byte dk []byte } diff --git a/internal/storage/encode.go b/internal/storage/encode.go index 1204e53ed..5ba759b07 100644 --- a/internal/storage/encode.go +++ b/internal/storage/encode.go @@ -2,6 +2,7 @@ package storage import ( "encoding/binary" + "unsafe" "github.com/kakao/varlog/pkg/types" ) @@ -15,11 +16,17 @@ const ( commitKeySentinelPrefix = byte('d') commitKeyLength = 9 // prefix(1) + GLSN(8) + // Deprecated: Log stream replica will not store a sequence of commit contexts. commitContextKeyPrefix = byte('x') commitContextKeySentinelPrefix = byte('y') commitContextKeyLength = 41 // prefix(1) + CommitContext(40) + + commitContextKeyMarker = byte('b') + commitContextLength = 40 ) +var commitContextKey = []byte{commitContextKeyMarker} + func encodeDataKeyInternal(llsn types.LLSN, key []byte) []byte { key[0] = dataKeyPrefix binary.BigEndian.PutUint64(key[1:], uint64(llsn)) @@ -92,3 +99,69 @@ func decodeCommitContextKey(k []byte) (cc CommitContext) { return cc } + +// encodeCommitContext serializes commit context into byte slice. +func encodeCommitContext(cc CommitContext, key []byte) []byte { + sz := types.GLSNLen + offset := 0 + binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.HighWatermark)) + + offset += sz + binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.CommittedGLSNBegin)) + + offset += sz + binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.CommittedGLSNEnd)) + + offset += sz + binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.CommittedLLSNBegin)) + + offset += sz + sz = types.VersionLen + binary.BigEndian.PutUint64(key[offset:offset+sz], uint64(cc.Version)) + + return key +} + +// decodeCommitContext deserializes a commit context from a byte slice. +func decodeCommitContext(k []byte) (cc CommitContext) { + if len(k) != commitContextLength { + panic("storage: invalid key type") + } + sz := types.GLSNLen + offset := 0 + cc.HighWatermark = types.GLSN(binary.BigEndian.Uint64(k[offset : offset+sz])) + + offset += sz + cc.CommittedGLSNBegin = types.GLSN(binary.BigEndian.Uint64(k[offset : offset+sz])) + + offset += sz + cc.CommittedGLSNEnd = types.GLSN(binary.BigEndian.Uint64(k[offset : offset+sz])) + + offset += sz + cc.CommittedLLSNBegin = types.LLSN(binary.BigEndian.Uint64(k[offset : offset+sz])) + + offset += sz + sz = types.VersionLen + cc.Version = types.Version(binary.BigEndian.Uint64(k[offset : offset+sz])) + + return cc +} + +// encodeCommitContextUnsafe is similar to encodeCommitContext except that it +// shares same memory with the argument cc. +// +// Experimental: It casts struct CommitContext to byte slice; hence its byte +// representation differs across architectures. A user should call +// decodeCommitContextUnsafe to decode bytes encoded by this function. +func encodeCommitContextUnsafe(cc *CommitContext) []byte { + return (*(*[commitContextLength]byte)(unsafe.Pointer(cc)))[:] +} + +// decodeCommitContextUnsafe is similar to decodeCommitContext. +// +// Experimental: A user has to use this function to decode a byte slice encoded +// by encodeCommitContextUnsafe. +func decodeCommitContextUnsafe(buf []byte) (cc CommitContext) { + cc = *(*CommitContext)(unsafe.Pointer(&buf[0])) + return cc +} diff --git a/internal/storage/encode_test.go b/internal/storage/encode_test.go new file mode 100644 index 000000000..b2e8f39e1 --- /dev/null +++ b/internal/storage/encode_test.go @@ -0,0 +1,100 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEncodeCommitContextUnsafe(t *testing.T) { + expected := CommitContext{ + Version: 1, + HighWatermark: 2, + CommittedGLSNBegin: 3, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 5, + } + buf := encodeCommitContextUnsafe(&expected) + actual := decodeCommitContextUnsafe(buf) + require.Equal(t, expected, actual) +} + +func BenchmarkCommitContext_Encode(b *testing.B) { + tcs := []struct { + name string + benchf func(*testing.B, CommitContext) + }{ + { + name: "Safe", + benchf: func(b *testing.B, cc CommitContext) { + key := make([]byte, commitContextKeyLength) + for i := 0; i < b.N; i++ { + _ = encodeCommitContext(cc, key) + } + }, + }, + { + name: "Unsafe", + benchf: func(b *testing.B, cc CommitContext) { + var key []byte + for i := 0; i < b.N; i++ { + key = encodeCommitContextUnsafe(&cc) + _ = key + } + }, + }, + } + + for _, tc := range tcs { + b.Run(tc.name, func(b *testing.B) { + cc := CommitContext{ + Version: 1, + HighWatermark: 2, + CommittedGLSNBegin: 3, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 5, + } + tc.benchf(b, cc) + }) + } +} + +func BenchmarkCommitContext_Decode(b *testing.B) { + tcs := []struct { + name string + benchf func(*testing.B, CommitContext) + }{ + { + name: "Safe", + benchf: func(b *testing.B, cc CommitContext) { + buf := make([]byte, commitContextLength) + buf = encodeCommitContext(cc, buf) + for i := 0; i < b.N; i++ { + _ = decodeCommitContext(buf) + } + }, + }, + { + name: "Unsafe", + benchf: func(b *testing.B, cc CommitContext) { + buf := encodeCommitContextUnsafe(&cc) + for i := 0; i < b.N; i++ { + _ = decodeCommitContextUnsafe(buf) + } + }, + }, + } + + for _, tc := range tcs { + b.Run(tc.name, func(b *testing.B) { + cc := CommitContext{ + Version: 1, + HighWatermark: 2, + CommittedGLSNBegin: 3, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 5, + } + tc.benchf(b, cc) + }) + } +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 456aa2e25..94ac0efc1 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -86,6 +86,10 @@ func (s *Storage) NewWriteBatch() *WriteBatch { // NewCommitBatch creates a batch for commit operations. func (s *Storage) NewCommitBatch(cc CommitContext) (*CommitBatch, error) { cb := newCommitBatch(s.db.NewBatch(), s.writeOpts) + if err := cb.batch.Set(commitContextKey, encodeCommitContext(cc, cb.cc), nil); err != nil { + _ = cb.Close() + return nil, err + } if err := cb.batch.Set(encodeCommitContextKeyInternal(cc, cb.cck), nil, nil); err != nil { _ = cb.Close() return nil, err @@ -160,6 +164,17 @@ func (s *Storage) readLLSN(llsn types.LLSN) (le varlogpb.LogEntry, err error) { return le, ErrNoLogEntry } +func (s *Storage) ReadCommitContext() (cc CommitContext, err error) { + buf, closer, err := s.db.Get(commitContextKey) + if err != nil { + return + } + defer func() { + _ = closer.Close() + }() + return decodeCommitContext(buf), nil +} + // CommitContextOf looks up a commit context that contains the log entry for the argument glsn. func (s *Storage) CommitContextOf(glsn types.GLSN) (cc CommitContext, err error) { cck := make([]byte, commitContextKeyLength) diff --git a/internal/storage/storage_test.go b/internal/storage/storage_test.go index 398a42253..142ce4024 100644 --- a/internal/storage/storage_test.go +++ b/internal/storage/storage_test.go @@ -6,6 +6,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/goleak" "go.uber.org/zap" @@ -112,6 +113,42 @@ func TestStorage(t *testing.T) { }) } +func TestStorage_CommitContext(t *testing.T) { + testStorage(t, func(t testing.TB, stg *Storage) { + expected := CommitContext{ + Version: 1, + HighWatermark: 1, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 2, + CommittedLLSNBegin: 1, + } + cb, err := stg.NewCommitBatch(expected) + require.NoError(t, err) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) + + actual, err := stg.ReadCommitContext() + require.NoError(t, err) + require.Equal(t, expected, actual) + + expected = CommitContext{ + Version: 2, + HighWatermark: 3, + CommittedGLSNBegin: 2, + CommittedGLSNEnd: 4, + CommittedLLSNBegin: 2, + } + cb, err = stg.NewCommitBatch(expected) + require.NoError(t, err) + require.NoError(t, cb.Apply()) + require.NoError(t, cb.Close()) + + actual, err = stg.ReadCommitContext() + require.NoError(t, err) + require.Equal(t, expected, actual) + }) +} + func TestStorage_WriteBatch(t *testing.T) { testStorage(t, func(t testing.TB, stg *Storage) { wb := stg.NewWriteBatch()