diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index f8c97fff879..d64c4d40146 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -17,6 +17,7 @@ limitations under the License. package binlogplayer import ( + "fmt" "regexp" "strings" "sync" @@ -39,6 +40,7 @@ type MockDBClient struct { currentResult int done chan struct{} invariants map[string]*sqltypes.Result + Tag string } type mockExpect struct { @@ -177,7 +179,11 @@ func (dc *MockDBClient) Close() { // ExecuteFetch is part of the DBClient interface func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) { dc.t.Helper() - dc.t.Logf("DBClient query: %v", query) + msg := "DBClient query: %v" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Logf(msg, query) for q, result := range dc.invariants { if strings.Contains(strings.ToLower(query), strings.ToLower(q)) { @@ -188,16 +194,28 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re dc.expectMu.Lock() defer dc.expectMu.Unlock() if dc.currentResult >= len(dc.expect) { - dc.t.Fatalf("DBClientMock: query: %s, no more requests are expected", query) + msg := "DBClientMock: query: %s, no more requests are expected" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Fatalf(msg, query) } result := dc.expect[dc.currentResult] if result.re == nil { if query != result.query { - dc.t.Fatalf("DBClientMock: query: %s, want %s", query, result.query) + msg := "DBClientMock: query: %s, want %s" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Fatalf(msg, query, result.query) } } else { if !result.re.MatchString(query) { - dc.t.Fatalf("DBClientMock: query: %s, must match %s", query, result.query) + msg := "DBClientMock: query: %s, must match %s" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Fatalf(msg, query, result.query) } } dc.currentResult++ diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 2e48da05ed8..1aa8137612d 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -39,6 +39,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -54,11 +55,12 @@ type materializer struct { sourceTs *topo.Server tmc tmclient.TabletManagerClient - ms *vtctldatapb.MaterializeSettings - targetVSchema *vindexes.KeyspaceSchema - sourceShards []*topo.ShardInfo - targetShards []*topo.ShardInfo - isPartial bool + ms *vtctldatapb.MaterializeSettings + targetVSchema *vindexes.KeyspaceSchema + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo + isPartial bool + primaryVindexesDiffer bool } func (mz *materializer) getWorkflowSubType() (binlogdatapb.VReplicationWorkflowSubType, error) { @@ -98,7 +100,9 @@ func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCr if err != nil { return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias) } - blses, err := mz.generateBinlogSources(mz.ctx, target) + + sourceShards := mz.filterSourceShards(target) + blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards) if err != nil { return err } @@ -136,7 +140,8 @@ func (mz *materializer) createMaterializerStreams() error { } insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { - inserts, err := mz.generateInserts(mz.ctx, targetShard) + sourceShards := mz.filterSourceShards(targetShard) + inserts, err := mz.generateInserts(mz.ctx, sourceShards) if err != nil { return err } @@ -148,17 +153,10 @@ func (mz *materializer) createMaterializerStreams() error { return nil } -func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") - for _, sourceShard := range mz.sourceShards { - // Don't create streams from sources which won't contain data for the target shard. - // We only do it for MoveTables for now since this doesn't hold for materialize flows - // where the target's sharding key might differ from that of the source - if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES && - !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } + for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ Keyspace: mz.ms.SourceKeyspace, Shard: sourceShard.ShardName(), @@ -255,16 +253,9 @@ func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.S return ig.String(), nil } -func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) { +func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) { blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards)) - for _, sourceShard := range mz.sourceShards { - // Don't create streams from sources which won't contain data for the target shard. - // We only do it for MoveTables for now since this doesn't hold for materialize flows - // where the target's sharding key might differ from that of the source - if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES && - !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } + for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ Keyspace: mz.ms.SourceKeyspace, Shard: sourceShard.ShardName(), @@ -511,10 +502,27 @@ func (mz *materializer) buildMaterializer() error { if len(targetShards) == 0 { return fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } + + sourceTs := mz.ts + if ms.ExternalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command + externalTopo, err := mz.ts.OpenExternalVitessClusterServer(ctx, ms.ExternalCluster) + if err != nil { + return fmt.Errorf("failed to open external topo: %v", err) + } + sourceTs = externalTopo + } + differentPVs := false + sourceVSchema, err := sourceTs.GetVSchema(ctx, ms.SourceKeyspace) + if err != nil { + return fmt.Errorf("failed to get source keyspace vschema: %v", err) + } + differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + mz.targetVSchema = targetVSchema mz.sourceShards = sourceShards mz.targetShards = targetShards mz.isPartial = isPartial + mz.primaryVindexesDiffer = differentPVs return nil } @@ -617,3 +625,117 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error }) return err } + +// filterSourceShards filters out source shards that do not overlap with the +// provided target shard. This is an optimization to avoid copying unnecessary +// data between the shards. This optimization is only applied for MoveTables +// when the source and target shard have the same primary vindexes. +func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { + if mz.primaryVindexesDiffer || mz.ms.MaterializationIntent != vtctldatapb.MaterializationIntent_MOVETABLES { + // Use all source shards. + return mz.sourceShards + } + // Use intersecting source shards. + var filteredSourceShards []*topo.ShardInfo + for _, sourceShard := range mz.sourceShards { + if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { + continue + } + filteredSourceShards = append(filteredSourceShards, sourceShard) + } + return filteredSourceShards +} + +// primaryVindexesDiffer returns true if, for any tables defined in the provided +// materialize settings, the source and target vschema definitions for those +// tables have different primary vindexes. +// +// The result of this function is used to determine whether to apply a source +// shard selection optimization in MoveTables. +func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target *vschemapb.Keyspace) bool { + // Unless both keyspaces are sharded, treat the answer to the question as + // trivially false. + if source.Sharded != target.Sharded { + return false + } + + // For source and target keyspaces that are sharded, we can optimize source + // shard selection if source and target tables' primary vindexes are equal. + // + // To determine this, iterate over all target tables, looking for primary + // vindexes that differ from the corresponding source table. + for _, ts := range ms.TableSettings { + sColumnVindexes := []*vschemapb.ColumnVindex{} + tColumnVindexes := []*vschemapb.ColumnVindex{} + if tt, ok := source.Tables[ts.TargetTable]; ok { + sColumnVindexes = tt.ColumnVindexes + } + if tt, ok := target.Tables[ts.TargetTable]; ok { + tColumnVindexes = tt.ColumnVindexes + } + + // If source does not have a primary vindex, but the target does, then + // the primary vindexes differ. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) > 0 { + return true + } + // If source has a primary vindex, but the target does not, then the + // primary vindexes differ. + if len(sColumnVindexes) > 0 && len(tColumnVindexes) == 0 { + return true + } + // If neither source nor target have any vindexes, treat the answer to + // the question as trivially false. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 { + return true + } + + sPrimaryVindex := sColumnVindexes[0] + tPrimaryVindex := tColumnVindexes[0] + + // Compare source and target primary vindex columns. + var sColumns, tColumns []string + if sPrimaryVindex.Column != "" { + sColumns = []string{sPrimaryVindex.Column} + } else { + sColumns = sPrimaryVindex.Columns + } + if tPrimaryVindex.Column != "" { + tColumns = []string{tPrimaryVindex.Column} + } else { + tColumns = tPrimaryVindex.Columns + } + if len(sColumns) != len(tColumns) { + return true + } + for i := 0; i < len(sColumns); i++ { + if !strings.EqualFold(sColumns[i], tColumns[i]) { + return true + } + } + + // Get source and target vindex definitions. + spv := source.Vindexes[sColumnVindexes[0].Name] + tpv := target.Vindexes[tColumnVindexes[0].Name] + // If the source has vindex definition, but target does not, then the + // target vschema is invalid. Assume the primary vindexes differ. + if spv != nil && tpv == nil { + return true + } + // If the target has vindex definition, but source does not, then the + // source vschema is invalid. Assume the primary vindexes differ. + if spv == nil && tpv != nil { + return true + } + // If both target and source are missing vindex definitions, then both + // are equally invalid. + if spv == nil && tpv == nil { + continue + } + // Compare source and target vindex type. + if !strings.EqualFold(spv.Type, tpv.Type) { + return true + } + } + return false +} diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 0fbba42f31c..7d112add1d3 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -157,9 +157,11 @@ func (tenv *testEnv) addTablet(t *testing.T, id int, keyspace, shard string) *fa panic(err) } + vrdbClient := binlogplayer.NewMockDBClient(t) + vrdbClient.Tag = fmt.Sprintf("tablet:%d", id) tenv.tmc.tablets[id] = &fakeTabletConn{ tablet: tablet, - vrdbClient: binlogplayer.NewMockDBClient(t), + vrdbClient: vrdbClient, } dbClientFactory := func() binlogplayer.DBClient { diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index bc4efedd85a..5efb200a5aa 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -18,6 +18,7 @@ package tabletmanager import ( "context" + "errors" "fmt" "math" "runtime/debug" @@ -576,6 +577,232 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { } } +// TestSourceShardSelection tests the RPC calls made by VtctldServer to tablet +// managers include the correct set of BLS settings. +// +// errShortCircuit is intentionally injected into the MoveTables workflow to +// short-circuit the workflow after we've validated everything we wanted to in +// the test. +func TestSourceShardSelection(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sourceKs := "sourceks" + sourceShard0 := "-55" + sourceShard1 := "55-aa" + sourceShard2 := "aa-" + sourceTabletUID0 := 200 + sourceTabletUID1 := 201 + sourceTabletUID2 := 202 + + targetKs := "targetks" + targetShard0 := "-80" + targetShard1 := "80-" + targetTabletUID0 := 300 + targetTabletUID1 := 301 + + wf := "testwf" + + tenv := newTestEnv(t, ctx, sourceKs, []string{sourceShard0, sourceShard1, sourceShard2}) + defer tenv.close() + + sourceTablets := map[int]*fakeTabletConn{ + sourceTabletUID0: tenv.addTablet(t, sourceTabletUID0, sourceKs, sourceShard0), + sourceTabletUID1: tenv.addTablet(t, sourceTabletUID1, sourceKs, sourceShard1), + sourceTabletUID2: tenv.addTablet(t, sourceTabletUID2, sourceKs, sourceShard2), + } + for _, st := range sourceTablets { + defer tenv.deleteTablet(st.tablet) + } + + targetTablets := map[int]*fakeTabletConn{ + targetTabletUID0: tenv.addTablet(t, targetTabletUID0, targetKs, targetShard0), + targetTabletUID1: tenv.addTablet(t, targetTabletUID1, targetKs, targetShard1), + } + for _, tt := range targetTablets { + defer tenv.deleteTablet(tt.tablet) + } + + ws := workflow.NewServer(tenv.ts, tenv.tmc) + + tenv.ts.SaveVSchema(ctx, sourceKs, &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + }, + }) + tenv.ts.SaveVSchema(ctx, targetKs, &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + }, + }) + + tests := []struct { + name string + req *vtctldatapb.MoveTablesCreateRequest + schema *tabletmanagerdatapb.SchemaDefinition + vschema *vschemapb.Keyspace + streams map[int][]string + }{ + { + name: "same primary vindexes, use intersecting source shards", + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + AllTables: true, + AutoStart: false, + }, + streams: map[int][]string{ + targetTabletUID0: { + sourceShard0, + sourceShard1, + }, + targetTabletUID1: { + sourceShard1, + sourceShard2, + }, + }, + }, + { + name: "different primary vindexes, use all source shards", + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + AllTables: true, + AutoStart: false, + }, + vschema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + }, + }, + streams: map[int][]string{ + targetTabletUID0: { + sourceShard0, + sourceShard1, + sourceShard2, + }, + targetTabletUID1: { + sourceShard0, + sourceShard1, + sourceShard2, + }, + }, + }, + } + + for _, tt := range targetTablets { + tenv.tmc.setVReplicationExecResults(tt.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", + targetKs, wf), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(tt.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and message='FROZEN' and workflow_sub_type != 1", + targetKs), &sqltypes.Result{}) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // This is needed because MockDBClient uses t.Fatal() + // which doesn't play well with subtests. + defer func() { + if err := recover(); err != nil { + t.Errorf("Recovered from panic: %v; Stack: %s", err, string(debug.Stack())) + } + }() + + require.NotNil(t, tt.req, "No MoveTablesCreate request provided") + require.NotEmpty(t, tt.streams, "No expected streams provided") + + if tt.schema == nil { + tt.schema = defaultSchema + } + tenv.tmc.SetSchema(tt.schema) + + if tt.vschema != nil { + tenv.ts.SaveVSchema(ctx, targetKs, tt.vschema) + } + + for uid, streams := range tt.streams { + tt := targetTablets[uid] + for i, sourceShard := range streams { + tt.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + var err error + if i == len(streams)-1 { + // errShortCircuit is intentionally injected into the MoveTables + // workflow to short-circuit the workflow after we've validated + // everything we wanted to in the test. + err = errShortCircuit + } + tt.vrdbClient.ExpectRequest( + fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1 where in_keyrange(id, \'%s.hash\', \'%s\')\"}}', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 0)`, + insertVReplicationPrefix, wf, sourceKs, sourceShard, targetKs, tt.tablet.Shard, tenv.cells[0], tenv.dbName), + &sqltypes.Result{InsertID: uint64(i + 1)}, + err, + ) + if errors.Is(err, errShortCircuit) { + break + } + tt.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil) + tt.vrdbClient.ExpectRequest( + fmt.Sprintf("select * from _vt.vreplication where id = %d", uint64(i+1)), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source|state", + "int64|varchar|varchar", + ), + fmt.Sprintf("%d|%s|Stopped", uint64(i+1), fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1 where in_keyrange(id, '%s.hash', '%s')"}}`, sourceKs, sourceShard, targetKs, tt.tablet.Shard)), + ), + nil, + ) + } + } + + _, err := ws.MoveTablesCreate(ctx, tt.req) + for _, tt := range targetTablets { + tt.vrdbClient.Wait() + } + // errShortCircuit is intentionally injected into the MoveTables + // workflow to short-circuit the workflow after we've validated + // everything we wanted to in the test. + require.ErrorContains(t, err, fmt.Sprintf("%s\n%s", errShortCircuit.Error(), errShortCircuit.Error())) + }) + } +} + // TestFailedMoveTablesCreateCleanup tests that the workflow // and its artifacts are cleaned up when the workflow creation // fails -- specifically after the point where we have created diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index b24aea00136..0fba424eacd 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -58,12 +58,13 @@ import ( ) type materializer struct { - wr *Wrangler - ms *vtctldatapb.MaterializeSettings - targetVSchema *vindexes.KeyspaceSchema - sourceShards []*topo.ShardInfo - targetShards []*topo.ShardInfo - isPartial bool + wr *Wrangler + ms *vtctldatapb.MaterializeSettings + targetVSchema *vindexes.KeyspaceSchema + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo + isPartial bool + primaryVindexesDiffer bool } const ( @@ -1027,7 +1028,8 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat } insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { - inserts, err := mz.generateInserts(ctx, targetShard) + sourceShards := mz.filterSourceShards(targetShard) + inserts, err := mz.generateInserts(ctx, sourceShards) if err != nil { return nil, err } @@ -1057,6 +1059,7 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if err != nil { return nil, err } + if targetVSchema.Keyspace.Sharded { for _, ts := range ms.TableSettings { if targetVSchema.Tables[ts.TargetTable] == nil { @@ -1106,13 +1109,29 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater return nil, fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } + sourceTs := wr.ts + if ms.ExternalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command + externalTopo, err := wr.ts.OpenExternalVitessClusterServer(ctx, ms.ExternalCluster) + if err != nil { + return nil, fmt.Errorf("failed to open external topo: %v", err) + } + sourceTs = externalTopo + } + differentPVs := false + sourceVSchema, err := sourceTs.GetVSchema(ctx, ms.SourceKeyspace) + if err != nil { + return nil, fmt.Errorf("failed to get source keyspace vschema: %v", err) + } + differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + return &materializer{ - wr: wr, - ms: ms, - targetVSchema: targetVSchema, - sourceShards: sourceShards, - targetShards: targetShards, - isPartial: isPartial, + wr: wr, + ms: ms, + targetVSchema: targetVSchema, + sourceShards: sourceShards, + targetShards: targetShards, + isPartial: isPartial, + primaryVindexesDiffer: differentPVs, }, nil } @@ -1300,17 +1319,10 @@ func stripTableConstraints(ddl string) (string, error) { return newDDL, nil } -func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") - for _, sourceShard := range mz.sourceShards { - // Don't create streams from sources which won't contain data for the target shard. - // We only do it for MoveTables for now since this doesn't hold for materialize flows - // where the target's sharding key might differ from that of the source - if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES && - !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } + for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ Keyspace: mz.ms.SourceKeyspace, Shard: sourceShard.ShardName(), @@ -1548,3 +1560,117 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error }) return err } + +// filterSourceShards filters out source shards that do not overlap with the +// provided target shard. This is an optimization to avoid copying unnecessary +// data between the shards. This optimization is only applied for MoveTables +// when the source and target shard have the same primary vindexes. +func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { + if mz.primaryVindexesDiffer || mz.ms.MaterializationIntent != vtctldatapb.MaterializationIntent_MOVETABLES { + // Use all source shards. + return mz.sourceShards + } + // Use intersecting source shards. + var filteredSourceShards []*topo.ShardInfo + for _, sourceShard := range mz.sourceShards { + if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { + continue + } + filteredSourceShards = append(filteredSourceShards, sourceShard) + } + return filteredSourceShards +} + +// primaryVindexesDiffer returns true if, for any tables defined in the provided +// materialize settings, the source and target vschema definitions for those +// tables have different primary vindexes. +// +// The result of this function is used to determine whether to apply a source +// shard selection optimization in MoveTables. +func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target *vschemapb.Keyspace) bool { + // Unless both keyspaces are sharded, treat the answer to the question as + // trivially false. + if source.Sharded != target.Sharded { + return false + } + + // For source and target keyspaces that are sharded, we can optimize source + // shard selection if source and target tables' primary vindexes are equal. + // + // To determine this, iterate over all target tables, looking for primary + // vindexes that differ from the corresponding source table. + for _, ts := range ms.TableSettings { + sColumnVindexes := []*vschemapb.ColumnVindex{} + tColumnVindexes := []*vschemapb.ColumnVindex{} + if tt, ok := source.Tables[ts.TargetTable]; ok { + sColumnVindexes = tt.ColumnVindexes + } + if tt, ok := target.Tables[ts.TargetTable]; ok { + tColumnVindexes = tt.ColumnVindexes + } + + // If source does not have a primary vindex, but the target does, then + // the primary vindexes differ. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) > 0 { + return true + } + // If source has a primary vindex, but the target does not, then the + // primary vindexes differ. + if len(sColumnVindexes) > 0 && len(tColumnVindexes) == 0 { + return true + } + // If neither source nor target have any vindexes, treat the answer to + // the question as trivially false. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 { + return true + } + + sPrimaryVindex := sColumnVindexes[0] + tPrimaryVindex := tColumnVindexes[0] + + // Compare source and target primary vindex columns. + var sColumns, tColumns []string + if sPrimaryVindex.Column != "" { + sColumns = []string{sPrimaryVindex.Column} + } else { + sColumns = sPrimaryVindex.Columns + } + if tPrimaryVindex.Column != "" { + tColumns = []string{tPrimaryVindex.Column} + } else { + tColumns = tPrimaryVindex.Columns + } + if len(sColumns) != len(tColumns) { + return true + } + for i := 0; i < len(sColumns); i++ { + if !strings.EqualFold(sColumns[i], tColumns[i]) { + return true + } + } + + // Get source and target vindex definitions. + spv := source.Vindexes[sColumnVindexes[0].Name] + tpv := target.Vindexes[tColumnVindexes[0].Name] + // If the source has vindex definition, but target does not, then the + // target vschema is invalid. Assume the primary vindexes differ. + if spv != nil && tpv == nil { + return true + } + // If the target has vindex definition, but source does not, then the + // source vschema is invalid. Assume the primary vindexes differ. + if spv == nil && tpv != nil { + return true + } + // If both target and source are missing vindex definitions, then both + // are equally invalid. + if spv == nil && tpv == nil { + continue + } + // Compare source and target vindex type. + if !strings.EqualFold(spv.Type, tpv.Type) { + return true + } + } + return false +} diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 87a4a9896ad..9126d8540c1 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -2952,7 +2952,7 @@ func TestStripConstraints(t *testing.T) { } } -func TestMaterializerManyToManySomeUnreachable(t *testing.T) { +func TestMaterializerSourceShardSelection(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", SourceKeyspace: "sourceks", @@ -2964,7 +2964,11 @@ func TestMaterializerManyToManySomeUnreachable(t *testing.T) { }}, } - vs := &vschemapb.Keyspace{ + getStreamInsert := func(sourceShard, sourceColumn, targetVindex, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(%s.*targetks\.%s.*%s.*`, sourceShard, sourceColumn, targetVindex, targetShard) + } + + targetVSchema := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ "xxhash": { @@ -2980,55 +2984,272 @@ func TestMaterializerManyToManySomeUnreachable(t *testing.T) { }, }, } + type testcase struct { - targetShards, sourceShards []string - insertMap map[string][]string + name string + targetShards, sourceShards []string + sourceColumn string + targetVindex string + insertMap map[string][]string + targetVSchema, sourceVSchema *vschemapb.Keyspace + getStreamInsert func(sourceShard, sourceColumn, targetVindexName, targetShard string) string } testcases := []testcase{ { - targetShards: []string{"-40", "40-80", "80-c0", "c0-"}, - sourceShards: []string{"-80", "80-"}, - insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-c0": {"80-"}, "c0-": {"80-"}}, + targetShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceShards: []string{"-80", "80-"}, + sourceColumn: "c1", + targetVindex: "xxhash", + insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-c0": {"80-"}, "c0-": {"80-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + targetShards: []string{"-20", "20-40", "40-a0", "a0-f0", "f0-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "xxhash", + insertMap: map[string][]string{"-20": {"-40"}, "20-40": {"-40"}, "40-a0": {"40-80", "80-c0"}, "a0-f0": {"80-c0", "c0-"}, "f0-": {"c0-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + targetShards: []string{"-40", "40-80", "80-"}, + sourceShards: []string{"-80", "80-"}, + sourceColumn: "c1", + targetVindex: "xxhash", + insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-": {"80-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "xxhash", + insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + targetShards: []string{"0"}, + sourceShards: []string{"-80", "80-"}, + sourceColumn: "c1", + targetVindex: "xxhash", + insertMap: map[string][]string{"0": {"-80", "80-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, }, { - targetShards: []string{"-20", "20-40", "40-a0", "a0-f0", "f0-"}, + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"0"}, + sourceColumn: "c1", + targetVindex: "xxhash", + insertMap: map[string][]string{"-80": {"0"}, "80-": {"0"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + name: "different primary vindex type, use all source shards", + targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, - insertMap: map[string][]string{"-20": {"-40"}, "20-40": {"-40"}, "40-a0": {"40-80", "80-c0"}, "a0-f0": {"80-c0", "c0-"}, "f0-": {"c0-"}}, + sourceColumn: "c1", + targetVindex: "hash", + insertMap: map[string][]string{ + "-80": {"-40", "40-80", "80-c0", "c0-"}, + "80-": {"-40", "40-80", "80-c0", "c0-"}, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + }, + getStreamInsert: getStreamInsert, }, { - targetShards: []string{"-40", "40-80", "80-"}, - sourceShards: []string{"-80", "80-"}, - insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-": {"80-"}}, + name: "different vindex type and name, use all source shards", + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "xxhash", + insertMap: map[string][]string{ + "-80": {"-40", "40-80", "80-c0", "c0-"}, + "80-": {"-40", "40-80", "80-c0", "c0-"}, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "xxhash", + }}, + }, + }, + }, + getStreamInsert: getStreamInsert, }, { + name: "same vindex type but different name, use intersecting source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash_vdx": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash_vdx", + }}, + }, + }, + }, + getStreamInsert: getStreamInsert, }, { - targetShards: []string{"0"}, + name: "unsharded source, sharded target, use all source shards", + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-"}, + targetVindex: "xxhash", + insertMap: map[string][]string{ + "-80": {"-"}, + "80-": {"-"}, + }, + sourceVSchema: &vschemapb.Keyspace{ + Sharded: false, + }, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + name: "sharded source, unsharded target, use all source shards", + targetShards: []string{"-"}, sourceShards: []string{"-80", "80-"}, - insertMap: map[string][]string{"0": {"-80", "80-"}}, + insertMap: map[string][]string{ + "-": {"-80", "80-"}, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: false, + }, + // The single target shard streams all data from each source shard + // without any keyrange filtering. + getStreamInsert: func(sourceShard, _, _, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1`, sourceShard) + }, }, { + name: "target secondary vindexes, use intersecting source shards", targetShards: []string{"-80", "80-"}, - sourceShards: []string{"0"}, - insertMap: map[string][]string{"-80": {"0"}, "80-": {"0"}}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "hash", + insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "lookup_vdx": { + Type: "lookup", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "c1", + Name: "hash", + }, + { + Column: "c2", + Name: "lookup_vdx", + }, + }, + }, + }, + }, + getStreamInsert: getStreamInsert, + }, + { + name: "same vindex type but different cols, use all source shards", + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c2", + targetVindex: "hash", + sourceVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c2", + Name: "hash", + }}, + }, + }, + }, + getStreamInsert: getStreamInsert, }, - } - - getStreamInsert := func(sourceShard, targetShard string) string { - return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.xxhash.*%s.*`, sourceShard, targetShard) } for _, tcase := range testcases { - t.Run("", func(t *testing.T) { + t.Run(tcase.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() env := newTestMaterializerEnv(t, ctx, ms, tcase.sourceShards, tcase.targetShards) - if err := env.topoServ.SaveVSchema(context.Background(), "targetks", vs); err != nil { + if err := env.topoServ.SaveVSchema(ctx, "targetks", tcase.targetVSchema); err != nil { t.Fatal(err) } + if tcase.sourceVSchema != nil { + if err := env.topoServ.SaveVSchema(context.Background(), "sourceks", tcase.sourceVSchema); err != nil { + t.Fatal(err) + } + } defer env.close() for i, targetShard := range tcase.targetShards { tabletID := 200 + i*10 @@ -3037,7 +3258,7 @@ func TestMaterializerManyToManySomeUnreachable(t *testing.T) { streamsInsert := "" sourceShards := tcase.insertMap[targetShard] for _, sourceShard := range sourceShards { - streamsInsert += getStreamInsert(sourceShard, targetShard) + streamsInsert += tcase.getStreamInsert(sourceShard, tcase.sourceColumn, tcase.targetVindex, targetShard) } env.tmc.expectVRQuery( tabletID,