Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storagenode): restore uncommitted logs #492

Merged
merged 1 commit into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions internal/storage/recovery_points.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package storage

import (
"errors"
"fmt"

"github.com/cockroachdb/pebble"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

Expand All @@ -14,6 +16,10 @@ type RecoveryPoints struct {
First *varlogpb.LogEntryMeta
Last *varlogpb.LogEntryMeta
}
UncommittedLLSN struct {
Begin types.LLSN
End types.LLSN
}
}

// ReadRecoveryPoints reads data necessary to restore the status of a log
Expand All @@ -31,6 +37,16 @@ func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) {
if err != nil {
return
}

uncommittedBegin := types.MinLLSN
if cc := rp.LastCommitContext; cc != nil {
uncommittedBegin = cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)
}
rp.UncommittedLLSN.Begin, rp.UncommittedLLSN.End, err = s.readUncommittedLogEntryBoundaries(uncommittedBegin)
if err != nil {
return
}

return rp, nil
}

Expand Down Expand Up @@ -74,3 +90,29 @@ func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogEntryMeta,
}
return first, &lastLE.LogEntryMeta, nil
}

func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) {
dk := make([]byte, dataKeyLength)
dk = encodeDataKeyInternal(uncommittedBegin, dk)
it := s.dataDB.NewIter(&pebble.IterOptions{
LowerBound: dk,
UpperBound: []byte{dataKeySentinelPrefix},
})
defer func() {
_ = it.Close()
}()

if !it.First() {
return types.InvalidLLSN, types.InvalidLLSN, nil
}

begin = decodeDataKey(it.Key())
if begin != uncommittedBegin {
err = fmt.Errorf("unexpected uncommitted begin, expected %v but got %v", uncommittedBegin, begin)
return types.InvalidLLSN, types.InvalidLLSN, err
}
_ = it.Last()
end = decodeDataKey(it.Key()) + 1

return begin, end, nil
}
9 changes: 9 additions & 0 deletions internal/storage/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func TestGetUnderlyingDB(tb testing.TB, stg *Storage) (dataDB, commitDB *pebble.
return stg.dataDB, stg.commitDB
}

// TestWriteLogEntry stores data located by the llsn. The data is not committed
// because it does not store commits.
func TestWriteLogEntry(tb testing.TB, stg *Storage, llsn types.LLSN, data []byte) {
batch := stg.NewWriteBatch()
require.NoError(tb, batch.Set(llsn, data))
require.NoError(tb, batch.Apply())
require.NoError(tb, batch.Close())
}

// TestAppendLogEntryWithoutCommitContext stores log entries without commit
// context.
func TestAppendLogEntryWithoutCommitContext(tb testing.TB, stg *Storage, llsn types.LLSN, glsn types.GLSN, data []byte) {
Expand Down
8 changes: 6 additions & 2 deletions internal/storagenode/logstream/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newCommitter(cfg committerConfig) (*committer, error) {
// The commit wait task is pushed into commitWaitQ in the committer.
// The writer calls this method internally to push commit wait tasks to the committer.
// If the input list of commit wait tasks are nil or empty, it panics.
func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err error) {
func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue, ignoreSealing bool) (err error) {
if cwts == nil {
panic("log stream: committer: commit wait task list is nil")
}
Expand All @@ -73,7 +73,11 @@ func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err
}()

switch cm.lse.esm.load() {
case executorStateSealing, executorStateSealed, executorStateLearning:
case executorStateSealing:
if !ignoreSealing {
err = verrors.ErrSealed
}
case executorStateSealed, executorStateLearning:
err = verrors.ErrSealed
case executorStateClosed:
err = verrors.ErrClosed
Expand Down
12 changes: 6 additions & 6 deletions internal/storagenode/logstream/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,25 @@ func TestCommitter_ShouldNotAcceptTasksWhileNotAppendable(t *testing.T) {

lse.esm.store(executorStateAppendable)
assert.Panics(t, func() {
_ = cm.sendCommitWaitTask(context.Background(), cwts)
_ = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
})

assert.Panics(t, func() {
_ = cm.sendCommitWaitTask(context.Background(), nil)
_ = cm.sendCommitWaitTask(context.Background(), nil, false /*ignoreSealing*/)
})

cwts.PushFront(&commitWaitTask{})

lse.esm.store(executorStateSealing)
err := cm.sendCommitWaitTask(context.Background(), cwts)
err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.Error(t, err)

lse.esm.store(executorStateSealed)
err = cm.sendCommitWaitTask(context.Background(), cwts)
err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.Error(t, err)

lse.esm.store(executorStateClosed)
err = cm.sendCommitWaitTask(context.Background(), cwts)
err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.Error(t, err)

// sendCommitTask
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestCommitter_DrainCommitWaitQ(t *testing.T) {

cwts := newListQueue()
cwts.PushFront(newCommitWaitTask(newAppendWaitGroup(newWriteWaitGroup())))
err := cm.sendCommitWaitTask(context.Background(), cwts)
err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/)
assert.NoError(t, err)

assert.EqualValues(t, 1, cm.inflightCommitWait.Load())
Expand Down
86 changes: 79 additions & 7 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error) {
lse: lse,
logger: lse.logger.Named("backup writer"),
})
if err != nil {
return
}

err = lse.restoreCommitWaitTasks(rp)
if err != nil {
return
}

return lse, err
}
Expand Down Expand Up @@ -201,7 +209,7 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL
return err
}

if err := lse.cm.sendCommitWaitTask(ctx, cwts); err != nil {
if err := lse.cm.sendCommitWaitTask(ctx, cwts, false /*ignoreSealing*/); err != nil {
lse.logger.Error("could not send commit wait task list", zap.Error(err))
cwtListNode := cwts.Back()
for cwtListNode != nil {
Expand Down Expand Up @@ -643,23 +651,41 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
lse.logger.Info("restore log stream context", zap.String("mode", restoreMode))
}()

if cc == nil && last == nil { // new log stream replica
// Log stream replica that has not committed any logs yet. For example, a
// new log stream replica.
if cc == nil && last == nil {
uncommittedLLSNEnd := lsc.uncommittedLLSNEnd.Load()
if !rp.UncommittedLLSN.End.Invalid() {
uncommittedLLSNEnd = rp.UncommittedLLSN.End
}
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd)

return lsc
}

if cc != nil && last == nil { // maybe trimmed
// Log stream replica with a commit context but no log entry. For instance,
// the log stream replica that trimmed all log entries.
if cc != nil && last == nil {
restoreMode = "recovered"
uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)
uncommittedBegin := varlogpb.LogSequenceNumber{
LLSN: uncommittedLLSNBegin,
GLSN: cc.CommittedGLSNEnd,
}
lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/)
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin)

uncommittedLLSNEnd := uncommittedLLSNBegin
if !rp.UncommittedLLSN.End.Invalid() {
uncommittedLLSNEnd = rp.UncommittedLLSN.End
}
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd)

return lsc
}

if cc != nil && last != nil { // recovery
// Log stream replica that has a commit context and log entries. The commit
// context specifies the last log entry exactly.
if cc != nil && last != nil {
restoreMode = "recovered"
uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)
if uncommittedLLSNBegin-1 == last.LLSN {
Expand All @@ -668,7 +694,13 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
GLSN: last.GLSN + 1,
}
lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/)
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin)

uncommittedLLSNEnd := uncommittedLLSNBegin
if !rp.UncommittedLLSN.End.Invalid() {
uncommittedLLSNEnd = rp.UncommittedLLSN.End
}
lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd)

lsc.setLocalLowWatermark(varlogpb.LogSequenceNumber{
LLSN: first.LLSN,
GLSN: first.GLSN,
Expand All @@ -677,7 +709,17 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
}
}

// something wrong
// Log stream replica is invalid:
//
// - It has committed log entries but no commit context.
// - It has a commit context and committed log entries, but the commit
// context doesn't specify the last log entry.
//
// Invalid log stream replica should be resolved by synchronization from
// the sealed source replica. Log stream context should be set carefully to
// receive copies of log entries through synchronization. To restore the
// log stream replica restarted during the synchronization phase, the log
// stream context is initiated by the committed log entries.
restoreMode = "invalid"
lsc.storeReportCommitBase(types.InvalidVersion, types.InvalidGLSN, varlogpb.LogSequenceNumber{}, true /*invalid*/)
if last != nil {
Expand All @@ -694,3 +736,33 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre
}
return lsc
}

func (lse *Executor) restoreCommitWaitTasks(rp storage.RecoveryPoints) error {
// Invalid log stream replica does not receive commit messages from the metadata repository since synchronization can resolve it only.
_, _, _, invalid := lse.lsc.reportCommitBase()
if invalid {
return nil
}

// No uncommitted logs, so no CommitWaitTasks.
if rp.UncommittedLLSN.Begin.Invalid() {
return nil
}

cwts := newListQueue()
for i := rp.UncommittedLLSN.Begin; i < rp.UncommittedLLSN.End; i++ {
cwts.PushFront(newCommitWaitTask(nil))
}
err := lse.cm.sendCommitWaitTask(context.Background(), cwts, true /*ignoreSealing*/)
if err != nil {
lse.logger.Error("could not send commit wait task list", zap.Error(err))
cwtListNode := cwts.Back()
for cwtListNode != nil {
cwt := cwtListNode.value.(*commitWaitTask)
cwt.release()
cwtListNode = cwtListNode.Prev()
}
return err
}
return nil
}
Loading