From de83a0869b5f43bf84b27e0b6f0df7fc81a77c76 Mon Sep 17 00:00:00 2001 From: "pharrell.jang" Date: Fri, 23 Sep 2022 23:59:32 +0900 Subject: [PATCH 1/2] feat(metarepos): ignore invalid report --- .../dummy_storagenode_client_factory_impl.go | 8 ++ .../metarepos/raft_metadata_repository.go | 4 + .../raft_metadata_repository_test.go | 116 ++++++++++++----- internal/metarepos/report_collector.go | 15 ++- internal/metarepos/report_collector_test.go | 121 +++++++++++++----- proto/snpb/log_stream_reporter.go | 6 + 6 files changed, 205 insertions(+), 65 deletions(-) diff --git a/internal/metarepos/dummy_storagenode_client_factory_impl.go b/internal/metarepos/dummy_storagenode_client_factory_impl.go index aacf985b2..94b16f546 100644 --- a/internal/metarepos/dummy_storagenode_client_factory_impl.go +++ b/internal/metarepos/dummy_storagenode_client_factory_impl.go @@ -351,6 +351,14 @@ func (r *DummyStorageNodeClient) getKnownVersion(idx int) types.Version { return r.knownVersion[idx] } +func (r *DummyStorageNodeClient) makeInvalid(idx int) { + r.mu.Lock() + defer r.mu.Unlock() + + r.knownVersion[idx] = 0 + r.uncommittedLLSNOffset[idx] = 0 +} + func (fac *DummyStorageNodeClientFactory) crashRPC(snID types.StorageNodeID) { f, ok := fac.m.Load(snID) if !ok { diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index a229371a5..f0614173b 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -798,6 +798,10 @@ func (mr *RaftMetadataRepository) applyReport(reports *mrpb.Reports) error { snID := r.StorageNodeID LS: for _, u := range r.UncommitReport { + if u.Invalid() { + continue LS + } + s, ok := mr.storage.LookupUncommitReport(u.LogStreamID, snID) if !ok { continue LS diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index 4ab4c4f78..806a59724 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -316,7 +316,7 @@ func (clus *metadataRepoCluster) initDummyStorageNode(nrSN, nrTopic int) error { } for i := 0; i < nrSN; i++ { - snID := types.StorageNodeID(i) + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -415,7 +415,7 @@ func TestMRApplyReport(t *testing.T) { snIDs := make([]types.StorageNodeID, rep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -426,8 +426,8 @@ func TestMRApplyReport(t *testing.T) { err := mr.storage.registerStorageNode(sn) So(err, ShouldBeNil) } - lsID := types.LogStreamID(0) - notExistSnID := types.StorageNodeID(rep) + lsID := types.MinLogStreamID + notExistSnID := types.MinStorageNodeID + types.StorageNodeID(rep) report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 2) mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. @@ -489,6 +489,64 @@ func TestMRApplyReport(t *testing.T) { }) } +func TestMRApplyInvalidReport(t *testing.T) { + Convey("Given LogStream", t, func(ctx C) { + rep := 2 + clus := newMetadataRepoCluster(1, rep, false, false) + Reset(func() { + clus.closeNoErrors(t) + }) + mr := clus.nodes[0] + + tn := &varlogpb.TopicDescriptor{ + TopicID: types.TopicID(1), + Status: varlogpb.TopicStatusRunning, + } + + err := mr.storage.registerTopic(tn) + So(err, ShouldBeNil) + + snIDs := make([]types.StorageNodeID, rep) + for i := range snIDs { + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) + + sn := &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: snIDs[i], + }, + } + + err := mr.storage.registerStorageNode(sn) + So(err, ShouldBeNil) + } + err = mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) + So(err, ShouldBeNil) + + lsID := types.MinLogStreamID + ls := makeLogStream(types.TopicID(1), lsID, snIDs) + err = mr.storage.registerLogStream(ls) + So(err, ShouldBeNil) + + for _, snID := range snIDs { + report := makeUncommitReport(snID, types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 1) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. + + r, ok := mr.storage.LookupUncommitReport(lsID, snID) + So(ok, ShouldBeTrue) + So(r.Invalid(), ShouldBeFalse) + } + + Convey("When Some LogStream reports invalid report", func(ctx C) { + report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.InvalidLLSN, 2) + mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned. + + r, ok := mr.storage.LookupUncommitReport(lsID, snIDs[0]) + So(ok, ShouldBeTrue) + So(r.Invalid(), ShouldBeFalse) + }) + }) +} + func TestMRCalculateCommit(t *testing.T) { Convey("Calculate commit", t, func(ctx C) { clus := newMetadataRepoCluster(1, 2, false, false) @@ -499,7 +557,7 @@ func TestMRCalculateCommit(t *testing.T) { snIDs := make([]types.StorageNodeID, 2) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ StorageNodeID: snIDs[i], @@ -513,7 +571,7 @@ func TestMRCalculateCommit(t *testing.T) { err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs) err = mr.storage.registerLogStream(ls) So(err, ShouldBeNil) @@ -573,7 +631,7 @@ func TestMRGlobalCommit(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -591,7 +649,7 @@ func TestMRGlobalCommit(t *testing.T) { lsIds := make([]types.LogStreamID, 2) for i := range lsIds { - lsIds[i] = types.LogStreamID(i) + lsIds[i] = types.MinLogStreamID + types.LogStreamID(i) } for i, lsID := range lsIds { @@ -685,7 +743,7 @@ func TestMRGlobalCommitConsistency(t *testing.T) { snIDs := make([]types.StorageNodeID, rep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -706,7 +764,7 @@ func TestMRGlobalCommitConsistency(t *testing.T) { lsIDs := make([]types.LogStreamID, nrLS) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } for _, lsID := range lsIDs { @@ -762,8 +820,8 @@ func TestMRSimpleReportNCommit(t *testing.T) { return clus.healthCheckAll() }), ShouldBeTrue) - snID := types.StorageNodeID(0) - snIDs := make([]types.StorageNodeID, 1) + snID := types.MinStorageNodeID + snIDs := make([]types.StorageNodeID, 0, 1) snIDs = append(snIDs, snID) lsID := types.LogStreamID(snID) @@ -955,7 +1013,7 @@ func TestMRGetLastCommitted(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -970,7 +1028,7 @@ func TestMRGetLastCommitted(t *testing.T) { lsIds := make([]types.LogStreamID, 2) for i := range lsIds { - lsIds[i] = types.LogStreamID(i) + lsIds[i] = types.MinLogStreamID + types.LogStreamID(i) } err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) @@ -1110,7 +1168,7 @@ func TestMRSeal(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1125,7 +1183,7 @@ func TestMRSeal(t *testing.T) { lsIDs := make([]types.LogStreamID, 2) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) @@ -1198,7 +1256,7 @@ func TestMRUnseal(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, rep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*2 + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1213,7 +1271,7 @@ func TestMRUnseal(t *testing.T) { lsIDs := make([]types.LogStreamID, 2) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) @@ -1334,7 +1392,7 @@ func TestMRUpdateLogStream(t *testing.T) { snIDs := make([]types.StorageNodeID, nrStorageNode) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1349,7 +1407,7 @@ func TestMRUpdateLogStream(t *testing.T) { err := mr.RegisterTopic(context.TODO(), types.TopicID(1)) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs[0:1]) err = mr.RegisterLogStream(context.TODO(), ls) So(err, ShouldBeNil) @@ -1407,7 +1465,7 @@ func TestMRFailoverLeaderElection(t *testing.T) { snIDs := make([]types.StorageNodeID, nrRep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1425,7 +1483,7 @@ func TestMRFailoverLeaderElection(t *testing.T) { err := clus.nodes[0].RegisterTopic(context.TODO(), types.TopicID(1)) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs) rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50)) @@ -1476,7 +1534,7 @@ func TestMRFailoverJoinNewNode(t *testing.T) { snIDs := make([]types.StorageNodeID, nrRep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -1494,7 +1552,7 @@ func TestMRFailoverJoinNewNode(t *testing.T) { err := clus.nodes[0].RegisterTopic(context.TODO(), types.TopicID(1)) So(err, ShouldBeNil) - lsID := types.LogStreamID(0) + lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs) rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(200)) @@ -2135,7 +2193,7 @@ func TestMRFailoverRecoverReportCollector(t *testing.T) { for i := range snIDs { snIDs[i] = make([]types.StorageNodeID, nrRep) for j := range snIDs[i] { - snIDs[i][j] = types.StorageNodeID(i*nrStorageNode + j) + snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*nrStorageNode+j) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -2159,7 +2217,7 @@ func TestMRFailoverRecoverReportCollector(t *testing.T) { So(err, ShouldBeNil) for i := 0; i < nrLogStream; i++ { - lsID := types.LogStreamID(i) + lsID := types.MinLogStreamID + types.LogStreamID(i) ls := makeLogStream(types.TopicID(1), lsID, snIDs[i%nrStorageNode]) rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50)) @@ -2367,7 +2425,7 @@ func TestMRUnregisterTopic(t *testing.T) { lsIDs := make([]types.LogStreamID, nrLS) for i := range lsIDs { - lsIDs[i] = types.LogStreamID(i) + lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i) } for _, lsID := range lsIDs { @@ -2397,7 +2455,7 @@ func TestMRTopicLastHighWatermark(t *testing.T) { snIDs := make([]types.StorageNodeID, rep) for i := range snIDs { - snIDs[i] = types.StorageNodeID(i) + snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -2411,7 +2469,7 @@ func TestMRTopicLastHighWatermark(t *testing.T) { topicLogStreamID := make(map[types.TopicID][]types.LogStreamID) topicID := types.TopicID(1) - lsID := types.LogStreamID(1) + lsID := types.MinLogStreamID for i := 0; i < nrTopics; i++ { err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: topicID}) So(err, ShouldBeNil) diff --git a/internal/metarepos/report_collector.go b/internal/metarepos/report_collector.go index 07ff51d18..50c8e4626 100644 --- a/internal/metarepos/report_collector.go +++ b/internal/metarepos/report_collector.go @@ -698,11 +698,13 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse cur := report.UncommitReports[i] prev := prevReport.UncommitReports[j] - if cur.LogStreamID < prev.LogStreamID { - diff.UncommitReports = append(diff.UncommitReports, cur) + if cur.Invalid() { i++ - } else if prev.LogStreamID < cur.LogStreamID { + } else if prev.Invalid() || prev.LogStreamID < cur.LogStreamID { j++ + } else if cur.LogStreamID < prev.LogStreamID { + diff.UncommitReports = append(diff.UncommitReports, cur) + i++ } else { if cur.Version < prev.Version { fmt.Printf("invalid report prev:%v, cur:%v\n", @@ -721,8 +723,11 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse } } - if i < report.Len() { - diff.UncommitReports = append(diff.UncommitReports, report.UncommitReports[i:]...) + for ; i < report.Len(); i++ { + cur := report.UncommitReports[i] + if !cur.Invalid() { + diff.UncommitReports = append(diff.UncommitReports, cur) + } } for _, r := range diff.UncommitReports { diff --git a/internal/metarepos/report_collector_test.go b/internal/metarepos/report_collector_test.go index 16d4c67dd..7201ab38d 100644 --- a/internal/metarepos/report_collector_test.go +++ b/internal/metarepos/report_collector_test.go @@ -163,9 +163,9 @@ func TestRegisterLogStream(t *testing.T) { reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() - snID := types.StorageNodeID(0) - lsID := types.LogStreamID(0) - topicID := types.TopicID(0) + snID := types.MinStorageNodeID + lsID := types.MinLogStreamID + topicID := types.MinTopicID Convey("registeration LogStream with not existing storageNodeID should be failed", func() { err := reportCollector.RegisterLogStream(topicID, snID, lsID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) @@ -206,8 +206,8 @@ func TestUnregisterStorageNode(t *testing.T) { defer reportCollector.Close() snID := types.StorageNodeID(time.Now().UnixNano()) - lsID := types.LogStreamID(0) - topicID := types.TopicID(0) + lsID := types.MinLogStreamID + topicID := types.MinTopicID sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ @@ -261,9 +261,9 @@ func TestUnregisterLogStream(t *testing.T) { reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() - snID := types.StorageNodeID(0) - lsID := types.LogStreamID(0) - topicID := types.TopicID(0) + snID := types.MinStorageNodeID + lsID := types.MinLogStreamID + topicID := types.MinTopicID Convey("unregisteration LogStream with not existing storageNodeID should be failed", func() { err := reportCollector.UnregisterLogStream(snID, lsID) @@ -478,9 +478,10 @@ func TestReport(t *testing.T) { }(nrStorage) for i := 0; i < nrStorage; i++ { + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(i), + StorageNodeID: snID, }, } @@ -494,6 +495,55 @@ func TestReport(t *testing.T) { }) } +func TestReportIgnore(t *testing.T) { + Convey("ReportCollector should collect report from registered storage node", t, func() { + a := NewDummyStorageNodeClientFactory(1, true) + mr := NewDummyMetadataRepository(a) + + logger, _ := zap.NewDevelopment() + reportCollector := NewReportCollector(mr, DefaultRPCTimeout, newNopTelmetryStub(), logger) + reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. + defer reportCollector.Close() + + // Register Invalid StorageNodeID to set InvalidLogStreamID + snID := types.MinStorageNodeID + sn := &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: snID, + }, + } + + err := reportCollector.RegisterStorageNode(sn) + So(err, ShouldBeNil) + + <-mr.reportC + + reporterClient := a.lookupClient(sn.StorageNodeID) + reporterClient.makeInvalid(0) + + r := <-mr.reportC + for _, ur := range r.UncommitReports { + So(ur.Invalid(), ShouldBeTrue) + } + + after := time.After(DefaultReportRefreshTime / 2) + + Loop: + for { + select { + case <-after: + break Loop + case r := <-mr.reportC: + for _, ur := range r.UncommitReports { + So(ur.LogStreamID.Invalid(), ShouldBeFalse) + So(ur.Invalid(), ShouldBeFalse) + } + } + } + + }) +} + func TestReportDedup(t *testing.T) { Convey("Given ReportCollector", t, func() { a := NewDummyStorageNodeClientFactory(3, true) @@ -506,7 +556,7 @@ func TestReportDedup(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } @@ -520,26 +570,26 @@ func TestReportDedup(t *testing.T) { reporterClient := a.lookupClient(sn.StorageNodeID) reporterClient.increaseUncommitted(0) - Convey("Then report should include logStream[0]", func() { + Convey("Then report should include logStream[1]", func() { r = <-mr.reportC So(r.Len(), ShouldEqual, 1) - So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.LogStreamID(0)) + So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.MinLogStreamID) - Convey("When logStream[1] increase uncommitted", func() { + Convey("When logStream[2] increase uncommitted", func() { reporterClient.increaseUncommitted(1) - Convey("Then report should include logStream[1]", func() { + Convey("Then report should include logStream[2]", func() { r = <-mr.reportC So(r.Len(), ShouldEqual, 1) - So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.LogStreamID(1)) + So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.MinLogStreamID+types.LogStreamID(1)) - Convey("When logStream[2] increase uncommitted", func() { + Convey("When logStream[3] increase uncommitted", func() { reporterClient.increaseUncommitted(2) - Convey("Then report should include logStream[2]", func() { + Convey("Then report should include logStream[3]", func() { r = <-mr.reportC So(r.Len(), ShouldEqual, 1) - So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.LogStreamID(2)) + So(r.UncommitReports[0].LogStreamID, ShouldEqual, types.MinLogStreamID+types.LogStreamID(2)) Convey("After reportAll interval, report should include all", func() { r = <-mr.reportC @@ -574,9 +624,10 @@ func TestReportCollectorSeal(t *testing.T) { }) for i := 0; i < nrStorage; i++ { + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(i), + StorageNodeID: snID, }, } @@ -593,12 +644,14 @@ func TestReportCollectorSeal(t *testing.T) { var sealedLSID types.LogStreamID for i := 0; i < nrLogStream; i++ { - err := reportCollector.RegisterLogStream(topicID, types.StorageNodeID(i%nrStorage), types.LogStreamID(i), types.InvalidVersion, varlogpb.LogStreamStatusRunning) + snID := types.MinStorageNodeID + types.StorageNodeID(i%nrStorage) + lsID := types.MinLogStreamID + types.LogStreamID(i) + err := reportCollector.RegisterLogStream(topicID, snID, lsID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) if err != nil { t.Fatal(err) } - sealedLSID = types.LogStreamID(i) + sealedLSID = lsID } gls := cc.newDummyCommitResults(knownVer+1, glsn, nrStorage) @@ -717,12 +770,13 @@ func (cc *dummyCommitContext) newDummyCommitResults(ver types.Version, baseGLSN glsn := baseGLSN for i := 0; i < nrLogStream; i++ { numUncommitLen := 0 - if !cc.sealed(types.LogStreamID(i)) { + lsID := types.MinLogStreamID + types.LogStreamID(i) + if !cc.sealed(lsID) { numUncommitLen = 1 } r := snpb.LogStreamCommitResult{ - LogStreamID: types.LogStreamID(i), + LogStreamID: lsID, CommittedGLSNOffset: glsn, CommittedLLSNOffset: cc.committedLLSNBeginOffset[i], CommittedGLSNLength: uint64(numUncommitLen), @@ -756,9 +810,10 @@ func TestCommit(t *testing.T) { }) for i := 0; i < nrStorage; i++ { + snID := types.MinStorageNodeID + types.StorageNodeID(i) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(i), + StorageNodeID: snID, }, } @@ -773,7 +828,9 @@ func TestCommit(t *testing.T) { } for i := 0; i < nrLogStream; i++ { - err := reportCollector.RegisterLogStream(topicID, types.StorageNodeID(i%nrStorage), types.LogStreamID(i), types.InvalidVersion, varlogpb.LogStreamStatusRunning) + snID := types.MinStorageNodeID + types.StorageNodeID(i%nrStorage) + lsID := types.MinLogStreamID + types.LogStreamID(i) + err := reportCollector.RegisterLogStream(topicID, snID, lsID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) if err != nil { t.Fatal(err) } @@ -838,9 +895,11 @@ func TestCommit(t *testing.T) { logger.Debug("trimGLS", zap.Any("knowVer", knownVer), zap.Any("trimVer", trimVer), zap.Any("result", len(mr.m))) Convey("ReportCollector should send proper commit against new StorageNode", func() { + snID := types.MinStorageNodeID + types.StorageNodeID(nrStorage) + lsID := types.MinLogStreamID + types.LogStreamID(nrLogStream) sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(nrStorage), + StorageNodeID: snID, }, } @@ -849,7 +908,7 @@ func TestCommit(t *testing.T) { nrStorage += 1 - err = reportCollector.RegisterLogStream(topicID, sn.StorageNodeID, types.LogStreamID(nrLogStream), knownVer, varlogpb.LogStreamStatusRunning) + err = reportCollector.RegisterLogStream(topicID, sn.StorageNodeID, lsID, knownVer, varlogpb.LogStreamStatusRunning) So(err, ShouldBeNil) nrLogStream += 1 @@ -899,7 +958,7 @@ func TestCommitWithDelay(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } @@ -912,7 +971,7 @@ func TestCommitWithDelay(t *testing.T) { return a.lookupClient(sn.StorageNodeID) != nil }), ShouldBeTrue) - err = reportCollector.RegisterLogStream(topicID, types.StorageNodeID(0), types.LogStreamID(0), types.InvalidVersion, varlogpb.LogStreamStatusRunning) + err = reportCollector.RegisterLogStream(topicID, types.MinStorageNodeID, types.MinLogStreamID, types.InvalidVersion, varlogpb.LogStreamStatusRunning) if err != nil { t.Fatal(err) } @@ -1012,7 +1071,7 @@ func TestRPCFail(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } @@ -1073,7 +1132,7 @@ func TestReporterClientReconnect(t *testing.T) { sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ - StorageNodeID: types.StorageNodeID(0), + StorageNodeID: types.MinStorageNodeID, }, } diff --git a/proto/snpb/log_stream_reporter.go b/proto/snpb/log_stream_reporter.go index 99dd86cbf..c6140b5f8 100644 --- a/proto/snpb/log_stream_reporter.go +++ b/proto/snpb/log_stream_reporter.go @@ -8,6 +8,12 @@ import ( var InvalidLogStreamUncommitReport = LogStreamUncommitReport{} var InvalidLogStreamCommitResult = LogStreamCommitResult{} +func (m *LogStreamUncommitReport) Invalid() bool { + // uncommitReport with invalid logStream or invalid uncommittedLLSNOffset + // is not acceptable. MetadataRepository ignores these reports. + return m.GetLogStreamID().Invalid() || m.GetUncommittedLLSNOffset().Invalid() +} + func (m *LogStreamUncommitReport) UncommittedLLSNEnd() types.LLSN { if m == nil { return types.InvalidLLSN From 2bcf5fc1df71fc4a39b3876321ad4878b6a56935 Mon Sep 17 00:00:00 2001 From: "pharrell.jang" Date: Mon, 26 Sep 2022 09:30:03 +0900 Subject: [PATCH 2/2] refactor(mr): fix comment --- internal/metarepos/raft_metadata_repository_test.go | 2 -- internal/metarepos/report_collector_test.go | 1 - proto/snpb/log_stream_reporter.go | 5 +++-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index 806a59724..6c0ef6fdc 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -519,8 +519,6 @@ func TestMRApplyInvalidReport(t *testing.T) { err := mr.storage.registerStorageNode(sn) So(err, ShouldBeNil) } - err = mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)}) - So(err, ShouldBeNil) lsID := types.MinLogStreamID ls := makeLogStream(types.TopicID(1), lsID, snIDs) diff --git a/internal/metarepos/report_collector_test.go b/internal/metarepos/report_collector_test.go index 7201ab38d..9d2ac45dd 100644 --- a/internal/metarepos/report_collector_test.go +++ b/internal/metarepos/report_collector_test.go @@ -505,7 +505,6 @@ func TestReportIgnore(t *testing.T) { reportCollector.Run() //nolint:errcheck,revive // TODO:: Handle an error returned. defer reportCollector.Close() - // Register Invalid StorageNodeID to set InvalidLogStreamID snID := types.MinStorageNodeID sn := &varlogpb.StorageNodeDescriptor{ StorageNode: varlogpb.StorageNode{ diff --git a/proto/snpb/log_stream_reporter.go b/proto/snpb/log_stream_reporter.go index c6140b5f8..c4d633d29 100644 --- a/proto/snpb/log_stream_reporter.go +++ b/proto/snpb/log_stream_reporter.go @@ -8,9 +8,10 @@ import ( var InvalidLogStreamUncommitReport = LogStreamUncommitReport{} var InvalidLogStreamCommitResult = LogStreamCommitResult{} +// Invalid returns whether the LogStreamUncommitReport is acceptable. +// LogStreamUncommitReport with invalid logStream or invalid uncommittedLLSNOffset +// is not acceptable. MetadataRepository ignores these reports. func (m *LogStreamUncommitReport) Invalid() bool { - // uncommitReport with invalid logStream or invalid uncommittedLLSNOffset - // is not acceptable. MetadataRepository ignores these reports. return m.GetLogStreamID().Invalid() || m.GetUncommittedLLSNOffset().Invalid() }