From 6efcd7b3966987d80f5aca43a8445b555d0326c4 Mon Sep 17 00:00:00 2001 From: Vilius Okockis Date: Mon, 22 Apr 2024 11:08:46 +0300 Subject: [PATCH] partial backport VDiff tablet selection: pick non-serving tablets in Reshard workflows https://github.com/vitessio/vitess/pull/14413 Signed-off-by: Vilius Okockis --- go/vt/discovery/tablet_picker.go | 81 +++++++++++++++------------ go/vt/discovery/tablet_picker_test.go | 69 +++++++++++++++++++++++ go/vt/wrangler/vdiff.go | 15 ++++- 3 files changed, 128 insertions(+), 37 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 2c4d9819155..820c2700045 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -93,8 +93,9 @@ func SetTabletPickerRetryDelay(delay time.Duration) { } type TabletPickerOptions struct { - CellPreference string - TabletOrder string + CellPreference string + TabletOrder string + IncludeNonServingTablets bool } func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) { @@ -140,6 +141,7 @@ type TabletPicker struct { localCellInfo localCellInfo // This map is keyed on the results of TabletAlias.String(). ignoreTablets map[string]struct{} + options TabletPickerOptions } // NewTabletPicker returns a TabletPicker. @@ -232,6 +234,7 @@ func NewTabletPicker( inOrder: inOrder, cellPref: cellPref, ignoreTablets: make(map[string]struct{}, len(ignoreTablets)), + options: options, } for _, ignoreTablet := range ignoreTablets { @@ -291,11 +294,45 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo return candidates } +func (tp *TabletPicker) sortCandidates(ctx context.Context, candidates []*topo.TabletInfo) []*topo.TabletInfo { + rand.Seed(time.Now().UnixNano()) + if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias { + sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) + + if tp.inOrder { + sameCellCandidates = tp.orderByTabletType(sameCellCandidates) + sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates) + allOtherCandidates = tp.orderByTabletType(allOtherCandidates) + } else { + // Randomize candidates + rand.Shuffle(len(sameCellCandidates), func(i, j int) { + sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] + }) + rand.Shuffle(len(sameAliasCandidates), func(i, j int) { + sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i] + }) + rand.Shuffle(len(allOtherCandidates), func(i, j int) { + allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i] + }) + } + + candidates = append(sameCellCandidates, sameAliasCandidates...) + candidates = append(candidates, allOtherCandidates...) + } else if tp.inOrder { + candidates = tp.orderByTabletType(candidates) + } else { + // Randomize candidates. + rand.Shuffle(len(candidates), func(i, j int) { + candidates[i], candidates[j] = candidates[j], candidates[i] + }) + } + return candidates +} + // PickForStreaming picks a tablet that is healthy and serving. // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - rand.Seed(time.Now().UnixNano()) // Keep trying at intervals (tabletPickerRetryDelay) until a healthy // serving tablet is found or the context is cancelled. for { @@ -305,36 +342,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table default: } candidates := tp.GetMatchingTablets(ctx) - if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias { - sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) - - if tp.inOrder { - sameCellCandidates = tp.orderByTabletType(sameCellCandidates) - sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates) - allOtherCandidates = tp.orderByTabletType(allOtherCandidates) - } else { - // Randomize candidates - rand.Shuffle(len(sameCellCandidates), func(i, j int) { - sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] - }) - rand.Shuffle(len(sameAliasCandidates), func(i, j int) { - sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i] - }) - rand.Shuffle(len(allOtherCandidates), func(i, j int) { - allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i] - }) - } - - candidates = append(sameCellCandidates, sameAliasCandidates...) - candidates = append(candidates, allOtherCandidates...) - } else if tp.inOrder { - candidates = tp.orderByTabletType(candidates) - } else { - // Randomize candidates. - rand.Shuffle(len(candidates), func(i, j int) { - candidates[i], candidates[j] = candidates[j], candidates[i] - }) - } + candidates = tp.sortCandidates(ctx, candidates) if len(candidates) == 0 { // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() @@ -349,7 +357,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) + log.Infof("Tablet picker found a healthy tablet for streaming: %s", candidates[0].Tablet.String()) return candidates[0].Tablet, nil } } @@ -442,7 +450,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) defer cancel() if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { - if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { + if shr != nil && + (shr.Serving || tp.options.IncludeNonServingTablets) && + shr.RealtimeStats != nil && + shr.RealtimeStats.HealthError == "" { return io.EOF // End the stream } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 8fa85b44902..504d694a484 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -28,6 +28,11 @@ import ( "vitess.io/vitess/go/vt/topo/memorytopo" ) +const ( + contextTimeout = 5 * time.Second + numTestIterations = 50 +) + func TestPickPrimary(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_MASTER, "cell", true, true) @@ -541,6 +546,70 @@ func TestPickFallbackType(t *testing.T) { assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) } +// TestPickNonServingTablets validates that non serving tablets are included when the +// IncludeNonServingTablets option is set. Unhealthy tablets should not be picked, irrespective of this option. +func TestPickNonServingTablets(t *testing.T) { + cells := []string{"cell1", "cell2"} + localCell := cells[0] + tabletTypes := "replica,master" + options := TabletPickerOptions{} + te := newPickerTestEnv(t, cells) + + // Tablet should be selected as it is healthy and serving. + primaryTablet := addTablet(te, 100, topodatapb.TabletType_MASTER, localCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Tablet should not be selected as it is unhealthy. + replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false) + defer deleteTablet(t, te, replicaTablet) + + // Tablet should be selected because the IncludeNonServingTablets option is set and it is healthy. + replicaTablet2 := addTablet(te, 300, topodatapb.TabletType_REPLICA, localCell, false, true) + defer deleteTablet(t, te, replicaTablet2) + + var cancel context.CancelFunc + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.MasterAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx2, cancel2 := context.WithTimeout(ctx, contextTimeout) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + // IncludeNonServingTablets is false: only the healthy serving tablet should be picked. + assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) + + options.IncludeNonServingTablets = true + tp, err = NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx3, cancel3 := context.WithTimeout(ctx, contextTimeout) + defer cancel3() + var picked1, picked2, picked3 bool + // IncludeNonServingTablets is true: both the healthy tablets should be picked even though one is not serving. + for i := 0; i < numTestIterations; i++ { + tablet, err := tp.PickForStreaming(ctx3) + require.NoError(t, err) + if proto.Equal(tablet, primaryTablet) { + picked1 = true + } + if proto.Equal(tablet, replicaTablet) { + picked2 = true + } + if proto.Equal(tablet, replicaTablet2) { + picked3 = true + } + } + assert.True(t, picked1) + assert.False(t, picked2) + assert.True(t, picked3) +} + type pickerTestEnv struct { t *testing.T keyspace string diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index f1bcd225d06..9879fd472f5 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -572,7 +572,8 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, + df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } @@ -589,8 +590,18 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { wg.Add(1) go func() { defer wg.Done() + includeNonServingTablets := false + if df.ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard { + // For resharding, the target shards could be non-serving if traffic has already been switched once. + // When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched + // it is set to false for the shards we are switching from. We don't have a way to know if we have + // switched or not, so we just include non-serving tablets for all reshards. + includeNonServingTablets = true + } err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, + df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, + discovery.TabletPickerOptions{IncludeNonServingTablets: includeNonServingTablets}) if err != nil { return err }