Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental backup and point in time recovery for XtraBackup #13156

Merged
merged 32 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6212772
incremental backup is always using 'builtin' engine
shlomi-noach May 24, 2023
bc9c029
restore: use 'builtin' for incremental restore
shlomi-noach May 25, 2023
db506ca
test all backup types
shlomi-noach May 25, 2023
1ae6041
format code
shlomi-noach May 25, 2023
1b79d88
Populate PurgedPosition
shlomi-noach May 25, 2023
b17db00
cleanup backups at the end of each test case
shlomi-noach May 25, 2023
7f1a577
improved cleanup
shlomi-noach May 25, 2023
c40090f
rename variable
shlomi-noach May 25, 2023
a042eb6
record all backups
shlomi-noach May 25, 2023
33cb6cd
no need to cleanup backups in between test cases, since each new case…
shlomi-noach May 25, 2023
7b7bd41
install xtrabackup on backup_pitr tests
shlomi-noach May 25, 2023
a8e6817
use pgzip for xtrabackup
shlomi-noach May 28, 2023
a1af0fe
more debug info
shlomi-noach May 28, 2023
6db1a8d
builtin engine: store gtid_purged in manifest
shlomi-noach May 29, 2023
b5641cf
use backupfrom-GTID as incremental-from-GTID if first binary log has …
shlomi-noach May 29, 2023
57877e6
more unit tests
shlomi-noach May 29, 2023
9b9016d
improve error message
shlomi-noach May 29, 2023
a2dc603
capturing MySQL's stderr and reading and logging if not empty
shlomi-noach May 29, 2023
fa66d2c
At the end of Xtrabackup restore, validate that @@gtid_purged (and th…
shlomi-noach May 30, 2023
136126b
add comperssion details into test case. Fix GTID validation of manife…
shlomi-noach May 30, 2023
c28a9ab
check manifest
shlomi-noach May 31, 2023
f40db18
Refactor into function
shlomi-noach May 31, 2023
741cfeb
check manifest.Position.GTIDSet
shlomi-noach May 31, 2023
5fbe3d1
fix wrangler tests
shlomi-noach May 31, 2023
abf6672
typo
shlomi-noach Jun 5, 2023
4f5fd46
Update go/vt/mysqlctl/backup.go
shlomi-noach Jun 5, 2023
289645e
Update go/vt/mysqlctl/backup.go
shlomi-noach Jun 5, 2023
027f33d
Update go/vt/mysqlctl/backup.go
shlomi-noach Jun 5, 2023
2b02ab7
typo
shlomi-noach Jun 5, 2023
fe2aa08
Update go/vt/mysqlctl/mysqld.go
shlomi-noach Jun 5, 2023
63890aa
Update go/vt/mysqlctl/mysqld.go
shlomi-noach Jun 5, 2023
a67e497
Update go/vt/mysqlctl/mysqld.go
shlomi-noach Jun 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 159 additions & 139 deletions go/test/endtoend/backup/pitr/backup_mysqlctld_pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,161 +52,181 @@ func waitForReplica(t *testing.T) {
}

// TestIncrementalBackupMysqlctld - tests incremental backups using myslctld
func TestIncrementalBackupMysqlctld(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the content here is the same, just indented, and executes for any one of the three backup types we support. Best reviewed with spaces ignored.

func TestIncrementalBackup(t *testing.T) {
defer cluster.PanicHandler(t)
// setup cluster for the testing
code, err := backup.LaunchCluster(backup.Mysqlctld, "xbstream", 0, nil)
require.NoError(t, err, "setup failed with status code %d", code)
defer backup.TearDownCluster()

backup.InitTestTable(t)

rowsPerPosition := map[string]int{}
backupPositions := []string{}

recordRowsPerPosition := func(t *testing.T) {
pos := backup.GetReplicaPosition(t)
msgs := backup.ReadRowsFromReplica(t)
if _, ok := rowsPerPosition[pos]; !ok {
backupPositions = append(backupPositions, pos)
rowsPerPosition[pos] = len(msgs)
}
}

var fullBackupPos mysql.Position
t.Run("full backup", func(t *testing.T) {
backup.InsertRowOnPrimary(t, "before-full-backup")
waitForReplica(t)
manifest, _ := backup.TestReplicaFullBackup(t)
fullBackupPos = manifest.Position
require.False(t, fullBackupPos.IsZero())
//
msgs := backup.ReadRowsFromReplica(t)
pos := mysql.EncodePosition(fullBackupPos)
backupPositions = append(backupPositions, pos)
rowsPerPosition[pos] = len(msgs)
})

lastBackupPos := fullBackupPos
backup.InsertRowOnPrimary(t, "before-incremental-backups")

tt := []struct {
name string
writeBeforeBackup bool
fromFullPosition bool
autoPosition bool
expectError string
tcases := []struct {
name string
setupType int
}{
{
name: "first incremental backup",
"XtraBackup", backup.XtraBackup,
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
"BuiltinBackup", backup.BuiltinBackup,
},
{
name: "fail, no binary logs to backup",
expectError: "no binary logs to backup",
},
{
name: "make writes again, succeed",
writeBeforeBackup: true,
},
{
name: "auto position, succeed",
writeBeforeBackup: true,
autoPosition: true,
},
{
name: "fail auto position, no binary logs to backup",
autoPosition: true,
expectError: "no binary logs to backup",
},
{
name: "auto position, make writes again, succeed",
writeBeforeBackup: true,
autoPosition: true,
},
{
name: "from full backup position",
fromFullPosition: true,
"Mysqlctld", backup.Mysqlctld,
},
}
var fromFullPositionBackups []string
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
if tc.writeBeforeBackup {
backup.InsertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
time.Sleep(1100 * time.Millisecond)
waitForReplica(t)
recordRowsPerPosition(t)
// configure --incremental-from-pos to either:
// - auto
// - explicit last backup pos
// - back in history to the original full backup
var incrementalFromPos mysql.Position
if !tc.autoPosition {
incrementalFromPos = lastBackupPos
if tc.fromFullPosition {
incrementalFromPos = fullBackupPos
for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {

// setup cluster for the testing
code, err := backup.LaunchCluster(tcase.setupType, "xbstream", 0, nil)
require.NoError(t, err, "setup failed with status code %d", code)
defer backup.TearDownCluster()

backup.InitTestTable(t)

rowsPerPosition := map[string]int{}
backupPositions := []string{}

recordRowsPerPosition := func(t *testing.T) {
pos := backup.GetReplicaPosition(t)
msgs := backup.ReadRowsFromReplica(t)
if _, ok := rowsPerPosition[pos]; !ok {
backupPositions = append(backupPositions, pos)
rowsPerPosition[pos] = len(msgs)
}
}
manifest, backupName := backup.TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError)
if tc.expectError != "" {
return
}
defer func() {
lastBackupPos = manifest.Position
}()
if tc.fromFullPosition {
fromFullPositionBackups = append(fromFullPositionBackups, backupName)
}
require.False(t, manifest.FromPosition.IsZero())
require.NotEqual(t, manifest.Position, manifest.FromPosition)
require.True(t, manifest.Position.GTIDSet.Contains(manifest.FromPosition.GTIDSet))

gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, backup.GetReplicaGtidPurged(t))
require.NoError(t, err)
fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet)
var fullBackupPos mysql.Position
t.Run("full backup", func(t *testing.T) {
backup.InsertRowOnPrimary(t, "before-full-backup")
waitForReplica(t)
manifest, _ := backup.TestReplicaFullBackup(t)
fullBackupPos = manifest.Position
require.False(t, fullBackupPos.IsZero())
//
msgs := backup.ReadRowsFromReplica(t)
pos := mysql.EncodePosition(fullBackupPos)
backupPositions = append(backupPositions, pos)
rowsPerPosition[pos] = len(msgs)
})

expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
if !incrementalFromPos.IsZero() {
expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
lastBackupPos := fullBackupPos
backup.InsertRowOnPrimary(t, "before-incremental-backups")

tt := []struct {
name string
writeBeforeBackup bool
fromFullPosition bool
autoPosition bool
expectError string
}{
{
name: "first incremental backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
},
{
name: "fail, no binary logs to backup",
expectError: "no binary logs to backup",
},
{
name: "make writes again, succeed",
writeBeforeBackup: true,
},
{
name: "auto position, succeed",
writeBeforeBackup: true,
autoPosition: true,
},
{
name: "fail auto position, no binary logs to backup",
autoPosition: true,
expectError: "no binary logs to backup",
},
{
name: "auto position, make writes again, succeed",
writeBeforeBackup: true,
autoPosition: true,
},
{
name: "from full backup position",
fromFullPosition: true,
},
}
var fromFullPositionBackups []string
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
if tc.writeBeforeBackup {
backup.InsertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
time.Sleep(1100 * time.Millisecond)
waitForReplica(t)
recordRowsPerPosition(t)
// configure --incremental-from-pos to either:
// - auto
// - explicit last backup pos
// - back in history to the original full backup
var incrementalFromPos mysql.Position
if !tc.autoPosition {
incrementalFromPos = lastBackupPos
if tc.fromFullPosition {
incrementalFromPos = fullBackupPos
}
}
manifest, backupName := backup.TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError)
if tc.expectError != "" {
return
}
defer func() {
lastBackupPos = manifest.Position
}()
if tc.fromFullPosition {
fromFullPositionBackups = append(fromFullPositionBackups, backupName)
}
require.False(t, manifest.FromPosition.IsZero())
require.NotEqual(t, manifest.Position, manifest.FromPosition)
require.True(t, manifest.Position.GTIDSet.Contains(manifest.FromPosition.GTIDSet))

gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, backup.GetReplicaGtidPurged(t))
require.NoError(t, err)
fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet)

expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
if !incrementalFromPos.IsZero() {
expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
}
require.Equalf(t, expectFromPosition, fromPositionIncludingPurged, "expected: %v, found: %v", expectFromPosition, fromPositionIncludingPurged)
})
}
require.Equalf(t, expectFromPosition, fromPositionIncludingPurged, "expected: %v, found: %v", expectFromPosition, fromPositionIncludingPurged)
})
}

testRestores := func(t *testing.T) {
for _, r := range rand.Perm(len(backupPositions)) {
pos := backupPositions[r]
testName := fmt.Sprintf("%s, %d records", pos, rowsPerPosition[pos])
t.Run(testName, func(t *testing.T) {
restoreToPos, err := mysql.DecodePosition(pos)
require.NoError(t, err)
backup.TestReplicaRestoreToPos(t, restoreToPos, "")
msgs := backup.ReadRowsFromReplica(t)
count, ok := rowsPerPosition[pos]
require.True(t, ok)
assert.Equalf(t, count, len(msgs), "messages: %v", msgs)
testRestores := func(t *testing.T) {
for _, r := range rand.Perm(len(backupPositions)) {
pos := backupPositions[r]
testName := fmt.Sprintf("%s, %d records", pos, rowsPerPosition[pos])
t.Run(testName, func(t *testing.T) {
restoreToPos, err := mysql.DecodePosition(pos)
require.NoError(t, err)
backup.TestReplicaRestoreToPos(t, restoreToPos, "")
msgs := backup.ReadRowsFromReplica(t)
count, ok := rowsPerPosition[pos]
require.True(t, ok)
assert.Equalf(t, count, len(msgs), "messages: %v", msgs)
})
}
}
t.Run("PITR", func(t *testing.T) {
testRestores(t)
})
}
t.Run("remove full position backups", func(t *testing.T) {
// Delete the fromFullPosition backup(s), which leaves us with less restore options. Try again.
for _, backupName := range fromFullPositionBackups {
backup.RemoveBackup(t, backupName)
}
})
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
})
}
t.Run("PITR", func(t *testing.T) {
testRestores(t)
})
t.Run("remove full position backups", func(t *testing.T) {
// Delete the fromFullPosition backup(s), which leaves us with less restore options. Try again.
for _, backupName := range fromFullPositionBackups {
backup.RemoveBackup(t, backupName)
}
})
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
}
19 changes: 15 additions & 4 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,24 @@ func Backup(ctx context.Context, params BackupParams) error {
return vterrors.Wrap(err, "StartBackup failed")
}

be, err := GetBackupEngine()
if err != nil {
return vterrors.Wrap(err, "failed to find backup engine")
}
// Scope stats to selected backup engine.
beParams := params.Copy()
beParams.Stats = params.Stats.Scope(
stats.Component(stats.BackupEngine),
stats.Implementation(titleCase(backupEngineImplementation)),
)
var be BackupEngine
if isIncrementalBackup(beParams) {
// Incremental backups are always done via 'builtin' engine, which copies
// appropriate binlog files.
be = BackupRestoreEngineMap[builtinBackupEngineName]
} else {
be, err = GetBackupEngine()
if err != nil {
return vterrors.Wrap(err, "failed to find backup engine")
}
}

// Take the backup, and either AbortBackup or EndBackup.
usable, err := be.ExecuteBackup(ctx, beParams, bh)
logger := params.Logger
Expand Down Expand Up @@ -406,6 +414,9 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error)

if handles := restorePath.IncrementalBackupHandles(); len(handles) > 0 {
params.Logger.Infof("Restore: applying %v incremental backups", len(handles))
// Incremental restores are always done via 'builtin' engine, which copies
// appropriate binlog files.
re := BackupRestoreEngineMap[builtinBackupEngineName]
for _, bh := range handles {
manifest, err := re.ExecuteRestore(ctx, params, bh)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/backupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func init() {
}
}

// isIncrementalBackup is a convenience function to check whether the params indicate an incremental backup request
func isIncrementalBackup(params BackupParams) bool {
return params.IncrementalFromPos != ""
}

func registerBackupEngineFlags(fs *pflag.FlagSet) {
fs.StringVar(&backupEngineImplementation, "backup_engine_implementation", backupEngineImplementation, "Specifies which implementation to use for creating new backups (builtin or xtrabackup). Restores will always be done with whichever engine created a given backup.")
}
Expand Down
7 changes: 1 addition & 6 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ func registerBuiltinBackupEngineFlags(fs *pflag.FlagSet) {
fs.UintVar(&builtinBackupFileWriteBufferSize, "builtinbackup-file-write-buffer-size", builtinBackupFileWriteBufferSize, "write files using an IO buffer of this many bytes. Golang defaults are used when set to 0.")
}

// isIncrementalBackup is a convenience function to check whether the params indicate an incremental backup request
func isIncrementalBackup(params BackupParams) bool {
return params.IncrementalFromPos != ""
}

// fullPath returns the full path of the entry, based on its type
func (fe *FileEntry) fullPath(cnf *Mycnf) (string, error) {
// find the root to use
Expand Down Expand Up @@ -1126,5 +1121,5 @@ func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, t
}

func init() {
BackupRestoreEngineMap["builtin"] = &BuiltinBackupEngine{}
BackupRestoreEngineMap[builtinBackupEngineName] = &BuiltinBackupEngine{}
}
Loading