Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: migration with same context and SQL now considered duplicate and silently marked as complete #9107

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type VtctlClientProcess struct {

// VtctlClientParams encapsulated params to provide if non-default
type VtctlClientParams struct {
DDLStrategy string
SkipPreflight bool
DDLStrategy string
RequestContext string
SkipPreflight bool
}

// InitShardPrimary executes vtctlclient command to make specified tablet the primary for the shard.
Expand All @@ -61,6 +62,9 @@ func (vtctlclient *VtctlClientProcess) ApplySchemaWithOutput(Keyspace string, SQ
"ApplySchema",
"-sql", SQL,
}
if params.RequestContext != "" {
args = append(args, "-request_context", params.RequestContext)
}
if params.DDLStrategy != "" {
args = append(args, "-ddl_strategy", params.DDLStrategy)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
requestContext = "1111-2222-3333"
createStatement1 = `
CREATE TABLE stress_test (
id bigint(20) not null,
Expand Down Expand Up @@ -147,6 +148,9 @@ var (
alterStatement = `
ALTER TABLE stress_test modify hint_col varchar(64) not null default 'this-should-fail'
`
trivialAlterStatement = `
ALTER TABLE stress_test ENGINE=InnoDB
`
insertRowStatement = `
INSERT IGNORE INTO stress_test (id, rand_val) VALUES (%d, left(md5(rand()), 8))
`
Expand Down Expand Up @@ -450,6 +454,39 @@ func TestSchemaChange(t *testing.T) {
checkTable(t, tableName, true)
testSelectTableMetrics(t)
})

// ### The following tests are not strictly 'declarative' but are best served under this endtoend test

// Test duplicate context/SQL
t.Run("Trivial statement with request context is successful", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, trivialAlterStatement, "online", "vtctl", "", "")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
// the table existed, so we expect no changes in this non-declarative DDL
checkTable(t, tableName, true)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
require.NotContains(t, message, "duplicate DDL")
}
})
t.Run("Duplicate trivial statement with request context is successful", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, trivialAlterStatement, "online", "vtctl", "", "")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
// the table existed, so we expect no changes in this non-declarative DDL
checkTable(t, tableName, true)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
// Message suggests that the migration was identified as duplicate
require.Contains(t, message, "duplicate DDL")
}
})
// Piggyride this test suite, let's also test --allow-zero-in-date for 'direct' strategy
t.Run("drop non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, dropZeroDateStatement, "direct", "vtctl", "", "")
Expand All @@ -474,7 +511,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
uuid = row.AsString("uuid", "")
}
} else {
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, alterStatement, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, SkipPreflight: true})
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, alterStatement, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, RequestContext: requestContext, SkipPreflight: true})
if expectError == "" {
assert.NoError(t, err)
uuid = output
Expand Down
57 changes: 49 additions & 8 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,27 @@ func (e *Executor) evaluateDeclarativeDiff(ctx context.Context, onlineDDL *schem
return alterClause, nil
}

// getCompletedMigrationByContextAndSQL chceks if there exists a completed migration with exact same
// context and SQL as given migration. If so, it returns its UUID.
func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onlineDDL *schema.OnlineDDL) (completedUUID string, err error) {
query, err := sqlparser.ParseAndBind(sqlSelectCompleteMigrationsByContextAndSQL,
sqltypes.StringBindVariable(e.keyspace),
sqltypes.StringBindVariable(onlineDDL.RequestContext),
sqltypes.StringBindVariable(onlineDDL.SQL),
)
if err != nil {
return "", err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return "", err
}
for _, row := range r.Named().Rows {
completedUUID = row["migration_uuid"].ToString()
}
return completedUUID, nil
}

// failMigration marks a migration as failed
func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, err error) error {
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
Expand All @@ -1972,6 +1993,23 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin
return failMigration(err)
}

// See if this is a duplicate submission. A submission is considered duplicate if it has the exact same
// migration context and DDL as a previous one. We are only interested in our scenario in a duplicate
// whose predecessor is "complete". If this is the case, then we can mark our own migration as
// implicitly "complete", too.
if onlineDDL.RequestContext != "" {
completedUUID, err := e.getCompletedMigrationByContextAndSQL(ctx, onlineDDL)
if err != nil {
return err
}
if completedUUID != "" {
// Yep. We mark this migration as implicitly complete, and we're done with it!
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, rowsCopiedUnknown)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, fmt.Sprintf("duplicate DDL as %s for migration context %s", completedUUID, onlineDDL.RequestContext))
return nil
}
}

if onlineDDL.StrategySetting().IsDeclarative() {
switch ddlAction {
case sqlparser.RevertDDLAction:
Expand Down Expand Up @@ -2194,14 +2232,17 @@ func (e *Executor) runNextMigration(ctx context.Context) error {
named := r.Named()
for i, row := range named.Rows {
onlineDDL := &schema.OnlineDDL{
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
RequestContext: row["migration_context"].ToString(),
}
{
// We strip out any VT query comments because our simplified parser doesn't work well with comments
Expand Down
13 changes: 13 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
alterSchemaMigrationsTableLogFile = "ALTER TABLE _vt.schema_migrations add column log_file varchar(1024) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableRetainArtifacts = "ALTER TABLE _vt.schema_migrations add column retain_artifacts_seconds bigint NOT NULL DEFAULT 0"
alterSchemaMigrationsTablePostponeCompletion = "ALTER TABLE _vt.schema_migrations add column postpone_completion tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableContextIndex = "ALTER TABLE _vt.schema_migrations add KEY migration_context_idx (migration_context(64))"

sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations (
migration_uuid,
Expand Down Expand Up @@ -268,6 +269,17 @@ const (
completed_timestamp DESC
LIMIT 1
`
sqlSelectCompleteMigrationsByContextAndSQL = `SELECT
migration_uuid,
strategy
FROM _vt.schema_migrations
WHERE
migration_status='complete'
AND keyspace=%a
AND migration_context=%a
AND migration_statement=%a
LIMIT 1
`
sqlSelectCountReadyMigrations = `SELECT
count(*) as count_ready
FROM _vt.schema_migrations
Expand Down Expand Up @@ -558,4 +570,5 @@ var ApplyDDL = []string{
alterSchemaMigrationsTableLogFile,
alterSchemaMigrationsTableRetainArtifacts,
alterSchemaMigrationsTablePostponeCompletion,
alterSchemaMigrationsTableContextIndex,
}