Skip to content

Commit

Permalink
More intelligently manage vschema table entries on unsharded targets.
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 2, 2023
1 parent 4616a9b commit e50817c
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,37 @@ const (
createDDLAsCopyDropForeignKeys = "copy:drop_foreign_keys"
)

// addTablesToVSchema adds tables to an (unsharded) vschema. Depending on copyAttributes It will also add any sequence info
// that is associated with a table by copying it from the vschema of the source keyspace.
// For a migrate workflow we do not copy attributes since the source keyspace is just a proxy to import data into Vitess
// Todo: For now we only copy sequence but later we may also want to copy other attributes like authoritative column flag and list of columns
func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyAttributes bool) error {
// addTablesToVSchema adds tables to an (unsharded) vschema if they are not already defined.
// If copyVschema is true then we copy over the vschema table definitions from the source,
// otherwise we create empty ones.
// For a migrate workflow we do not copy the vschema since the source keyspace is just a
// proxy to import data into Vitess.
func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyVSchema bool) error {
if targetVSchema.Tables == nil {
targetVSchema.Tables = make(map[string]*vschemapb.Table)
}
for _, table := range tables {
targetVSchema.Tables[table] = &vschemapb.Table{}
}

if copyAttributes { // if source keyspace is provided, copy over the sequence info.
if copyVSchema { // If source keyspace is provided, copy over the vschema table definitions.
srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace)
if err != nil {
return err
}
for _, table := range tables {
srcTable, ok := srcVSchema.Tables[table]
if ok {
targetVSchema.Tables[table].AutoIncrement = srcTable.AutoIncrement
srcTable, sok := srcVSchema.Tables[table]
if _, tok := targetVSchema.Tables[table]; sok && !tok {
targetVSchema.Tables[table] = srcTable
// If going from sharded to unsharded, then we need to remove the
// column vindexes as they are not valid for unsharded tables.
if srcVSchema.Sharded {
targetVSchema.Tables[table].ColumnVindexes = nil
}
}
}
} else { // Create empty table definitions.
for _, table := range tables {
if _, tok := targetVSchema.Tables[table]; !tok {
targetVSchema.Tables[table] = &vschemapb.Table{}
}
}

}
return nil
}
Expand Down

0 comments on commit e50817c

Please sign in to comment.