Skip to content

Commit

Permalink
Merge pull request #9107 from planetscale/online-ddl-idempotent-conte…
Browse files Browse the repository at this point in the history
…xt-sql

Online DDL: migration with same context and SQL now considered duplicate and silently marked as complete
  • Loading branch information
shlomi-noach authored Dec 5, 2021
2 parents ea40f2d + 36d82d4 commit c124930
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 11 deletions.
8 changes: 6 additions & 2 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type VtctlClientProcess struct {

// VtctlClientParams encapsulated params to provide if non-default
type VtctlClientParams struct {
DDLStrategy string
SkipPreflight bool
DDLStrategy string
RequestContext string
SkipPreflight bool
}

// InitShardPrimary executes vtctlclient command to make specified tablet the primary for the shard.
Expand All @@ -61,6 +62,9 @@ func (vtctlclient *VtctlClientProcess) ApplySchemaWithOutput(Keyspace string, SQ
"ApplySchema",
"-sql", SQL,
}
if params.RequestContext != "" {
args = append(args, "-request_context", params.RequestContext)
}
if params.DDLStrategy != "" {
args = append(args, "-ddl_strategy", params.DDLStrategy)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
requestContext = "1111-2222-3333"
createStatement1 = `
CREATE TABLE stress_test (
id bigint(20) not null,
Expand Down Expand Up @@ -147,6 +148,9 @@ var (
alterStatement = `
ALTER TABLE stress_test modify hint_col varchar(64) not null default 'this-should-fail'
`
trivialAlterStatement = `
ALTER TABLE stress_test ENGINE=InnoDB
`
insertRowStatement = `
INSERT IGNORE INTO stress_test (id, rand_val) VALUES (%d, left(md5(rand()), 8))
`
Expand Down Expand Up @@ -450,6 +454,39 @@ func TestSchemaChange(t *testing.T) {
checkTable(t, tableName, true)
testSelectTableMetrics(t)
})

// ### The following tests are not strictly 'declarative' but are best served under this endtoend test

// Test duplicate context/SQL
t.Run("Trivial statement with request context is successful", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, trivialAlterStatement, "online", "vtctl", "", "")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
// the table existed, so we expect no changes in this non-declarative DDL
checkTable(t, tableName, true)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
require.NotContains(t, message, "duplicate DDL")
}
})
t.Run("Duplicate trivial statement with request context is successful", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, trivialAlterStatement, "online", "vtctl", "", "")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
// the table existed, so we expect no changes in this non-declarative DDL
checkTable(t, tableName, true)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
// Message suggests that the migration was identified as duplicate
require.Contains(t, message, "duplicate DDL")
}
})
// Piggyride this test suite, let's also test --allow-zero-in-date for 'direct' strategy
t.Run("drop non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, dropZeroDateStatement, "direct", "vtctl", "", "")
Expand All @@ -474,7 +511,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
uuid = row.AsString("uuid", "")
}
} else {
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, alterStatement, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, SkipPreflight: true})
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, alterStatement, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, RequestContext: requestContext, SkipPreflight: true})
if expectError == "" {
assert.NoError(t, err)
uuid = output
Expand Down
57 changes: 49 additions & 8 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,27 @@ func (e *Executor) evaluateDeclarativeDiff(ctx context.Context, onlineDDL *schem
return alterClause, nil
}

// getCompletedMigrationByContextAndSQL chceks if there exists a completed migration with exact same
// context and SQL as given migration. If so, it returns its UUID.
func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onlineDDL *schema.OnlineDDL) (completedUUID string, err error) {
query, err := sqlparser.ParseAndBind(sqlSelectCompleteMigrationsByContextAndSQL,
sqltypes.StringBindVariable(e.keyspace),
sqltypes.StringBindVariable(onlineDDL.RequestContext),
sqltypes.StringBindVariable(onlineDDL.SQL),
)
if err != nil {
return "", err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return "", err
}
for _, row := range r.Named().Rows {
completedUUID = row["migration_uuid"].ToString()
}
return completedUUID, nil
}

// failMigration marks a migration as failed
func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, err error) error {
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
Expand All @@ -1972,6 +1993,23 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin
return failMigration(err)
}

// See if this is a duplicate submission. A submission is considered duplicate if it has the exact same
// migration context and DDL as a previous one. We are only interested in our scenario in a duplicate
// whose predecessor is "complete". If this is the case, then we can mark our own migration as
// implicitly "complete", too.
if onlineDDL.RequestContext != "" {
completedUUID, err := e.getCompletedMigrationByContextAndSQL(ctx, onlineDDL)
if err != nil {
return err
}
if completedUUID != "" {
// Yep. We mark this migration as implicitly complete, and we're done with it!
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, rowsCopiedUnknown)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, fmt.Sprintf("duplicate DDL as %s for migration context %s", completedUUID, onlineDDL.RequestContext))
return nil
}
}

if onlineDDL.StrategySetting().IsDeclarative() {
switch ddlAction {
case sqlparser.RevertDDLAction:
Expand Down Expand Up @@ -2194,14 +2232,17 @@ func (e *Executor) runNextMigration(ctx context.Context) error {
named := r.Named()
for i, row := range named.Rows {
onlineDDL := &schema.OnlineDDL{
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
RequestContext: row["migration_context"].ToString(),
}
{
// We strip out any VT query comments because our simplified parser doesn't work well with comments
Expand Down
13 changes: 13 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
alterSchemaMigrationsTableLogFile = "ALTER TABLE _vt.schema_migrations add column log_file varchar(1024) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableRetainArtifacts = "ALTER TABLE _vt.schema_migrations add column retain_artifacts_seconds bigint NOT NULL DEFAULT 0"
alterSchemaMigrationsTablePostponeCompletion = "ALTER TABLE _vt.schema_migrations add column postpone_completion tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableContextIndex = "ALTER TABLE _vt.schema_migrations add KEY migration_context_idx (migration_context(64))"

sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations (
migration_uuid,
Expand Down Expand Up @@ -268,6 +269,17 @@ const (
completed_timestamp DESC
LIMIT 1
`
sqlSelectCompleteMigrationsByContextAndSQL = `SELECT
migration_uuid,
strategy
FROM _vt.schema_migrations
WHERE
migration_status='complete'
AND keyspace=%a
AND migration_context=%a
AND migration_statement=%a
LIMIT 1
`
sqlSelectCountReadyMigrations = `SELECT
count(*) as count_ready
FROM _vt.schema_migrations
Expand Down Expand Up @@ -559,4 +571,5 @@ var ApplyDDL = []string{
alterSchemaMigrationsTableLogFile,
alterSchemaMigrationsTableRetainArtifacts,
alterSchemaMigrationsTablePostponeCompletion,
alterSchemaMigrationsTableContextIndex,
}

0 comments on commit c124930

Please sign in to comment.