diff --git a/internal/metarepos/dummy_storagenode_client_factory_impl.go b/internal/metarepos/dummy_storagenode_client_factory_impl.go index aacf985b2..94b16f546 100644 --- a/internal/metarepos/dummy_storagenode_client_factory_impl.go +++ b/internal/metarepos/dummy_storagenode_client_factory_impl.go @@ -351,6 +351,14 @@ func (r *DummyStorageNodeClient) getKnownVersion(idx int) types.Version { return r.knownVersion[idx] } +func (r *DummyStorageNodeClient) makeInvalid(idx int) { + r.mu.Lock() + defer r.mu.Unlock() + + r.knownVersion[idx] = 0 + r.uncommittedLLSNOffset[idx] = 0 +} + func (fac *DummyStorageNodeClientFactory) crashRPC(snID types.StorageNodeID) { f, ok := fac.m.Load(snID) if !ok { diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index a229371a5..f0614173b 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -798,6 +798,10 @@ func (mr *RaftMetadataRepository) applyReport(reports *mrpb.Reports) error { snID := r.StorageNodeID LS: for _, u := range r.UncommitReport { + if u.Invalid() { + continue LS + } + s, ok := mr.storage.LookupUncommitReport(u.LogStreamID, snID) if !ok { continue LS diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index 4ab4c4f78..6c0ef6fdc 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -316,7 +316,7 @@ func (clus *metadataRepoCluster) initDummyStorageNode(nrSN, nrTopic int) error { } for i := 0; i < nrSN; i++ { - snID := types.StorageNodeID(i) + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -415,7 +415,7 @@ func TestMRApplyReport(t *testing.T) { snIDs := make([]types.StorageNodeID, rep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -426,8 +426,8 @@ func TestMRApplyReport(t *testing.T) { err := mr.storage.registerStorageNode(sn) So(err, ShouldBeNil) } - lsID := types.LogStreamID(0) - notExistSnID := types.StorageNodeID(rep) + lsID := types.MinLogStreamID + notExistSnID := types.MinStorageNodeID + types.StorageNodeID(rep) report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 2) mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. @@ -489,6 +489,62 @@ func TestMRApplyReport(t *testing.T) { }) } +func TestMRApplyInvalidReport(t *testing.T) { + Convey("Given LogStream", t, func(ctx C) { + rep := 2 + clus := newMetadataRepoCluster(1, rep, false, false) + Reset(func() { + clus.closeNoErrors(t) + }) + mr := clus.nodes[0] + + tn := &varlogpb.TopicDescriptor{ + TopicID: types.TopicID(1), + Status: varlogpb.TopicStatusRunning, + } + + err := mr.storage.registerTopic(tn) + So(err, ShouldBeNil) + + snIDs := make([]types.StorageNodeID, rep) + for i := range snIDs { + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) + + sn := &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: snIDs[i], + }, + } + + err := mr.storage.registerStorageNode(sn) + So(err, ShouldBeNil) + } + + lsID := types.MinLogStreamID + ls := makeLogStream(types.TopicID(1), lsID, snIDs) + err = mr.storage.registerLogStream(ls) + So(err, ShouldBeNil) + + for _, snID := range snIDs { + report := makeUncommitReport(snID, types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 1) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. + + r, ok := mr.storage.LookupUncommitReport(lsID, snID) + So(ok, ShouldBeTrue) + So(r.Invalid(), ShouldBeFalse) + } + + Convey("When Some LogStream reports invalid report", func(ctx C) { + report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.InvalidLLSN, 2) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. + + r, ok := mr.storage.LookupUncommitReport(lsID, snIDs[0]) + So(ok, ShouldBeTrue) + So(r.Invalid(), ShouldBeFalse) + }) + }) +} + func TestMRCalculateCommit(t *testing.T) { Convey("Calculate commit", t, func(ctx C) { clus := newMetadataRepoCluster(1, 2, false, false) @@ -499,7 +555,7 @@ func TestMRCalculateCommit(t *testing.T) { snIDs := make([]types.StorageNodeID, 2) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ StorageNodeID: snIDs[i], @@ -513,7 +569,7 @@ func TestMRCalculateCommit(t *testing.T) { err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs) err = mr.storage.registerLogStream(ls) So(err, ShouldBeNil) @@ -573,7 +629,7 @@ func TestMRGlobalCommit(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -591,7 +647,7 @@ func TestMRGlobalCommit(t *testing.T) { lsIds := make([]types.LogStreamID, 2) for i := range lsIds { - lsIds[i] = types.LogStreamID(i) + lsIds[i] = types.MinLogStreamID + types.LogStreamID(i) } for i, lsID := range lsIds { @@ -685,7 +741,7 @@ func TestMRGlobalCommitConsistency(t *testing.T) { snIDs := make([]types.StorageNodeID, rep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -706,7 +762,7 @@ func TestMRGlobalCommitConsistency(t *testing.T) { lsIDs := make([]types.LogStreamID, nrLS) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } for _, lsID := range lsIDs { @@ -762,8 +818,8 @@ func TestMRSimpleReportNCommit(t *testing.T) { return clus.healthCheckAll() }), ShouldBeTrue) - snID := types.StorageNodeID(0) - snIDs := make([]types.StorageNodeID, 1) + snID := types.MinStorageNodeID + snIDs := make([]types.StorageNodeID, 0, 1) snIDs = append(snIDs, snID) lsID := types.LogStreamID(snID) @@ -955,7 +1011,7 @@ func TestMRGetLastCommitted(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -970,7 +1026,7 @@ func TestMRGetLastCommitted(t *testing.T) { lsIds := make([]types.LogStreamID, 2) for i := range lsIds { - lsIds[i] = types.LogStreamID(i) + lsIds[i] = types.MinLogStreamID + types.LogStreamID(i) } err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) @@ -1110,7 +1166,7 @@ func TestMRSeal(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1125,7 +1181,7 @@ func TestMRSeal(t *testing.T) { lsIDs := make([]types.LogStreamID, 2) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) @@ -1198,7 +1254,7 @@ func TestMRUnseal(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1213,7 +1269,7 @@ func TestMRUnseal(t *testing.T) { lsIDs := make([]types.LogStreamID, 2) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) @@ -1334,7 +1390,7 @@ func TestMRUpdateLogStream(t *testing.T) { snIDs := make([]types.StorageNodeID, nrStorageNode) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1349,7 +1405,7 @@ func TestMRUpdateLogStream(t *testing.T) { err := mr.RegisterTopic(context.TODO(), types.TopicID(1)) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs[0:1]) err = mr.RegisterLogStream(context.TODO(), ls) So(err, ShouldBeNil) @@ -1407,7 +1463,7 @@ func TestMRFailoverLeaderElection(t *testing.T) { snIDs := make([]types.StorageNodeID, nrRep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1425,7 +1481,7 @@ func TestMRFailoverLeaderElection(t *testing.T) { err := clus.nodes[0].RegisterTopic(context.TODO(), types.TopicID(1)) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs) rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50)) @@ -1476,7 +1532,7 @@ func TestMRFailoverJoinNewNode(t *testing.T) { snIDs := make([]types.StorageNodeID, nrRep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1494,7 +1550,7 @@ func TestMRFailoverJoinNewNode(t *testing.T) { err := clus.nodes[0].RegisterTopic(context.TODO(), types.TopicID(1)) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs) rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(200)) @@ -2135,7 +2191,7 @@ func TestMRFailoverRecoverReportCollector(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, nrRep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*nrStorageNode + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*nrStorageNode+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -2159,7 +2215,7 @@ func TestMRFailoverRecoverReportCollector(t *testing.T) { So(err, ShouldBeNil) for i := 0; i < nrLogStream; i++ { - lsID := types.LogStreamID(i) + lsID := types.MinLogStreamID + types.LogStreamID(i) ls := makeLogStream(types.TopicID(1), lsID, snIDs[i%nrStorageNode]) rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50)) @@ -2367,7 +2423,7 @@ func TestMRUnregisterTopic(t *testing.T) { lsIDs := make([]types.LogStreamID, nrLS) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } for _, lsID := range lsIDs { @@ -2397,7 +2453,7 @@ func TestMRTopicLastHighWatermark(t *testing.T) { snIDs := make([]types.StorageNodeID, rep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -2411,7 +2467,7 @@ func TestMRTopicLastHighWatermark(t *testing.T) { topicLogStreamID := make(map[types.TopicID][]types.LogStreamID) topicID := types.TopicID(1) - lsID := types.LogStreamID(1) + lsID := types.MinLogStreamID for i := 0; i < nrTopics; i++ { err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: topicID}) So(err, ShouldBeNil) diff --git a/internal/metarepos/report_collector.go b/internal/metarepos/report_collector.go index 07ff51d18..50c8e4626 100644 --- a/internal/metarepos/report_collector.go +++ b/internal/metarepos/report_collector.go @@ -698,11 +698,13 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse cur := report.UncommitReports[i] prev := prevReport.UncommitReports[j] - if cur.LogStreamID < prev.LogStreamID { - diff.UncommitReports = append(diff.UncommitReports, cur) + if cur.Invalid() { i++ - } else if prev.LogStreamID < cur.LogStreamID { + } else if prev.Invalid() || prev.LogStreamID < cur.LogStreamID { j++ + } else if cur.LogStreamID < prev.LogStreamID { + diff.UncommitReports = append(diff.UncommitReports, cur) + i++ } else { if cur.Version < prev.Version { fmt.Printf("invalid report prev:%v, cur:%v\n", @@ -721,8 +723,11 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse } } - if i < report.Len() { - diff.UncommitReports = append(diff.UncommitReports, report.UncommitReports[i:]...) + for ; i < report.Len(); i++ { + cur := report.UncommitReports[i] + if !cur.Invalid() { + diff.UncommitReports = append(diff.UncommitReports, cur) + } } for _, r := range diff.UncommitReports { diff --git a/internal/metarepos/report_collector_test.go b/internal/metarepos/report_collector_test.go index 16d4c67dd..9d2ac45dd 100644 --- a/internal/metarepos/report_collector_test.go +++ b/internal/metarepos/report_collector_test.go @@ -163,9 +163,9 @@ func TestRegisterLogStream(t *testing.T) { reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() - snID := types.StorageNodeID(0) - lsID := types.LogStreamID(0) - topicID := types.TopicID(0) + snID := types.MinStorageNodeID + lsID := types.MinLogStreamID + topicID := types.MinTopicID Convey("registeration LogStream with not existing storageNodeID should be failed", func() { err := reportCollector.RegisterLogStream(topicID, snID, lsID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) @@ -206,8 +206,8 @@ func TestUnregisterStorageNode(t *testing.T) { defer reportCollector.Close() snID := types.StorageNodeID(time.Now().UnixNano()) - lsID := types.LogStreamID(0) - topicID := types.TopicID(0) + lsID := types.MinLogStreamID + topicID := types.MinTopicID sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -261,9 +261,9 @@ func TestUnregisterLogStream(t *testing.T) { reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() - snID := types.StorageNodeID(0) - lsID := types.LogStreamID(0) - topicID := types.TopicID(0) + snID := types.MinStorageNodeID + lsID := types.MinLogStreamID + topicID := types.MinTopicID Convey("unregisteration LogStream with not existing storageNodeID should be failed", func() { err := reportCollector.UnregisterLogStream(snID, lsID) @@ -478,9 +478,10 @@ func TestReport(t *testing.T) { }(nrStorage) for i := 0; i < nrStorage; i++ { + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(i), + StorageNodeID: snID, }, } @@ -494,6 +495,54 @@ func TestReport(t *testing.T) { }) } +func TestReportIgnore(t *testing.T) { + Convey("ReportCollector should collect report from registered storage node", t, func() { + a := NewDummyStorageNodeClientFactory(1, true) + mr := NewDummyMetadataRepository(a) + + logger, _ := zap.NewDevelopment() + reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. + defer reportCollector.Close() + + snID := types.MinStorageNodeID + sn := &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: snID, + }, + } + + err := reportCollector.RegisterStorageNode(sn) + So(err, ShouldBeNil) + + <-mr.reportC + + reporterClient := a.lookupClient(sn.StorageNodeID) + reporterClient.makeInvalid(0) + + r := <-mr.reportC + for _, ur := range r.UncommitReports { + So(ur.Invalid(), ShouldBeTrue) + } + + after := time.After(DefaultReportRefreshTime / 2) + + Loop: + for { + select { + case <-after: + break Loop + case r := <-mr.reportC: + for _, ur := range r.UncommitReports { + So(ur.LogStreamID.Invalid(), ShouldBeFalse) + So(ur.Invalid(), ShouldBeFalse) + } + } + } + + }) +} + func TestReportDedup(t *testing.T) { Convey("Given ReportCollector", t, func() { a := NewDummyStorageNodeClientFactory(3, true) @@ -506,7 +555,7 @@ func TestReportDedup(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } @@ -520,26 +569,26 @@ func TestReportDedup(t *testing.T) { reporterClient := a.lookupClient(sn.StorageNodeID) reporterClient.increaseUncommitted(0) - Convey("Then report should include logStream[0]", func() { + Convey("Then report should include logStream[1]", func() { r = <-mr.reportC So(r.Len(), ShouldEqual, 1) - So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.LogStreamID(0)) + So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.MinLogStreamID) - Convey("When logStream[1] increase uncommitted", func() { + Convey("When logStream[2] increase uncommitted", func() { reporterClient.increaseUncommitted(1) - Convey("Then report should include logStream[1]", func() { + Convey("Then report should include logStream[2]", func() { r = <-mr.reportC So(r.Len(), ShouldEqual, 1) - So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.LogStreamID(1)) + So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.MinLogStreamID+types.LogStreamID(1)) - Convey("When logStream[2] increase uncommitted", func() { + Convey("When logStream[3] increase uncommitted", func() { reporterClient.increaseUncommitted(2) - Convey("Then report should include logStream[2]", func() { + Convey("Then report should include logStream[3]", func() { r = <-mr.reportC So(r.Len(), ShouldEqual, 1) - So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.LogStreamID(2)) + So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.MinLogStreamID+types.LogStreamID(2)) Convey("After reportAll interval, report should include all", func() { r = <-mr.reportC @@ -574,9 +623,10 @@ func TestReportCollectorSeal(t *testing.T) { }) for i := 0; i < nrStorage; i++ { + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(i), + StorageNodeID: snID, }, } @@ -593,12 +643,14 @@ func TestReportCollectorSeal(t *testing.T) { var sealedLSID types.LogStreamID for i := 0; i < nrLogStream; i++ { - err := reportCollector.RegisterLogStream(topicID, types.StorageNodeID(i%nrStorage), types.LogStreamID(i), types.InvalidVersion, varlogpb.LogStreamStatusRunning) + snID := types.MinStorageNodeID + types.StorageNodeID(i%nrStorage) + lsID := types.MinLogStreamID + types.LogStreamID(i) + err := reportCollector.RegisterLogStream(topicID, snID, lsID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) if err != nil { t.Fatal(err) } - sealedLSID = types.LogStreamID(i) + sealedLSID = lsID } gls := cc.newDummyCommitResults(knownVer+1, glsn, nrStorage) @@ -717,12 +769,13 @@ func (cc *dummyCommitContext) newDummyCommitResults(ver types.Version, baseGLSN glsn := baseGLSN for i := 0; i < nrLogStream; i++ { numUncommitLen := 0 - if !cc.sealed(types.LogStreamID(i)) { + lsID := types.MinLogStreamID + types.LogStreamID(i) + if !cc.sealed(lsID) { numUncommitLen = 1 } r := snpb.LogStreamCommitResult{ - LogStreamID: types.LogStreamID(i), + LogStreamID: lsID, CommittedGLSNOffset: glsn, CommittedLLSNOffset: cc.committedLLSNBeginOffset[i], CommittedGLSNLength: uint64(numUncommitLen), @@ -756,9 +809,10 @@ func TestCommit(t *testing.T) { }) for i := 0; i < nrStorage; i++ { + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(i), + StorageNodeID: snID, }, } @@ -773,7 +827,9 @@ func TestCommit(t *testing.T) { } for i := 0; i < nrLogStream; i++ { - err := reportCollector.RegisterLogStream(topicID, types.StorageNodeID(i%nrStorage), types.LogStreamID(i), types.InvalidVersion, varlogpb.LogStreamStatusRunning) + snID := types.MinStorageNodeID + types.StorageNodeID(i%nrStorage) + lsID := types.MinLogStreamID + types.LogStreamID(i) + err := reportCollector.RegisterLogStream(topicID, snID, lsID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) if err != nil { t.Fatal(err) } @@ -838,9 +894,11 @@ func TestCommit(t *testing.T) { logger.Debug("trimGLS", zap.Any("knowVer", knownVer), zap.Any("trimVer", trimVer), zap.Any("result", len(mr.m))) Convey("ReportCollector should send proper commit against new StorageNode", func() { + snID := types.MinStorageNodeID + types.StorageNodeID(nrStorage) + lsID := types.MinLogStreamID + types.LogStreamID(nrLogStream) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(nrStorage), + StorageNodeID: snID, }, } @@ -849,7 +907,7 @@ func TestCommit(t *testing.T) { nrStorage += 1 - err = reportCollector.RegisterLogStream(topicID, sn.StorageNodeID, types.LogStreamID(nrLogStream), knownVer, varlogpb.LogStreamStatusRunning) + err = reportCollector.RegisterLogStream(topicID, sn.StorageNodeID, lsID, knownVer, varlogpb.LogStreamStatusRunning) So(err, ShouldBeNil) nrLogStream += 1 @@ -899,7 +957,7 @@ func TestCommitWithDelay(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } @@ -912,7 +970,7 @@ func TestCommitWithDelay(t *testing.T) { return a.lookupClient(sn.StorageNodeID) != nil }), ShouldBeTrue) - err = reportCollector.RegisterLogStream(topicID, types.StorageNodeID(0), types.LogStreamID(0), types.InvalidVersion, varlogpb.LogStreamStatusRunning) + err = reportCollector.RegisterLogStream(topicID, types.MinStorageNodeID, types.MinLogStreamID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) if err != nil { t.Fatal(err) } @@ -1012,7 +1070,7 @@ func TestRPCFail(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } @@ -1073,7 +1131,7 @@ func TestReporterClientReconnect(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } diff --git a/proto/snpb/log_stream_reporter.go b/proto/snpb/log_stream_reporter.go index 99dd86cbf..c4d633d29 100644 --- a/proto/snpb/log_stream_reporter.go +++ b/proto/snpb/log_stream_reporter.go @@ -8,6 +8,13 @@ import ( var InvalidLogStreamUncommitReport = LogStreamUncommitReport{} var InvalidLogStreamCommitResult = LogStreamCommitResult{} +// Invalid returns whether the LogStreamUncommitReport is acceptable. +// LogStreamUncommitReport with invalid logStream or invalid uncommittedLLSNOffset +// is not acceptable. MetadataRepository ignores these reports. +func (m *LogStreamUncommitReport) Invalid() bool { + return m.GetLogStreamID().Invalid() || m.GetUncommittedLLSNOffset().Invalid() +} + func (m *LogStreamUncommitReport) UncommittedLLSNEnd() types.LLSN { if m == nil { return types.InvalidLLSN