Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup/restore: provision and restore a tablet with point-in-time recovery flags #13964

Merged
merged 16 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ Usage of vttablet:
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
--restore_from_backup (init restore parameter) will check BackupStorage for a recent backup at startup and start there
--restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050'
--restore_to_pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore_to_timestamp 2006-01-02T15:04:05Z07:00 (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format (2006-01-02T15:04:05Z07:00)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a date next to the flag name? The type should be timestamp (or something like that).
Also in a later PR, we should explore whether to combine this flag with --restore_from_backup_ts string if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The date clarifies what RFC3339 format looks like, and is the golang default template for that time format. I can remove it of course.

Yeah, combining with --restore_from_backup_ts is an option, but might be confusing because the flag already has a different meaning and so we'd have to introduce a new flag to say how we'd want to interpret --restore_from_backup_ts...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually no idea why --restore_to_timestamp 2006-01-02T15:04:05Z07:00!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. It's because the comment includes the text

(`2006-01-02T15:04:05Z07:00`)

which is interpeted as the default value/type for the variable. Editing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combining with --restore_from_backup_ts is an option, but might be confusing because the flag already has a different meaning

Yes, that's why I've deferred the discussion on it 😅

--retain_online_ddl_tables duration How long should vttablet keep an old migrated table before purging it (default 24h0m0s)
--s2a_enable_appengine_dialer If true, opportunistically use AppEngine-specific dialer to call S2A.
--s2a_timeout duration Timeout enforced on the connection to the S2A service for handshake. (default 3s)
Expand Down
20 changes: 17 additions & 3 deletions go/test/endtoend/backup/pitr/backup_pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ import (
backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)

// TestIncrementalBackupAndRestoreToPos
// TestIncrementalBackupAndRestoreToPos - tests incremental backups and restores.
// The general outline of the test:
// - Generate some schema with data
// - Take a full backup
// - Proceed to take a series of inremental backups. In between, inject data (insert rows), and keep record
// of which data (number of rows) is present in each backup, and at which position.
// - Expect backups success/failure per scenario
// - Next up, we start testing restores. Randomly pick recorded positions and restore to those points in time.
// - In each restore, excpect to find the data (number of rows) recorded for said position
// - Some restores should fail because the position exceeds the last binlog
// - Do so for all recorded positions.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
// - Last, create a new tablet with --restore_from_backup --restore_to_pos and see that it bootstraps with restored data
// and that it ends up in DRAINED type
func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
Expand All @@ -45,8 +59,8 @@ func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
// - Do so for all recorded tiemstamps.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
//
// All of the above is done for BuiltinBackup, XtraBackup, Mysqlctld (which is technically builtin)
// - Last, create a new tablet with --restore_from_backup --restore_to_timestamp and see that it bootstraps with restored data
// and that it ends up in DRAINED type
func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
Expand Down
42 changes: 34 additions & 8 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
primary *cluster.Vttablet
replica1 *cluster.Vttablet
replica2 *cluster.Vttablet
replica3 *cluster.Vttablet
localCluster *cluster.LocalProcessCluster
newInitDBFile string
useXtrabackup bool
Expand Down Expand Up @@ -90,6 +91,7 @@ var (
primary key (id)
) Engine=InnoDB
`
SetupReplica3Tablet func(extraArgs []string) (*cluster.Vttablet, error)
)

type CompressionDetails struct {
Expand Down Expand Up @@ -170,9 +172,10 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
0: "primary",
1: "replica",
2: "rdonly",
3: "spare",
}
for i := 0; i < 3; i++ {
tabletType := tabletTypes[i]

createTablet := func(tabletType string) error {
tablet := localCluster.NewVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
Expand All @@ -182,33 +185,40 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
if setupType == Mysqlctld {
mysqlctldProcess, err := cluster.MysqlCtldProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
if err != nil {
return 1, err
return err
}
tablet.MysqlctldProcess = *mysqlctldProcess
tablet.MysqlctldProcess.InitDBFile = newInitDBFile
tablet.MysqlctldProcess.ExtraArgs = extraArgs
tablet.MysqlctldProcess.Password = tablet.VttabletProcess.DbPassword
if err := tablet.MysqlctldProcess.Start(); err != nil {
return 1, err
return err
}
shard.Vttablets = append(shard.Vttablets, tablet)
continue
return nil
}

mysqlctlProcess, err := cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
if err != nil {
return 1, err
return err
}
tablet.MysqlctlProcess = *mysqlctlProcess
tablet.MysqlctlProcess.InitDBFile = newInitDBFile
tablet.MysqlctlProcess.ExtraArgs = extraArgs
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
return 1, err
return err
}
mysqlProcs = append(mysqlProcs, proc)

shard.Vttablets = append(shard.Vttablets, tablet)
return nil
}
for i := 0; i < 4; i++ {
tabletType := tabletTypes[i]
if err := createTablet(tabletType); err != nil {
return 1, err
}
}
for _, proc := range mysqlProcs {
if err := proc.Wait(); err != nil {
Expand All @@ -218,6 +228,7 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
primary = shard.Vttablets[0]
replica1 = shard.Vttablets[1]
replica2 = shard.Vttablets[2]
replica3 = shard.Vttablets[3]

if err := localCluster.VtctlclientProcess.InitTablet(primary, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
Expand All @@ -234,12 +245,23 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
return 1, err
}

for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} {
for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} { // we don't start replica3 yet
if err := tablet.VttabletProcess.Setup(); err != nil {
return 1, err
}
}

SetupReplica3Tablet = func(extraArgs []string) (*cluster.Vttablet, error) {
replica3.VttabletProcess.ExtraArgs = append(replica3.VttabletProcess.ExtraArgs, extraArgs...)
if err := localCluster.VtctlclientProcess.InitTablet(replica3, cell, keyspaceName, hostname, shard.Name); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be necessary.

return replica3, err
}
if err := replica3.VttabletProcess.Setup(); err != nil {
return replica3, err
}
return replica3, nil
}

if err := localCluster.VtctlclientProcess.InitShardPrimary(keyspaceName, shard.Name, cell, primary.TabletUID); err != nil {
return 1, err
}
Expand Down Expand Up @@ -1140,6 +1162,8 @@ func getReplica(t *testing.T, replicaIndex int) *cluster.Vttablet {
return replica1
case 1:
return replica2
case 2:
return replica3
default:
assert.Failf(t, "invalid replica index", "index=%d", replicaIndex)
return nil
Expand Down Expand Up @@ -1290,6 +1314,7 @@ func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos replic
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars())
checkTabletType(t, replica1.Alias, topodata.TabletType_DRAINED)
}

func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, expectError string) {
Expand All @@ -1303,6 +1328,7 @@ func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, e
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
checkTabletType(t, replica1.Alias, topodata.TabletType_DRAINED)
}

func verifyTabletBackupStats(t *testing.T, vars map[string]any) {
Expand Down
53 changes: 53 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

var (
gracefulPostBackupDuration = 10 * time.Millisecond
backupTimeoutDuration = 3 * time.Minute
)

const (
Expand Down Expand Up @@ -225,6 +226,7 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
})
}

sampleTestedBackupPos := ""
testRestores := func(t *testing.T) {
for _, r := range rand.Perm(len(backupPositions)) {
pos := backupPositions[r]
Expand All @@ -237,6 +239,9 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
count, ok := rowsPerPosition[pos]
require.True(t, ok)
assert.Equalf(t, count, len(msgs), "messages: %v", msgs)
if sampleTestedBackupPos == "" {
sampleTestedBackupPos = pos
}
})
}
}
Expand All @@ -252,6 +257,27 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
// Test that we can create a new tablet with --restore_from_backup --restore_to_pos and that it bootstraps
// via PITR and ends up in DRAINED type.
t.Run("init tablet PITR", func(t *testing.T) {
require.NotEmpty(t, sampleTestedBackupPos)

var tablet *cluster.Vttablet

t.Run(fmt.Sprintf("init from backup pos %s", sampleTestedBackupPos), func(t *testing.T) {
tablet, err = SetupReplica3Tablet([]string{"--restore_to_pos", sampleTestedBackupPos})
assert.NoError(t, err)
})
t.Run("wait for drained", func(t *testing.T) {
err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"drained"}, backupTimeoutDuration)
assert.NoError(t, err)
})
t.Run(fmt.Sprintf("validate %d rows", rowsPerPosition[sampleTestedBackupPos]), func(t *testing.T) {
require.NotZero(t, rowsPerPosition[sampleTestedBackupPos])
msgs := ReadRowsFromReplica(t, 2)
assert.Equal(t, rowsPerPosition[sampleTestedBackupPos], len(msgs))
})
})
})
}

Expand Down Expand Up @@ -415,6 +441,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
})
}

sampleTestedBackupIndex := -1
testRestores := func(t *testing.T) {
numFailedRestores := 0
numSuccessfulRestores := 0
Expand All @@ -433,6 +460,9 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
msgs := ReadRowsFromReplica(t, 0)
assert.Equalf(t, testedBackup.rows, len(msgs), "messages: %v", msgs)
numSuccessfulRestores++
if sampleTestedBackupIndex < 0 {
sampleTestedBackupIndex = backupIndex
}
} else {
numFailedRestores++
}
Expand All @@ -454,6 +484,29 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
// Test that we can create a new tablet with --restore_from_backup --restore_to_timestamp and that it bootstraps
// via PITR and ends up in DRAINED type.
t.Run("init tablet PITR", func(t *testing.T) {
require.GreaterOrEqual(t, sampleTestedBackupIndex, 0)
sampleTestedBackup := testedBackups[sampleTestedBackupIndex]
restoreToTimestampArg := mysqlctl.FormatRFC3339(sampleTestedBackup.postTimestamp)

var tablet *cluster.Vttablet

t.Run(fmt.Sprintf("init from backup num %d", sampleTestedBackupIndex), func(t *testing.T) {
tablet, err = SetupReplica3Tablet([]string{"--restore_to_timestamp", restoreToTimestampArg})
assert.NoError(t, err)
})
t.Run("wait for drained", func(t *testing.T) {
err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"drained"}, backupTimeoutDuration)
assert.NoError(t, err)
})
t.Run(fmt.Sprintf("validate %d rows", sampleTestedBackup.rows), func(t *testing.T) {
require.NotZero(t, sampleTestedBackup.rows)
msgs := ReadRowsFromReplica(t, 2)
assert.Equal(t, sampleTestedBackup.rows, len(msgs))
})
})
})
}

Expand Down
34 changes: 29 additions & 5 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,18 @@ func registerRestoreFlags(fs *pflag.FlagSet) {
}

var (
// Flags for PITR
// Flags for incremental restore (PITR) - new iteration
restoreToTimestampStr string
restoreToPos string
)

func registerIncrementalRestoreFlags(fs *pflag.FlagSet) {
fs.StringVar(&restoreToTimestampStr, "restore_to_timestamp", restoreToTimestampStr, "(init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format (`2006-01-02T15:04:05Z07:00`)")
fs.StringVar(&restoreToPos, "restore_to_pos", restoreToPos, "(init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups")
}

var (
// Flags for PITR - old iteration
binlogHost string
binlogPort int
binlogUser string
Expand Down Expand Up @@ -99,6 +110,9 @@ func init() {
servenv.OnParseFor("vtcombo", registerRestoreFlags)
servenv.OnParseFor("vttablet", registerRestoreFlags)

servenv.OnParseFor("vtcombo", registerIncrementalRestoreFlags)
servenv.OnParseFor("vttablet", registerIncrementalRestoreFlags)

servenv.OnParseFor("vtcombo", registerPointInTimeRestoreFlags)
servenv.OnParseFor("vttablet", registerPointInTimeRestoreFlags)

Expand All @@ -110,7 +124,14 @@ func init() {
// It will either work, fail gracefully, or return
// an error in case of a non-recoverable error.
// It takes the action lock so no RPC interferes.
func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger, waitForBackupInterval time.Duration, deleteBeforeRestore bool, backupTime time.Time) error {
func (tm *TabletManager) RestoreData(
ctx context.Context,
logger logutil.Logger,
waitForBackupInterval time.Duration,
deleteBeforeRestore bool,
backupTime time.Time,
restoreToTimetamp time.Time,
restoreToPos string) error {
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -155,7 +176,9 @@ func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger,
startTime = time.Now()

req := &tabletmanagerdatapb.RestoreFromBackupRequest{
BackupTime: protoutil.TimeToProto(backupTime),
BackupTime: protoutil.TimeToProto(backupTime),
RestoreToPos: restoreToPos,
RestoreToTimestamp: protoutil.TimeToProto(restoreToTimetamp),
}
err = tm.restoreDataLocked(ctx, logger, waitForBackupInterval, deleteBeforeRestore, req)
if err != nil {
Expand Down Expand Up @@ -207,7 +230,8 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
DryRun: request.DryRun,
Stats: backupstats.RestoreStats(),
}
if request.RestoreToPos != "" && !protoutil.TimeFromProto(request.RestoreToTimestamp).UTC().IsZero() {
restoreToTimestamp := protoutil.TimeFromProto(request.RestoreToTimestamp).UTC()
if request.RestoreToPos != "" && !restoreToTimestamp.IsZero() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--restore_to_pos and --restore_to_timestamp are mutually exclusive")
}
if request.RestoreToPos != "" {
Expand All @@ -217,7 +241,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
}
params.RestoreToPos = pos
}
if restoreToTimestamp := protoutil.TimeFromProto(request.RestoreToTimestamp).UTC(); !restoreToTimestamp.IsZero() {
if !restoreToTimestamp.IsZero() {
// Restore to given timestamp
params.RestoreToTimestamp = restoreToTimestamp
}
Expand Down
14 changes: 12 additions & 2 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,9 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) {
if tm.Cnf == nil && restoreFromBackup {
return false, fmt.Errorf("you cannot enable --restore_from_backup without a my.cnf file")
}
if restoreToTimestampStr != "" && restoreToPos != "" {
return false, fmt.Errorf("--restore_to_timestamp and --restore_to_pos are mutually exclusive")
}

// Restore in the background
if restoreFromBackup {
Expand All @@ -778,7 +781,6 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) {

// Zero date will cause us to use the latest, which is the default
backupTime := time.Time{}

// Or if a backup timestamp was specified then we use the last backup taken at or before that time
if restoreFromBackupTsStr != "" {
var err error
Expand All @@ -788,9 +790,17 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) {
}
}

restoreToTimestamp := time.Time{}
if restoreToTimestampStr != "" {
var err error
restoreToTimestamp, err = mysqlctl.ParseRFC3339(restoreToTimestampStr)
if err != nil {
log.Exitf(fmt.Sprintf("RestoreFromBackup failed: unable to parse the --restore_to_timestamp value provided of '%s'. Error: %v", restoreToTimestampStr, err))
}
}
// restoreFromBackup will just be a regular action
// (same as if it was triggered remotely)
if err := tm.RestoreData(ctx, logutil.NewConsoleLogger(), waitForBackupInterval, false /* deleteBeforeRestore */, backupTime); err != nil {
if err := tm.RestoreData(ctx, logutil.NewConsoleLogger(), waitForBackupInterval, false /* deleteBeforeRestore */, backupTime, restoreToTimestamp, restoreToPos); err != nil {
log.Exitf("RestoreFromBackup failed: %v", err)
}
}()
Expand Down
Loading