Skip to content

Commit

Permalink
Merge branch 'release-8.1' into cherry-pick-57178-to-release-8.1
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Dec 13, 2024
2 parents fe65c81 + 2a6bd46 commit 8134411
Show file tree
Hide file tree
Showing 114 changed files with 2,238 additions and 710 deletions.
60 changes: 30 additions & 30 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5338,26 +5338,26 @@ def go_deps():
name = "com_github_onsi_ginkgo_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/onsi/ginkgo/v2",
sha256 = "f41e92baa52ec53d482603e4585c0906ca0c02e05004dca78a62bf1de88833ad",
strip_prefix = "github.com/onsi/ginkgo/v2@v2.9.4",
sha256 = "4865aab6c56b0d29a93cfe56206b586f1c9f36fde5a66e85650576344861b7cc",
strip_prefix = "github.com/onsi/ginkgo/v2@v2.13.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
"http://ats.apps.svc/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
"https://cache.hawkingrei.com/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
"http://ats.apps.svc/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
],
)
go_repository(
name = "com_github_onsi_gomega",
build_file_proto_mode = "disable_global",
importpath = "github.com/onsi/gomega",
sha256 = "ea2b22782cc15569645dfdfc066a651e1335626677ad92d7ba4358a0885bf369",
strip_prefix = "github.com/onsi/gomega@v1.20.1",
sha256 = "923e8d0a1f95b3989f31c45142dee0b80a0aaa00cfa210bbd4d059f7046d12a8",
strip_prefix = "github.com/onsi/gomega@v1.29.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
"http://ats.apps.svc/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
"https://cache.hawkingrei.com/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
"http://ats.apps.svc/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
],
)
go_repository(
Expand Down Expand Up @@ -6807,13 +6807,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "6701afd9ef373b22010ff1c3aeb91fca8a6165341c6a38dd31a00ed10d24f314",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240913090512-3777c384feb1",
sha256 = "1fd861359a159d20435b7e4ed81f520b7235941aa2c5e059e05f338f1f781664",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241125064226-08d0b3b826b3",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240913090512-3777c384feb1.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240913090512-3777c384feb1.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240913090512-3777c384feb1.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240913090512-3777c384feb1.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064226-08d0b3b826b3.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064226-08d0b3b826b3.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064226-08d0b3b826b3.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064226-08d0b3b826b3.zip",
],
)
go_repository(
Expand Down Expand Up @@ -9268,26 +9268,26 @@ def go_deps():
name = "io_k8s_api",
build_file_proto_mode = "disable_global",
importpath = "k8s.io/api",
sha256 = "2255428d2347df0b3a9cf6ac2791f5be6653b3c642359736e46733584d917335",
strip_prefix = "k8s.io/api@v0.28.6",
sha256 = "ae7b519f36431bc55fa56c47a51c1c37cf9e0df7e9d23741b3e839426d2627ff",
strip_prefix = "k8s.io/api@v0.29.11",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
"http://ats.apps.svc/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
"https://cache.hawkingrei.com/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
"http://ats.apps.svc/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
"https://cache.hawkingrei.com/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
],
)
go_repository(
name = "io_k8s_apimachinery",
build_file_proto_mode = "disable_global",
importpath = "k8s.io/apimachinery",
sha256 = "efc7e38cb4662d0b6c5648772e1ae92040a4d03af0a3a7731aedf17f8eab7359",
strip_prefix = "k8s.io/apimachinery@v0.28.6",
sha256 = "8dd5f53bf72f7bd6323bcc8f9f45823b30fc350daee4ab2d9e27cf1960d06b25",
strip_prefix = "k8s.io/apimachinery@v0.29.11",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
"http://ats.apps.svc/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
"https://cache.hawkingrei.com/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
"http://ats.apps.svc/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
"https://cache.hawkingrei.com/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
],
)
go_repository(
Expand Down
47 changes: 43 additions & 4 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,26 @@ type Checksum struct {
// ProgressUnit represents the unit of progress.
type ProgressUnit string

type StoreBasedErr struct {
storeID uint64
err error
}

func (e *StoreBasedErr) Error() string {
return fmt.Sprintf("Store ID '%d': %v", e.storeID, e.err.Error())
}

func (e *StoreBasedErr) Unwrap() error {
return e.err
}

const (
// backupFineGrainedMaxBackoff is 1 hour.
// given it begins the fine-grained backup, there must be some problems in the cluster.
// We need to be more patient.
backupFineGrainedMaxBackoff = 3600000
backupRetryTimes = 5
disconnectRetryTimeout = 20000
// RangeUnit represents the progress updated counter when a range finished.
RangeUnit ProgressUnit = "range"
// RegionUnit represents the progress updated counter when a region finished.
Expand Down Expand Up @@ -1121,6 +1135,7 @@ func (bc *Client) fineGrainedBackup(
})

bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown)
maxDisconnect := make(map[uint64]uint)
for {
// Step1, check whether there is any incomplete range
incomplete := pr.Res.GetIncompleteRange(req.StartKey, req.EndKey)
Expand Down Expand Up @@ -1168,8 +1183,19 @@ func (bc *Client) fineGrainedBackup(
for {
select {
case err := <-errCh:
// TODO: should we handle err here?
return errors.Trace(err)
if !berrors.Is(err, berrors.ErrFailedToConnect) {
return errors.Trace(err)
}
storeErr, ok := err.(*StoreBasedErr)
if !ok {
return errors.Trace(err)
}

storeID := storeErr.storeID
maxDisconnect[storeID]++
if maxDisconnect[storeID] > backupRetryTimes {
return errors.Annotatef(err, "Failed to connect to store %d more than %d times", storeID, backupRetryTimes)
}
case resp, ok := <-respCh:
if !ok {
// Finished.
Expand Down Expand Up @@ -1270,12 +1296,22 @@ func (bc *Client) handleFineGrained(
storeID := targetPeer.GetStoreId()
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)

// inject a disconnect failpoint
failpoint.Inject("disconnect", func(_ failpoint.Value) {
logutil.CL(ctx).Warn("This is a injected disconnection error")
err = berrors.ErrFailedToConnect
})

if err != nil {
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
return disconnectRetryTimeout, &StoreBasedErr{
storeID: storeID,
err: err,
}
}

logutil.CL(ctx).Error("fail to connect store", zap.Uint64("StoreID", storeID))
Expand Down Expand Up @@ -1314,7 +1350,10 @@ func (bc *Client) handleFineGrained(
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
return disconnectRetryTimeout, &StoreBasedErr{
storeID: storeID,
err: err,
}
}
logutil.CL(ctx).Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"golang.org/x/sync/errgroup"
)

const CheckpointDir = "/checkpoints"
const CheckpointDir = "checkpoints"

type flushPosition struct {
CheckpointDataDir string
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,16 +732,20 @@ func ReadBackupMeta(
// flagToZapField checks whether this flag can be logged,
// if need to log, return its zap field. Or return a field with hidden value.
func flagToZapField(f *pflag.Flag) zap.Field {
if f.Name == flagStorage {
switch f.Name {
case flagStorage, FlagStreamFullBackupStorage:
hiddenQuery, err := url.Parse(f.Value.String())
if err != nil {
return zap.String(f.Name, "<invalid URI>")
}
// hide all query here.
hiddenQuery.RawQuery = ""
return zap.Stringer(f.Name, hiddenQuery)
case flagCipherKey, "azblob.encryption-key":
return zap.String(f.Name, "<redacted>")
default:
return zap.Stringer(f.Name, f.Value)
}
return zap.Stringer(f.Name, f.Value)
}

// LogArguments prints origin command arguments.
Expand Down
55 changes: 49 additions & 6 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,56 @@ func (f fakeValue) Type() string {
}

func TestUrlNoQuery(t *testing.T) {
flag := &pflag.Flag{
Name: flagStorage,
Value: fakeValue("s3://some/what?secret=a123456789&key=987654321"),
testCases := []struct {
inputName string
expectedName string
inputValue string
expectedValue string
}{
{
inputName: flagSendCreds,
expectedName: "send-credentials-to-tikv",
inputValue: "true",
expectedValue: "true",
},
{
inputName: flagStorage,
expectedName: "storage",
inputValue: "s3://some/what?secret=a123456789&key=987654321",
expectedValue: "s3://some/what",
},
{
inputName: FlagStreamFullBackupStorage,
expectedName: "full-backup-storage",
inputValue: "s3://bucket/prefix/?access-key=1&secret-key=2",
expectedValue: "s3://bucket/prefix/",
},
{
inputName: flagCipherKey,
expectedName: "crypter.key",
inputValue: "537570657253656372657456616C7565",
expectedValue: "<redacted>",
},
{
inputName: "azblob.encryption-key",
expectedName: "azblob.encryption-key",
inputValue: "SUPERSECRET_AZURE_ENCRYPTION_KEY",
expectedValue: "<redacted>",
},
}

for _, tc := range testCases {
flag := pflag.Flag{
Name: tc.inputName,
Value: fakeValue(tc.inputValue),
}
field := flagToZapField(&flag)
require.Equal(t, tc.expectedName, field.Key, `test-case [%s="%s"]`, tc.expectedName, tc.expectedValue)
if stringer, ok := field.Interface.(fmt.Stringer); ok {
field.String = stringer.String()
}
require.Equal(t, tc.expectedValue, field.String, `test-case [%s="%s"]`, tc.expectedName, tc.expectedValue)
}
field := flagToZapField(flag)
require.Equal(t, flagStorage, field.Key)
require.Equal(t, "s3://some/what", field.Interface.(fmt.Stringer).String())
}

func TestTiDBConfigUnchanged(t *testing.T) {
Expand Down
34 changes: 34 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ const (
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

const (
waitInfoSchemaReloadCheckInterval = 1 * time.Second
// a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early
waitInfoSchemaReloadTimeout = 15 * time.Minute
)

var (
StreamStart = "log start"
StreamStop = "log stop"
Expand Down Expand Up @@ -1445,6 +1451,21 @@ func restoreStream(
return errors.Annotate(err, "failed to restore kv files")
}

// failpoint to stop for a while after restoring kvs
// this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh
failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep but not sure what's the better way right now
log.Info("sleep after restoring kv")
time.Sleep(2 * time.Second)
}
})

// make sure schema reload finishes before proceeding
if err = waitUntilSchemaReload(ctx, client); err != nil {
return errors.Trace(err)
}

if err = client.CleanUpKVFiles(ctx); err != nil {
return errors.Annotate(err, "failed to clean up")
}
Expand Down Expand Up @@ -1869,3 +1890,16 @@ func checkPiTRTaskInfo(

return curTaskInfo, doFullRestore, nil
}

func waitUntilSchemaReload(ctx context.Context, client *restore.Client) error {
log.Info("waiting for schema info finishes reloading")
reloadStart := time.Now()
conditionFunc := func() bool {
return !client.GetDomain().IsLeaseExpired()
}
if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil {
return errors.Annotate(err, "failed to wait until schema reload")
}
log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart)))
return nil
}
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"safe_point.go",
"schema.go",
"store_manager.go",
"wait.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServic
log.Debug("update PD safePoint limit with TTL", zap.Object("safePoint", sp))

lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, sp.ID, sp.TTL, sp.BackupTS-1)
if lastSafePoint > sp.BackupTS-1 {
if lastSafePoint > sp.BackupTS-1 && sp.TTL > 0 {
log.Warn("service GC safe point lost, we may fail to back up if GC lifetime isn't long enough",
zap.Uint64("lastSafePoint", lastSafePoint),
zap.Object("safePoint", sp),
Expand Down
Loading

0 comments on commit 8134411

Please sign in to comment.