Skip to content

Commit

Permalink
feat(storagenode): restore the status of a log stream replica by usin…
Browse files Browse the repository at this point in the history
…g the latest commit context

This patch changes how to restore the status of a log stream replica after #162. Previously, the log
stream replica reads a sequence of all commit contexts to decide whether it can work well after
restoring. However, since the replica will maintain only the latest commit context, we should change
the previous approach to reloading a replica.

The log stream replica reads the last commit context and boundary of log entries and checks if it
recovers its status. The replica runs if the recovery completes. However, if there is something
wrong, its reportCommitBase will be invalid. In that case, the metadata repository will ignore its
report, and cloning the log entries from the other replica will occur.

Updates #125
  • Loading branch information
ijsong committed Oct 6, 2022
1 parent 0b64855 commit 9e042d2
Show file tree
Hide file tree
Showing 90 changed files with 1,720 additions and 148 deletions.
10 changes: 10 additions & 0 deletions internal/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type config struct {
maxConcurrentCompaction int
verbose bool
logger *zap.Logger

readOnly bool
}

func newConfig(opts []Option) (config, error) {
Expand Down Expand Up @@ -157,6 +159,14 @@ func WithLogger(logger *zap.Logger) Option {
})
}

// ReadOnly makes storage read-only. It is helpful only for testing. Usually,
// users do not have to call it.
func ReadOnly() Option {
return newFuncOption(func(cfg *config) {
cfg.readOnly = true
})
}

/*
func errInvalidLevelOptions(kv string, err error) error {
return fmt.Errorf("storage: level options: invalid option %s: %w", kv, err)
Expand Down
74 changes: 17 additions & 57 deletions internal/storage/recovery_points.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package storage

import (
"fmt"

"github.com/cockroachdb/pebble"

"github.com/kakao/varlog/proto/varlogpb"
Expand All @@ -14,35 +12,31 @@ type RecoveryPoints struct {
First *varlogpb.LogEntryMeta
Last *varlogpb.LogEntryMeta
}
//UncommittedLLSN struct {
// First types.LLSN
// Last types.LLSN
//}
}

// ReadRecoveryPoints reads data necessary to restore the status of a log
// stream replica - the first and last log entries and commit context.
// Incompatible between the boundary of log entries and commit context is okay;
// thus, it returns nil as err.
// However, if there is a fatal error, such as missing data in a log entry, it
// returns an error.
func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) {
rp.LastCommitContext = s.readLastCommitContext()
rp.CommittedLogEntry.First, rp.CommittedLogEntry.Last, err = s.readLogEntryBoundaries()
if err != nil {
return
}
var lastNonempty *CommitContext
rp.LastCommitContext, lastNonempty = s.readLastCommitContext()

// TODO: Find valid commit context and log entries rather than returning an error.
if lastNonempty != nil && (rp.CommittedLogEntry.Last == nil || lastNonempty.CommittedGLSNEnd-1 != rp.CommittedLogEntry.Last.GLSN) {
err = fmt.Errorf("storage: mismatched commit context and log entries")
return
}
if lastNonempty == nil && rp.CommittedLogEntry.First != nil {
err = fmt.Errorf("storage: mismatched commit context and log entries")
return
}
return rp, nil
}

// readLastCommitContext returns the last commit context and the last non-empty commit context.
// readLastCommitContext returns the last commit context.
// It returns nil if not exists.
func (s *Storage) readLastCommitContext() (last, lastNonempty *CommitContext) {
func (s *Storage) readLastCommitContext() *CommitContext {
cc, err := s.ReadCommitContext()
if err == nil {
return &cc
}

it := s.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{commitContextKeyPrefix},
UpperBound: []byte{commitContextKeySentinelPrefix},
Expand All @@ -52,24 +46,10 @@ func (s *Storage) readLastCommitContext() (last, lastNonempty *CommitContext) {
}()

if !it.Last() {
return nil, nil
}
cc := decodeCommitContextKey(it.Key())
last = &cc
if !cc.Empty() {
lastNonempty = &cc
return last, lastNonempty
return nil
}
it.Prev()
for it.Valid() {
cc := decodeCommitContextKey(it.Key())
if !cc.Empty() {
lastNonempty = &cc
break
}
it.Prev()
}
return last, lastNonempty
cc = decodeCommitContextKey(it.Key())
return &cc
}

func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogEntryMeta, err error) {
Expand Down Expand Up @@ -99,23 +79,3 @@ func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogEntryMeta,
}
return first, &lastLE.LogEntryMeta, nil
}

//func (s *Storage) readUncommittedLogEntryBoundaries(lastCommitted *varlogpb.LogEntryMeta) (first, last types.LLSN) {
// dk := make([]byte, dataKeyLength)
// dk = encodeDataKeyInternal(lastCommitted.LLSN+1, dk)
// it := s.db.NewIter(&pebble.IterOptions{
// LowerBound: dk,
// UpperBound: []byte{dataKeySentinelPrefix},
// })
// defer func() {
// _ = it.Close()
// }()
//
// if !it.First() {
// return types.InvalidLLSN, types.InvalidLLSN
// }
// first = decodeDataKey(it.Key())
// _ = it.Last()
// last = decodeDataKey(it.Key())
// return first, last
//}
11 changes: 9 additions & 2 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func New(opts ...Option) (*Storage, error) {
pebbleOpts.EventListener.WALDeleted = nil
}

if cfg.readOnly {
pebbleOpts.ReadOnly = true
}

db, err := pebble.Open(cfg.path, pebbleOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -280,6 +284,9 @@ func (s *Storage) DiskUsage() int64 {
}

// Close closes the storage.
func (s *Storage) Close() error {
return multierr.Append(s.db.Flush(), s.db.Close())
func (s *Storage) Close() (err error) {
if !s.readOnly {
err = s.db.Flush()
}
return multierr.Append(err, s.db.Close())
}
25 changes: 6 additions & 19 deletions internal/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,8 @@ func TestStorageRead(t *testing.T) {
func TestStorageReadLastCommitContext(t *testing.T) {
testStorage(t, func(t testing.TB, stg *Storage) {
// no cc
lastCC, lastNonemptyCC := stg.readLastCommitContext()
lastCC := stg.readLastCommitContext()
assert.Nil(t, lastCC)
assert.Nil(t, lastNonemptyCC)

// empty cc
expectedLastCC := CommitContext{
Expand All @@ -420,13 +419,11 @@ func TestStorageReadLastCommitContext(t *testing.T) {
assert.NoError(t, cb.Apply())
assert.NoError(t, cb.Close())

lastCC, lastNonemptyCC = stg.readLastCommitContext()
lastCC = stg.readLastCommitContext()
// lastCC
assert.NotNil(t, lastCC)
assert.True(t, lastCC.Empty())
assert.Equal(t, expectedLastCC, *lastCC)
// lastNonemptyCC
assert.Nil(t, lastNonemptyCC)

// empty cc
expectedLastCC = CommitContext{
Expand All @@ -441,13 +438,11 @@ func TestStorageReadLastCommitContext(t *testing.T) {
assert.NoError(t, cb.Apply())
assert.NoError(t, cb.Close())

lastCC, lastNonemptyCC = stg.readLastCommitContext()
lastCC = stg.readLastCommitContext()
// lastCC
assert.NotNil(t, lastCC)
assert.True(t, lastCC.Empty())
assert.Equal(t, expectedLastCC, *lastCC)
// lastNonemptyCC
assert.Nil(t, lastNonemptyCC)

// nonempty cc
expectedLastCC = CommitContext{
Expand All @@ -457,21 +452,16 @@ func TestStorageReadLastCommitContext(t *testing.T) {
CommittedGLSNEnd: 2,
CommittedLLSNBegin: 1,
}
expectedLastNonempyCC := expectedLastCC
cb, err = stg.NewCommitBatch(expectedLastCC)
assert.NoError(t, err)
assert.NoError(t, cb.Apply())
assert.NoError(t, cb.Close())

lastCC, lastNonemptyCC = stg.readLastCommitContext()
lastCC = stg.readLastCommitContext()
// lastCC
assert.NotNil(t, lastCC)
assert.False(t, lastCC.Empty())
assert.Equal(t, expectedLastCC, *lastCC)
// lastNonemptyCC
assert.NotNil(t, lastNonemptyCC)
assert.False(t, lastNonemptyCC.Empty())
assert.Equal(t, expectedLastNonempyCC, *lastNonemptyCC)

// empty cc
expectedLastCC = CommitContext{
Expand All @@ -486,15 +476,11 @@ func TestStorageReadLastCommitContext(t *testing.T) {
assert.NoError(t, cb.Apply())
assert.NoError(t, cb.Close())

lastCC, lastNonemptyCC = stg.readLastCommitContext()
lastCC = stg.readLastCommitContext()
// lastCC
assert.NotNil(t, lastCC)
assert.True(t, lastCC.Empty())
assert.Equal(t, expectedLastCC, *lastCC)
// lastNonemptyCC
assert.NotNil(t, lastNonemptyCC)
assert.False(t, lastNonemptyCC.Empty())
assert.Equal(t, expectedLastNonempyCC, *lastNonemptyCC)
})
}

Expand Down Expand Up @@ -631,6 +617,7 @@ func TestStorageReadRecoveryPoints(t *testing.T) {
}

func TestStorageReadRecoveryPoints_InconsistentWriteCommit(t *testing.T) {
t.Skip("Storage will not consider the consistency of committed logs.")
testStorage(t, func(t testing.TB, stg *Storage) {
ck := make([]byte, commitKeyLength)
dk := make([]byte, dataKeyLength)
Expand Down
38 changes: 38 additions & 0 deletions internal/storage/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package storage
import (
"testing"

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kakao/varlog/pkg/types"
)

func TestNewStorage(tb testing.TB, opts ...Option) *Storage {
Expand All @@ -15,3 +19,37 @@ func TestNewStorage(tb testing.TB, opts ...Option) *Storage {
assert.NoError(tb, err)
return s
}

// TestGetUnderlyingDB returns a pebble that is an internal database in the
// storage.
func TestGetUnderlyingDB(tb testing.TB, stg *Storage) *pebble.DB {
require.NotNil(tb, stg)
require.NotNil(tb, stg.db)
return stg.db
}

// TestAppendLogEntryWithoutCommitContext stores log entries without commit
// context.
func TestAppendLogEntryWithoutCommitContext(tb testing.TB, stg *Storage, llsn types.LLSN, glsn types.GLSN, data []byte) {
db := TestGetUnderlyingDB(tb, stg)

ck := make([]byte, commitKeyLength)
ck = encodeCommitKeyInternal(glsn, ck)

dk := make([]byte, dataKeyLength)
dk = encodeDataKeyInternal(llsn, dk)

batch := db.NewBatch()
require.NoError(tb, batch.Set(dk, data, nil))
require.NoError(tb, batch.Set(ck, dk, nil))
require.NoError(tb, batch.Commit(pebble.Sync))
require.NoError(tb, batch.Close())
}

// TestSetCommitContext stores only commit context.
func TestSetCommitContext(tb testing.TB, stg *Storage, cc CommitContext) {
db := TestGetUnderlyingDB(tb, stg)
value := make([]byte, commitContextLength)
value = encodeCommitContext(cc, value)
require.NoError(tb, db.Set(commitContextKey, value, pebble.Sync))
}
23 changes: 19 additions & 4 deletions internal/storagenode/logstream/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,32 @@ func (cm *committer) commitLoopInternal(ctx context.Context, ct *commitTask) {
ct.release()
atomic.AddInt64(&cm.inflightCommit, -1)
}()
commitVersion, _, _ := cm.lse.lsc.reportCommitBase()

// TODO: Move these condition expressions to `internal/storagenode/logstream.(*committer).commit` method.
commitVersion, _, _, invalid := cm.lse.lsc.reportCommitBase()
if ct.stale(commitVersion) {
cm.logger.Debug("discard a stale commit message",
zap.Any("replica", commitVersion),
zap.Any("commit", ct.version),
)
return
}
if invalid {
// Synchronization should fix this invalid replica status
// caused by the inconsistency between the commit context and
// the last log entry.
cm.logger.Debug("discard a commit message due to invalid replica status")
return
}

if err := cm.commit(ctx, ct); err != nil {
cm.logger.Error("could not commit", zap.Error(err))
cm.lse.esm.compareAndSwap(executorStateAppendable, executorStateSealing)
}
}

func (cm *committer) commit(_ context.Context, ct *commitTask) error {
_, _, uncommittedLLSNBegin := cm.lse.lsc.reportCommitBase()
_, _, uncommittedLLSNBegin, _ := cm.lse.lsc.reportCommitBase()
if uncommittedLLSNBegin != ct.committedLLSNBegin {
// skip this commit
// See #VARLOG-453
Expand Down Expand Up @@ -197,7 +211,8 @@ func (cm *committer) commit(_ context.Context, ct *commitTask) error {
}

func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitTasks bool) (err error) {
_, _, uncommittedLLSNBegin := cm.lse.lsc.reportCommitBase()
_, _, uncommittedLLSNBegin, _ := cm.lse.lsc.reportCommitBase()

numCommits := int(cc.CommittedGLSNEnd - cc.CommittedGLSNBegin)

// NOTE: It seems to be similar to the above condition. The actual purpose of this
Expand Down Expand Up @@ -293,7 +308,7 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT
uncommittedLLSNBegin += types.LLSN(numCommits)

cm.lse.decider.change(func() {
cm.lse.lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedLLSNBegin)
cm.lse.lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedLLSNBegin, false)
})

for _, cwt := range committedTasks {
Expand Down
Loading

0 comments on commit 9e042d2

Please sign in to comment.