From 587aedc20ea3805a9e3cc421b0bd829b685c6862 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 28 Feb 2020 12:47:09 +0800 Subject: [PATCH 1/4] support backupts --- pkg/backup/client.go | 44 +++++++++++++++++++++++---------------- pkg/backup/client_test.go | 17 ++++++++++----- pkg/task/backup.go | 44 +++++++++++++++++++++++++++++++++++++-- pkg/task/backup_test.go | 34 ++++++++++++++++++++++++++++++ pkg/task/common.go | 2 +- 5 files changed, 115 insertions(+), 26 deletions(-) create mode 100644 pkg/task/backup_test.go diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 07a8fb5f1..fdc51157e 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -73,25 +73,33 @@ func NewBackupClient(ctx context.Context, mgr ClientMgr) (*Client, error) { } // GetTS returns the latest timestamp. -func (bc *Client) GetTS(ctx context.Context, duration time.Duration) (uint64, error) { - p, l, err := bc.mgr.GetPDClient().GetTS(ctx) - if err != nil { - return 0, errors.Trace(err) - } - backupTS := oracle.ComposeTS(p, l) - - switch { - case duration < 0: - return 0, errors.New("negative timeago is not allowed") - case duration > 0: - log.Info("backup time ago", zap.Duration("timeago", duration)) - - backupTime := oracle.GetTimeFromTS(backupTS) - backupAgo := backupTime.Add(-duration) - if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) { - return 0, errors.New("backup ts overflow please choose a smaller timeago") +func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) (uint64, error) { + var ( + backupTS uint64 + err error + ) + if ts > 0 { + backupTS = ts + } else { + p, l, err := bc.mgr.GetPDClient().GetTS(ctx) + if err != nil { + return 0, errors.Trace(err) + } + backupTS = oracle.ComposeTS(p, l) + + switch { + case duration < 0: + return 0, errors.New("negative timeago is not allowed") + case duration > 0: + log.Info("backup time ago", zap.Duration("timeago", duration)) + + backupTime := oracle.GetTimeFromTS(backupTS) + backupAgo := backupTime.Add(-duration) + if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) { + return 0, errors.New("backup ts overflow please choose a smaller timeago") + } + backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) } - backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) } // check backup time do not exceed GCSafePoint diff --git a/pkg/backup/client_test.go b/pkg/backup/client_test.go index bf2700cd4..63f3d5d5f 100644 --- a/pkg/backup/client_test.go +++ b/pkg/backup/client_test.go @@ -55,7 +55,7 @@ func (r *testBackup) TestGetTS(c *C) { // timeago not work expectedDuration := 0 currentTs := time.Now().UnixNano() / int64(time.Millisecond) - ts, err := r.backupClient.GetTS(r.ctx, 0) + ts, err := r.backupClient.GetTS(r.ctx, 0, 0) c.Assert(err, IsNil) pdTs := oracle.ExtractPhysical(ts) duration := int(currentTs - pdTs) @@ -65,7 +65,7 @@ func (r *testBackup) TestGetTS(c *C) { // timeago = "1.5m" expectedDuration = 90000 currentTs = time.Now().UnixNano() / int64(time.Millisecond) - ts, err = r.backupClient.GetTS(r.ctx, 90*time.Second) + ts, err = r.backupClient.GetTS(r.ctx, 90*time.Second, 0) c.Assert(err, IsNil) pdTs = oracle.ExtractPhysical(ts) duration = int(currentTs - pdTs) @@ -73,11 +73,11 @@ func (r *testBackup) TestGetTS(c *C) { c.Assert(duration, Less, expectedDuration+deviation) // timeago = "-1m" - _, err = r.backupClient.GetTS(r.ctx, -time.Minute) + _, err = r.backupClient.GetTS(r.ctx, -time.Minute, 0) c.Assert(err, ErrorMatches, "negative timeago is not allowed") // timeago = "1000000h" overflows - _, err = r.backupClient.GetTS(r.ctx, 1000000*time.Hour) + _, err = r.backupClient.GetTS(r.ctx, 1000000*time.Hour, 0) c.Assert(err, ErrorMatches, "backup ts overflow.*") // timeago = "10h" exceed GCSafePoint @@ -86,8 +86,15 @@ func (r *testBackup) TestGetTS(c *C) { now := oracle.ComposeTS(p, l) _, err = r.backupClient.mgr.GetPDClient().UpdateGCSafePoint(r.ctx, now) c.Assert(err, IsNil) - _, err = r.backupClient.GetTS(r.ctx, 10*time.Hour) + _, err = r.backupClient.GetTS(r.ctx, 10*time.Hour, 0) c.Assert(err, ErrorMatches, "GC safepoint [0-9]+ exceed TS [0-9]+") + + // timeago and backupts both exists, use backupts + backupts := oracle.ComposeTS(p+10, l) + ts, err = r.backupClient.GetTS(r.ctx, time.Minute, backupts) + c.Assert(err, IsNil) + c.Assert(ts, Equals, backupts) + } func (r *testBackup) TestBuildTableRange(c *C) { diff --git a/pkg/task/backup.go b/pkg/task/backup.go index bee2102f5..1ba615190 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -4,13 +4,18 @@ package task import ( "context" + "strconv" "time" "github.com/pingcap/errors" kvproto "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" "github.com/spf13/pflag" "go.uber.org/zap" @@ -23,6 +28,7 @@ import ( const ( flagBackupTimeago = "timeago" + flagBackupTS = "backupts" flagLastBackupTS = "lastbackupts" defaultBackupConcurrency = 4 @@ -33,6 +39,7 @@ type BackupConfig struct { Config TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` } @@ -43,7 +50,9 @@ func DefineBackupFlags(flags *pflag.FlagSet) { "The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint") // TODO: remove experimental tag if it's stable - flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts") + flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts, use for incremental backup, support TSO only") + flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+ + " e.g. '400036290571534337', '2018-05-11 01:42:23' ") } // ParseFromFlags parses the backup-related flags from the flag set. @@ -60,6 +69,15 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + backupTS, err := flags.GetString(flagBackupTS) + if err != nil { + return errors.Trace(err) + } + cfg.BackupTS, err = parseTSString(backupTS) + if err != nil { + return errors.Trace(err) + } + if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) } @@ -96,7 +114,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } - backupTS, err := client.GetTS(ctx, cfg.TimeAgo) + backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS) if err != nil { return err } @@ -198,3 +216,25 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } return nil } + +// parseTSString port from tidb setSnapshotTS +func parseTSString(ts string) (uint64, error) { + if len(ts) == 0 { + return 0, nil + } + if tso, err := strconv.ParseUint(ts, 10, 64); err == nil { + return tso, nil + } + sc := &stmtctx.StatementContext{ + TimeZone: time.Local, + } + t, err := types.ParseTime(sc, ts, mysql.TypeTimestamp, types.MaxFsp) + if err != nil { + return 0, errors.Trace(err) + } + t1, err := t.GoTime(time.Local) + if err != nil { + return 0, errors.Trace(err) + } + return variable.GoTimeToTS(t1), nil +} diff --git a/pkg/task/backup_test.go b/pkg/task/backup_test.go new file mode 100644 index 000000000..da61b8827 --- /dev/null +++ b/pkg/task/backup_test.go @@ -0,0 +1,34 @@ +package task + +import ( + "testing" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testBackupSuite{}) + +func TestT(t *testing.T) { + TestingT(t) +} + +type testBackupSuite struct{} + +func (s *testBackupSuite) TestParseTSString(c *C) { + var ( + ts uint64 + err error + ) + + ts, err = parseTSString("") + c.Assert(err, IsNil) + c.Assert(int(ts), Equals, 0) + + ts, err = parseTSString("400036290571534337") + c.Assert(err, IsNil) + c.Assert(int(ts), Equals, 400036290571534337) + + ts, err = parseTSString("2018-05-11 01:42:23") + c.Assert(err, IsNil) + c.Assert(int(ts), Equals, 400024965742592000) +} diff --git a/pkg/task/common.go b/pkg/task/common.go index 57134c60a..94e3d8c87 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -95,7 +95,7 @@ type Config struct { // DefineCommonFlags defines the flags common to all BRIE commands. func DefineCommonFlags(flags *pflag.FlagSet) { flags.BoolP(flagSendCreds, "c", true, "Whether send credentials to tikv") - flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "local:///path/to/save"`) + flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "s3:///path/to/save"`) flags.StringSliceP(flagPD, "u", []string{"127.0.0.1:2379"}, "PD address") flags.String(flagCA, "", "CA certificate path for TLS connection") flags.String(flagCert, "", "Certificate path for TLS connection") From c31b2024aa51c1870efb7c10c6f3cfa788fd64d5 Mon Sep 17 00:00:00 2001 From: luancheng Date: Thu, 12 Mar 2020 12:31:12 +0800 Subject: [PATCH 2/4] address comment --- pkg/task/backup.go | 3 ++- pkg/task/backup_test.go | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 1ba615190..206705504 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -50,7 +50,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) { "The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint") // TODO: remove experimental tag if it's stable - flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts, use for incremental backup, support TSO only") + flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts,"+ + " use for incremental backup, support TSO only") flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+ " e.g. '400036290571534337', '2018-05-11 01:42:23' ") } diff --git a/pkg/task/backup_test.go b/pkg/task/backup_test.go index da61b8827..4a9e64646 100644 --- a/pkg/task/backup_test.go +++ b/pkg/task/backup_test.go @@ -2,6 +2,7 @@ package task import ( "testing" + "time" . "github.com/pingcap/check" ) @@ -28,7 +29,9 @@ func (s *testBackupSuite) TestParseTSString(c *C) { c.Assert(err, IsNil) c.Assert(int(ts), Equals, 400036290571534337) + utcTS := 400024965742563200 + _, offset := time.Now().Local().Zone() ts, err = parseTSString("2018-05-11 01:42:23") c.Assert(err, IsNil) - c.Assert(int(ts), Equals, 400024965742592000) + c.Assert(int(ts), Equals, utcTS+offset) } From 2f7b476bef0498bb0f489a8591cb131f2e9449c5 Mon Sep 17 00:00:00 2001 From: luancheng Date: Thu, 12 Mar 2020 14:33:07 +0800 Subject: [PATCH 3/4] address comment --- pkg/task/backup.go | 6 ++++-- pkg/task/backup_test.go | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 206705504..1d3c048cb 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -226,14 +226,16 @@ func parseTSString(ts string) (uint64, error) { if tso, err := strconv.ParseUint(ts, 10, 64); err == nil { return tso, nil } + + loc := time.Local sc := &stmtctx.StatementContext{ - TimeZone: time.Local, + TimeZone: loc, } t, err := types.ParseTime(sc, ts, mysql.TypeTimestamp, types.MaxFsp) if err != nil { return 0, errors.Trace(err) } - t1, err := t.GoTime(time.Local) + t1, err := t.GoTime(loc) if err != nil { return 0, errors.Trace(err) } diff --git a/pkg/task/backup_test.go b/pkg/task/backup_test.go index 4a9e64646..50288f307 100644 --- a/pkg/task/backup_test.go +++ b/pkg/task/backup_test.go @@ -29,9 +29,8 @@ func (s *testBackupSuite) TestParseTSString(c *C) { c.Assert(err, IsNil) c.Assert(int(ts), Equals, 400036290571534337) - utcTS := 400024965742563200 _, offset := time.Now().Local().Zone() ts, err = parseTSString("2018-05-11 01:42:23") c.Assert(err, IsNil) - c.Assert(int(ts), Equals, utcTS+offset) + c.Assert(int(ts), Equals, 400032515489792000 - (offset*1000) << 18) } From 1460b25f7564bb24c344a2338216fdd9ba501624 Mon Sep 17 00:00:00 2001 From: luancheng Date: Thu, 12 Mar 2020 14:40:02 +0800 Subject: [PATCH 4/4] fix space --- pkg/task/backup.go | 2 +- pkg/task/backup_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 1d3c048cb..ab22c9039 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -53,7 +53,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts,"+ " use for incremental backup, support TSO only") flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+ - " e.g. '400036290571534337', '2018-05-11 01:42:23' ") + " e.g. '400036290571534337', '2018-05-11 01:42:23'") } // ParseFromFlags parses the backup-related flags from the flag set. diff --git a/pkg/task/backup_test.go b/pkg/task/backup_test.go index 50288f307..6bd60515b 100644 --- a/pkg/task/backup_test.go +++ b/pkg/task/backup_test.go @@ -32,5 +32,5 @@ func (s *testBackupSuite) TestParseTSString(c *C) { _, offset := time.Now().Local().Zone() ts, err = parseTSString("2018-05-11 01:42:23") c.Assert(err, IsNil) - c.Assert(int(ts), Equals, 400032515489792000 - (offset*1000) << 18) + c.Assert(int(ts), Equals, 400032515489792000-(offset*1000)<<18) }