From 3c767bf7d6c07cae727c032dbe3a2af23ceef012 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 22 Jun 2023 10:43:45 +0900 Subject: [PATCH] fix(storagenode): restore uncommitted logs Currently, all log stream replicas belonging to the storage nodes that are just restarted can't commit logs written before restarting if all log stream replicas in a log stream were restarted simultaneously. They have logs uncommitted in their storages, but they can't process Commit RPC sent from the metadata repository. This PR fixes the above issue. While recovering the log stream context after restarting the storage nodes, it restores uncommitted logs. Resolves #490 --- internal/storage/recovery_points.go | 42 +++ internal/storage/testing.go | 9 + internal/storagenode/logstream/committer.go | 8 +- .../storagenode/logstream/committer_test.go | 12 +- internal/storagenode/logstream/executor.go | 86 ++++- .../storagenode/logstream/executor_test.go | 317 +++++++++++++++++- internal/storagenode/logstream/sequencer.go | 2 +- 7 files changed, 452 insertions(+), 24 deletions(-) diff --git a/internal/storage/recovery_points.go b/internal/storage/recovery_points.go index 8164cf6bb..157b389ce 100644 --- a/internal/storage/recovery_points.go +++ b/internal/storage/recovery_points.go @@ -2,9 +2,11 @@ package storage import ( "errors" + "fmt" "github.com/cockroachdb/pebble" + "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/varlogpb" ) @@ -14,6 +16,10 @@ type RecoveryPoints struct { First *varlogpb.LogEntryMeta Last *varlogpb.LogEntryMeta } + UncommittedLLSN struct { + Begin types.LLSN + End types.LLSN + } } // ReadRecoveryPoints reads data necessary to restore the status of a log @@ -31,6 +37,16 @@ func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) { if err != nil { return } + + uncommittedBegin := types.MinLLSN + if cc := rp.LastCommitContext; cc != nil { + uncommittedBegin = cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) + } + rp.UncommittedLLSN.Begin, rp.UncommittedLLSN.End, err = s.readUncommittedLogEntryBoundaries(uncommittedBegin) + if err != nil { + return + } + return rp, nil } @@ -74,3 +90,29 @@ func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogEntryMeta, } return first, &lastLE.LogEntryMeta, nil } + +func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) { + dk := make([]byte, dataKeyLength) + dk = encodeDataKeyInternal(uncommittedBegin, dk) + it := s.dataDB.NewIter(&pebble.IterOptions{ + LowerBound: dk, + UpperBound: []byte{dataKeySentinelPrefix}, + }) + defer func() { + _ = it.Close() + }() + + if !it.First() { + return types.InvalidLLSN, types.InvalidLLSN, nil + } + + begin = decodeDataKey(it.Key()) + if begin != uncommittedBegin { + err = fmt.Errorf("unexpected uncommitted begin, expected %v but got %v", uncommittedBegin, begin) + return types.InvalidLLSN, types.InvalidLLSN, err + } + _ = it.Last() + end = decodeDataKey(it.Key()) + 1 + + return begin, end, nil +} diff --git a/internal/storage/testing.go b/internal/storage/testing.go index 6dc74fa6c..adb5cb05d 100644 --- a/internal/storage/testing.go +++ b/internal/storage/testing.go @@ -31,6 +31,15 @@ func TestGetUnderlyingDB(tb testing.TB, stg *Storage) (dataDB, commitDB *pebble. return stg.dataDB, stg.commitDB } +// TestWriteLogEntry stores data located by the llsn. The data is not committed +// because it does not store commits. +func TestWriteLogEntry(tb testing.TB, stg *Storage, llsn types.LLSN, data []byte) { + batch := stg.NewWriteBatch() + require.NoError(tb, batch.Set(llsn, data)) + require.NoError(tb, batch.Apply()) + require.NoError(tb, batch.Close()) +} + // TestAppendLogEntryWithoutCommitContext stores log entries without commit // context. func TestAppendLogEntryWithoutCommitContext(tb testing.TB, stg *Storage, llsn types.LLSN, glsn types.GLSN, data []byte) { diff --git a/internal/storagenode/logstream/committer.go b/internal/storagenode/logstream/committer.go index 8817c3421..a072997ef 100644 --- a/internal/storagenode/logstream/committer.go +++ b/internal/storagenode/logstream/committer.go @@ -50,7 +50,7 @@ func newCommitter(cfg committerConfig) (*committer, error) { // The commit wait task is pushed into commitWaitQ in the committer. // The writer calls this method internally to push commit wait tasks to the committer. // If the input list of commit wait tasks are nil or empty, it panics. -func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err error) { +func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue, ignoreSealing bool) (err error) { if cwts == nil { panic("log stream: committer: commit wait task list is nil") } @@ -73,7 +73,11 @@ func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err }() switch cm.lse.esm.load() { - case executorStateSealing, executorStateSealed, executorStateLearning: + case executorStateSealing: + if !ignoreSealing { + err = verrors.ErrSealed + } + case executorStateSealed, executorStateLearning: err = verrors.ErrSealed case executorStateClosed: err = verrors.ErrClosed diff --git a/internal/storagenode/logstream/committer_test.go b/internal/storagenode/logstream/committer_test.go index dbd063810..1f2a36ab1 100644 --- a/internal/storagenode/logstream/committer_test.go +++ b/internal/storagenode/logstream/committer_test.go @@ -55,25 +55,25 @@ func TestCommitter_ShouldNotAcceptTasksWhileNotAppendable(t *testing.T) { lse.esm.store(executorStateAppendable) assert.Panics(t, func() { - _ = cm.sendCommitWaitTask(context.Background(), cwts) + _ = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) }) assert.Panics(t, func() { - _ = cm.sendCommitWaitTask(context.Background(), nil) + _ = cm.sendCommitWaitTask(context.Background(), nil, false /*ignoreSealing*/) }) cwts.PushFront(&commitWaitTask{}) lse.esm.store(executorStateSealing) - err := cm.sendCommitWaitTask(context.Background(), cwts) + err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.Error(t, err) lse.esm.store(executorStateSealed) - err = cm.sendCommitWaitTask(context.Background(), cwts) + err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.Error(t, err) lse.esm.store(executorStateClosed) - err = cm.sendCommitWaitTask(context.Background(), cwts) + err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.Error(t, err) // sendCommitTask @@ -143,7 +143,7 @@ func TestCommitter_DrainCommitWaitQ(t *testing.T) { cwts := newListQueue() cwts.PushFront(newCommitWaitTask(newAppendWaitGroup(newWriteWaitGroup()))) - err := cm.sendCommitWaitTask(context.Background(), cwts) + err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.NoError(t, err) assert.EqualValues(t, 1, cm.inflightCommitWait.Load()) diff --git a/internal/storagenode/logstream/executor.go b/internal/storagenode/logstream/executor.go index 3fd90ade6..4c57b1023 100644 --- a/internal/storagenode/logstream/executor.go +++ b/internal/storagenode/logstream/executor.go @@ -148,6 +148,14 @@ func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error) { lse: lse, logger: lse.logger.Named("backup writer"), }) + if err != nil { + return + } + + err = lse.restoreCommitWaitTasks(rp) + if err != nil { + return + } return lse, err } @@ -201,7 +209,7 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL return err } - if err := lse.cm.sendCommitWaitTask(ctx, cwts); err != nil { + if err := lse.cm.sendCommitWaitTask(ctx, cwts, false /*ignoreSealing*/); err != nil { lse.logger.Error("could not send commit wait task list", zap.Error(err)) cwtListNode := cwts.Back() for cwtListNode != nil { @@ -643,11 +651,21 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre lse.logger.Info("restore log stream context", zap.String("mode", restoreMode)) }() - if cc == nil && last == nil { // new log stream replica + // Log stream replica that has not committed any logs yet. For example, a + // new log stream replica. + if cc == nil && last == nil { + uncommittedLLSNEnd := lsc.uncommittedLLSNEnd.Load() + if !rp.UncommittedLLSN.End.Invalid() { + uncommittedLLSNEnd = rp.UncommittedLLSN.End + } + lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd) + return lsc } - if cc != nil && last == nil { // maybe trimmed + // Log stream replica with a commit context but no log entry. For instance, + // the log stream replica that trimmed all log entries. + if cc != nil && last == nil { restoreMode = "recovered" uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) uncommittedBegin := varlogpb.LogSequenceNumber{ @@ -655,11 +673,19 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre GLSN: cc.CommittedGLSNEnd, } lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/) - lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin) + + uncommittedLLSNEnd := uncommittedLLSNBegin + if !rp.UncommittedLLSN.End.Invalid() { + uncommittedLLSNEnd = rp.UncommittedLLSN.End + } + lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd) + return lsc } - if cc != nil && last != nil { // recovery + // Log stream replica that has a commit context and log entries. The commit + // context specifies the last log entry exactly. + if cc != nil && last != nil { restoreMode = "recovered" uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) if uncommittedLLSNBegin-1 == last.LLSN { @@ -668,7 +694,13 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre GLSN: last.GLSN + 1, } lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/) - lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin) + + uncommittedLLSNEnd := uncommittedLLSNBegin + if !rp.UncommittedLLSN.End.Invalid() { + uncommittedLLSNEnd = rp.UncommittedLLSN.End + } + lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd) + lsc.setLocalLowWatermark(varlogpb.LogSequenceNumber{ LLSN: first.LLSN, GLSN: first.GLSN, @@ -677,7 +709,17 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre } } - // something wrong + // Log stream replica is invalid: + // + // - It has committed log entries but no commit context. + // - It has a commit context and committed log entries, but the commit + // context doesn't specify the last log entry. + // + // Invalid log stream replica should be resolved by synchronization from + // the sealed source replica. Log stream context should be set carefully to + // receive copies of log entries through synchronization. To restore the + // log stream replica restarted during the synchronization phase, the log + // stream context is initiated by the committed log entries. restoreMode = "invalid" lsc.storeReportCommitBase(types.InvalidVersion, types.InvalidGLSN, varlogpb.LogSequenceNumber{}, true /*invalid*/) if last != nil { @@ -694,3 +736,33 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre } return lsc } + +func (lse *Executor) restoreCommitWaitTasks(rp storage.RecoveryPoints) error { + // Invalid log stream replica does not receive commit messages from the metadata repository since synchronization can resolve it only. + _, _, _, invalid := lse.lsc.reportCommitBase() + if invalid { + return nil + } + + // No uncommitted logs, so no CommitWaitTasks. + if rp.UncommittedLLSN.Begin.Invalid() { + return nil + } + + cwts := newListQueue() + for i := rp.UncommittedLLSN.Begin; i < rp.UncommittedLLSN.End; i++ { + cwts.PushFront(newCommitWaitTask(nil)) + } + err := lse.cm.sendCommitWaitTask(context.Background(), cwts, true /*ignoreSealing*/) + if err != nil { + lse.logger.Error("could not send commit wait task list", zap.Error(err)) + cwtListNode := cwts.Back() + for cwtListNode != nil { + cwt := cwtListNode.value.(*commitWaitTask) + cwt.release() + cwtListNode = cwtListNode.Prev() + } + return err + } + return nil +} diff --git a/internal/storagenode/logstream/executor_test.go b/internal/storagenode/logstream/executor_test.go index 3971e849b..3fec0130d 100644 --- a/internal/storagenode/logstream/executor_test.go +++ b/internal/storagenode/logstream/executor_test.go @@ -3739,7 +3739,7 @@ func TestExecutorRestore(t *testing.T) { // data : none // commit: none // cmtctx: none - name: "NoLogEntry", + name: "ValidLogStreamReplica_NoCommittedLog_NoCommitContext", pref: func(t *testing.T, lse *Executor) { }, testf: func(t *testing.T, lse *Executor) { @@ -3776,11 +3776,86 @@ func TestExecutorRestore(t *testing.T) { require.True(t, localHWM.GLSN.Invalid()) }, }, + { + // data : 1 2 3 4 5 6 7 8 9 10 + // commit: none + // cmtctx: none + name: "ValidLogStreamReplica_NoCommittedLog_NoCommitContext_UncommittedLogs", + pref: func(t *testing.T, lse *Executor) { + for i := 1; i <= 10; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 1, + UncommittedLLSNLength: 10, + Version: 0, + HighWatermark: 0, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.Version(0), lsrmd.Version) + require.Equal(t, types.GLSN(0), lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.InvalidVersion, ver) + require.Equal(t, types.InvalidGLSN, hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.MinLLSN, GLSN: types.MinGLSN, + }, uncommittedBegin) + require.False(t, invalid) + require.Equal(t, types.LLSN(11), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.True(t, localLWM.LLSN.Invalid()) + require.True(t, localLWM.GLSN.Invalid()) + require.True(t, localHWM.LLSN.Invalid()) + require.True(t, localHWM.GLSN.Invalid()) + + // Commit uncommitted logs after restarting. + require.EventuallyWithT(t, func(c *assert.CollectT) { + _ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{ + TopicID: lse.tpid, + LogStreamID: lse.lsid, + CommittedLLSNOffset: 1, + CommittedGLSNOffset: 1, + CommittedGLSNLength: 5, + Version: 1, + HighWatermark: 5, + }) + rpt, err := lse.Report(context.Background()) + assert.NoError(c, err) + assert.EqualValues(c, 1, rpt.Version) + }, time.Second, 10*time.Millisecond) + + lss, lhwm, err := lse.Seal(context.Background(), 5) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealed, lss) + require.EqualValues(t, 5, lhwm) + + rpt, err = lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 6, + UncommittedLLSNLength: 0, + Version: 1, + HighWatermark: 5, + }, rpt) + }, + }, { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 10 - name: "TenLogEntries", + name: "ValidLogStreamReplica_CommittedLogs_CommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3829,11 +3904,98 @@ func TestExecutorRestore(t *testing.T) { require.Equal(t, types.GLSN(10), localHWM.GLSN) }, }, + { + // data : 1, 2, 3, ... , 10, 11, 12, 13, ...., 20 + // commit: 1, 2, 3, ...., 10 + // cmtctx: 10 + name: "ValidLogStreamReplica_CommittedLogs_CommitContext", + pref: func(t *testing.T, lse *Executor) { + for i := 0; i < 10; i++ { + llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) + storage.TestAppendLogEntryWithoutCommitContext(t, lse.stg, llsn, glsn, []byte{}) + } + storage.TestSetCommitContext(t, lse.stg, storage.CommitContext{ + Version: 10, + HighWatermark: 10, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + for i := 11; i <= 20; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 11, + UncommittedLLSNLength: 10, + Version: 10, + HighWatermark: 10, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.Version(10), lsrmd.Version) + require.Equal(t, types.GLSN(10), lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 1, GLSN: 1}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 10, GLSN: 10}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.Version(10), ver) + require.Equal(t, types.GLSN(10), hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(11), + GLSN: types.GLSN(11), + }, uncommittedBegin) + require.False(t, invalid) + require.Equal(t, types.LLSN(21), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.Equal(t, types.LLSN(1), localLWM.LLSN) + require.Equal(t, types.GLSN(1), localLWM.GLSN) + require.Equal(t, types.LLSN(10), localHWM.LLSN) + require.Equal(t, types.GLSN(10), localHWM.GLSN) + + // Commit uncommitted logs after restarting. + require.EventuallyWithT(t, func(c *assert.CollectT) { + _ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{ + TopicID: lse.tpid, + LogStreamID: lse.lsid, + CommittedLLSNOffset: 11, + CommittedGLSNOffset: 11, + CommittedGLSNLength: 5, + Version: 11, + HighWatermark: 15, + }) + rpt, err := lse.Report(context.Background()) + assert.NoError(c, err) + assert.EqualValues(c, 11, rpt.Version) + }, time.Second, 10*time.Millisecond) + + lss, lhwm, err := lse.Seal(context.Background(), 15) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealed, lss) + require.EqualValues(t, 15, lhwm) + + rpt, err = lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 16, + UncommittedLLSNLength: 0, + Version: 11, + HighWatermark: 15, + }, rpt) + }, + }, { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 10 - name: "TenLogEntriesFollowedByEmptyCommitContext", + name: "ValidLogStreamReplica_CommittedLogs_EmptyCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3886,7 +4048,7 @@ func TestExecutorRestore(t *testing.T) { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: none - name: "TenLogEntriesWithoutCommitContext", + name: "InvalidLogStreamReplica_CommittedLogs_NoCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3928,11 +4090,65 @@ func TestExecutorRestore(t *testing.T) { require.Equal(t, types.GLSN(10), localHWM.GLSN) }, }, + { + // data : 1, 2, 3, ... , 10, 11, 12, 13, ..., 20 + // commit: 1, 2, 3, ...., 10 + // cmtctx: none + name: "InvalidLogStreamReplica_CommittedLogs_NoCommitContext_UncommittedLogs", + pref: func(t *testing.T, lse *Executor) { + for i := 0; i < 10; i++ { + llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) + storage.TestAppendLogEntryWithoutCommitContext(t, lse.stg, llsn, glsn, []byte{}) + } + for i := 11; i <= 20; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 0, + UncommittedLLSNLength: 0, + Version: 0, + HighWatermark: 0, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.InvalidVersion, lsrmd.Version) + require.Equal(t, types.InvalidGLSN, lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 1, GLSN: 1}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 10, GLSN: 10}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.InvalidVersion, ver) + require.Equal(t, types.InvalidGLSN, hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(11), + GLSN: types.GLSN(11), + }, uncommittedBegin) + require.True(t, invalid) + require.Equal(t, types.LLSN(11), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.Equal(t, types.LLSN(1), localLWM.LLSN) + require.Equal(t, types.GLSN(1), localLWM.GLSN) + require.Equal(t, types.LLSN(10), localHWM.LLSN) + require.Equal(t, types.GLSN(10), localHWM.GLSN) + + lss, lhwm, err := lse.Seal(context.Background(), 10) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lss) + require.EqualValues(t, 10, lhwm) + }, + }, { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 20 - name: "TenLogEntriesWithFutureCommitContext", + name: "InvalidLogStreamReplica_CommittedLogs_FutureCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3985,7 +4201,7 @@ func TestExecutorRestore(t *testing.T) { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 5 - name: "TenLogEntriesWithPastCommitContext", + name: "InvalidLogStreamReplica_CommittedLogs_PastCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -4038,7 +4254,7 @@ func TestExecutorRestore(t *testing.T) { // data : 5, ... , 10 // commit: 5, ...., 10 // cmtctx: 10 - name: "TrimmedPrefix", + name: "ValidLogStreamReplica_PrefixTrimmed_CommittedLogs_CommitContext", pref: func(t *testing.T, lse *Executor) { for i := 5; i <= 10; i++ { llsn, glsn := types.LLSN(i), types.GLSN(i) @@ -4091,7 +4307,7 @@ func TestExecutorRestore(t *testing.T) { // data : none // commit: none // cmtctx: 10 - name: "TrimmedAll", + name: "ValidLogStreamReplica_AllTrimmed_NoCommittedLogs_CommitContext", pref: func(t *testing.T, lse *Executor) { storage.TestSetCommitContext(t, lse.stg, storage.CommitContext{ Version: 10, @@ -4136,6 +4352,89 @@ func TestExecutorRestore(t *testing.T) { require.Equal(t, types.GLSN(0), localHWM.GLSN) }, }, + { + // data : 11, 12, 13, ..., 20 + // commit: none + // cmtctx: 10 + name: "ValidLogStreamReplica_AllTrimmed_NoCommittedLogs_CommitContext_UncommittedLogs", + pref: func(t *testing.T, lse *Executor) { + storage.TestSetCommitContext(t, lse.stg, storage.CommitContext{ + Version: 10, + HighWatermark: 10, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + for i := 11; i <= 20; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 11, + UncommittedLLSNLength: 10, + Version: 10, + HighWatermark: 10, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.Version(10), lsrmd.Version) + require.Equal(t, types.GLSN(10), lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.Version(10), ver) + require.Equal(t, types.GLSN(10), hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(11), + GLSN: types.GLSN(11), + }, uncommittedBegin) + require.False(t, invalid) + require.Equal(t, types.LLSN(21), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.Equal(t, types.LLSN(0), localLWM.LLSN) + require.Equal(t, types.GLSN(0), localLWM.GLSN) + require.Equal(t, types.LLSN(0), localHWM.LLSN) + require.Equal(t, types.GLSN(0), localHWM.GLSN) + + // Commit uncommitted logs after restarting. + require.EventuallyWithT(t, func(c *assert.CollectT) { + _ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{ + TopicID: lse.tpid, + LogStreamID: lse.lsid, + CommittedLLSNOffset: 11, + CommittedGLSNOffset: 11, + CommittedGLSNLength: 5, + Version: 11, + HighWatermark: 15, + }) + rpt, err := lse.Report(context.Background()) + assert.NoError(c, err) + assert.EqualValues(c, 11, rpt.Version) + }, time.Second, 10*time.Millisecond) + + lss, lhwm, err := lse.Seal(context.Background(), 15) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealed, lss) + require.EqualValues(t, 15, lhwm) + + rpt, err = lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 16, + UncommittedLLSNLength: 0, + Version: 11, + HighWatermark: 15, + }, rpt) + }, + }, } for _, tc := range tcs { @@ -4146,6 +4445,7 @@ func TestExecutorRestore(t *testing.T) { func() { stg := storage.TestNewStorage(t, storage.WithPath(storagePath)) lse, err := NewExecutor(WithStorage(stg)) + require.NoError(t, err) tc.pref(t, lse) defer func() { err = lse.Close() @@ -4156,6 +4456,7 @@ func TestExecutorRestore(t *testing.T) { func() { stg := storage.TestNewStorage(t, storage.WithPath(storagePath)) lse, err := NewExecutor(WithStorage(stg)) + require.NoError(t, err) tc.testf(t, lse) defer func() { err = lse.Close() diff --git a/internal/storagenode/logstream/sequencer.go b/internal/storagenode/logstream/sequencer.go index 4b811ed4c..9c2cb6bf4 100644 --- a/internal/storagenode/logstream/sequencer.go +++ b/internal/storagenode/logstream/sequencer.go @@ -139,7 +139,7 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) // before sending tasks to writer. It prevents the above subtle case. // // send to committer - if err := sq.lse.cm.sendCommitWaitTask(ctx, cwts); err != nil { + if err := sq.lse.cm.sendCommitWaitTask(ctx, cwts, false /*ignoreSealing*/); err != nil { sq.logger.Error("could not send to committer", zap.Error(err)) sq.lse.esm.compareAndSwap(executorStateAppendable, executorStateSealing) st.wwg.done(err)