From 647d197219c1ff7acd0aa9a73d4cf8c32377da57 Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Tue, 16 Jul 2024 20:28:37 +0100 Subject: [PATCH] fixing issue with xtrabackup and long gtids (#16304) Signed-off-by: Renan Rangel --- go/vt/mysqlctl/xtrabackupengine.go | 44 ++++++++++++++---------- go/vt/mysqlctl/xtrabackupengine_test.go | 45 +++++++++++++++++++------ 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 38f29cee699..e895ca4d96b 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -68,6 +68,7 @@ const ( streamModeTar = "tar" xtrabackupBinaryName = "xtrabackup" xtrabackupEngineName = "xtrabackup" + xtrabackupInfoFile = "xtrabackup_info" xbstream = "xbstream" // closeTimeout is the timeout for closing backup files after writing. @@ -238,8 +239,14 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara return true, nil } -func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) { - +func (be *XtrabackupEngine) backupFiles( + ctx context.Context, + params BackupParams, + bh backupstorage.BackupHandle, + backupFileName string, + numStripes int, + flavor string, +) (replicationPosition mysql.Position, finalErr error) { backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName) flagsToExec := []string{"--defaults-file=" + params.Cnf.Path, "--backup", @@ -247,6 +254,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams "--slave-info", "--user=" + xtrabackupUser, "--target-dir=" + params.Cnf.TmpDir, + "--extra-lsndir=" + params.Cnf.TmpDir, } if xtrabackupStreamMode != "" { flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode) @@ -345,27 +353,14 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // the replication position. Note that if we don't read stderr as we go, the // xtrabackup process gets blocked when the write buffer fills up. stderrBuilder := &strings.Builder{} - posBuilder := &strings.Builder{} stderrDone := make(chan struct{}) go func() { defer close(stderrDone) scanner := bufio.NewScanner(backupErr) - capture := false for scanner.Scan() { line := scanner.Text() params.Logger.Infof("xtrabackup stderr: %s", line) - - // Wait until we see the first line of the binlog position. - // Then capture all subsequent lines. We need multiple lines since - // the value we're looking for has newlines in it. - if !capture { - if !strings.Contains(line, "MySQL binlog position") { - continue - } - capture = true - } - fmt.Fprintln(posBuilder, line) } if err := scanner.Err(); err != nil { params.Logger.Errorf("error reading from xtrabackup stderr: %v", err) @@ -409,8 +404,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput)) } - posOutput := posBuilder.String() - replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger) + replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(params.Cnf.TmpDir, flavor, params.Logger) if rerr != nil { return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position") } @@ -694,6 +688,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log return nil } +func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (mysql.Position, error) { + f, err := os.Open(path.Join(directory, xtrabackupInfoFile)) + if err != nil { + return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, + "couldn't open %q to read GTID position", path.Join(directory, xtrabackupInfoFile)) + } + defer f.Close() + + contents, err := io.ReadAll(f) + if err != nil { + return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name()) + } + + return findReplicationPosition(string(contents), flavor, logger) +} + var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`) func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql.Position, error) { diff --git a/go/vt/mysqlctl/xtrabackupengine_test.go b/go/vt/mysqlctl/xtrabackupengine_test.go index 26e53c6c949..ea47b0d7d8c 100644 --- a/go/vt/mysqlctl/xtrabackupengine_test.go +++ b/go/vt/mysqlctl/xtrabackupengine_test.go @@ -20,8 +20,12 @@ import ( "bytes" "io" "math/rand" + + "os" + "path" "testing" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/vt/logutil" ) @@ -50,27 +54,48 @@ func TestFindReplicationPosition(t *testing.T) { t.Errorf("findReplicationPosition() = %v; want %v", got, want) } } +func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) { + input := `tool_version = 8.0.35-30 + binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3, + 1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3, + 47b59de1-b368-11e9-b48b-624401d35560:1-152981, + 557def0a-b368-11e9-84ed-f6fffd91cc57:1-3, + 599ef589-ae55-11e9-9688-ca1f44501925:1-14857169, + b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262' + format = xbstream + ` + want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262" -func TestFindReplicationPositionNoMatch(t *testing.T) { + tmp, err := os.MkdirTemp(t.TempDir(), "test") + assert.NoError(t, err) + + f, err := os.Create(path.Join(tmp, xtrabackupInfoFile)) + assert.NoError(t, err) + _, err = f.WriteString(input) + assert.NoError(t, err) + assert.NoError(t, f.Close()) + + pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger()) + assert.NoError(t, err) + assert.Equal(t, want, pos.String()) +} + +func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `nothing` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) - if err == nil { - t.Fatalf("expected error from findReplicationPosition but got nil") - } + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) + assert.Error(t, err) } -func TestFindReplicationPositionEmptyMatch(t *testing.T) { +func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `GTID of the last change ' '` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) - if err == nil { - t.Fatalf("expected error from findReplicationPosition but got nil") - } + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) + assert.Error(t, err) } func TestStripeRoundTrip(t *testing.T) {