diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write.go b/cdc/scheduler/internal/v3/keyspan/splitter_write.go index 9773597b582..653328ba618 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_write.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write.go @@ -170,11 +170,11 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1( restRegions := len(regions) - i regionCount++ spanWriteWeight += regions[i].WrittenKeys - // If the restSpans count is one, and the restWeight is less than writeLimitPerSpan, + // If the restSpans count is one, and the restWeight is less than equal to writeLimitPerSpan, // we will use the rest regions as the last span. If the restWeight is larger than writeLimitPerSpan, - // then we need to add more restSpans (restWeight / writeLimitPerSpan) to split the rest regions. + // then we need to add more restSpans (restWeight / writeLimitPerSpan) + 1 to split the rest regions. if restSpans == 1 { - if restWeight < int64(writeLimitPerSpan) { + if restWeight <= int64(writeLimitPerSpan) { spans = append(spans, tablepb.Span{ TableID: tableID, StartKey: tablepb.Key(decodeKey(regions[spanStartIndex].StartKey)), @@ -191,11 +191,12 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1( regionCounts = append(regionCounts, lastSpanRegionCount) weights = append(weights, lastSpanWriteWeight) writeKeys = append(writeKeys, lastSpanWriteKey) + spanStartIndex = len(regions) break } // If the restWeight is larger than writeLimitPerSpan, // then we need to update the restSpans. - restSpans = int(restWeight) / int(writeLimitPerSpan) + restSpans = int(restWeight)/int(writeLimitPerSpan) + 1 } // If the restRegions is less than equal to restSpans, @@ -237,6 +238,32 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1( spanStartIndex = i + 1 } } + // All regions should be processed and append to spans + if spanStartIndex != len(regions) { + spans = append(spans, tablepb.Span{ + TableID: tableID, + StartKey: tablepb.Key(decodeKey(regions[spanStartIndex].StartKey)), + EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)), + }) + lastSpanRegionCount := len(regions) - spanStartIndex + lastSpanWriteWeight := uint64(0) + lastSpanWriteKey := uint64(0) + for j := spanStartIndex; j < len(regions); j++ { + lastSpanWriteKey += regions[j].WrittenKeys + lastSpanWriteWeight += regions[j].WrittenKeys + } + regionCounts = append(regionCounts, lastSpanRegionCount) + weights = append(weights, lastSpanWriteWeight) + writeKeys = append(writeKeys, lastSpanWriteKey) + log.Warn("some regions are added to the last span, it should not appear", + zap.Int("spanStartIndex", spanStartIndex), + zap.Int("regionsLength", len(regions)), + zap.Int("restSpans", restSpans), + zap.Int64("restWeight", restWeight), + zap.Any("prevSpan", spans[len(spans)-2]), + zap.Any("lastSpan", spans[len(spans)-1]), + ) + } return &splitRegionsInfo{ RegionCounts: regionCounts, Weights: weights, diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go index 2987857c776..6c69b7f20b1 100644 --- a/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go @@ -227,13 +227,44 @@ func TestSplitRegionEven(t *testing.T) { info := splitter.splitRegionsByWrittenKeysV1(tblID, regions, 5) require.Len(t, info.RegionCounts, 5) require.Len(t, info.Weights, 5) + count := 0 for i, w := range info.Weights { if i == 4 { require.Equal(t, uint64(9576), w, i) } else { require.Equal(t, uint64(9591), w, i) } + count += info.RegionCounts[i] } + require.Equal(t, count, regionCount) +} + +func TestSplitLargeRegion(t *testing.T) { + tblID := model.TableID(1) + regionCount := spanRegionLimit*5 + 1000 + regions := make([]pdutil.RegionInfo, regionCount) + for i := 0; i < regionCount; i++ { + regions[i] = pdutil.RegionInfo{ + ID: uint64(i), + StartKey: "" + strconv.Itoa(i), + EndKey: "" + strconv.Itoa(i), + WrittenKeys: 2, + } + } + splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 4) + info := splitter.splitRegionsByWrittenKeysV1(tblID, regions, 5) + require.Len(t, info.RegionCounts, 6) + require.Len(t, info.Weights, 6) + count := 0 + for i, c := range info.RegionCounts { + if i == 5 { + require.Equal(t, 1000, c, i) + } else { + require.Equal(t, spanRegionLimit, c, i) + } + count += c + } + require.Equal(t, count, regionCount) } func TestSpanRegionLimitBase(t *testing.T) { @@ -247,9 +278,12 @@ func TestSpanRegionLimitBase(t *testing.T) { spanNum := getSpansNumber(len(regions), captureNum) info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum) require.Len(t, info.RegionCounts, spanNum) + count := 0 for _, c := range info.RegionCounts { require.LessOrEqual(t, c, int(spanRegionLimit*1.1)) + count += c } + require.Equal(t, count, len(regions)) } func TestSpanRegionLimit(t *testing.T) { @@ -273,7 +307,7 @@ func TestSpanRegionLimit(t *testing.T) { } // total region number - totalRegionNumbers := spanRegionLimit * 10 + totalRegionNumbers := spanRegionLimit*10 + 100 // writtenKeys over 20000 percentage percentOver20000 := 1 @@ -318,7 +352,12 @@ func TestSpanRegionLimit(t *testing.T) { spanNum := getSpansNumber(len(regions), captureNum) info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum) require.LessOrEqual(t, spanNum, len(info.RegionCounts)) - for _, c := range info.RegionCounts { - require.LessOrEqual(t, c, int(spanRegionLimit*1.1)) + count := 0 + for i, c := range info.RegionCounts { + if i != len(info.RegionCounts)-1 { + require.LessOrEqual(t, c, int(spanRegionLimit*1.1)) + } + count += c } + require.Equal(t, count, totalRegionNumbers) }