diff --git a/internal/storage/recovery_points.go b/internal/storage/recovery_points.go index 8164cf6bb..157b389ce 100644 --- a/internal/storage/recovery_points.go +++ b/internal/storage/recovery_points.go @@ -2,9 +2,11 @@ package storage import ( "errors" + "fmt" "github.com/cockroachdb/pebble" + "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/varlogpb" ) @@ -14,6 +16,10 @@ type RecoveryPoints struct { First *varlogpb.LogEntryMeta Last *varlogpb.LogEntryMeta } + UncommittedLLSN struct { + Begin types.LLSN + End types.LLSN + } } // ReadRecoveryPoints reads data necessary to restore the status of a log @@ -31,6 +37,16 @@ func (s *Storage) ReadRecoveryPoints() (rp RecoveryPoints, err error) { if err != nil { return } + + uncommittedBegin := types.MinLLSN + if cc := rp.LastCommitContext; cc != nil { + uncommittedBegin = cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) + } + rp.UncommittedLLSN.Begin, rp.UncommittedLLSN.End, err = s.readUncommittedLogEntryBoundaries(uncommittedBegin) + if err != nil { + return + } + return rp, nil } @@ -74,3 +90,29 @@ func (s *Storage) readLogEntryBoundaries() (first, last *varlogpb.LogEntryMeta, } return first, &lastLE.LogEntryMeta, nil } + +func (s *Storage) readUncommittedLogEntryBoundaries(uncommittedBegin types.LLSN) (begin, end types.LLSN, err error) { + dk := make([]byte, dataKeyLength) + dk = encodeDataKeyInternal(uncommittedBegin, dk) + it := s.dataDB.NewIter(&pebble.IterOptions{ + LowerBound: dk, + UpperBound: []byte{dataKeySentinelPrefix}, + }) + defer func() { + _ = it.Close() + }() + + if !it.First() { + return types.InvalidLLSN, types.InvalidLLSN, nil + } + + begin = decodeDataKey(it.Key()) + if begin != uncommittedBegin { + err = fmt.Errorf("unexpected uncommitted begin, expected %v but got %v", uncommittedBegin, begin) + return types.InvalidLLSN, types.InvalidLLSN, err + } + _ = it.Last() + end = decodeDataKey(it.Key()) + 1 + + return begin, end, nil +} diff --git a/internal/storage/testing.go b/internal/storage/testing.go index 6dc74fa6c..adb5cb05d 100644 --- a/internal/storage/testing.go +++ b/internal/storage/testing.go @@ -31,6 +31,15 @@ func TestGetUnderlyingDB(tb testing.TB, stg *Storage) (dataDB, commitDB *pebble. return stg.dataDB, stg.commitDB } +// TestWriteLogEntry stores data located by the llsn. The data is not committed +// because it does not store commits. +func TestWriteLogEntry(tb testing.TB, stg *Storage, llsn types.LLSN, data []byte) { + batch := stg.NewWriteBatch() + require.NoError(tb, batch.Set(llsn, data)) + require.NoError(tb, batch.Apply()) + require.NoError(tb, batch.Close()) +} + // TestAppendLogEntryWithoutCommitContext stores log entries without commit // context. func TestAppendLogEntryWithoutCommitContext(tb testing.TB, stg *Storage, llsn types.LLSN, glsn types.GLSN, data []byte) { diff --git a/internal/storagenode/logstream/committer.go b/internal/storagenode/logstream/committer.go index 8817c3421..a072997ef 100644 --- a/internal/storagenode/logstream/committer.go +++ b/internal/storagenode/logstream/committer.go @@ -50,7 +50,7 @@ func newCommitter(cfg committerConfig) (*committer, error) { // The commit wait task is pushed into commitWaitQ in the committer. // The writer calls this method internally to push commit wait tasks to the committer. // If the input list of commit wait tasks are nil or empty, it panics. -func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err error) { +func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue, ignoreSealing bool) (err error) { if cwts == nil { panic("log stream: committer: commit wait task list is nil") } @@ -73,7 +73,11 @@ func (cm *committer) sendCommitWaitTask(_ context.Context, cwts *listQueue) (err }() switch cm.lse.esm.load() { - case executorStateSealing, executorStateSealed, executorStateLearning: + case executorStateSealing: + if !ignoreSealing { + err = verrors.ErrSealed + } + case executorStateSealed, executorStateLearning: err = verrors.ErrSealed case executorStateClosed: err = verrors.ErrClosed diff --git a/internal/storagenode/logstream/committer_test.go b/internal/storagenode/logstream/committer_test.go index dbd063810..1f2a36ab1 100644 --- a/internal/storagenode/logstream/committer_test.go +++ b/internal/storagenode/logstream/committer_test.go @@ -55,25 +55,25 @@ func TestCommitter_ShouldNotAcceptTasksWhileNotAppendable(t *testing.T) { lse.esm.store(executorStateAppendable) assert.Panics(t, func() { - _ = cm.sendCommitWaitTask(context.Background(), cwts) + _ = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) }) assert.Panics(t, func() { - _ = cm.sendCommitWaitTask(context.Background(), nil) + _ = cm.sendCommitWaitTask(context.Background(), nil, false /*ignoreSealing*/) }) cwts.PushFront(&commitWaitTask{}) lse.esm.store(executorStateSealing) - err := cm.sendCommitWaitTask(context.Background(), cwts) + err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.Error(t, err) lse.esm.store(executorStateSealed) - err = cm.sendCommitWaitTask(context.Background(), cwts) + err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.Error(t, err) lse.esm.store(executorStateClosed) - err = cm.sendCommitWaitTask(context.Background(), cwts) + err = cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.Error(t, err) // sendCommitTask @@ -143,7 +143,7 @@ func TestCommitter_DrainCommitWaitQ(t *testing.T) { cwts := newListQueue() cwts.PushFront(newCommitWaitTask(newAppendWaitGroup(newWriteWaitGroup()))) - err := cm.sendCommitWaitTask(context.Background(), cwts) + err := cm.sendCommitWaitTask(context.Background(), cwts, false /*ignoreSealing*/) assert.NoError(t, err) assert.EqualValues(t, 1, cm.inflightCommitWait.Load()) diff --git a/internal/storagenode/logstream/executor.go b/internal/storagenode/logstream/executor.go index 3fd90ade6..b29dd7a12 100644 --- a/internal/storagenode/logstream/executor.go +++ b/internal/storagenode/logstream/executor.go @@ -148,6 +148,14 @@ func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error) { lse: lse, logger: lse.logger.Named("backup writer"), }) + if err != nil { + return + } + + err = lse.restoreCommitWaitTasks(rp) + if err != nil { + return + } return lse, err } @@ -201,7 +209,7 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL return err } - if err := lse.cm.sendCommitWaitTask(ctx, cwts); err != nil { + if err := lse.cm.sendCommitWaitTask(ctx, cwts, false /*ignoreSealing*/); err != nil { lse.logger.Error("could not send commit wait task list", zap.Error(err)) cwtListNode := cwts.Back() for cwtListNode != nil { @@ -437,6 +445,7 @@ func (lse *Executor) Commit(ctx context.Context, commitResult snpb.LogStreamComm } if types.Version(atomic.LoadUint64(&lse.prevCommitVersion)) != commitResult.Version { + lse.logger.Info("commit", zap.String("commit_result", commitResult.String())) if ce := lse.logger.Check(zap.DebugLevel, "commit"); ce != nil { ce.Write(zap.String("commit_result", commitResult.String())) } @@ -633,6 +642,8 @@ func (lse *Executor) isPrimary() bool { } func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStreamContext { + lse.logger.Info("recovery points", zap.Any("rp", rp)) + cc := rp.LastCommitContext first := rp.CommittedLogEntry.First last := rp.CommittedLogEntry.Last @@ -643,11 +654,21 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre lse.logger.Info("restore log stream context", zap.String("mode", restoreMode)) }() - if cc == nil && last == nil { // new log stream replica + // Log stream replica that has not committed any logs yet. For example, a + // new log stream replica. + if cc == nil && last == nil { + uncommittedLLSNEnd := lsc.uncommittedLLSNEnd.Load() + if !rp.UncommittedLLSN.End.Invalid() { + uncommittedLLSNEnd = rp.UncommittedLLSN.End + } + lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd) + return lsc } - if cc != nil && last == nil { // maybe trimmed + // Log stream replica with a commit context but no log entry. For instance, + // the log stream replica that trimmed all log entries. + if cc != nil && last == nil { restoreMode = "recovered" uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) uncommittedBegin := varlogpb.LogSequenceNumber{ @@ -655,11 +676,19 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre GLSN: cc.CommittedGLSNEnd, } lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/) - lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin) + + uncommittedLLSNEnd := uncommittedLLSNBegin + if !rp.UncommittedLLSN.End.Invalid() { + uncommittedLLSNEnd = rp.UncommittedLLSN.End + } + lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd) + return lsc } - if cc != nil && last != nil { // recovery + // Log stream replica that has a commit context and log entries. The commit + // context specifies the last log entry exactly. + if cc != nil && last != nil { restoreMode = "recovered" uncommittedLLSNBegin := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) if uncommittedLLSNBegin-1 == last.LLSN { @@ -668,7 +697,13 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre GLSN: last.GLSN + 1, } lsc.storeReportCommitBase(cc.Version, cc.HighWatermark, uncommittedBegin, false /*invalid*/) - lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin) + + uncommittedLLSNEnd := uncommittedLLSNBegin + if !rp.UncommittedLLSN.End.Invalid() { + uncommittedLLSNEnd = rp.UncommittedLLSN.End + } + lsc.uncommittedLLSNEnd.Store(uncommittedLLSNEnd) + lsc.setLocalLowWatermark(varlogpb.LogSequenceNumber{ LLSN: first.LLSN, GLSN: first.GLSN, @@ -677,7 +712,17 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre } } - // something wrong + // Log stream replica is invalid: + // + // - It has committed log entries but no commit context. + // - It has a commit context and committed log entries, but the commit + // context doesn't specify the last log entry. + // + // Invalid log stream replica should be resolved by synchronization from + // the sealed source replica. Log stream context should be set carefully to + // receive copies of log entries through synchronization. To restore the + // log stream replica restarted during the synchronization phase, the log + // stream context is initiated by the committed log entries. restoreMode = "invalid" lsc.storeReportCommitBase(types.InvalidVersion, types.InvalidGLSN, varlogpb.LogSequenceNumber{}, true /*invalid*/) if last != nil { @@ -694,3 +739,33 @@ func (lse *Executor) restoreLogStreamContext(rp storage.RecoveryPoints) *logStre } return lsc } + +func (lse *Executor) restoreCommitWaitTasks(rp storage.RecoveryPoints) error { + // Invalid log stream replica does not receive commit messages from the metadata repository since synchronization can resolve it only. + _, _, _, invalid := lse.lsc.reportCommitBase() + if invalid { + return nil + } + + // No uncommitted logs, so no CommitWaitTasks. + if rp.UncommittedLLSN.Begin.Invalid() { + return nil + } + + cwts := newListQueue() + for i := rp.UncommittedLLSN.Begin; i < rp.UncommittedLLSN.End; i++ { + cwts.PushFront(newCommitWaitTask(nil)) + } + err := lse.cm.sendCommitWaitTask(context.Background(), cwts, true /*ignoreSealing*/) + if err != nil { + lse.logger.Error("could not send commit wait task list", zap.Error(err)) + cwtListNode := cwts.Back() + for cwtListNode != nil { + cwt := cwtListNode.value.(*commitWaitTask) + cwt.release() + cwtListNode = cwtListNode.Prev() + } + return err + } + return nil +} diff --git a/internal/storagenode/logstream/executor_test.go b/internal/storagenode/logstream/executor_test.go index 3971e849b..3fec0130d 100644 --- a/internal/storagenode/logstream/executor_test.go +++ b/internal/storagenode/logstream/executor_test.go @@ -3739,7 +3739,7 @@ func TestExecutorRestore(t *testing.T) { // data : none // commit: none // cmtctx: none - name: "NoLogEntry", + name: "ValidLogStreamReplica_NoCommittedLog_NoCommitContext", pref: func(t *testing.T, lse *Executor) { }, testf: func(t *testing.T, lse *Executor) { @@ -3776,11 +3776,86 @@ func TestExecutorRestore(t *testing.T) { require.True(t, localHWM.GLSN.Invalid()) }, }, + { + // data : 1 2 3 4 5 6 7 8 9 10 + // commit: none + // cmtctx: none + name: "ValidLogStreamReplica_NoCommittedLog_NoCommitContext_UncommittedLogs", + pref: func(t *testing.T, lse *Executor) { + for i := 1; i <= 10; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 1, + UncommittedLLSNLength: 10, + Version: 0, + HighWatermark: 0, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.Version(0), lsrmd.Version) + require.Equal(t, types.GLSN(0), lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.InvalidVersion, ver) + require.Equal(t, types.InvalidGLSN, hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.MinLLSN, GLSN: types.MinGLSN, + }, uncommittedBegin) + require.False(t, invalid) + require.Equal(t, types.LLSN(11), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.True(t, localLWM.LLSN.Invalid()) + require.True(t, localLWM.GLSN.Invalid()) + require.True(t, localHWM.LLSN.Invalid()) + require.True(t, localHWM.GLSN.Invalid()) + + // Commit uncommitted logs after restarting. + require.EventuallyWithT(t, func(c *assert.CollectT) { + _ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{ + TopicID: lse.tpid, + LogStreamID: lse.lsid, + CommittedLLSNOffset: 1, + CommittedGLSNOffset: 1, + CommittedGLSNLength: 5, + Version: 1, + HighWatermark: 5, + }) + rpt, err := lse.Report(context.Background()) + assert.NoError(c, err) + assert.EqualValues(c, 1, rpt.Version) + }, time.Second, 10*time.Millisecond) + + lss, lhwm, err := lse.Seal(context.Background(), 5) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealed, lss) + require.EqualValues(t, 5, lhwm) + + rpt, err = lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 6, + UncommittedLLSNLength: 0, + Version: 1, + HighWatermark: 5, + }, rpt) + }, + }, { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 10 - name: "TenLogEntries", + name: "ValidLogStreamReplica_CommittedLogs_CommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3829,11 +3904,98 @@ func TestExecutorRestore(t *testing.T) { require.Equal(t, types.GLSN(10), localHWM.GLSN) }, }, + { + // data : 1, 2, 3, ... , 10, 11, 12, 13, ...., 20 + // commit: 1, 2, 3, ...., 10 + // cmtctx: 10 + name: "ValidLogStreamReplica_CommittedLogs_CommitContext", + pref: func(t *testing.T, lse *Executor) { + for i := 0; i < 10; i++ { + llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) + storage.TestAppendLogEntryWithoutCommitContext(t, lse.stg, llsn, glsn, []byte{}) + } + storage.TestSetCommitContext(t, lse.stg, storage.CommitContext{ + Version: 10, + HighWatermark: 10, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + for i := 11; i <= 20; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 11, + UncommittedLLSNLength: 10, + Version: 10, + HighWatermark: 10, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.Version(10), lsrmd.Version) + require.Equal(t, types.GLSN(10), lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 1, GLSN: 1}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 10, GLSN: 10}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.Version(10), ver) + require.Equal(t, types.GLSN(10), hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(11), + GLSN: types.GLSN(11), + }, uncommittedBegin) + require.False(t, invalid) + require.Equal(t, types.LLSN(21), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.Equal(t, types.LLSN(1), localLWM.LLSN) + require.Equal(t, types.GLSN(1), localLWM.GLSN) + require.Equal(t, types.LLSN(10), localHWM.LLSN) + require.Equal(t, types.GLSN(10), localHWM.GLSN) + + // Commit uncommitted logs after restarting. + require.EventuallyWithT(t, func(c *assert.CollectT) { + _ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{ + TopicID: lse.tpid, + LogStreamID: lse.lsid, + CommittedLLSNOffset: 11, + CommittedGLSNOffset: 11, + CommittedGLSNLength: 5, + Version: 11, + HighWatermark: 15, + }) + rpt, err := lse.Report(context.Background()) + assert.NoError(c, err) + assert.EqualValues(c, 11, rpt.Version) + }, time.Second, 10*time.Millisecond) + + lss, lhwm, err := lse.Seal(context.Background(), 15) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealed, lss) + require.EqualValues(t, 15, lhwm) + + rpt, err = lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 16, + UncommittedLLSNLength: 0, + Version: 11, + HighWatermark: 15, + }, rpt) + }, + }, { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 10 - name: "TenLogEntriesFollowedByEmptyCommitContext", + name: "ValidLogStreamReplica_CommittedLogs_EmptyCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3886,7 +4048,7 @@ func TestExecutorRestore(t *testing.T) { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: none - name: "TenLogEntriesWithoutCommitContext", + name: "InvalidLogStreamReplica_CommittedLogs_NoCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3928,11 +4090,65 @@ func TestExecutorRestore(t *testing.T) { require.Equal(t, types.GLSN(10), localHWM.GLSN) }, }, + { + // data : 1, 2, 3, ... , 10, 11, 12, 13, ..., 20 + // commit: 1, 2, 3, ...., 10 + // cmtctx: none + name: "InvalidLogStreamReplica_CommittedLogs_NoCommitContext_UncommittedLogs", + pref: func(t *testing.T, lse *Executor) { + for i := 0; i < 10; i++ { + llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) + storage.TestAppendLogEntryWithoutCommitContext(t, lse.stg, llsn, glsn, []byte{}) + } + for i := 11; i <= 20; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 0, + UncommittedLLSNLength: 0, + Version: 0, + HighWatermark: 0, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.InvalidVersion, lsrmd.Version) + require.Equal(t, types.InvalidGLSN, lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 1, GLSN: 1}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 10, GLSN: 10}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.InvalidVersion, ver) + require.Equal(t, types.InvalidGLSN, hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(11), + GLSN: types.GLSN(11), + }, uncommittedBegin) + require.True(t, invalid) + require.Equal(t, types.LLSN(11), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.Equal(t, types.LLSN(1), localLWM.LLSN) + require.Equal(t, types.GLSN(1), localLWM.GLSN) + require.Equal(t, types.LLSN(10), localHWM.LLSN) + require.Equal(t, types.GLSN(10), localHWM.GLSN) + + lss, lhwm, err := lse.Seal(context.Background(), 10) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lss) + require.EqualValues(t, 10, lhwm) + }, + }, { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 20 - name: "TenLogEntriesWithFutureCommitContext", + name: "InvalidLogStreamReplica_CommittedLogs_FutureCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -3985,7 +4201,7 @@ func TestExecutorRestore(t *testing.T) { // data : 1, 2, 3, ... , 10 // commit: 1, 2, 3, ...., 10 // cmtctx: 5 - name: "TenLogEntriesWithPastCommitContext", + name: "InvalidLogStreamReplica_CommittedLogs_PastCommitContext", pref: func(t *testing.T, lse *Executor) { for i := 0; i < 10; i++ { llsn, glsn := types.LLSN(i+1), types.GLSN(i+1) @@ -4038,7 +4254,7 @@ func TestExecutorRestore(t *testing.T) { // data : 5, ... , 10 // commit: 5, ...., 10 // cmtctx: 10 - name: "TrimmedPrefix", + name: "ValidLogStreamReplica_PrefixTrimmed_CommittedLogs_CommitContext", pref: func(t *testing.T, lse *Executor) { for i := 5; i <= 10; i++ { llsn, glsn := types.LLSN(i), types.GLSN(i) @@ -4091,7 +4307,7 @@ func TestExecutorRestore(t *testing.T) { // data : none // commit: none // cmtctx: 10 - name: "TrimmedAll", + name: "ValidLogStreamReplica_AllTrimmed_NoCommittedLogs_CommitContext", pref: func(t *testing.T, lse *Executor) { storage.TestSetCommitContext(t, lse.stg, storage.CommitContext{ Version: 10, @@ -4136,6 +4352,89 @@ func TestExecutorRestore(t *testing.T) { require.Equal(t, types.GLSN(0), localHWM.GLSN) }, }, + { + // data : 11, 12, 13, ..., 20 + // commit: none + // cmtctx: 10 + name: "ValidLogStreamReplica_AllTrimmed_NoCommittedLogs_CommitContext_UncommittedLogs", + pref: func(t *testing.T, lse *Executor) { + storage.TestSetCommitContext(t, lse.stg, storage.CommitContext{ + Version: 10, + HighWatermark: 10, + CommittedGLSNBegin: 1, + CommittedGLSNEnd: 11, + CommittedLLSNBegin: 1, + }) + for i := 11; i <= 20; i++ { + storage.TestWriteLogEntry(t, lse.stg, types.LLSN(i), []byte{}) + } + }, + testf: func(t *testing.T, lse *Executor) { + rpt, err := lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 11, + UncommittedLLSNLength: 10, + Version: 10, + HighWatermark: 10, + }, rpt) + + lsrmd, err := lse.Metadata() + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status) + require.Equal(t, types.Version(10), lsrmd.Version) + require.Equal(t, types.GLSN(10), lsrmd.GlobalHighWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalLowWatermark) + require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalHighWatermark) + + ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase() + require.Equal(t, types.Version(10), ver) + require.Equal(t, types.GLSN(10), hwm) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(11), + GLSN: types.GLSN(11), + }, uncommittedBegin) + require.False(t, invalid) + require.Equal(t, types.LLSN(21), lse.lsc.uncommittedLLSNEnd.Load()) + localLWM, localHWM, _ := lse.lsc.localWatermarks() + require.Equal(t, types.LLSN(0), localLWM.LLSN) + require.Equal(t, types.GLSN(0), localLWM.GLSN) + require.Equal(t, types.LLSN(0), localHWM.LLSN) + require.Equal(t, types.GLSN(0), localHWM.GLSN) + + // Commit uncommitted logs after restarting. + require.EventuallyWithT(t, func(c *assert.CollectT) { + _ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{ + TopicID: lse.tpid, + LogStreamID: lse.lsid, + CommittedLLSNOffset: 11, + CommittedGLSNOffset: 11, + CommittedGLSNLength: 5, + Version: 11, + HighWatermark: 15, + }) + rpt, err := lse.Report(context.Background()) + assert.NoError(c, err) + assert.EqualValues(c, 11, rpt.Version) + }, time.Second, 10*time.Millisecond) + + lss, lhwm, err := lse.Seal(context.Background(), 15) + require.NoError(t, err) + require.Equal(t, varlogpb.LogStreamStatusSealed, lss) + require.EqualValues(t, 15, lhwm) + + rpt, err = lse.Report(context.Background()) + require.NoError(t, err) + require.Equal(t, snpb.LogStreamUncommitReport{ + LogStreamID: lse.lsid, + UncommittedLLSNOffset: 16, + UncommittedLLSNLength: 0, + Version: 11, + HighWatermark: 15, + }, rpt) + }, + }, } for _, tc := range tcs { @@ -4146,6 +4445,7 @@ func TestExecutorRestore(t *testing.T) { func() { stg := storage.TestNewStorage(t, storage.WithPath(storagePath)) lse, err := NewExecutor(WithStorage(stg)) + require.NoError(t, err) tc.pref(t, lse) defer func() { err = lse.Close() @@ -4156,6 +4456,7 @@ func TestExecutorRestore(t *testing.T) { func() { stg := storage.TestNewStorage(t, storage.WithPath(storagePath)) lse, err := NewExecutor(WithStorage(stg)) + require.NoError(t, err) tc.testf(t, lse) defer func() { err = lse.Close() diff --git a/internal/storagenode/logstream/sequencer.go b/internal/storagenode/logstream/sequencer.go index 4b811ed4c..9c2cb6bf4 100644 --- a/internal/storagenode/logstream/sequencer.go +++ b/internal/storagenode/logstream/sequencer.go @@ -139,7 +139,7 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) // before sending tasks to writer. It prevents the above subtle case. // // send to committer - if err := sq.lse.cm.sendCommitWaitTask(ctx, cwts); err != nil { + if err := sq.lse.cm.sendCommitWaitTask(ctx, cwts, false /*ignoreSealing*/); err != nil { sq.logger.Error("could not send to committer", zap.Error(err)) sq.lse.esm.compareAndSwap(executorStateAppendable, executorStateSealing) st.wwg.done(err)