From e9f1de69b7c4b8d3abd154c153004cec19848800 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sun, 13 Aug 2023 23:43:39 -0400 Subject: [PATCH 01/11] wrangler,workflow/workflow: materialize from intersecting source shards based on primary vindexes Signed-off-by: Max Englander --- go/vt/vtctl/workflow/materializer.go | 134 +++++++++++++++++++++------ go/vt/wrangler/materializer.go | 131 +++++++++++++++++++++----- go/vt/wrangler/materializer_test.go | 128 +++++++++++++++++++------ 3 files changed, 314 insertions(+), 79 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 15da7a20107..017b3e3daa3 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -54,11 +54,12 @@ type materializer struct { sourceTs *topo.Server tmc tmclient.TabletManagerClient - ms *vtctldatapb.MaterializeSettings - targetVSchema *vindexes.KeyspaceSchema - sourceShards []*topo.ShardInfo - targetShards []*topo.ShardInfo - isPartial bool + ms *vtctldatapb.MaterializeSettings + targetVSchema *vindexes.KeyspaceSchema + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo + isPartial bool + samePrimaryVindexes bool } func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCreateRequest) error { @@ -86,7 +87,9 @@ func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCr if err != nil { return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias) } - blses, err := mz.generateBinlogSources(mz.ctx, target) + + sourceShards := mz.filterSourceShards(target) + blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards) if err != nil { return err } @@ -124,7 +127,8 @@ func (mz *materializer) createMaterializerStreams() error { } insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { - inserts, err := mz.generateInserts(mz.ctx, targetShard) + sourceShards := mz.filterSourceShards(targetShard) + inserts, err := mz.generateInserts(mz.ctx, sourceShards) if err != nil { return err } @@ -136,17 +140,10 @@ func (mz *materializer) createMaterializerStreams() error { return nil } -func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") - for _, sourceShard := range mz.sourceShards { - // Don't create streams from sources which won't contain data for the target shard. - // We only do it for MoveTables for now since this doesn't hold for materialize flows - // where the target's sharding key might differ from that of the source - if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES && - !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } + for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ Keyspace: mz.ms.SourceKeyspace, Shard: sourceShard.ShardName(), @@ -243,16 +240,13 @@ func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.S return ig.String(), nil } -func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) { +func (mz *materializer) generateBinlogSources( + ctx context.Context, + targetShard *topo.ShardInfo, + sourceShards []*topo.ShardInfo, +) ([]*binlogdatapb.BinlogSource, error) { blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards)) - for _, sourceShard := range mz.sourceShards { - // Don't create streams from sources which won't contain data for the target shard. - // We only do it for MoveTables for now since this doesn't hold for materialize flows - // where the target's sharding key might differ from that of the source - if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES && - !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } + for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ Keyspace: mz.ms.SourceKeyspace, Shard: sourceShard.ShardName(), @@ -443,11 +437,19 @@ func (mz *materializer) deploySchema() error { func (mz *materializer) buildMaterializer() error { ctx := mz.ctx ms := mz.ms - vschema, err := mz.ts.GetVSchema(ctx, ms.TargetKeyspace) + sourceVSchemaPB, err := mz.ts.GetVSchema(ctx, ms.SourceKeyspace) + if err != nil { + return err + } + sourceVSchema, err := vindexes.BuildKeyspaceSchema(sourceVSchemaPB, ms.SourceKeyspace) if err != nil { return err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace) + targetVSchemaPB, err := mz.ts.GetVSchema(ctx, ms.TargetKeyspace) + if err != nil { + return err + } + targetVSchema, err := vindexes.BuildKeyspaceSchema(targetVSchemaPB, ms.TargetKeyspace) if err != nil { return err } @@ -503,6 +505,7 @@ func (mz *materializer) buildMaterializer() error { mz.sourceShards = sourceShards mz.targetShards = targetShards mz.isPartial = isPartial + mz.samePrimaryVindexes = samePrimaryVindexes(ms, sourceVSchema, targetVSchema) return nil } @@ -605,3 +608,80 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error }) return err } + +func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { + // Don't create streams from sources which won't contain data for the + // target shard. Only do it for MoveTables where the source and target + // keyspaces are both sharded and have the same primary vindexes. + if mz.samePrimaryVindexes && mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES { + return useIntersectingSourceShards(targetShard, mz.sourceShards) + } + return useAllSourceShards(targetShard, mz.sourceShards) +} + +// samePrimaryVindexes returns true if, for all tables defined in the provided +// materialize settings, the provided source and target vschemas have the same +// primary vindexes. +func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vindexes.KeyspaceSchema) bool { + // For keyspaces that are not unsharded, there are no primary vindexes to + // compare. + if !source.Keyspace.Sharded || !target.Keyspace.Sharded { + return false + } + + // For source and target keyspaces that are sharded, we can optimize source + // shard selection if source and target tables' primary vindexes are equal. + // + // To determine this, iterate over all target tables, looking for primary + // vindexes that differ from the corresponding source table. + for _, ts := range ms.TableSettings { + st, sok := source.Tables[ts.TargetTable] + tt, tok := target.Tables[ts.TargetTable] + if !sok || !tok { + // Expected vschema to have a table definition, but did not. + return false + } + if st.Type != "" || tt.Type != "" { + // Expected tables to be type "" (regular, sharded), but were not. + return false + } + if len(st.ColumnVindexes) == 0 || len(tt.ColumnVindexes) == 0 { + // Expected column to have a primary column vindex, but none + // was found. + return false + } + spv, sok := source.Vindexes[st.ColumnVindexes[0].Name] + tpv, tok := target.Vindexes[st.ColumnVindexes[0].Name] + if !sok || !tok { + // Could not find keyspace vindex defined in column primary + // vindex. + return false + } + // Compare source and primary vindex. For explanation of pointer + // comparison, see note on vindexes.Vindex.String. + // + // > String returns the name of the Vindex instance. + // > It's used for testing and diagnostics. Use pointer + // > comparison to see if two objects refer to the same + // > Vindex. + if spv != tpv { + return false + } + } + return true +} + +func useAllSourceShards(_ *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { + return sourceShards +} + +func useIntersectingSourceShards(targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { + var filteredSourceShards []*topo.ShardInfo + for _, sourceShard := range sourceShards { + if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { + continue + } + filteredSourceShards = append(filteredSourceShards, sourceShard) + } + return filteredSourceShards +} diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 2edc06aa607..f5ef1d89ae7 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -57,12 +57,13 @@ import ( ) type materializer struct { - wr *Wrangler - ms *vtctldatapb.MaterializeSettings - targetVSchema *vindexes.KeyspaceSchema - sourceShards []*topo.ShardInfo - targetShards []*topo.ShardInfo - isPartial bool + wr *Wrangler + ms *vtctldatapb.MaterializeSettings + targetVSchema *vindexes.KeyspaceSchema + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo + isPartial bool + samePrimaryVindexes bool } const ( @@ -971,7 +972,8 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat } insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { - inserts, err := mz.generateInserts(ctx, targetShard) + sourceShards := mz.filterSourceShards(targetShard) + inserts, err := mz.generateInserts(ctx, sourceShards) if err != nil { return nil, err } @@ -993,11 +995,19 @@ func (wr *Wrangler) Materialize(ctx context.Context, ms *vtctldatapb.Materialize } func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.MaterializeSettings) (*materializer, error) { - vschema, err := wr.ts.GetVSchema(ctx, ms.TargetKeyspace) + sourceVSchemaPB, err := wr.ts.GetVSchema(ctx, ms.SourceKeyspace) if err != nil { return nil, err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace) + sourceVSchema, err := vindexes.BuildKeyspaceSchema(sourceVSchemaPB, ms.SourceKeyspace) + if err != nil { + return nil, err + } + targetVSchemaPB, err := wr.ts.GetVSchema(ctx, ms.TargetKeyspace) + if err != nil { + return nil, err + } + targetVSchema, err := vindexes.BuildKeyspaceSchema(targetVSchemaPB, ms.TargetKeyspace) if err != nil { return nil, err } @@ -1049,14 +1059,15 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if len(targetShards) == 0 { return nil, fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } - + samePrimaryVindexes := samePrimaryVindexes(ms, sourceVSchema, targetVSchema) return &materializer{ - wr: wr, - ms: ms, - targetVSchema: targetVSchema, - sourceShards: sourceShards, - targetShards: targetShards, - isPartial: isPartial, + wr: wr, + ms: ms, + targetVSchema: targetVSchema, + sourceShards: sourceShards, + targetShards: targetShards, + isPartial: isPartial, + samePrimaryVindexes: samePrimaryVindexes, }, nil } @@ -1244,17 +1255,10 @@ func stripTableConstraints(ddl string) (string, error) { return newDDL, nil } -func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") - for _, sourceShard := range mz.sourceShards { - // Don't create streams from sources which won't contain data for the target shard. - // We only do it for MoveTables for now since this doesn't hold for materialize flows - // where the target's sharding key might differ from that of the source - if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES && - !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } + for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ Keyspace: mz.ms.SourceKeyspace, Shard: sourceShard.ShardName(), @@ -1470,3 +1474,80 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error }) return err } + +func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { + // Don't create streams from sources which won't contain data for the + // target shard. Only do it for MoveTables where the source and target + // keyspaces are both sharded and have the same primary vindexes. + if mz.samePrimaryVindexes && mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES { + return useIntersectingSourceShards(targetShard, mz.sourceShards) + } + return useAllSourceShards(targetShard, mz.sourceShards) +} + +// samePrimaryVindexes returns true if, for all tables defined in the provided +// materialize settings, the provided source and target vschemas have the same +// primary vindexes. +func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vindexes.KeyspaceSchema) bool { + // For keyspaces that are not unsharded, there are no primary vindexes to + // compare. + if !source.Keyspace.Sharded || !target.Keyspace.Sharded { + return false + } + + // For source and target keyspaces that are sharded, we can optimize source + // shard selection if source and target tables' primary vindexes are equal. + // + // To determine this, iterate over all target tables, looking for primary + // vindexes that differ from the corresponding source table. + for _, ts := range ms.TableSettings { + st, sok := source.Tables[ts.TargetTable] + tt, tok := target.Tables[ts.TargetTable] + if !sok || !tok { + // Expected vschema to have a table definition, but did not. + return false + } + if st.Type != "" || tt.Type != "" { + // Expected tables to be type "" (regular, sharded), but were not. + return false + } + if len(st.ColumnVindexes) == 0 || len(tt.ColumnVindexes) == 0 { + // Expected column to have a primary column vindex, but none + // was found. + return false + } + spv, sok := source.Vindexes[st.ColumnVindexes[0].Name] + tpv, tok := target.Vindexes[st.ColumnVindexes[0].Name] + if !sok || !tok { + // Could not find keyspace vindex defined in column primary + // vindex. + return false + } + // Compare source and primary vindex. For explanation of pointer + // comparison, see note on vindexes.Vindex.String. + // + // > String returns the name of the Vindex instance. + // > It's used for testing and diagnostics. Use pointer + // > comparison to see if two objects refer to the same + // > Vindex. + if spv != tpv { + return false + } + } + return true +} + +func useAllSourceShards(_ *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { + return sourceShards +} + +func useIntersectingSourceShards(targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { + var filteredSourceShards []*topo.ShardInfo + for _, sourceShard := range sourceShards { + if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { + continue + } + filteredSourceShards = append(filteredSourceShards, sourceShard) + } + return filteredSourceShards +} diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 0001058a7b3..928bfabad93 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -2707,7 +2707,7 @@ func TestStripConstraints(t *testing.T) { } } -func TestMaterializerManyToManySomeUnreachable(t *testing.T) { +func TestMaterializerSourceShardSelection(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", SourceKeyspace: "sourceks", @@ -2719,7 +2719,11 @@ func TestMaterializerManyToManySomeUnreachable(t *testing.T) { }}, } - vs := &vschemapb.Keyspace{ + getStreamInsert := func(sourceShard, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.hash.*%s.*`, sourceShard, targetShard) + } + + targetVSchema := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ "hash": { @@ -2735,53 +2739,123 @@ func TestMaterializerManyToManySomeUnreachable(t *testing.T) { }, }, } + type testcase struct { - targetShards, sourceShards []string - insertMap map[string][]string + targetShards, sourceShards []string + insertMap map[string][]string + targetVSchema, sourceVSchema *vschemapb.Keyspace + getStreamInsert func(sourceShard, targetShard string) string } testcases := []testcase{ { - targetShards: []string{"-40", "40-80", "80-c0", "c0-"}, - sourceShards: []string{"-80", "80-"}, - insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-c0": {"80-"}, "c0-": {"80-"}}, + targetShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceShards: []string{"-80", "80-"}, + insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-c0": {"80-"}, "c0-": {"80-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, }, { - targetShards: []string{"-20", "20-40", "40-a0", "a0-f0", "f0-"}, - sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, - insertMap: map[string][]string{"-20": {"-40"}, "20-40": {"-40"}, "40-a0": {"40-80", "80-c0"}, "a0-f0": {"80-c0", "c0-"}, "f0-": {"c0-"}}, + targetShards: []string{"-20", "20-40", "40-a0", "a0-f0", "f0-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + insertMap: map[string][]string{"-20": {"-40"}, "20-40": {"-40"}, "40-a0": {"40-80", "80-c0"}, "a0-f0": {"80-c0", "c0-"}, "f0-": {"c0-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, }, { - targetShards: []string{"-40", "40-80", "80-"}, - sourceShards: []string{"-80", "80-"}, - insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-": {"80-"}}, + targetShards: []string{"-40", "40-80", "80-"}, + sourceShards: []string{"-80", "80-"}, + insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-": {"80-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, }, { - targetShards: []string{"-80", "80-"}, - sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, - insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + targetShards: []string{"0"}, + sourceShards: []string{"-80", "80-"}, + insertMap: map[string][]string{"0": {"-80", "80-"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, }, { - targetShards: []string{"0"}, - sourceShards: []string{"-80", "80-"}, - insertMap: map[string][]string{"0": {"-80", "80-"}}, + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"0"}, + insertMap: map[string][]string{"-80": {"0"}, "80-": {"0"}}, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + insertMap: map[string][]string{ + "-80": {"-40", "40-80", "80-c0", "c0-"}, + "80-": {"-40", "40-80", "80-c0", "c0-"}, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + }, + getStreamInsert: func(sourceShard, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.hash.*%s.*`, sourceShard, targetShard) + }, }, { targetShards: []string{"-80", "80-"}, - sourceShards: []string{"0"}, - insertMap: map[string][]string{"-80": {"0"}, "80-": {"0"}}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + insertMap: map[string][]string{ + "-80": {"-40", "40-80", "80-c0", "c0-"}, + "80-": {"-40", "40-80", "80-c0", "c0-"}, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "xxhash", + }}, + }, + }, + }, + getStreamInsert: func(sourceShard, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.xxhash.*%s.*`, sourceShard, targetShard) + }, }, } - getStreamInsert := func(sourceShard, targetShard string) string { - return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.hash.*%s.*`, sourceShard, targetShard) - } - for _, tcase := range testcases { t.Run("", func(t *testing.T) { env := newTestMaterializerEnv(t, ms, tcase.sourceShards, tcase.targetShards) - if err := env.topoServ.SaveVSchema(context.Background(), "targetks", vs); err != nil { + if err := env.topoServ.SaveVSchema(context.Background(), "targetks", tcase.targetVSchema); err != nil { t.Fatal(err) } + if tcase.sourceVSchema != nil { + if err := env.topoServ.SaveVSchema(context.Background(), "sourceks", tcase.sourceVSchema); err != nil { + t.Fatal(err) + } + } defer env.close() for i, targetShard := range tcase.targetShards { tabletID := 200 + i*10 @@ -2790,7 +2864,7 @@ func TestMaterializerManyToManySomeUnreachable(t *testing.T) { streamsInsert := "" sourceShards := tcase.insertMap[targetShard] for _, sourceShard := range sourceShards { - streamsInsert += getStreamInsert(sourceShard, targetShard) + streamsInsert += tcase.getStreamInsert(sourceShard, targetShard) } env.tmc.expectVRQuery( tabletID, From d62993bda8ef8c0f15e77140607f248761619f3c Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 15 Aug 2023 02:52:18 -0400 Subject: [PATCH 02/11] fix e2e test, fix primary vindex comparison logic Signed-off-by: Max Englander --- go/vt/vtctl/workflow/materializer.go | 33 ++++++++++--------------- go/vt/wrangler/materializer.go | 36 +++++++++++----------------- 2 files changed, 26 insertions(+), 43 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 017b3e3daa3..636c92e285f 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -39,6 +39,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -437,19 +438,11 @@ func (mz *materializer) deploySchema() error { func (mz *materializer) buildMaterializer() error { ctx := mz.ctx ms := mz.ms - sourceVSchemaPB, err := mz.ts.GetVSchema(ctx, ms.SourceKeyspace) + vschema, err := mz.ts.GetVSchema(ctx, ms.TargetKeyspace) if err != nil { return err } - sourceVSchema, err := vindexes.BuildKeyspaceSchema(sourceVSchemaPB, ms.SourceKeyspace) - if err != nil { - return err - } - targetVSchemaPB, err := mz.ts.GetVSchema(ctx, ms.TargetKeyspace) - if err != nil { - return err - } - targetVSchema, err := vindexes.BuildKeyspaceSchema(targetVSchemaPB, ms.TargetKeyspace) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace) if err != nil { return err } @@ -501,11 +494,15 @@ func (mz *materializer) buildMaterializer() error { if len(targetShards) == 0 { return fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } + samePVs := false + if sourceVSchema, err := mz.ts.GetVSchema(ctx, ms.SourceKeyspace); err == nil { + samePVs = samePrimaryVindexes(ms, sourceVSchema, vschema) + } mz.targetVSchema = targetVSchema mz.sourceShards = sourceShards mz.targetShards = targetShards mz.isPartial = isPartial - mz.samePrimaryVindexes = samePrimaryVindexes(ms, sourceVSchema, targetVSchema) + mz.samePrimaryVindexes = samePVs return nil } @@ -622,10 +619,10 @@ func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo. // samePrimaryVindexes returns true if, for all tables defined in the provided // materialize settings, the provided source and target vschemas have the same // primary vindexes. -func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vindexes.KeyspaceSchema) bool { +func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vschema.Keyspace) bool { // For keyspaces that are not unsharded, there are no primary vindexes to // compare. - if !source.Keyspace.Sharded || !target.Keyspace.Sharded { + if !source.Sharded || !target.Sharded { return false } @@ -657,14 +654,8 @@ func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vi // vindex. return false } - // Compare source and primary vindex. For explanation of pointer - // comparison, see note on vindexes.Vindex.String. - // - // > String returns the name of the Vindex instance. - // > It's used for testing and diagnostics. Use pointer - // > comparison to see if two objects refer to the same - // > Vindex. - if spv != tpv { + // Compare source and primary vindex types. + if !strings.EqualFold(spv.Type, tpv.Type) { return false } } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index f5ef1d89ae7..abd56254207 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -52,6 +52,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/proto/vschema" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -995,22 +996,15 @@ func (wr *Wrangler) Materialize(ctx context.Context, ms *vtctldatapb.Materialize } func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.MaterializeSettings) (*materializer, error) { - sourceVSchemaPB, err := wr.ts.GetVSchema(ctx, ms.SourceKeyspace) + vschema, err := wr.ts.GetVSchema(ctx, ms.TargetKeyspace) if err != nil { return nil, err } - sourceVSchema, err := vindexes.BuildKeyspaceSchema(sourceVSchemaPB, ms.SourceKeyspace) - if err != nil { - return nil, err - } - targetVSchemaPB, err := wr.ts.GetVSchema(ctx, ms.TargetKeyspace) - if err != nil { - return nil, err - } - targetVSchema, err := vindexes.BuildKeyspaceSchema(targetVSchemaPB, ms.TargetKeyspace) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace) if err != nil { return nil, err } + if targetVSchema.Keyspace.Sharded { for _, ts := range ms.TableSettings { if targetVSchema.Tables[ts.TargetTable] == nil { @@ -1059,7 +1053,11 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if len(targetShards) == 0 { return nil, fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } - samePrimaryVindexes := samePrimaryVindexes(ms, sourceVSchema, targetVSchema) + samePVs := false + if sourceVSchema, err := wr.ts.GetVSchema(ctx, ms.SourceKeyspace); err == nil { + samePVs = samePrimaryVindexes(ms, sourceVSchema, vschema) + } + return &materializer{ wr: wr, ms: ms, @@ -1067,7 +1065,7 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater sourceShards: sourceShards, targetShards: targetShards, isPartial: isPartial, - samePrimaryVindexes: samePrimaryVindexes, + samePrimaryVindexes: samePVs, }, nil } @@ -1488,10 +1486,10 @@ func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo. // samePrimaryVindexes returns true if, for all tables defined in the provided // materialize settings, the provided source and target vschemas have the same // primary vindexes. -func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vindexes.KeyspaceSchema) bool { +func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vschema.Keyspace) bool { // For keyspaces that are not unsharded, there are no primary vindexes to // compare. - if !source.Keyspace.Sharded || !target.Keyspace.Sharded { + if !source.Sharded || !target.Sharded { return false } @@ -1523,14 +1521,8 @@ func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vi // vindex. return false } - // Compare source and primary vindex. For explanation of pointer - // comparison, see note on vindexes.Vindex.String. - // - // > String returns the name of the Vindex instance. - // > It's used for testing and diagnostics. Use pointer - // > comparison to see if two objects refer to the same - // > Vindex. - if spv != tpv { + // Compare source and primary vindex types. + if !strings.EqualFold(spv.Type, tpv.Type) { return false } } From 674ef140b221dc1c05bcaa52c82495bd5e23ea53 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 15 Aug 2023 13:33:24 -0400 Subject: [PATCH 03/11] add rpc_vreplication_test for source shard selection Signed-off-by: Max Englander --- go/vt/binlog/binlogplayer/mock_dbclient.go | 26 ++- .../vttablet/tabletmanager/framework_test.go | 4 +- .../tabletmanager/rpc_vreplication_test.go | 215 ++++++++++++++++++ 3 files changed, 240 insertions(+), 5 deletions(-) diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index f8c97fff879..d64c4d40146 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -17,6 +17,7 @@ limitations under the License. package binlogplayer import ( + "fmt" "regexp" "strings" "sync" @@ -39,6 +40,7 @@ type MockDBClient struct { currentResult int done chan struct{} invariants map[string]*sqltypes.Result + Tag string } type mockExpect struct { @@ -177,7 +179,11 @@ func (dc *MockDBClient) Close() { // ExecuteFetch is part of the DBClient interface func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) { dc.t.Helper() - dc.t.Logf("DBClient query: %v", query) + msg := "DBClient query: %v" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Logf(msg, query) for q, result := range dc.invariants { if strings.Contains(strings.ToLower(query), strings.ToLower(q)) { @@ -188,16 +194,28 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re dc.expectMu.Lock() defer dc.expectMu.Unlock() if dc.currentResult >= len(dc.expect) { - dc.t.Fatalf("DBClientMock: query: %s, no more requests are expected", query) + msg := "DBClientMock: query: %s, no more requests are expected" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Fatalf(msg, query) } result := dc.expect[dc.currentResult] if result.re == nil { if query != result.query { - dc.t.Fatalf("DBClientMock: query: %s, want %s", query, result.query) + msg := "DBClientMock: query: %s, want %s" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Fatalf(msg, query, result.query) } } else { if !result.re.MatchString(query) { - dc.t.Fatalf("DBClientMock: query: %s, must match %s", query, result.query) + msg := "DBClientMock: query: %s, must match %s" + if dc.Tag != "" { + msg = fmt.Sprintf("[%s] %s", dc.Tag, msg) + } + dc.t.Fatalf(msg, query, result.query) } } dc.currentResult++ diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 32d1c7019c2..08f3d00cd79 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -156,9 +156,11 @@ func (tenv *testEnv) addTablet(t *testing.T, id int, keyspace, shard string) *fa panic(err) } + vrdbClient := binlogplayer.NewMockDBClient(t) + vrdbClient.Tag = fmt.Sprintf("tablet:%d", id) tenv.tmc.tablets[id] = &fakeTabletConn{ tablet: tablet, - vrdbClient: binlogplayer.NewMockDBClient(t), + vrdbClient: vrdbClient, } dbClientFactory := func() binlogplayer.DBClient { diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index a7fbb59c9d8..4fd34c0fb0f 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -568,3 +568,218 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { }) } } + +// TestSourceShardSelection tests the RPC calls made by VtctldServer to tablet +// managers include the correct set of BLS settings. +func TestSourceShardSelection(t *testing.T) { + ctx := context.Background() + + sourceKs := "sourceks" + sourceShard0 := "-55" + sourceShard1 := "55-aa" + sourceShard2 := "aa-" + sourceTabletUID0 := 200 + sourceTabletUID1 := 201 + sourceTabletUID2 := 202 + + targetKs := "targetks" + targetShard0 := "-80" + targetShard1 := "80-" + targetTabletUID0 := 300 + targetTabletUID1 := 301 + + wf := "testwf" + + tenv := newTestEnv(t, sourceKs, []string{sourceShard0, sourceShard1, sourceShard2}) + defer tenv.close() + + sourceTablets := map[int]*fakeTabletConn{ + sourceTabletUID0: tenv.addTablet(t, sourceTabletUID0, sourceKs, sourceShard0), + sourceTabletUID1: tenv.addTablet(t, sourceTabletUID1, sourceKs, sourceShard1), + sourceTabletUID2: tenv.addTablet(t, sourceTabletUID2, sourceKs, sourceShard2), + } + for _, st := range sourceTablets { + defer tenv.deleteTablet(st.tablet) + } + + targetTablets := map[int]*fakeTabletConn{ + targetTabletUID0: tenv.addTablet(t, targetTabletUID0, targetKs, targetShard0), + targetTabletUID1: tenv.addTablet(t, targetTabletUID1, targetKs, targetShard1), + } + for _, tt := range targetTablets { + defer tenv.deleteTablet(tt.tablet) + } + + ws := workflow.NewServer(tenv.ts, tenv.tmc) + + tenv.ts.SaveVSchema(ctx, sourceKs, &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + }, + }) + tenv.ts.SaveVSchema(ctx, targetKs, &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + }, + }) + + tests := []struct { + name string + req *vtctldatapb.MoveTablesCreateRequest + schema *tabletmanagerdatapb.SchemaDefinition + vschema *vschemapb.Keyspace + streams map[int][]string + }{ + { + name: "same primary vindexes, use intersecting source shards", + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + AllTables: true, + AutoStart: false, + }, + streams: map[int][]string{ + targetTabletUID0: { + sourceShard0, + sourceShard1, + }, + targetTabletUID1: { + sourceShard1, + sourceShard2, + }, + }, + }, + { + name: "different primary vindexes, use all source shards", + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + AllTables: true, + AutoStart: false, + }, + vschema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + }, + }, + streams: map[int][]string{ + targetTabletUID0: { + sourceShard0, + sourceShard1, + sourceShard2, + }, + targetTabletUID1: { + sourceShard0, + sourceShard1, + sourceShard2, + }, + }, + }, + } + + for _, tt := range targetTablets { + tenv.tmc.setVReplicationExecResults(tt.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", + targetKs, wf), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(tt.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and message='FROZEN' and workflow_sub_type != 1", + targetKs), &sqltypes.Result{}) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // This is needed because MockDBClient uses t.Fatal() + // which doesn't play well with subtests. + defer func() { + if err := recover(); err != nil { + t.Errorf("Recovered from panic: %v; Stack: %s", err, string(debug.Stack())) + } + }() + + require.NotNil(t, tt.req, "No MoveTablesCreate request provided") + require.NotEmpty(t, tt.streams, "No expected streams provided") + + if tt.schema == nil { + tt.schema = defaultSchema + } + tenv.tmc.SetSchema(tt.schema) + + if tt.vschema != nil { + tenv.ts.SaveVSchema(ctx, targetKs, tt.vschema) + } + + for uid, streams := range tt.streams { + tt := targetTablets[uid] + for i, sourceShard := range streams { + tt.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + var err error + if i == len(streams)-1 { + err = errShortCircuit + } + tt.vrdbClient.ExpectRequest( + fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1 where in_keyrange(id, \'%s.hash\', \'%s\')\"}}', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 0)`, + insertVReplicationPrefix, wf, sourceKs, sourceShard, targetKs, tt.tablet.Shard, tenv.cells[0], tenv.dbName), + &sqltypes.Result{InsertID: uint64(i + 1)}, + err, + ) + if err != nil { + continue + } + tt.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil) + tt.vrdbClient.ExpectRequest( + fmt.Sprintf("select * from _vt.vreplication where id = %d", uint64(i+1)), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source|state", + "int64|varchar|varchar", + ), + fmt.Sprintf("%d|%s|Stopped", uint64(i+1), fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1 where in_keyrange(id, '%s.hash', '%s')"}}`, sourceKs, sourceShard, targetKs, tt.tablet.Shard)), + ), + nil, + ) + } + } + + _, err := ws.MoveTablesCreate(ctx, tt.req) + for _, tt := range targetTablets { + tt.vrdbClient.Wait() + } + require.ErrorContains(t, err, fmt.Sprintf("%s\n%s", errShortCircuit.Error(), errShortCircuit.Error())) + }) + } +} From cfe753153c2b9cbff7938a8ce4b63d34af39a450 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 29 Aug 2023 17:47:39 -0400 Subject: [PATCH 04/11] Apply suggestions from code review Co-authored-by: Matt Lord Signed-off-by: Max Englander --- go/vt/vtctl/workflow/materializer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 636c92e285f..2bbcce4d8ae 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -39,7 +39,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/vschema" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -620,7 +620,7 @@ func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo. // materialize settings, the provided source and target vschemas have the same // primary vindexes. func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vschema.Keyspace) bool { - // For keyspaces that are not unsharded, there are no primary vindexes to + // For keyspaces that are not sharded, there are no primary vindexes to // compare. if !source.Sharded || !target.Sharded { return false From da1802553aeb1a04562d0c3f276cf41df2cd5ea9 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 29 Aug 2023 19:59:44 -0400 Subject: [PATCH 05/11] address pr feedback Signed-off-by: Max Englander --- go/vt/vtctl/workflow/materializer.go | 135 ++++++++++++++++---------- go/vt/wrangler/materializer.go | 136 +++++++++++++++++---------- go/vt/wrangler/materializer_test.go | 128 ++++++++++++++++++++++++- 3 files changed, 295 insertions(+), 104 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 2bbcce4d8ae..b7097d075d2 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -55,12 +55,12 @@ type materializer struct { sourceTs *topo.Server tmc tmclient.TabletManagerClient - ms *vtctldatapb.MaterializeSettings - targetVSchema *vindexes.KeyspaceSchema - sourceShards []*topo.ShardInfo - targetShards []*topo.ShardInfo - isPartial bool - samePrimaryVindexes bool + ms *vtctldatapb.MaterializeSettings + targetVSchema *vindexes.KeyspaceSchema + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo + isPartial bool + primaryVindexesDiffer bool } func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCreateRequest) error { @@ -241,11 +241,7 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top return ig.String(), nil } -func (mz *materializer) generateBinlogSources( - ctx context.Context, - targetShard *topo.ShardInfo, - sourceShards []*topo.ShardInfo, -) ([]*binlogdatapb.BinlogSource, error) { +func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) { blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards)) for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ @@ -494,15 +490,15 @@ func (mz *materializer) buildMaterializer() error { if len(targetShards) == 0 { return fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } - samePVs := false + differentPVs := false if sourceVSchema, err := mz.ts.GetVSchema(ctx, ms.SourceKeyspace); err == nil { - samePVs = samePrimaryVindexes(ms, sourceVSchema, vschema) + differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) } mz.targetVSchema = targetVSchema mz.sourceShards = sourceShards mz.targetShards = targetShards mz.isPartial = isPartial - mz.samePrimaryVindexes = samePVs + mz.primaryVindexesDiffer = differentPVs return nil } @@ -606,23 +602,27 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error return err } +// filterSourceShards filters out source shards that do not overlap with the +// provided target shard. This is an optimization to avoid copying unnecessary +// data between the shards. This optimization is only applied for MoveTables +// when the source and target shard have the same primary vindexes. func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { - // Don't create streams from sources which won't contain data for the - // target shard. Only do it for MoveTables where the source and target - // keyspaces are both sharded and have the same primary vindexes. - if mz.samePrimaryVindexes && mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES { - return useIntersectingSourceShards(targetShard, mz.sourceShards) + if mz.primaryVindexesDiffer || mz.ms.MaterializationIntent != vtctldatapb.MaterializationIntent_MOVETABLES { + return useAllSourceShards(targetShard, mz.sourceShards) } - return useAllSourceShards(targetShard, mz.sourceShards) + return useIntersectingSourceShards(targetShard, mz.sourceShards) } -// samePrimaryVindexes returns true if, for all tables defined in the provided -// materialize settings, the provided source and target vschemas have the same -// primary vindexes. -func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vschema.Keyspace) bool { - // For keyspaces that are not sharded, there are no primary vindexes to - // compare. - if !source.Sharded || !target.Sharded { +// primaryVindexesDiffer returns true if, for any tables defined in the provided +// materialize settings, the source and target vschema definitions for those +// tables have different primary vindexes. +// +// The result of this function is used to determine whether to apply a source +// shard selection optimization in MoveTables. +func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target *vschemapb.Keyspace) bool { + // Unless both keyspaces are sharded, treat the answer to the question as + // trivially false. + if source.Sharded != target.Sharded { return false } @@ -632,34 +632,65 @@ func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vs // To determine this, iterate over all target tables, looking for primary // vindexes that differ from the corresponding source table. for _, ts := range ms.TableSettings { - st, sok := source.Tables[ts.TargetTable] - tt, tok := target.Tables[ts.TargetTable] - if !sok || !tok { - // Expected vschema to have a table definition, but did not. - return false - } - if st.Type != "" || tt.Type != "" { - // Expected tables to be type "" (regular, sharded), but were not. - return false - } - if len(st.ColumnVindexes) == 0 || len(tt.ColumnVindexes) == 0 { - // Expected column to have a primary column vindex, but none - // was found. - return false - } - spv, sok := source.Vindexes[st.ColumnVindexes[0].Name] - tpv, tok := target.Vindexes[st.ColumnVindexes[0].Name] - if !sok || !tok { - // Could not find keyspace vindex defined in column primary - // vindex. - return false - } - // Compare source and primary vindex types. + sColumnVindexes := []*vschemapb.ColumnVindex{} + tColumnVindexes := []*vschemapb.ColumnVindex{} + if tt, ok := source.Tables[ts.TargetTable]; ok { + sColumnVindexes = tt.ColumnVindexes + } + if tt, ok := target.Tables[ts.TargetTable]; ok { + tColumnVindexes = tt.ColumnVindexes + } + + // If source does not have a primary vindex, but the target does, then + // the primary vindexes differ. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) > 0 { + return true + } + // If source has a primary vindex, but the target does not, then the + // primary vindexes differ. + if len(sColumnVindexes) > 0 && len(tColumnVindexes) == 0 { + return true + } + // If neither source nor target have any vindexes, treat the answer to the question as trivially false. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 { + return true + } + + sPrimaryVindex := sColumnVindexes[0] + tPrimaryVindex := tColumnVindexes[0] + + // Compare source and target primary vindex columns. + var sColumns, tColumns []string + if sPrimaryVindex.Column != "" { + sColumns = []string{sPrimaryVindex.Column} + } else { + sColumns = sPrimaryVindex.Columns + } + if tPrimaryVindex.Column != "" { + tColumns = []string{tPrimaryVindex.Column} + } else { + tColumns = tPrimaryVindex.Columns + } + if len(sColumns) != len(tColumns) { + return true + } + for i := 0; i < len(sColumns); i++ { + if !strings.EqualFold(sColumns[i], tColumns[i]) { + return true + } + } + + // Assume the source and target keyspaces specify the vindex referenced + // in column vindex definitions. + spv := source.Vindexes[sColumnVindexes[0].Name] + tpv := target.Vindexes[tColumnVindexes[0].Name] + + // Compare source and target vindex type. if !strings.EqualFold(spv.Type, tpv.Type) { - return false + return true } } - return true + return false } func useAllSourceShards(_ *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index abd56254207..ab62aba2e6d 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -52,19 +52,18 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/vschema" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) type materializer struct { - wr *Wrangler - ms *vtctldatapb.MaterializeSettings - targetVSchema *vindexes.KeyspaceSchema - sourceShards []*topo.ShardInfo - targetShards []*topo.ShardInfo - isPartial bool - samePrimaryVindexes bool + wr *Wrangler + ms *vtctldatapb.MaterializeSettings + targetVSchema *vindexes.KeyspaceSchema + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo + isPartial bool + primaryVindexesDiffer bool } const ( @@ -1053,19 +1052,19 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if len(targetShards) == 0 { return nil, fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } - samePVs := false + differentPVs := false if sourceVSchema, err := wr.ts.GetVSchema(ctx, ms.SourceKeyspace); err == nil { - samePVs = samePrimaryVindexes(ms, sourceVSchema, vschema) + differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) } return &materializer{ - wr: wr, - ms: ms, - targetVSchema: targetVSchema, - sourceShards: sourceShards, - targetShards: targetShards, - isPartial: isPartial, - samePrimaryVindexes: samePVs, + wr: wr, + ms: ms, + targetVSchema: targetVSchema, + sourceShards: sourceShards, + targetShards: targetShards, + isPartial: isPartial, + primaryVindexesDiffer: differentPVs, }, nil } @@ -1473,23 +1472,27 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error return err } +// filterSourceShards filters out source shards that do not overlap with the +// provided target shard. This is an optimization to avoid copying unnecessary +// data between the shards. This optimization is only applied for MoveTables +// when the source and target shard have the same primary vindexes. func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { - // Don't create streams from sources which won't contain data for the - // target shard. Only do it for MoveTables where the source and target - // keyspaces are both sharded and have the same primary vindexes. - if mz.samePrimaryVindexes && mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES { - return useIntersectingSourceShards(targetShard, mz.sourceShards) + if mz.primaryVindexesDiffer || mz.ms.MaterializationIntent != vtctldatapb.MaterializationIntent_MOVETABLES { + return useAllSourceShards(targetShard, mz.sourceShards) } - return useAllSourceShards(targetShard, mz.sourceShards) + return useIntersectingSourceShards(targetShard, mz.sourceShards) } -// samePrimaryVindexes returns true if, for all tables defined in the provided -// materialize settings, the provided source and target vschemas have the same -// primary vindexes. -func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vschema.Keyspace) bool { - // For keyspaces that are not unsharded, there are no primary vindexes to - // compare. - if !source.Sharded || !target.Sharded { +// primaryVindexesDiffer returns true if, for any tables defined in the provided +// materialize settings, the source and target vschema definitions for those +// tables have different primary vindexes. +// +// The result of this function is used to determine whether to apply a source +// shard selection optimization in MoveTables. +func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target *vschemapb.Keyspace) bool { + // Unless both keyspaces are sharded, treat the answer to the question as + // trivially false. + if source.Sharded != target.Sharded { return false } @@ -1499,34 +1502,65 @@ func samePrimaryVindexes(ms *vtctldatapb.MaterializeSettings, source, target *vs // To determine this, iterate over all target tables, looking for primary // vindexes that differ from the corresponding source table. for _, ts := range ms.TableSettings { - st, sok := source.Tables[ts.TargetTable] - tt, tok := target.Tables[ts.TargetTable] - if !sok || !tok { - // Expected vschema to have a table definition, but did not. - return false + sColumnVindexes := []*vschemapb.ColumnVindex{} + tColumnVindexes := []*vschemapb.ColumnVindex{} + if tt, ok := source.Tables[ts.TargetTable]; ok { + sColumnVindexes = tt.ColumnVindexes } - if st.Type != "" || tt.Type != "" { - // Expected tables to be type "" (regular, sharded), but were not. - return false + if tt, ok := target.Tables[ts.TargetTable]; ok { + tColumnVindexes = tt.ColumnVindexes } - if len(st.ColumnVindexes) == 0 || len(tt.ColumnVindexes) == 0 { - // Expected column to have a primary column vindex, but none - // was found. - return false + + // If source does not have a primary vindex, but the target does, then + // the primary vindexes differ. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) > 0 { + return true } - spv, sok := source.Vindexes[st.ColumnVindexes[0].Name] - tpv, tok := target.Vindexes[st.ColumnVindexes[0].Name] - if !sok || !tok { - // Could not find keyspace vindex defined in column primary - // vindex. - return false + // If source has a primary vindex, but the target does not, then the + // primary vindexes differ. + if len(sColumnVindexes) > 0 && len(tColumnVindexes) == 0 { + return true + } + // If neither source nor target have any vindexes, treat the answer to the question as trivially false. + if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 { + return true + } + + sPrimaryVindex := sColumnVindexes[0] + tPrimaryVindex := tColumnVindexes[0] + + // Compare source and target primary vindex columns. + var sColumns, tColumns []string + if sPrimaryVindex.Column != "" { + sColumns = []string{sPrimaryVindex.Column} + } else { + sColumns = sPrimaryVindex.Columns } - // Compare source and primary vindex types. + if tPrimaryVindex.Column != "" { + tColumns = []string{tPrimaryVindex.Column} + } else { + tColumns = tPrimaryVindex.Columns + } + if len(sColumns) != len(tColumns) { + return true + } + for i := 0; i < len(sColumns); i++ { + if !strings.EqualFold(sColumns[i], tColumns[i]) { + return true + } + } + + // Assume the source and target keyspaces specify the vindex referenced + // in column vindex definitions. + spv := source.Vindexes[sColumnVindexes[0].Name] + tpv := target.Vindexes[tColumnVindexes[0].Name] + + // Compare source and target vindex type. if !strings.EqualFold(spv.Type, tpv.Type) { - return false + return true } } - return true + return false } func useAllSourceShards(_ *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 928bfabad93..777b840b0ac 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -2741,6 +2741,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { } type testcase struct { + name string targetShards, sourceShards []string insertMap map[string][]string targetVSchema, sourceVSchema *vschemapb.Keyspace @@ -2790,6 +2791,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { getStreamInsert: getStreamInsert, }, { + name: "different primary vindex type, use all source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, insertMap: map[string][]string{ @@ -2817,6 +2819,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { }, }, { + name: "different vindex type and name, use all source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, insertMap: map[string][]string{ @@ -2843,10 +2846,133 @@ func TestMaterializerSourceShardSelection(t *testing.T) { return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.xxhash.*%s.*`, sourceShard, targetShard) }, }, + { + name: "same vindex type but different name, use intersecting source shards", + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash_vdx": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash_vdx", + }}, + }, + }, + }, + getStreamInsert: getStreamInsert, + }, + { + name: "unsharded source, sharded target, use all source shards", + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-"}, + insertMap: map[string][]string{ + "-80": {"-"}, + "80-": {"-"}, + }, + sourceVSchema: &vschemapb.Keyspace{ + Sharded: false, + }, + targetVSchema: targetVSchema, + getStreamInsert: getStreamInsert, + }, + { + name: "sharded source, unsharded target, use all source shards", + targetShards: []string{"-"}, + sourceShards: []string{"-80", "80-"}, + insertMap: map[string][]string{ + "-": {"-80", "80-"}, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: false, + }, + getStreamInsert: func(sourceShard, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1`, sourceShard) + }, + }, + { + name: "target secondary vindexes, use intersecting source shards", + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "lookup_vdx": { + Type: "lookup", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "c1", + Name: "hash", + }, + { + Column: "c2", + Name: "lookup_vdx", + }, + }, + }, + }, + }, + getStreamInsert: getStreamInsert, + }, + { + name: "same vindex type but different cols, use all source shards", + targetShards: []string{"-80", "80-"}, + sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + }, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c2", + Name: "hash", + }}, + }, + }, + }, + getStreamInsert: func(sourceShard, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1`, sourceShard) + }, + }, } for _, tcase := range testcases { - t.Run("", func(t *testing.T) { + t.Run(tcase.name, func(t *testing.T) { env := newTestMaterializerEnv(t, ms, tcase.sourceShards, tcase.targetShards) if err := env.topoServ.SaveVSchema(context.Background(), "targetks", tcase.targetVSchema); err != nil { t.Fatal(err) From 7242db2b6c0f1886a45d61f55ec7e716ed7e077c Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 29 Aug 2023 21:02:08 -0400 Subject: [PATCH 06/11] gofmt Signed-off-by: Max Englander --- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 7 ++++--- go/vt/wrangler/materializer_test.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 8c442b97e5a..b41ed9128af 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -579,7 +579,8 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { // TestSourceShardSelection tests the RPC calls made by VtctldServer to tablet // managers include the correct set of BLS settings. func TestSourceShardSelection(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() sourceKs := "sourceks" sourceShard0 := "-55" @@ -597,7 +598,7 @@ func TestSourceShardSelection(t *testing.T) { wf := "testwf" - tenv := newTestEnv(t, sourceKs, []string{sourceShard0, sourceShard1, sourceShard2}) + tenv := newTestEnv(t, ctx, sourceKs, []string{sourceShard0, sourceShard1, sourceShard2}) defer tenv.close() sourceTablets := map[int]*fakeTabletConn{ @@ -970,4 +971,4 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { vs2, err := tenv.ts.GetVSchema(ctx, targetKs) require.NoError(t, err, "failed to get target vschema") require.Equal(t, vs, vs2, "expected vschema to be unchanged") -} \ No newline at end of file +} diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 0e6c499b633..e6b6b039f2a 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -3339,4 +3339,4 @@ func TestAddTablesToVSchema(t *testing.T) { require.Equal(t, tt.wantTargetVSchema, tt.inTargetVSchema) }) } -} \ No newline at end of file +} From fddfe49353ea9d9b78590c5dfefbd32a8a3bef3e Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 12 Sep 2023 11:09:05 +0100 Subject: [PATCH 07/11] Update go/vt/vttablet/tabletmanager/rpc_vreplication_test.go Co-authored-by: Matt Lord Signed-off-by: Max Englander --- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index b41ed9128af..eabccd41094 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -765,8 +765,8 @@ func TestSourceShardSelection(t *testing.T) { &sqltypes.Result{InsertID: uint64(i + 1)}, err, ) - if err != nil { - continue + if errors.Is(err, errShortCircuit) { + break } tt.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil) tt.vrdbClient.ExpectRequest( From 37799200b0706efdd0149602384339abcc48ebdd Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 12 Sep 2023 10:37:48 +0100 Subject: [PATCH 08/11] address pr feedback Signed-off-by: Max Englander --- go/vt/vtctl/workflow/materializer.go | 57 +++++++++++++++++----------- go/vt/wrangler/materializer.go | 56 ++++++++++++++++----------- 2 files changed, 67 insertions(+), 46 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 66108efb80e..14990b0be7d 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -486,9 +486,12 @@ func (mz *materializer) buildMaterializer() error { return fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } differentPVs := false - if sourceVSchema, err := mz.ts.GetVSchema(ctx, ms.SourceKeyspace); err == nil { - differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + sourceVSchema, err := mz.ts.GetVSchema(ctx, ms.SourceKeyspace) + if err != nil { + return fmt.Errorf("failed to get source keyspace vschema: %v", err) } + differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + mz.targetVSchema = targetVSchema mz.sourceShards = sourceShards mz.targetShards = targetShards @@ -603,9 +606,18 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error // when the source and target shard have the same primary vindexes. func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { if mz.primaryVindexesDiffer || mz.ms.MaterializationIntent != vtctldatapb.MaterializationIntent_MOVETABLES { - return useAllSourceShards(targetShard, mz.sourceShards) + // Use all source shards. + return mz.sourceShards + } + // Use intersecting source shards. + var filteredSourceShards []*topo.ShardInfo + for _, sourceShard := range mz.sourceShards { + if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { + continue + } + filteredSourceShards = append(filteredSourceShards, sourceShard) } - return useIntersectingSourceShards(targetShard, mz.sourceShards) + return filteredSourceShards } // primaryVindexesDiffer returns true if, for any tables defined in the provided @@ -646,7 +658,8 @@ func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target * if len(sColumnVindexes) > 0 && len(tColumnVindexes) == 0 { return true } - // If neither source nor target have any vindexes, treat the answer to the question as trivially false. + // If neither source nor target have any vindexes, treat the answer to + // the question as trivially false. if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 { return true } @@ -675,11 +688,24 @@ func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target * } } - // Assume the source and target keyspaces specify the vindex referenced - // in column vindex definitions. + // Get source and target vindex definitions. spv := source.Vindexes[sColumnVindexes[0].Name] tpv := target.Vindexes[tColumnVindexes[0].Name] - + // If the source has vindex definition, but target does not, then the + // target vschema is invalid. Assume the primary vindexes differ. + if spv != nil && tpv == nil { + return true + } + // If the target has vindex definition, but source does not, then the + // source vschema is invalid. Assume the primary vindexes differ. + if spv == nil && tpv != nil { + return true + } + // If both target and source are missing vindex definitions, then both + // are equally invalid. + if spv == nil && tpv == nil { + continue + } // Compare source and target vindex type. if !strings.EqualFold(spv.Type, tpv.Type) { return true @@ -687,18 +713,3 @@ func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target * } return false } - -func useAllSourceShards(_ *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { - return sourceShards -} - -func useIntersectingSourceShards(targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { - var filteredSourceShards []*topo.ShardInfo - for _, sourceShard := range sourceShards { - if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } - filteredSourceShards = append(filteredSourceShards, sourceShard) - } - return filteredSourceShards -} diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 85dac55dd02..66185c1d5e4 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1080,9 +1080,11 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater return nil, fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } differentPVs := false - if sourceVSchema, err := wr.ts.GetVSchema(ctx, ms.SourceKeyspace); err == nil { - differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + sourceVSchema, err := wr.ts.GetVSchema(ctx, ms.SourceKeyspace) + if err != nil { + return nil, fmt.Errorf("failed to get source keyspace vschema: %v", err) } + differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) return &materializer{ wr: wr, @@ -1505,9 +1507,18 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error // when the source and target shard have the same primary vindexes. func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo { if mz.primaryVindexesDiffer || mz.ms.MaterializationIntent != vtctldatapb.MaterializationIntent_MOVETABLES { - return useAllSourceShards(targetShard, mz.sourceShards) + // Use all source shards. + return mz.sourceShards + } + // Use intersecting source shards. + var filteredSourceShards []*topo.ShardInfo + for _, sourceShard := range mz.sourceShards { + if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { + continue + } + filteredSourceShards = append(filteredSourceShards, sourceShard) } - return useIntersectingSourceShards(targetShard, mz.sourceShards) + return filteredSourceShards } // primaryVindexesDiffer returns true if, for any tables defined in the provided @@ -1548,7 +1559,8 @@ func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target * if len(sColumnVindexes) > 0 && len(tColumnVindexes) == 0 { return true } - // If neither source nor target have any vindexes, treat the answer to the question as trivially false. + // If neither source nor target have any vindexes, treat the answer to + // the question as trivially false. if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 { return true } @@ -1577,11 +1589,24 @@ func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target * } } - // Assume the source and target keyspaces specify the vindex referenced - // in column vindex definitions. + // Get source and target vindex definitions. spv := source.Vindexes[sColumnVindexes[0].Name] tpv := target.Vindexes[tColumnVindexes[0].Name] - + // If the source has vindex definition, but target does not, then the + // target vschema is invalid. Assume the primary vindexes differ. + if spv != nil && tpv == nil { + return true + } + // If the target has vindex definition, but source does not, then the + // source vschema is invalid. Assume the primary vindexes differ. + if spv == nil && tpv != nil { + return true + } + // If both target and source are missing vindex definitions, then both + // are equally invalid. + if spv == nil && tpv == nil { + continue + } // Compare source and target vindex type. if !strings.EqualFold(spv.Type, tpv.Type) { return true @@ -1589,18 +1614,3 @@ func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target * } return false } - -func useAllSourceShards(_ *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { - return sourceShards -} - -func useIntersectingSourceShards(targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) []*topo.ShardInfo { - var filteredSourceShards []*topo.ShardInfo - for _, sourceShard := range sourceShards { - if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) { - continue - } - filteredSourceShards = append(filteredSourceShards, sourceShard) - } - return filteredSourceShards -} From 52dac0312859cca34c7732116d7b552e9dbb59af Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 12 Sep 2023 13:33:43 +0100 Subject: [PATCH 09/11] address pr feedback Signed-off-by: Max Englander --- .../tabletmanager/rpc_vreplication_test.go | 11 +++++ go/vt/wrangler/materializer_test.go | 49 +++++++++++++------ 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index eabccd41094..5efb200a5aa 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -18,6 +18,7 @@ package tabletmanager import ( "context" + "errors" "fmt" "math" "runtime/debug" @@ -578,6 +579,10 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { // TestSourceShardSelection tests the RPC calls made by VtctldServer to tablet // managers include the correct set of BLS settings. +// +// errShortCircuit is intentionally injected into the MoveTables workflow to +// short-circuit the workflow after we've validated everything we wanted to in +// the test. func TestSourceShardSelection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -757,6 +762,9 @@ func TestSourceShardSelection(t *testing.T) { tt.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) var err error if i == len(streams)-1 { + // errShortCircuit is intentionally injected into the MoveTables + // workflow to short-circuit the workflow after we've validated + // everything we wanted to in the test. err = errShortCircuit } tt.vrdbClient.ExpectRequest( @@ -787,6 +795,9 @@ func TestSourceShardSelection(t *testing.T) { for _, tt := range targetTablets { tt.vrdbClient.Wait() } + // errShortCircuit is intentionally injected into the MoveTables + // workflow to short-circuit the workflow after we've validated + // everything we wanted to in the test. require.ErrorContains(t, err, fmt.Sprintf("%s\n%s", errShortCircuit.Error(), errShortCircuit.Error())) }) } diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index e6b6b039f2a..c1cf3be6405 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -2815,8 +2815,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { }}, } - getStreamInsert := func(sourceShard, targetShard string) string { - return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.hash.*%s.*`, sourceShard, targetShard) + getStreamInsert := func(sourceShard, sourceColumn, targetVindex, targetShard string) string { + return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(%s.*targetks\.%s.*%s.*`, sourceShard, sourceColumn, targetVindex, targetShard) } targetVSchema := &vschemapb.Keyspace{ @@ -2839,14 +2839,18 @@ func TestMaterializerSourceShardSelection(t *testing.T) { type testcase struct { name string targetShards, sourceShards []string + sourceColumn string + targetVindex string insertMap map[string][]string targetVSchema, sourceVSchema *vschemapb.Keyspace - getStreamInsert func(sourceShard, targetShard string) string + getStreamInsert func(sourceShard, sourceColumn, targetVindexName, targetShard string) string } testcases := []testcase{ { targetShards: []string{"-40", "40-80", "80-c0", "c0-"}, sourceShards: []string{"-80", "80-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-c0": {"80-"}, "c0-": {"80-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -2854,6 +2858,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { { targetShards: []string{"-20", "20-40", "40-a0", "a0-f0", "f0-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-20": {"-40"}, "20-40": {"-40"}, "40-a0": {"40-80", "80-c0"}, "a0-f0": {"80-c0", "c0-"}, "f0-": {"c0-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -2861,6 +2867,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { { targetShards: []string{"-40", "40-80", "80-"}, sourceShards: []string{"-80", "80-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-": {"80-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -2868,6 +2876,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { { targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -2875,6 +2885,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { { targetShards: []string{"0"}, sourceShards: []string{"-80", "80-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"0": {"-80", "80-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -2882,6 +2894,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { { targetShards: []string{"-80", "80-"}, sourceShards: []string{"0"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-80": {"0"}, "80-": {"0"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -2890,6 +2904,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { name: "different primary vindex type, use all source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{ "-80": {"-40", "40-80", "80-c0", "c0-"}, "80-": {"-40", "40-80", "80-c0", "c0-"}, @@ -2910,14 +2926,14 @@ func TestMaterializerSourceShardSelection(t *testing.T) { }, }, }, - getStreamInsert: func(sourceShard, targetShard string) string { - return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.hash.*%s.*`, sourceShard, targetShard) - }, + getStreamInsert: getStreamInsert, }, { name: "different vindex type and name, use all source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "xxhash", insertMap: map[string][]string{ "-80": {"-40", "40-80", "80-c0", "c0-"}, "80-": {"-40", "40-80", "80-c0", "c0-"}, @@ -2938,14 +2954,14 @@ func TestMaterializerSourceShardSelection(t *testing.T) { }, }, }, - getStreamInsert: func(sourceShard, targetShard string) string { - return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.xxhash.*%s.*`, sourceShard, targetShard) - }, + getStreamInsert: getStreamInsert, }, { name: "same vindex type but different name, use intersecting source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, targetVSchema: &vschemapb.Keyspace{ Sharded: true, @@ -2969,6 +2985,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { name: "unsharded source, sharded target, use all source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-"}, + targetVindex: "hash", insertMap: map[string][]string{ "-80": {"-"}, "80-": {"-"}, @@ -2989,7 +3006,9 @@ func TestMaterializerSourceShardSelection(t *testing.T) { targetVSchema: &vschemapb.Keyspace{ Sharded: false, }, - getStreamInsert: func(sourceShard, targetShard string) string { + // The single target shard streams all data from each source shard + // without any keyrange filtering. + getStreamInsert: func(sourceShard, _, _, targetShard string) string { return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1`, sourceShard) }, }, @@ -2997,6 +3016,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { name: "target secondary vindexes, use intersecting source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c1", + targetVindex: "hash", insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, targetVSchema: &vschemapb.Keyspace{ Sharded: true, @@ -3029,6 +3050,8 @@ func TestMaterializerSourceShardSelection(t *testing.T) { name: "same vindex type but different cols, use all source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, + sourceColumn: "c2", + targetVindex: "hash", sourceVSchema: &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -3061,9 +3084,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { }, }, }, - getStreamInsert: func(sourceShard, targetShard string) string { - return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1`, sourceShard) - }, + getStreamInsert: getStreamInsert, }, } @@ -3088,7 +3109,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { streamsInsert := "" sourceShards := tcase.insertMap[targetShard] for _, sourceShard := range sourceShards { - streamsInsert += tcase.getStreamInsert(sourceShard, targetShard) + streamsInsert += tcase.getStreamInsert(sourceShard, tcase.sourceColumn, tcase.targetVindex, targetShard) } env.tmc.expectVRQuery( tabletID, From b8197af6ea56cf1a2b35369e4f41668dd6c84af3 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 12 Sep 2023 16:07:24 +0100 Subject: [PATCH 10/11] post merge main test fix Signed-off-by: Max Englander --- go/vt/wrangler/materializer_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 8653fd70434..9126d8540c1 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -2999,7 +2999,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { targetShards: []string{"-40", "40-80", "80-c0", "c0-"}, sourceShards: []string{"-80", "80-"}, sourceColumn: "c1", - targetVindex: "hash", + targetVindex: "xxhash", insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-c0": {"80-"}, "c0-": {"80-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -3008,7 +3008,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { targetShards: []string{"-20", "20-40", "40-a0", "a0-f0", "f0-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, sourceColumn: "c1", - targetVindex: "hash", + targetVindex: "xxhash", insertMap: map[string][]string{"-20": {"-40"}, "20-40": {"-40"}, "40-a0": {"40-80", "80-c0"}, "a0-f0": {"80-c0", "c0-"}, "f0-": {"c0-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -3017,7 +3017,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { targetShards: []string{"-40", "40-80", "80-"}, sourceShards: []string{"-80", "80-"}, sourceColumn: "c1", - targetVindex: "hash", + targetVindex: "xxhash", insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-": {"80-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -3026,7 +3026,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { targetShards: []string{"-80", "80-"}, sourceShards: []string{"-40", "40-80", "80-c0", "c0-"}, sourceColumn: "c1", - targetVindex: "hash", + targetVindex: "xxhash", insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -3035,7 +3035,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { targetShards: []string{"0"}, sourceShards: []string{"-80", "80-"}, sourceColumn: "c1", - targetVindex: "hash", + targetVindex: "xxhash", insertMap: map[string][]string{"0": {"-80", "80-"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -3044,7 +3044,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { targetShards: []string{"-80", "80-"}, sourceShards: []string{"0"}, sourceColumn: "c1", - targetVindex: "hash", + targetVindex: "xxhash", insertMap: map[string][]string{"-80": {"0"}, "80-": {"0"}}, targetVSchema: targetVSchema, getStreamInsert: getStreamInsert, @@ -3134,7 +3134,7 @@ func TestMaterializerSourceShardSelection(t *testing.T) { name: "unsharded source, sharded target, use all source shards", targetShards: []string{"-80", "80-"}, sourceShards: []string{"-"}, - targetVindex: "hash", + targetVindex: "xxhash", insertMap: map[string][]string{ "-80": {"-"}, "80-": {"-"}, From a0555d3a673e934a416d17d9812ec6d317beb1e6 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 13 Sep 2023 09:53:04 +0100 Subject: [PATCH 11/11] load source vschema from external topo Signed-off-by: Max Englander --- go/vt/vtctl/workflow/materializer.go | 11 ++++++++++- go/vt/wrangler/materializer.go | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index b7d07d4f540..1aa8137612d 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -502,8 +502,17 @@ func (mz *materializer) buildMaterializer() error { if len(targetShards) == 0 { return fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } + + sourceTs := mz.ts + if ms.ExternalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command + externalTopo, err := mz.ts.OpenExternalVitessClusterServer(ctx, ms.ExternalCluster) + if err != nil { + return fmt.Errorf("failed to open external topo: %v", err) + } + sourceTs = externalTopo + } differentPVs := false - sourceVSchema, err := mz.ts.GetVSchema(ctx, ms.SourceKeyspace) + sourceVSchema, err := sourceTs.GetVSchema(ctx, ms.SourceKeyspace) if err != nil { return fmt.Errorf("failed to get source keyspace vschema: %v", err) } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index a71278dfd48..0fba424eacd 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1108,8 +1108,17 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if len(targetShards) == 0 { return nil, fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow) } + + sourceTs := wr.ts + if ms.ExternalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command + externalTopo, err := wr.ts.OpenExternalVitessClusterServer(ctx, ms.ExternalCluster) + if err != nil { + return nil, fmt.Errorf("failed to open external topo: %v", err) + } + sourceTs = externalTopo + } differentPVs := false - sourceVSchema, err := wr.ts.GetVSchema(ctx, ms.SourceKeyspace) + sourceVSchema, err := sourceTs.GetVSchema(ctx, ms.SourceKeyspace) if err != nil { return nil, fmt.Errorf("failed to get source keyspace vschema: %v", err) }