Skip to content

Commit

Permalink
Merge pull request #163 from kakao/ignore-invalid-report
Browse files Browse the repository at this point in the history
feat(metarepos): ignore invalid report
  • Loading branch information
hungryjang authored Sep 26, 2022
2 parents 0a14ab5 + 2bcf5fc commit 18b34c6
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 65 deletions.
8 changes: 8 additions & 0 deletions internal/metarepos/dummy_storagenode_client_factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ func (r *DummyStorageNodeClient) getKnownVersion(idx int) types.Version {
return r.knownVersion[idx]
}

func (r *DummyStorageNodeClient) makeInvalid(idx int) {
r.mu.Lock()
defer r.mu.Unlock()

r.knownVersion[idx] = 0
r.uncommittedLLSNOffset[idx] = 0
}

func (fac *DummyStorageNodeClientFactory) crashRPC(snID types.StorageNodeID) {
f, ok := fac.m.Load(snID)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,10 @@ func (mr *RaftMetadataRepository) applyReport(reports *mrpb.Reports) error {
snID := r.StorageNodeID
LS:
for _, u := range r.UncommitReport {
if u.Invalid() {
continue LS
}

s, ok := mr.storage.LookupUncommitReport(u.LogStreamID, snID)
if !ok {
continue LS
Expand Down
114 changes: 85 additions & 29 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (clus *metadataRepoCluster) initDummyStorageNode(nrSN, nrTopic int) error {
}

for i := 0; i < nrSN; i++ {
snID := types.StorageNodeID(i)
snID := types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestMRApplyReport(t *testing.T) {

snIDs := make([]types.StorageNodeID, rep)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -426,8 +426,8 @@ func TestMRApplyReport(t *testing.T) {
err := mr.storage.registerStorageNode(sn)
So(err, ShouldBeNil)
}
lsID := types.LogStreamID(0)
notExistSnID := types.StorageNodeID(rep)
lsID := types.MinLogStreamID
notExistSnID := types.MinStorageNodeID + types.StorageNodeID(rep)

report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 2)
mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned.
Expand Down Expand Up @@ -489,6 +489,62 @@ func TestMRApplyReport(t *testing.T) {
})
}

func TestMRApplyInvalidReport(t *testing.T) {
Convey("Given LogStream", t, func(ctx C) {
rep := 2
clus := newMetadataRepoCluster(1, rep, false, false)
Reset(func() {
clus.closeNoErrors(t)
})
mr := clus.nodes[0]

tn := &varlogpb.TopicDescriptor{
TopicID: types.TopicID(1),
Status: varlogpb.TopicStatusRunning,
}

err := mr.storage.registerTopic(tn)
So(err, ShouldBeNil)

snIDs := make([]types.StorageNodeID, rep)
for i := range snIDs {
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snIDs[i],
},
}

err := mr.storage.registerStorageNode(sn)
So(err, ShouldBeNil)
}

lsID := types.MinLogStreamID
ls := makeLogStream(types.TopicID(1), lsID, snIDs)
err = mr.storage.registerLogStream(ls)
So(err, ShouldBeNil)

for _, snID := range snIDs {
report := makeUncommitReport(snID, types.InvalidVersion, types.InvalidGLSN, lsID, types.MinLLSN, 1)
mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned.

r, ok := mr.storage.LookupUncommitReport(lsID, snID)
So(ok, ShouldBeTrue)
So(r.Invalid(), ShouldBeFalse)
}

Convey("When Some LogStream reports invalid report", func(ctx C) {
report := makeUncommitReport(snIDs[0], types.InvalidVersion, types.InvalidGLSN, lsID, types.InvalidLLSN, 2)
mr.applyReport(&mrpb.Reports{Reports: []*mrpb.Report{report}}) //nolint:errcheck,revive // TODO:: Handle an error returned.

r, ok := mr.storage.LookupUncommitReport(lsID, snIDs[0])
So(ok, ShouldBeTrue)
So(r.Invalid(), ShouldBeFalse)
})
})
}

func TestMRCalculateCommit(t *testing.T) {
Convey("Calculate commit", t, func(ctx C) {
clus := newMetadataRepoCluster(1, 2, false, false)
Expand All @@ -499,7 +555,7 @@ func TestMRCalculateCommit(t *testing.T) {

snIDs := make([]types.StorageNodeID, 2)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)
sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snIDs[i],
Expand All @@ -513,7 +569,7 @@ func TestMRCalculateCommit(t *testing.T) {
err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)})
So(err, ShouldBeNil)

lsID := types.LogStreamID(0)
lsID := types.MinLogStreamID
ls := makeLogStream(types.TopicID(1), lsID, snIDs)
err = mr.storage.registerLogStream(ls)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -573,7 +629,7 @@ func TestMRGlobalCommit(t *testing.T) {
for i := range snIDs {
snIDs[i] = make([]types.StorageNodeID, rep)
for j := range snIDs[i] {
snIDs[i][j] = types.StorageNodeID(i*2 + j)
snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -591,7 +647,7 @@ func TestMRGlobalCommit(t *testing.T) {

lsIds := make([]types.LogStreamID, 2)
for i := range lsIds {
lsIds[i] = types.LogStreamID(i)
lsIds[i] = types.MinLogStreamID + types.LogStreamID(i)
}

for i, lsID := range lsIds {
Expand Down Expand Up @@ -685,7 +741,7 @@ func TestMRGlobalCommitConsistency(t *testing.T) {

snIDs := make([]types.StorageNodeID, rep)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -706,7 +762,7 @@ func TestMRGlobalCommitConsistency(t *testing.T) {

lsIDs := make([]types.LogStreamID, nrLS)
for i := range lsIDs {
lsIDs[i] = types.LogStreamID(i)
lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i)
}

for _, lsID := range lsIDs {
Expand Down Expand Up @@ -762,8 +818,8 @@ func TestMRSimpleReportNCommit(t *testing.T) {
return clus.healthCheckAll()
}), ShouldBeTrue)

snID := types.StorageNodeID(0)
snIDs := make([]types.StorageNodeID, 1)
snID := types.MinStorageNodeID
snIDs := make([]types.StorageNodeID, 0, 1)
snIDs = append(snIDs, snID)

lsID := types.LogStreamID(snID)
Expand Down Expand Up @@ -955,7 +1011,7 @@ func TestMRGetLastCommitted(t *testing.T) {
for i := range snIDs {
snIDs[i] = make([]types.StorageNodeID, rep)
for j := range snIDs[i] {
snIDs[i][j] = types.StorageNodeID(i*2 + j)
snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -970,7 +1026,7 @@ func TestMRGetLastCommitted(t *testing.T) {

lsIds := make([]types.LogStreamID, 2)
for i := range lsIds {
lsIds[i] = types.LogStreamID(i)
lsIds[i] = types.MinLogStreamID + types.LogStreamID(i)
}

err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)})
Expand Down Expand Up @@ -1110,7 +1166,7 @@ func TestMRSeal(t *testing.T) {
for i := range snIDs {
snIDs[i] = make([]types.StorageNodeID, rep)
for j := range snIDs[i] {
snIDs[i][j] = types.StorageNodeID(i*2 + j)
snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -1125,7 +1181,7 @@ func TestMRSeal(t *testing.T) {

lsIDs := make([]types.LogStreamID, 2)
for i := range lsIDs {
lsIDs[i] = types.LogStreamID(i)
lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i)
}

err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)})
Expand Down Expand Up @@ -1198,7 +1254,7 @@ func TestMRUnseal(t *testing.T) {
for i := range snIDs {
snIDs[i] = make([]types.StorageNodeID, rep)
for j := range snIDs[i] {
snIDs[i][j] = types.StorageNodeID(i*2 + j)
snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*2+j)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -1213,7 +1269,7 @@ func TestMRUnseal(t *testing.T) {

lsIDs := make([]types.LogStreamID, 2)
for i := range lsIDs {
lsIDs[i] = types.LogStreamID(i)
lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i)
}

err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: types.TopicID(1)})
Expand Down Expand Up @@ -1334,7 +1390,7 @@ func TestMRUpdateLogStream(t *testing.T) {

snIDs := make([]types.StorageNodeID, nrStorageNode)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -1349,7 +1405,7 @@ func TestMRUpdateLogStream(t *testing.T) {
err := mr.RegisterTopic(context.TODO(), types.TopicID(1))
So(err, ShouldBeNil)

lsID := types.LogStreamID(0)
lsID := types.MinLogStreamID
ls := makeLogStream(types.TopicID(1), lsID, snIDs[0:1])
err = mr.RegisterLogStream(context.TODO(), ls)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -1407,7 +1463,7 @@ func TestMRFailoverLeaderElection(t *testing.T) {

snIDs := make([]types.StorageNodeID, nrRep)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -1425,7 +1481,7 @@ func TestMRFailoverLeaderElection(t *testing.T) {
err := clus.nodes[0].RegisterTopic(context.TODO(), types.TopicID(1))
So(err, ShouldBeNil)

lsID := types.LogStreamID(0)
lsID := types.MinLogStreamID
ls := makeLogStream(types.TopicID(1), lsID, snIDs)

rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50))
Expand Down Expand Up @@ -1476,7 +1532,7 @@ func TestMRFailoverJoinNewNode(t *testing.T) {

snIDs := make([]types.StorageNodeID, nrRep)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -1494,7 +1550,7 @@ func TestMRFailoverJoinNewNode(t *testing.T) {
err := clus.nodes[0].RegisterTopic(context.TODO(), types.TopicID(1))
So(err, ShouldBeNil)

lsID := types.LogStreamID(0)
lsID := types.MinLogStreamID
ls := makeLogStream(types.TopicID(1), lsID, snIDs)

rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(200))
Expand Down Expand Up @@ -2135,7 +2191,7 @@ func TestMRFailoverRecoverReportCollector(t *testing.T) {
for i := range snIDs {
snIDs[i] = make([]types.StorageNodeID, nrRep)
for j := range snIDs[i] {
snIDs[i][j] = types.StorageNodeID(i*nrStorageNode + j)
snIDs[i][j] = types.MinStorageNodeID + types.StorageNodeID(i*nrStorageNode+j)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -2159,7 +2215,7 @@ func TestMRFailoverRecoverReportCollector(t *testing.T) {
So(err, ShouldBeNil)

for i := 0; i < nrLogStream; i++ {
lsID := types.LogStreamID(i)
lsID := types.MinLogStreamID + types.LogStreamID(i)
ls := makeLogStream(types.TopicID(1), lsID, snIDs[i%nrStorageNode])

rctx, cancel := context.WithTimeout(context.Background(), vtesting.TimeoutUnitTimesFactor(50))
Expand Down Expand Up @@ -2367,7 +2423,7 @@ func TestMRUnregisterTopic(t *testing.T) {

lsIDs := make([]types.LogStreamID, nrLS)
for i := range lsIDs {
lsIDs[i] = types.LogStreamID(i)
lsIDs[i] = types.MinLogStreamID + types.LogStreamID(i)
}

for _, lsID := range lsIDs {
Expand Down Expand Up @@ -2397,7 +2453,7 @@ func TestMRTopicLastHighWatermark(t *testing.T) {

snIDs := make([]types.StorageNodeID, rep)
for i := range snIDs {
snIDs[i] = types.StorageNodeID(i)
snIDs[i] = types.MinStorageNodeID + types.StorageNodeID(i)

sn := &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
Expand All @@ -2411,7 +2467,7 @@ func TestMRTopicLastHighWatermark(t *testing.T) {

topicLogStreamID := make(map[types.TopicID][]types.LogStreamID)
topicID := types.TopicID(1)
lsID := types.LogStreamID(1)
lsID := types.MinLogStreamID
for i := 0; i < nrTopics; i++ {
err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: topicID})
So(err, ShouldBeNil)
Expand Down
15 changes: 10 additions & 5 deletions internal/metarepos/report_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,13 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse
cur := report.UncommitReports[i]
prev := prevReport.UncommitReports[j]

if cur.LogStreamID < prev.LogStreamID {
diff.UncommitReports = append(diff.UncommitReports, cur)
if cur.Invalid() {
i++
} else if prev.LogStreamID < cur.LogStreamID {
} else if prev.Invalid() || prev.LogStreamID < cur.LogStreamID {
j++
} else if cur.LogStreamID < prev.LogStreamID {
diff.UncommitReports = append(diff.UncommitReports, cur)
i++
} else {
if cur.Version < prev.Version {
fmt.Printf("invalid report prev:%v, cur:%v\n",
Expand All @@ -721,8 +723,11 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse
}
}

if i < report.Len() {
diff.UncommitReports = append(diff.UncommitReports, report.UncommitReports[i:]...)
for ; i < report.Len(); i++ {
cur := report.UncommitReports[i]
if !cur.Invalid() {
diff.UncommitReports = append(diff.UncommitReports, cur)
}
}

for _, r := range diff.UncommitReports {
Expand Down
Loading

0 comments on commit 18b34c6

Please sign in to comment.