Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrangler,workflow/workflow: materialize from intersecting source shards based on primary vindexes #13782

Merged
merged 13 commits into from
Sep 13, 2023
Merged
26 changes: 22 additions & 4 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package binlogplayer

import (
"fmt"
"regexp"
"strings"
"sync"
Expand All @@ -39,6 +40,7 @@ type MockDBClient struct {
currentResult int
done chan struct{}
invariants map[string]*sqltypes.Result
Tag string
}

type mockExpect struct {
Expand Down Expand Up @@ -177,7 +179,11 @@ func (dc *MockDBClient) Close() {
// ExecuteFetch is part of the DBClient interface
func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
dc.t.Helper()
dc.t.Logf("DBClient query: %v", query)
msg := "DBClient query: %v"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the changes in here are necessary but I found them helpful during debugging so left them in.

if dc.Tag != "" {
msg = fmt.Sprintf("[%s] %s", dc.Tag, msg)
}
dc.t.Logf(msg, query)

for q, result := range dc.invariants {
if strings.Contains(strings.ToLower(query), strings.ToLower(q)) {
Expand All @@ -188,16 +194,28 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
dc.expectMu.Lock()
defer dc.expectMu.Unlock()
if dc.currentResult >= len(dc.expect) {
dc.t.Fatalf("DBClientMock: query: %s, no more requests are expected", query)
msg := "DBClientMock: query: %s, no more requests are expected"
if dc.Tag != "" {
msg = fmt.Sprintf("[%s] %s", dc.Tag, msg)
}
dc.t.Fatalf(msg, query)
}
result := dc.expect[dc.currentResult]
if result.re == nil {
if query != result.query {
dc.t.Fatalf("DBClientMock: query: %s, want %s", query, result.query)
msg := "DBClientMock: query: %s, want %s"
if dc.Tag != "" {
msg = fmt.Sprintf("[%s] %s", dc.Tag, msg)
}
dc.t.Fatalf(msg, query, result.query)
}
} else {
if !result.re.MatchString(query) {
dc.t.Fatalf("DBClientMock: query: %s, must match %s", query, result.query)
msg := "DBClientMock: query: %s, must match %s"
if dc.Tag != "" {
msg = fmt.Sprintf("[%s] %s", dc.Tag, msg)
}
dc.t.Fatalf(msg, query, result.query)
}
}
dc.currentResult++
Expand Down
172 changes: 147 additions & 25 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand All @@ -54,11 +55,12 @@ type materializer struct {
sourceTs *topo.Server
tmc tmclient.TabletManagerClient

ms *vtctldatapb.MaterializeSettings
targetVSchema *vindexes.KeyspaceSchema
sourceShards []*topo.ShardInfo
targetShards []*topo.ShardInfo
isPartial bool
ms *vtctldatapb.MaterializeSettings
targetVSchema *vindexes.KeyspaceSchema
sourceShards []*topo.ShardInfo
targetShards []*topo.ShardInfo
isPartial bool
primaryVindexesDiffer bool
}

func (mz *materializer) getWorkflowSubType() (binlogdatapb.VReplicationWorkflowSubType, error) {
Expand Down Expand Up @@ -98,7 +100,9 @@ func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCr
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
}
blses, err := mz.generateBinlogSources(mz.ctx, target)

sourceShards := mz.filterSourceShards(target)
blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards)
if err != nil {
return err
}
Expand Down Expand Up @@ -136,7 +140,8 @@ func (mz *materializer) createMaterializerStreams() error {
}
insertMap := make(map[string]string, len(mz.targetShards))
for _, targetShard := range mz.targetShards {
inserts, err := mz.generateInserts(mz.ctx, targetShard)
sourceShards := mz.filterSourceShards(targetShard)
inserts, err := mz.generateInserts(mz.ctx, sourceShards)
if err != nil {
return err
}
Expand All @@ -148,17 +153,10 @@ func (mz *materializer) createMaterializerStreams() error {
return nil
}

func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.ShardInfo) (string, error) {
func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) {
ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}")

for _, sourceShard := range mz.sourceShards {
// Don't create streams from sources which won't contain data for the target shard.
// We only do it for MoveTables for now since this doesn't hold for materialize flows
// where the target's sharding key might differ from that of the source
if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES &&
!key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) {
continue
}
for _, sourceShard := range sourceShards {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
bls := &binlogdatapb.BinlogSource{
Keyspace: mz.ms.SourceKeyspace,
Shard: sourceShard.ShardName(),
Expand Down Expand Up @@ -255,16 +253,9 @@ func (mz *materializer) generateInserts(ctx context.Context, targetShard *topo.S
return ig.String(), nil
}

func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) {
func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) {
blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards))
for _, sourceShard := range mz.sourceShards {
// Don't create streams from sources which won't contain data for the target shard.
// We only do it for MoveTables for now since this doesn't hold for materialize flows
// where the target's sharding key might differ from that of the source
if mz.ms.MaterializationIntent == vtctldatapb.MaterializationIntent_MOVETABLES &&
!key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) {
continue
}
for _, sourceShard := range sourceShards {
bls := &binlogdatapb.BinlogSource{
Keyspace: mz.ms.SourceKeyspace,
Shard: sourceShard.ShardName(),
Expand Down Expand Up @@ -511,10 +502,27 @@ func (mz *materializer) buildMaterializer() error {
if len(targetShards) == 0 {
return fmt.Errorf("no target shards specified for workflow %s ", ms.Workflow)
}

sourceTs := mz.ts
if ms.ExternalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command
externalTopo, err := mz.ts.OpenExternalVitessClusterServer(ctx, ms.ExternalCluster)
if err != nil {
return fmt.Errorf("failed to open external topo: %v", err)
}
sourceTs = externalTopo
}
differentPVs := false
sourceVSchema, err := sourceTs.GetVSchema(ctx, ms.SourceKeyspace)
if err != nil {
return fmt.Errorf("failed to get source keyspace vschema: %v", err)
}
differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema)

mz.targetVSchema = targetVSchema
mz.sourceShards = sourceShards
mz.targetShards = targetShards
mz.isPartial = isPartial
mz.primaryVindexesDiffer = differentPVs
return nil
}

Expand Down Expand Up @@ -617,3 +625,117 @@ func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error
})
return err
}

// filterSourceShards filters out source shards that do not overlap with the
// provided target shard. This is an optimization to avoid copying unnecessary
// data between the shards. This optimization is only applied for MoveTables
// when the source and target shard have the same primary vindexes.
func (mz *materializer) filterSourceShards(targetShard *topo.ShardInfo) []*topo.ShardInfo {
if mz.primaryVindexesDiffer || mz.ms.MaterializationIntent != vtctldatapb.MaterializationIntent_MOVETABLES {
// Use all source shards.
return mz.sourceShards
}
// Use intersecting source shards.
var filteredSourceShards []*topo.ShardInfo
for _, sourceShard := range mz.sourceShards {
if !key.KeyRangeIntersect(sourceShard.KeyRange, targetShard.KeyRange) {
continue
}
filteredSourceShards = append(filteredSourceShards, sourceShard)
}
return filteredSourceShards
}

// primaryVindexesDiffer returns true if, for any tables defined in the provided
// materialize settings, the source and target vschema definitions for those
// tables have different primary vindexes.
//
// The result of this function is used to determine whether to apply a source
// shard selection optimization in MoveTables.
func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target *vschemapb.Keyspace) bool {
// Unless both keyspaces are sharded, treat the answer to the question as
// trivially false.
if source.Sharded != target.Sharded {
return false
}

// For source and target keyspaces that are sharded, we can optimize source
// shard selection if source and target tables' primary vindexes are equal.
//
// To determine this, iterate over all target tables, looking for primary
// vindexes that differ from the corresponding source table.
for _, ts := range ms.TableSettings {
sColumnVindexes := []*vschemapb.ColumnVindex{}
tColumnVindexes := []*vschemapb.ColumnVindex{}
if tt, ok := source.Tables[ts.TargetTable]; ok {
sColumnVindexes = tt.ColumnVindexes
}
if tt, ok := target.Tables[ts.TargetTable]; ok {
tColumnVindexes = tt.ColumnVindexes
}

// If source does not have a primary vindex, but the target does, then
// the primary vindexes differ.
if len(sColumnVindexes) == 0 && len(tColumnVindexes) > 0 {
return true
}
// If source has a primary vindex, but the target does not, then the
// primary vindexes differ.
if len(sColumnVindexes) > 0 && len(tColumnVindexes) == 0 {
return true
}
// If neither source nor target have any vindexes, treat the answer to
// the question as trivially false.
if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 {
return true
}

sPrimaryVindex := sColumnVindexes[0]
tPrimaryVindex := tColumnVindexes[0]

// Compare source and target primary vindex columns.
var sColumns, tColumns []string
if sPrimaryVindex.Column != "" {
sColumns = []string{sPrimaryVindex.Column}
} else {
sColumns = sPrimaryVindex.Columns
}
if tPrimaryVindex.Column != "" {
tColumns = []string{tPrimaryVindex.Column}
} else {
tColumns = tPrimaryVindex.Columns
}
if len(sColumns) != len(tColumns) {
return true
}
for i := 0; i < len(sColumns); i++ {
if !strings.EqualFold(sColumns[i], tColumns[i]) {
return true
}
}

// Get source and target vindex definitions.
spv := source.Vindexes[sColumnVindexes[0].Name]
tpv := target.Vindexes[tColumnVindexes[0].Name]
// If the source has vindex definition, but target does not, then the
// target vschema is invalid. Assume the primary vindexes differ.
if spv != nil && tpv == nil {
return true
}
// If the target has vindex definition, but source does not, then the
// source vschema is invalid. Assume the primary vindexes differ.
if spv == nil && tpv != nil {
return true
}
// If both target and source are missing vindex definitions, then both
// are equally invalid.
if spv == nil && tpv == nil {
continue
}
// Compare source and target vindex type.
if !strings.EqualFold(spv.Type, tpv.Type) {
return true
}
}
return false
}
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ func (tenv *testEnv) addTablet(t *testing.T, id int, keyspace, shard string) *fa
panic(err)
}

vrdbClient := binlogplayer.NewMockDBClient(t)
vrdbClient.Tag = fmt.Sprintf("tablet:%d", id)
tenv.tmc.tablets[id] = &fakeTabletConn{
tablet: tablet,
vrdbClient: binlogplayer.NewMockDBClient(t),
vrdbClient: vrdbClient,
}

dbClientFactory := func() binlogplayer.DBClient {
Expand Down
Loading