Skip to content

Commit

Permalink
fix(storagenode): restore uncommitted logs
Browse files Browse the repository at this point in the history
Currently, all log stream replicas belonging to the storage nodes that are just restarted can't
commit logs written before restarting if all log stream replicas in a log stream were restarted
simultaneously. They have logs uncommitted in their storages, but they can't process Commit RPC sent
from the metadata repository.

This PR fixes the above issue. While recovering the log stream context after restarting the storage
nodes, it restores uncommitted logs.

Resolves #490
  • Loading branch information
ijsong committed Jun 22, 2023
1 parent 11968c6 commit 0f85da0
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 24 deletions.
42 changes: 42 additions & 0 deletions internal/storage/recovery_points.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package storage

import (
"errors"
"fmt"

"github.com/cockroachdb/pebble"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

Expand All @@ -14,6 +16,10 @@ type RecoveryPoints struct {
First *varlogpb.LogEntryMeta
Last *varlogpb.LogEntryMeta
}
UncommittedLLSN struct {
Begin types.LLSN
End types.LLSN
}
}

// ReadRecoveryPoints reads data necessary to restore the status of a log
Expand All @@ -31,6 +37,16 @@ func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) {
if err != nil {
return
}

uncommittedBegin := types.MinLLSN
if cc := rp.LastCommitContext; cc != nil {
uncommittedBegin = cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)
}
rp.UncommittedLLSN.Begin, rp.UncommittedLLSN.End, err = s.readUncommittedLogEntryBoundaries(uncommittedBegin)
if err != nil {
return
}

return rp, nil
}

Expand Down Expand Up @@ -74,3 +90,29 @@ func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogEntryMeta,
}
return first, &lastLE.LogEntryMeta, nil
}

func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) {
dk := make([]byte, dataKeyLength)
dk = encodeDataKeyInternal(uncommittedBegin, dk)
it := s.dataDB.NewIter(&pebble.IterOptions{
LowerBound: dk,
UpperBound: []byte{dataKeySentinelPrefix},
})
defer func() {
_ = it.Close()
}()

if !it.First() {
return types.InvalidLLSN, types.InvalidLLSN, nil
}

begin = decodeDataKey(it.Key())
if begin != uncommittedBegin {
err = fmt.Errorf("unexpected uncommitted begin, expected %v but got %v", uncommittedBegin, begin)
return types.InvalidLLSN, types.InvalidLLSN, err
}
_ = it.Last()
end = decodeDataKey(it.Key()) + 1

return begin, end, nil
}
9 changes: 9 additions & 0 deletions internal/storage/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func TestGetUnderlyingDB(tb testing.TB, stg *Storage) (dataDB, commitDB *pebble.
return stg.dataDB, stg.commitDB
}

// TestWriteLogEntry stores data located by the llsn. The data is not committed
// because it does not store commits.
func TestWriteLogEntry(tb testing.TB, stg *Storage, llsn types.LLSN, data []byte) {
batch := stg.NewWriteBatch()
require.NoError(tb, batch.Set(llsn, data))
require.NoError(tb, batch.Apply())
require.NoError(tb, batch.Close())
}

// TestAppendLogEntryWithoutCommitContext stores log entries without commit
// context.
func TestAppendLogEntryWithoutCommitContext(tb testing.TB, stg *Storage, llsn types.LLSN, glsn types.GLSN, data []byte) {
Expand Down
8 changes: 6 additions & 2 deletions internal/storagenode/logstream/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newCommitter(cfg committerConfig) (*committer, error) {
// The commit wait task is pushed into commitWaitQ in the committer.
// The writer calls this method internally to push commit wait tasks to the committer.
// If the input list of commit wait tasks are nil or empty, it panics.
func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err error) {
func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue, ignoreSealing bool) (err error) {
if cwts == nil {
panic("log stream: committer: commit wait task list is nil")
}
Expand All @@ -73,7 +73,11 @@ func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err
}()

switch cm.lse.esm.load() {
case executorStateSealing, executorStateSealed, executorStateLearning:
case executorStateSealing:
if !ignoreSealing {
err = verrors.ErrSealed
}
case executorStateSealed, executorStateLearning:
err = verrors.ErrSealed
case executorStateClosed:
err = verrors.ErrClosed
Expand Down
12 changes: 6 additions & 6 deletions internal/storagenode/logstream/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,25 @@ func TestCommitter_ShouldNotAcceptTasksWhileNotAppendable(t *testing.T) {

lse.esm.store(executorStateAppendable)
assert.Panics(t, func() {
_ = cm.sendCommitWaitTask(context.Background(), cwts)
_ = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
})

assert.Panics(t, func() {
_ = cm.sendCommitWaitTask(context.Background(), nil)
_ = cm.sendCommitWaitTask(context.Background(), nil, false /*ignoreSealing*/)
})

cwts.PushFront(&commitWaitTask{})

lse.esm.store(executorStateSealing)
err := cm.sendCommitWaitTask(context.Background(), cwts)
err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.Error(t, err)

lse.esm.store(executorStateSealed)
err = cm.sendCommitWaitTask(context.Background(), cwts)
err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.Error(t, err)

lse.esm.store(executorStateClosed)
err = cm.sendCommitWaitTask(context.Background(), cwts)
err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.Error(t, err)

// sendCommitTask
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestCommitter_DrainCommitWaitQ(t *testing.T) {

cwts := newListQueue()
cwts.PushFront(newCommitWaitTask(newAppendWaitGroup(newWriteWaitGroup())))
err := cm.sendCommitWaitTask(context.Background(), cwts)
err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.NoError(t, err)

assert.EqualValues(t, 1, cm.inflightCommitWait.Load())
Expand Down
89 changes: 82 additions & 7 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error) {
lse: lse,
logger: lse.logger.Named("backup writer"),
})
if err != nil {
return
}

err = lse.restoreCommitWaitTasks(rp)
if err != nil {
return
}

return lse, err
}
Expand Down Expand Up @@ -201,7 +209,7 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL
return err
}

if err := lse.cm.sendCommitWaitTask(ctx, cwts); err != nil {
if err := lse.cm.sendCommitWaitTask(ctx, cwts, false /*ignoreSealing*/); err != nil {
lse.logger.Error("could not send commit wait task list", zap.Error(err))
cwtListNode := cwts.Back()
for cwtListNode != nil {
Expand Down Expand Up @@ -437,6 +445,7 @@ func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamComm
}

if types.Version(atomic.LoadUint64(&lse.prevCommitVersion)) != commitResult.Version {
lse.logger.Info("commit", zap.String("commit_result", commitResult.String()))
if ce := lse.logger.Check(zap.DebugLevel, "commit"); ce != nil {
ce.Write(zap.String("commit_result", commitResult.String()))
}
Expand Down Expand Up @@ -633,6 +642,8 @@ func (lse *Executor) isPrimary() bool {
}

func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStreamContext {
lse.logger.Info("recovery points", zap.Any("rp", rp))

cc := rp.LastCommitContext
first := rp.CommittedLogEntry.First
last := rp.CommittedLogEntry.Last
Expand All @@ -643,23 +654,41 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
lse.logger.Info("restore log stream context", zap.String("mode", restoreMode))
}()

if cc == nil && last == nil { // new log stream replica
// Log stream replica that has not committed any logs yet. For example, a
// new log stream replica.
if cc == nil && last == nil {
uncommittedLLSNEnd := lsc.uncommittedLLSNEnd.Load()
if !rp.UncommittedLLSN.End.Invalid() {
uncommittedLLSNEnd = rp.UncommittedLLSN.End
}
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd)

return lsc
}

if cc != nil && last == nil { // maybe trimmed
// Log stream replica with a commit context but no log entry. For instance,
// the log stream replica that trimmed all log entries.
if cc != nil && last == nil {
restoreMode = "recovered"
uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)
uncommittedBegin := varlogpb.LogSequenceNumber{
LLSN: uncommittedLLSNBegin,
GLSN: cc.CommittedGLSNEnd,
}
lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/)
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin)

uncommittedLLSNEnd := uncommittedLLSNBegin
if !rp.UncommittedLLSN.End.Invalid() {
uncommittedLLSNEnd = rp.UncommittedLLSN.End
}
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd)

return lsc
}

if cc != nil && last != nil { // recovery
// Log stream replica that has a commit context and log entries. The commit
// context specifies the last log entry exactly.
if cc != nil && last != nil {
restoreMode = "recovered"
uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)
if uncommittedLLSNBegin-1 == last.LLSN {
Expand All @@ -668,7 +697,13 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
GLSN: last.GLSN + 1,
}
lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/)
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin)

uncommittedLLSNEnd := uncommittedLLSNBegin
if !rp.UncommittedLLSN.End.Invalid() {
uncommittedLLSNEnd = rp.UncommittedLLSN.End
}
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd)

lsc.setLocalLowWatermark(varlogpb.LogSequenceNumber{
LLSN: first.LLSN,
GLSN: first.GLSN,
Expand All @@ -677,7 +712,17 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
}
}

// something wrong
// Log stream replica is invalid:
//
// - It has committed log entries but no commit context.
// - It has a commit context and committed log entries, but the commit
// context doesn't specify the last log entry.
//
// Invalid log stream replica should be resolved by synchronization from
// the sealed source replica. Log stream context should be set carefully to
// receive copies of log entries through synchronization. To restore the
// log stream replica restarted during the synchronization phase, the log
// stream context is initiated by the committed log entries.
restoreMode = "invalid"
lsc.storeReportCommitBase(types.InvalidVersion, types.InvalidGLSN, varlogpb.LogSequenceNumber{}, true /*invalid*/)
if last != nil {
Expand All @@ -694,3 +739,33 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
}
return lsc
}

func (lse *Executor) restoreCommitWaitTasks(rp storage.RecoveryPoints) error {
// Invalid log stream replica does not receive commit messages from the metadata repository since synchronization can resolve it only.
_, _, _, invalid := lse.lsc.reportCommitBase()
if invalid {
return nil
}

// No uncommitted logs, so no CommitWaitTasks.
if rp.UncommittedLLSN.Begin.Invalid() {
return nil
}

cwts := newListQueue()
for i := rp.UncommittedLLSN.Begin; i < rp.UncommittedLLSN.End; i++ {
cwts.PushFront(newCommitWaitTask(nil))
}
err := lse.cm.sendCommitWaitTask(context.Background(), cwts, true /*ignoreSealing*/)
if err != nil {
lse.logger.Error("could not send commit wait task list", zap.Error(err))
cwtListNode := cwts.Back()
for cwtListNode != nil {
cwt := cwtListNode.value.(*commitWaitTask)
cwt.release()
cwtListNode = cwtListNode.Prev()
}
return err
}
return nil
}
Loading

0 comments on commit 0f85da0

Please sign in to comment.