Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI: testing self referencing tables in foreign key stress tests #16216

Merged
merged 3 commits into from
Jun 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 153 additions & 46 deletions go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ import (
// +- stress_child
// +- stress_grandchild
// +- stress_child2
// stress_self <- (self-referencing)
// stress_nofk (no foreign key; control group)
// - Create these tables. Then, on the MySQL replica, remove the foreign key constraints.
// - Static test:
// - Randomly populate all tables via highly-contentive INSERT/UPDATE/DELETE statements
Expand Down Expand Up @@ -151,8 +153,9 @@ var (
childTableName = "stress_child"
child2TableName = "stress_child2"
grandchildTableName = "stress_grandchild"
selfTableName = "stress_self"
nofkTableName = "stress_nofk"
tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName, nofkTableName}
tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName, selfTableName, nofkTableName}
reverseTableNames []string

seedOnce sync.Once
Expand Down Expand Up @@ -193,6 +196,21 @@ var (
) ENGINE=InnoDB
`,
`
CREATE TABLE stress_self (
id bigint not null,
parent_id bigint,
rand_val varchar(32) null default '',
hint_col varchar(64) not null default '',
created_timestamp timestamp not null default current_timestamp,
updates int unsigned not null default 0,
PRIMARY KEY (id),
key parent_id_idx(parent_id),
key created_idx(created_timestamp),
key updates_idx(updates),
CONSTRAINT self_fk FOREIGN KEY (parent_id) REFERENCES stress_self (id) ON DELETE %s ON UPDATE %s
) ENGINE=InnoDB
`,
`
CREATE TABLE stress_child (
id bigint not null,
parent_id bigint,
Expand Down Expand Up @@ -248,6 +266,7 @@ var (
`ALTER TABLE stress_child DROP CONSTRAINT child_parent_fk`,
`ALTER TABLE stress_child2 DROP CONSTRAINT child2_parent_fk`,
`ALTER TABLE stress_grandchild DROP CONSTRAINT grandchild_child_fk`,
`ALTER TABLE stress_self DROP CONSTRAINT self_fk`,
}
alterHintStatement = `
ALTER TABLE %s modify hint_col varchar(64) not null default '%s'
Expand Down Expand Up @@ -277,6 +296,9 @@ var (
selectMatchingRowsGrandchild = `
select stress_grandchild.id from stress_grandchild join stress_child on (stress_child.id = stress_grandchild.parent_id)
`
selectMatchingRowsSelf = `
select stress_self_child.id from stress_self as stress_self_child join stress_self as stress_self_parent on (stress_self_parent.id = stress_self_child.parent_id)
`
selectOrphanedRowsChild = `
select stress_child.id from stress_child left join stress_parent on (stress_parent.id = stress_child.parent_id) where stress_parent.id is null
`
Expand All @@ -286,6 +308,10 @@ var (
selectOrphanedRowsGrandchild = `
select stress_grandchild.id from stress_grandchild left join stress_child on (stress_child.id = stress_grandchild.parent_id) where stress_child.id is null
`
selectOrphanedRowsSelf = `
select stress_self_child.id from stress_self as stress_self_child left join stress_self as stress_self_parent on (stress_self_parent.id = stress_self_child.parent_id) where stress_self_parent.id is null
`

selectOrphanedRowsNoFK = `
select stress_nofk.id from stress_nofk left join stress_parent on (stress_parent.id = stress_nofk.parent_id) where stress_parent.id is null
`
Expand Down Expand Up @@ -571,7 +597,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, tableName, sleepInterval)
runSingleConnection(ctx, t, tableName, tcase, sleepInterval)
}()
}

Expand Down Expand Up @@ -733,7 +759,7 @@ func TestStressFK(t *testing.T) {

func validateTableDefinitions(t *testing.T, afterOnlineDDL bool) {
t.Run("validate definitions", func(t *testing.T) {
for _, tableName := range []string{childTableName, child2TableName, grandchildTableName} {
for _, tableName := range []string{childTableName, child2TableName, grandchildTableName, selfTableName} {
t.Run(tableName, func(t *testing.T) {
childFKFollowedParentRenameMsg := "found traces of internal vitess table name, suggesting Online DDL on parent table caused this child table to follow the renames parent. 'rename_table_preserve_foreign_key' should have prevented this"
var primaryStmt string
Expand Down Expand Up @@ -787,7 +813,9 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
})
t.Run("waiting for vschema deletions to apply", func(t *testing.T) {
for _, tableName := range tableNames {
utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName)
t.Run(tableName, func(t *testing.T) {
utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName)
})
}
})
t.Run("creating tables", func(t *testing.T) {
Expand All @@ -797,17 +825,35 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
switch i {
case 0:
// parent table, no foreign keys
require.Contains(t, sql, "CREATE TABLE stress_parent")
b.WriteString(sql)
case 1:
// stress_nofk, no foreign keys
require.Contains(t, sql, "CREATE TABLE stress_nofk")
b.WriteString(sql)
case 2:
// stress_self, self-referencing table
require.Contains(t, sql, "CREATE TABLE stress_self")
onDeleteAction := tcase.onDeleteAction
if onDeleteAction == sqlparser.Cascade {
// We don't test self-referencing tables with ON DELETE CASCADE as some queries/scenarios
// are unsopported and can be rejected.
onDeleteAction = sqlparser.NoAction
}
onUpdateAction := tcase.onUpdateAction
if onUpdateAction == sqlparser.Cascade {
// We don't test self-referencing tables with ON UPDATE CASCADE as some queries/scenarios
// are unsopported and can be rejected.
onUpdateAction = sqlparser.NoAction
}
b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction], referenceActionMap[onUpdateAction]))
default:
b.WriteString(fmt.Sprintf(sql, referenceActionMap[tcase.onDeleteAction], referenceActionMap[tcase.onUpdateAction]))
}
b.WriteString(";")
}
err := clusterInstance.VtctldClientProcess.ApplySchema(keyspaceName, b.String())
require.NoError(t, err)
require.NoError(t, err, b.String())
})
if tcase.preStatement != "" {
t.Run("pre-statement", func(t *testing.T) {
Expand All @@ -824,6 +870,7 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
checkTable(t, childTableName, "hint_col")
checkTable(t, child2TableName, "hint_col")
checkTable(t, grandchildTableName, "hint_col")
checkTable(t, selfTableName, "hint_col")
checkTable(t, nofkTableName, "hint_col")
})
t.Run("validating tables: vtgate", func(t *testing.T) {
Expand All @@ -832,12 +879,15 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
waitForTable(t, childTableName, conn)
waitForTable(t, child2TableName, conn)
waitForTable(t, grandchildTableName, conn)
waitForTable(t, selfTableName, conn)
waitForTable(t, nofkTableName, conn)
})
t.Run("waiting for vschema definition to apply", func(t *testing.T) {
for _, tableName := range tableNames {
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id")
require.NoError(t, err)
t.Run(tableName, func(t *testing.T) {
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id")
require.NoError(t, err)
})
}
})

Expand Down Expand Up @@ -990,6 +1040,8 @@ func isFKError(err error) bool {
return false // bummer, but deadlocks can happen, it's a legit error.
case sqlerror.ERLockNowait:
return false // For some queries we use NOWAIT. Bummer, but this can happen, it's a legit error.
case sqlerror.ERQueryTimeout:
return false // query timed out, not a FK error
case sqlerror.ERNoReferencedRow,
sqlerror.ERRowIsReferenced,
sqlerror.ERRowIsReferenced2,
Expand All @@ -1007,6 +1059,19 @@ func isFKError(err error) bool {
func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error {
id := rand.Int32N(int32(maxTableRows))
parentId := rand.Int32N(int32(maxTableRows))
if tableName == selfTableName {
// for the self referencing table we really need to help it out in initial population
if rand.IntN(2) == 0 {
parentId = id
}
// Also, we want to avoid loops. We ensure parentId <= id
if parentId > id {
parentId = id - 1
}
if parentId < 0 {
parentId = 0
}
}
query := fmt.Sprintf(insertRowStatement, tableName, id, parentId)
qr, err := conn.ExecuteFetch(query, 1000, true)

Expand Down Expand Up @@ -1097,7 +1162,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sleepInterval time.Duration) {
func runSingleConnection(ctx context.Context, t *testing.T, tableName string, tcase *testCase, sleepInterval time.Duration) {
log.Infof("Running single connection on %s", tableName)
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -1108,6 +1173,8 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sl
_, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true)
require.Nil(t, err)

ticker := time.NewTicker(sleepInterval)
defer ticker.Stop()
for {
switch rand.Int32N(3) {
case 0:
Expand All @@ -1121,15 +1188,11 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sl
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-time.After(sleepInterval):
case <-ticker.C:
}
}
}

func wrapWithNoFKChecks(sql string) string {
return fmt.Sprintf("set foreign_key_checks=0; %s; set foreign_key_checks=1;", sql)
}

// populateTables randomly populates all test tables. This is done sequentially.
func populateTables(t *testing.T, tcase *testCase) {
log.Infof("initTable begin")
Expand All @@ -1140,13 +1203,26 @@ func populateTables(t *testing.T, tcase *testCase) {
require.Nil(t, err)
defer conn.Close()

t.Logf("===== clearing tables")
for _, tableName := range reverseTableNames {
writeMetrics[tableName].Clear()
deleteQuery := fmt.Sprintf(deleteAllStatement, tableName)
_, err = conn.ExecuteFetch(deleteQuery, 1000, true)
require.Nil(t, err)
}
t.Run("clearing", func(t *testing.T) {
for _, tableName := range reverseTableNames {
t.Run(tableName, func(t *testing.T) {
writeMetrics[tableName].Clear()
t.Run("deleting", func(t *testing.T) {
deleteQuery := fmt.Sprintf(deleteAllStatement, tableName)
_, err = conn.ExecuteFetch(deleteQuery, 1000, true)
require.Nil(t, err)
})
t.Run("counting after delete", func(t *testing.T) {
rs, err := conn.ExecuteFetch(fmt.Sprintf(selectCountRowsStatement, tableName), 1000, true)
require.Nil(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
numRows := row.AsInt64("num_rows", -1)
require.Zero(t, numRows)
})
})
}
})
// In an ideal world we would randomly re-seed the tables in each and every instance of the test.
// In reality, that takes a lot of time, and while the seeding is important, it's not the heart of
// the test. To that effect, the seeding works as follows:
Expand Down Expand Up @@ -1198,36 +1274,59 @@ func populateTables(t *testing.T, tcase *testCase) {
if !tablesSeeded {
t.Run("reseeding", func(t *testing.T) {
for _, tableName := range tableNames {
ignoreModifier := ""
if tcase.reseedInsertIgnore {
ignoreModifier = "ignore"
}
seedQuery := fmt.Sprintf("insert %s into %s select * from %s_seed", ignoreModifier, tableName, tableName)
_, err := conn.ExecuteFetch(seedQuery, 1000, true)
require.NoError(t, err)
t.Run(tableName, func(t *testing.T) {
ignoreModifier := ""
if tcase.reseedInsertIgnore {
ignoreModifier = "ignore"
}
if tableName == selfTableName {
ignoreModifier = "ignore"
}

for {
// In MySQL, for self-referencing tables, you can't just INSERT INTO ... SELECT ; some rows will fail.
// So we use "IGNORE" and repeatetly try until all rows are inserted.
// At least one row is expected to succeed at each attempt
seedQuery := fmt.Sprintf("insert %s into %s select * from %s_seed", ignoreModifier, tableName, tableName)
rs, err := conn.ExecuteFetch(seedQuery, 1000, true)
if tableName != selfTableName {
require.NoError(t, err)
return
}
if err == nil {
// All done
return
}
require.ErrorContains(t, err, "foreign key constraint fails")
require.NotNil(t, rs)
require.NotZero(t, rs.RowsAffected) // This ensures we make a progress of at least one row at each iteration
}
})
}
})
}

t.Run("validating table rows", func(t *testing.T) {
for _, tableName := range tableNames {
validationQuery := fmt.Sprintf(selectCountRowsStatement, tableName)
rs, err := conn.ExecuteFetch(validationQuery, 1000, true)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
numRows := row.AsInt64("num_rows", 0)
sumUpdates := row.AsInt64("sum_updates", 0)
require.NotZero(t, numRows)
if !tablesSeeded {
// We cloned the data from *_seed tables. This means we didn't populate writeMetrics. Now,
// this function only takes care of the base seed. We will later on run a stress workload on
// these tables, at the end of which we will examine the writeMetrics. We thus have to have those
// metrics consistent with the cloned data. It's a bit ugly, but we inject fake writeMetrics.
writeMetrics[tableName].deletes = 1
writeMetrics[tableName].inserts = numRows + writeMetrics[tableName].deletes
writeMetrics[tableName].updates = sumUpdates + writeMetrics[tableName].deletes
}
t.Run(tableName, func(t *testing.T) {
validationQuery := fmt.Sprintf(selectCountRowsStatement, tableName)
rs, err := conn.ExecuteFetch(validationQuery, 1000, true)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
numRows := row.AsInt64("num_rows", 0)
sumUpdates := row.AsInt64("sum_updates", 0)
require.NotZero(t, numRows)
if !tablesSeeded {
// We cloned the data from *_seed tables. This means we didn't populate writeMetrics. Now,
// this function only takes care of the base seed. We will later on run a stress workload on
// these tables, at the end of which we will examine the writeMetrics. We thus have to have those
// metrics consistent with the cloned data. It's a bit ugly, but we inject fake writeMetrics.
writeMetrics[tableName].deletes = 1
writeMetrics[tableName].inserts = numRows + writeMetrics[tableName].deletes
writeMetrics[tableName].updates = sumUpdates + writeMetrics[tableName].deletes
}
})
}
})
}
Expand Down Expand Up @@ -1283,10 +1382,10 @@ func testSelectTableFKErrors(
writeMetrics[tableName].mu.Lock()
defer writeMetrics[tableName].mu.Unlock()

if tcase.onDeleteAction == sqlparser.Cascade {
if tcase.onDeleteAction == sqlparser.Cascade && tableName != selfTableName {
assert.Zerof(t, writeMetrics[tableName].deletesFKErrors, "unexpected foreign key errors for DELETEs in ON DELETE CASCADE. Sample error: %v", writeMetrics[tableName].sampleDeleteFKError)
}
if tcase.onUpdateAction == sqlparser.Cascade {
if tcase.onUpdateAction == sqlparser.Cascade && tableName != selfTableName {
assert.Zerof(t, writeMetrics[tableName].updatesFKErrors, "unexpected foreign key errors for UPDATEs in ON UPDATE CASCADE. Sample error: %v", writeMetrics[tableName].sampleUpdateFKError)
}
}
Expand Down Expand Up @@ -1319,6 +1418,10 @@ func testFKIntegrity(
rs := queryTablet(t, tablet, selectMatchingRowsGrandchild, "")
assert.NotZero(t, len(rs.Rows))
})
t.Run("matching self rows", func(t *testing.T) {
rs := queryTablet(t, tablet, selectMatchingRowsSelf, "")
assert.NotZero(t, len(rs.Rows))
})
if tcase.onDeleteAction != sqlparser.SetNull && tcase.onUpdateAction != sqlparser.SetNull {
// Because with SET NULL there _are_ orphaned rows
t.Run("parent-child orphaned rows", func(t *testing.T) {
Expand All @@ -1333,6 +1436,10 @@ func testFKIntegrity(
rs := queryTablet(t, tablet, selectOrphanedRowsGrandchild, "")
assert.Zero(t, len(rs.Rows))
})
t.Run("self orphaned rows", func(t *testing.T) {
rs := queryTablet(t, tablet, selectOrphanedRowsSelf, "")
assert.Zero(t, len(rs.Rows))
})
if !tcase.skipNofkOrphanedRows {
t.Run("parent-nofk orphaned rows", func(t *testing.T) {
rs := queryTablet(t, tablet, selectOrphanedRowsNoFK, "")
Expand Down
Loading