Skip to content

Commit

Permalink
Don't create vreplication streams for a workflow where there is no in…
Browse files Browse the repository at this point in the history
…tersection between the target and source shards

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Aug 4, 2021
1 parent 1901057 commit 74c83bb
Show file tree
Hide file tree
Showing 6 changed files with 1,109 additions and 882 deletions.
1,792 changes: 934 additions & 858 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 33 additions & 19 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,14 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
return err
}
ms := &vtctldatapb.MaterializeSettings{
Workflow: workflow,
SourceKeyspace: sourceKeyspace,
TargetKeyspace: targetKeyspace,
Cell: cell,
TabletTypes: tabletTypes,
StopAfterCopy: stopAfterCopy,
ExternalCluster: externalCluster,
Workflow: workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_MOVETABLES,
SourceKeyspace: sourceKeyspace,
TargetKeyspace: targetKeyspace,
Cell: cell,
TabletTypes: tabletTypes,
StopAfterCopy: stopAfterCopy,
ExternalCluster: externalCluster,
}
for _, table := range tables {
buf := sqlparser.NewTrackedBuffer(nil)
Expand Down Expand Up @@ -614,10 +615,11 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
}

ms = &vtctldatapb.MaterializeSettings{
Workflow: targetTableName + "_vdx",
SourceKeyspace: keyspace,
TargetKeyspace: targetKeyspace,
StopAfterCopy: vindex.Owner != "" && !continueAfterCopyWithOwner,
Workflow: targetTableName + "_vdx",
MaterializationIntent: vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX,
SourceKeyspace: keyspace,
TargetKeyspace: targetKeyspace,
StopAfterCopy: vindex.Owner != "",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: targetTableName,
SourceExpression: materializeQuery,
Expand Down Expand Up @@ -807,11 +809,15 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat
if err := mz.deploySchema(ctx); err != nil {
return nil, err
}
inserts, err := mz.generateInserts(ctx)
if err != nil {
return nil, err
insertMap := make(map[string]string, len(mz.targetShards))
for _, targetShard := range mz.targetShards {
inserts, err := mz.generateInserts(ctx, targetShard)
if err != nil {
return nil, err
}
insertMap[targetShard.ShardName()] = inserts
}
if err := mz.createStreams(ctx, inserts); err != nil {
if err := mz.createStreams(ctx, insertMap); err != nil {
return nil, err
}
return mz, nil
Expand Down Expand Up @@ -1002,13 +1008,20 @@ func stripTableConstraints(ddl string) (string, error) {
return newDDL, nil
}

func (mz *materializer) generateInserts(ctx context.Context) (string, error) {
func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.ShardInfo) (string, error) {
ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, "{{.dbname}}")

for _, source := range mz.sourceShards {
for _, sourceShard := range mz.sourceShards {
// Don't create streams from sources which won't contain data for the target shard.
// We only do it for MoveTables for now since this doesn't hold for materialize flows
// where the target's sharding key might differ from that of the source
if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES &&
!key.KeyRangesIntersect(sourceShard.KeyRange, targetShard.KeyRange) {
continue
}
bls := &binlogdatapb.BinlogSource{
Keyspace: mz.ms.SourceKeyspace,
Shard: source.ShardName(),
Shard: sourceShard.ShardName(),
Filter: &binlogdatapb.Filter{},
StopAfterCopy: mz.ms.StopAfterCopy,
ExternalCluster: mz.ms.ExternalCluster,
Expand Down Expand Up @@ -1113,8 +1126,9 @@ func matchColInSelect(col sqlparser.ColIdent, sel *sqlparser.Select) (*sqlparser
return nil, fmt.Errorf("could not find vindex column %v", sqlparser.String(col))
}

func (mz *materializer) createStreams(ctx context.Context, inserts string) error {
func (mz *materializer) createStreams(ctx context.Context, insertsMap map[string]string) error {
return mz.forAllTargets(func(target *topo.ShardInfo) error {
inserts := insertsMap[target.ShardName()]
targetMaster, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.MasterAlias)
Expand Down
5 changes: 2 additions & 3 deletions go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ limitations under the License.
package wrangler

import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"testing"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
Expand Down Expand Up @@ -245,7 +244,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table
matched = query == qrs[0].query
}
if !matched {
return nil, fmt.Errorf("tablet %v: unexpected query %s, want: %s", tablet, query, qrs[0].query)
return nil, fmt.Errorf("tablet %v:\nunexpected query\n%s\nwant:\n%s", tablet, query, qrs[0].query)
}
tmc.vrQueries[int(tablet.Alias.Uid)] = qrs[1:]
return qrs[0].result, nil
Expand Down
100 changes: 99 additions & 1 deletion go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,6 @@ func TestMaterializerManyToMany(t *testing.T) {
)
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(210, mzUpdateQuery, &sqltypes.Result{})

err := env.wr.Materialize(context.Background(), ms)
require.NoError(t, err)
env.tmc.verifyQueries(t)
Expand Down Expand Up @@ -2511,3 +2510,102 @@ func TestStripConstraints(t *testing.T) {
}
}
}

func TestMaterializerManyToManySomeUnreachable(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
CreateDdl: "t1ddl",
}},
}

vs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "c1",
Name: "hash",
}},
},
},
}
type testcase struct {
targetShards, sourceShards []string
insertMap map[string][]string
}
testcases := []testcase{
{
targetShards: []string{"-40", "40-80", "80-c0", "c0-"},
sourceShards: []string{"-80", "80-"},
insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-c0": {"80-"}, "c0-": {"80-"}},
},
{
targetShards: []string{"-20", "20-40", "40-a0", "a0-f0", "f0-"},
sourceShards: []string{"-40", "40-80", "80-c0", "c0-"},
insertMap: map[string][]string{"-20": {"-40"}, "20-40": {"-40"}, "40-a0": {"40-80", "80-c0"}, "a0-f0": {"80-c0", "c0-"}, "f0-": {"c0-"}},
},
{
targetShards: []string{"-40", "40-80", "80-"},
sourceShards: []string{"-80", "80-"},
insertMap: map[string][]string{"-40": {"-80"}, "40-80": {"-80"}, "80-": {"80-"}},
},
{
targetShards: []string{"-80", "80-"},
sourceShards: []string{"-40", "40-80", "80-c0", "c0-"},
insertMap: map[string][]string{"-80": {"-40", "40-80"}, "80-": {"80-c0", "c0-"}},
},
{
targetShards: []string{"0"},
sourceShards: []string{"-80", "80-"},
insertMap: map[string][]string{"0": {"-80", "80-"}},
},
{
targetShards: []string{"-80", "80-"},
sourceShards: []string{"0"},
insertMap: map[string][]string{"-80": {"0"}, "80-": {"0"}},
},
}

getStreamInsert := func(sourceShard, targetShard string) string {
return fmt.Sprintf(`.*shard:\\"%s\\" filter:{rules:{match:\\"t1\\" filter:\\"select.*t1 where in_keyrange\(c1.*targetks\.hash.*%s.*`, sourceShard, targetShard)
}

for _, tcase := range testcases {
t.Run("", func(t *testing.T) {
env := newTestMaterializerEnv(t, ms, tcase.sourceShards, tcase.targetShards)
if err := env.topoServ.SaveVSchema(context.Background(), "targetks", vs); err != nil {
t.Fatal(err)
}
defer env.close()
for i, targetShard := range tcase.targetShards {
tabletID := 200 + i*10
env.tmc.expectVRQuery(tabletID, mzSelectFrozenQuery, &sqltypes.Result{})

streamsInsert := ""
sourceShards := tcase.insertMap[targetShard]
for _, sourceShard := range sourceShards {
streamsInsert += getStreamInsert(sourceShard, targetShard)
}
env.tmc.expectVRQuery(
tabletID,
insertPrefix+streamsInsert,
&sqltypes.Result{},
)
env.tmc.expectVRQuery(tabletID, mzUpdateQuery, &sqltypes.Result{})
}
err := env.wr.Materialize(context.Background(), ms)
require.NoError(t, err)
env.tmc.verifyQueries(t)
})
}
}
15 changes: 14 additions & 1 deletion proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ message ExecuteVtctlCommandResponse {
logutil.Event event = 1;
}

// MaterializationIntent describes the reason for creating the Materialize flow
enum MaterializationIntent {
// CUSTOM is the default value
CUSTOM = 0;

// MOVETABLES is when we are creating a MoveTables flow
MOVETABLES = 1;

// CREATELOOKUPINDEX is when we are creating a CreateLookupIndex flow
CREATELOOKUPINDEX = 2;
}

// TableMaterializeSttings contains the settings for one table.
message TableMaterializeSettings {
string target_table = 1;
Expand All @@ -69,7 +81,8 @@ message MaterializeSettings {
// ExternalCluster is the name of the mounted cluster which has the source keyspace/db for this workflow
// it is of the type <cluster_type.cluster_name>
string external_cluster = 8;

// MaterializationIntent is used to identify the reason behind the materialization workflow: eg. MoveTables, CreateLookupVindex
MaterializationIntent materialization_intent = 9;
}

/* Data types for VtctldServer */
Expand Down

0 comments on commit 74c83bb

Please sign in to comment.