Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point in time recovery: fix cross-tablet GTID evaluation #13555

Merged
merged 16 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go/test/endtoend/backup/pitr/backup_pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,13 @@ func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) {
}
backup.ExecTestIncrementalBackupAndRestoreToTimestamp(t, tcase)
}

// TestIncrementalBackupOnTwoTablets
func TestIncrementalBackupOnTwoTablets(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We should have a proper comment on top of the test function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
SetupType: backup.BuiltinBackup,
ComprssDetails: nil,
}
backup.ExecTestIncrementalBackupOnTwoTablets(t, tcase)
}
153 changes: 109 additions & 44 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ var (
}

vtInsertTest = `
create table vt_insert_test (
id bigint auto_increment,
msg varchar(64),
primary key (id)
) Engine=InnoDB`
create table vt_insert_test (
id bigint auto_increment,
msg varchar(64),
primary key (id)
) Engine=InnoDB
`
)

type CompressionDetails struct {
Expand Down Expand Up @@ -163,11 +164,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
commonTabletArg = append(commonTabletArg, getCompressorArgs(cDetails)...)

var mysqlProcs []*exec.Cmd
tabletTypes := map[int]string{
0: "primary",
1: "replica",
2: "rdonly",
}
for i := 0; i < 3; i++ {
tabletType := "replica"
if i == 0 {
tabletType = "primary"
}
tabletType := tabletTypes[i]
tablet := localCluster.NewVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
Expand Down Expand Up @@ -220,13 +223,16 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
}
if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
}
vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory)
_, err = vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
if err != nil {
return 1, err
}

for _, tablet := range []cluster.Vttablet{*primary, *replica1} {
for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} {
if err := tablet.VttabletProcess.Setup(); err != nil {
return 1, err
}
Expand Down Expand Up @@ -1069,40 +1075,30 @@ func terminateRestore(t *testing.T) {
assert.True(t, found, "Restore message not found")
}

func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, tabletType string) (backups []string, destroy func(t *testing.T)) {
restoreWaitForBackup(t, tabletType, nil, true)
verifyInitialReplication(t)

err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)

backups = localCluster.VerifyBackupCount(t, shardKsName, 1)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())
func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) {
replica := getReplica(t, replicaIndex)

err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second)
err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias)
require.Nil(t, err)

err = replica2.VttabletProcess.TearDown()
backups, err = localCluster.ListBackups(shardKsName)
require.Nil(t, err)

err = localCluster.VtctlclientProcess.ExecuteCommand("DeleteTablet", replica2.Alias)
require.Nil(t, err)
verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())

destroy = func(t *testing.T) {
verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups)
}
return backups, destroy
return backups
}

func GetReplicaPosition(t *testing.T) string {
pos, _ := cluster.GetPrimaryPosition(t, *replica1, hostname)
func GetReplicaPosition(t *testing.T, replicaIndex int) string {
replica := getReplica(t, replicaIndex)
pos, _ := cluster.GetPrimaryPosition(t, *replica, hostname)
return pos
}

func GetReplicaGtidPurged(t *testing.T) string {
func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string {
replica := getReplica(t, replicaIndex)
query := "select @@global.gtid_purged as gtid_purged"
rs, err := replica1.VttabletProcess.QueryTablet(query, keyspaceName, true)
rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
Expand Down Expand Up @@ -1135,13 +1131,62 @@ func ReadRowsFromPrimary(t *testing.T) (msgs []string) {
return ReadRowsFromTablet(t, primary)
}

func ReadRowsFromReplica(t *testing.T) (msgs []string) {
return ReadRowsFromTablet(t, replica1)
func getReplica(t *testing.T, replicaIndex int) *cluster.Vttablet {
switch replicaIndex {
case 0:
return replica1
case 1:
return replica2
default:
assert.Failf(t, "invalid replica index", "index=%d", replicaIndex)
return nil
}
}

func ReadRowsFromReplica(t *testing.T, replicaIndex int) (msgs []string) {
return ReadRowsFromTablet(t, getReplica(t, replicaIndex))
}

// FlushBinaryLogsOnReplica issues `FLUSH BINARY LOGS` <count> times
func FlushBinaryLogsOnReplica(t *testing.T, replicaIndex int, count int) {
replica := getReplica(t, replicaIndex)
query := "flush binary logs"
for i := 0; i < count; i++ {
_, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
}
}

// FlushAndPurgeBinaryLogsOnReplica intentionally loses all existing binary logs. It flushes into a new binary log
// and immediately purges all previous logs.
// This is used to lose information.
func FlushAndPurgeBinaryLogsOnReplica(t *testing.T, replicaIndex int) (lastBinlog string) {
FlushBinaryLogsOnReplica(t, replicaIndex, 1)

replica := getReplica(t, replicaIndex)
{
query := "show binary logs"
rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
require.NotEmpty(t, rs.Rows)
for _, row := range rs.Rows {
// binlog file name is first column
lastBinlog = row[0].ToString()
}
}
{
query, err := sqlparser.ParseAndBind("purge binary logs to %a", sqltypes.StringBindVariable(lastBinlog))
require.NoError(t, err)
_, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
}
return lastBinlog
}

func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.BackupManifest) {
// reading manifest
data, err := os.ReadFile(backupLocation + "/MANIFEST")
fullPath := backupLocation + "/MANIFEST"
data, err := os.ReadFile(fullPath)
require.NoErrorf(t, err, "error while reading MANIFEST %v", err)

// parsing manifest
Expand All @@ -1151,19 +1196,19 @@ func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.B
return manifest
}

func TestReplicaFullBackup(t *testing.T) (manifest *mysqlctl.BackupManifest, destroy func(t *testing.T)) {
backups, destroy := vtctlBackupReplicaNoDestroyNoWrites(t, "replica")
func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.BackupManifest) {
backups := vtctlBackupReplicaNoDestroyNoWrites(t, replicaIndex)

backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backups[len(backups)-1]
return readManifestFile(t, backupLocation), destroy
return readManifestFile(t, backupLocation)
}

func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
incrementalFromPosArg := "auto"
if !incrementalFromPos.IsZero() {
incrementalFromPosArg = replication.EncodePosition(incrementalFromPos)
}
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica1.Alias)
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
Expand All @@ -1173,23 +1218,43 @@ func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos replication.P

backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())
verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())
backupName = backups[len(backups)-1]
backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName
return readManifestFile(t, backupLocation), backupName
}

func TestReplicaRestoreToPos(t *testing.T, restoreToPos replication.Position, expectError string) {
func TestReplicaIncrementalBackup(t *testing.T, replicaIndex int, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
replica := getReplica(t, replicaIndex)
return testReplicaIncrementalBackup(t, replica, incrementalFromPos, expectError)
}

func TestReplicaFullRestore(t *testing.T, replicaIndex int, expectError string) {
replica := getReplica(t, replicaIndex)

output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
return
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars())
}

func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos replication.Position, expectError string) {
replica := getReplica(t, replicaIndex)

require.False(t, restoreToPos.IsZero())
restoreToPosArg := replication.EncodePosition(restoreToPos)
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--restore-to-pos", restoreToPosArg, replica1.Alias)
output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--", "--restore_to_pos", restoreToPosArg, replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
return
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars())
}

func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, expectError string) {
Expand Down
Loading