diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index e2bf99c026f..3ccb2635e9f 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -8,21 +8,11 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 20 steps: - - uses: actions/setup-go@v3 - with: - go-version: '1.21' - name: Checkout code - uses: actions/checkout@v3 - - name: Restore cache - uses: actions/cache@v3 + uses: actions/checkout@v4 + - uses: actions/setup-go@v5 with: - path: | - ~/go/pkg/mod - ~/.cache/go-build - **/.dashboard_download_cache - key: ${{ runner.os }}-golang-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-golang + go-version: '1.21' - name: Make Check run: | SWAGGER=1 make build diff --git a/.github/workflows/label.yaml b/.github/workflows/label.yaml index 5ff2b895528..00438d26b63 100644 --- a/.github/workflows/label.yaml +++ b/.github/workflows/label.yaml @@ -7,7 +7,7 @@ jobs: add_labels: runs-on: ubuntu-latest steps: - - uses: actions/github-script@v4 + - uses: actions/github-script@v7 name: Add labels with: script: | diff --git a/.github/workflows/pd-docker-image.yaml b/.github/workflows/pd-docker-image.yaml index 2a04c030016..5beaa66c156 100644 --- a/.github/workflows/pd-docker-image.yaml +++ b/.github/workflows/pd-docker-image.yaml @@ -15,10 +15,10 @@ jobs: strategy: fail-fast: true steps: - - uses: actions/setup-go@v3 + - name: Checkout code + uses: actions/checkout@v4 + - uses: actions/setup-go@v5 with: go-version: '1.21' - - name: Checkout code - uses: actions/checkout@v3 - name: Make run: make docker-image diff --git a/.github/workflows/pd-tests.yaml b/.github/workflows/pd-tests.yaml index 1508c1a1457..9084c7545a8 100644 --- a/.github/workflows/pd-tests.yaml +++ b/.github/workflows/pd-tests.yaml @@ -29,20 +29,11 @@ jobs: outputs: job-total: 13 steps: - - uses: actions/setup-go@v3 - with: - go-version: '1.21' - name: Checkout code - uses: actions/checkout@v3 - - name: Restore cache - uses: actions/cache@v3 + uses: actions/checkout@v4 + - uses: actions/setup-go@v5 with: - path: | - ~/go/pkg/mod - ~/.cache/go-build - **/.tools - **/.dashboard_download_cache - key: ${{ runner.os }}-go-${{ matrix.worker_id }}-${{ hashFiles('**/go.sum') }} + go-version: '1.21' - name: Make Test env: WORKER_ID: ${{ matrix.worker_id }} @@ -53,20 +44,21 @@ jobs: mv covprofile covprofile_$WORKER_ID sed -i "/failpoint_binding/d" covprofile_$WORKER_ID - name: Upload coverage result ${{ matrix.worker_id }} - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: - name: cover-reports + name: cover-reports-${{ matrix.worker_id }} path: covprofile_${{ matrix.worker_id }} report-coverage: needs: chunks runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Download chunk report - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: - name: cover-reports + pattern: cover-reports-* + merge-multiple: true - name: Merge env: TOTAL_JOBS: ${{needs.chunks.outputs.job-total}} diff --git a/.github/workflows/tso-consistency-test.yaml b/.github/workflows/tso-consistency-test.yaml index 570cbbc5da8..3cb24898a10 100644 --- a/.github/workflows/tso-consistency-test.yaml +++ b/.github/workflows/tso-consistency-test.yaml @@ -8,10 +8,10 @@ jobs: tso-consistency-test: runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v3 + - name: Checkout code + uses: actions/checkout@v4 + - uses: actions/setup-go@v5 with: go-version: '1.21' - - name: Checkout code - uses: actions/checkout@v3 - name: Make TSO Consistency Test run: make test-tso-consistency diff --git a/.github/workflows/tso-function-test.yaml b/.github/workflows/tso-function-test.yaml index d7780425d30..13fd6fe7df6 100644 --- a/.github/workflows/tso-function-test.yaml +++ b/.github/workflows/tso-function-test.yaml @@ -21,10 +21,10 @@ jobs: tso-function-test: runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v3 + - name: Checkout code + uses: actions/checkout@v4 + - uses: actions/setup-go@v5 with: go-version: '1.21' - - name: Checkout code - uses: actions/checkout@v3 - name: Make TSO Function Test run: make test-tso-function diff --git a/client/client.go b/client/client.go index 1852b77e4c6..eaebef7e10c 100644 --- a/client/client.go +++ b/client/client.go @@ -798,23 +798,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur defer span.Finish() } - req := c.getTSORequest(ctx, dcLocation) - if err := c.dispatchTSORequestWithRetry(req); err != nil { - req.tryDone(err) - } - return req -} - -func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { - req := tsoReqPool.Get().(*tsoRequest) - // Set needed fields in the request before using it. - req.start = time.Now() - req.clientCtx = c.ctx - req.requestCtx = ctx - req.physical = 0 - req.logical = 0 - req.dcLocation = dcLocation - return req + return c.dispatchTSORequestWithRetry(ctx, dcLocation) } const ( @@ -822,10 +806,11 @@ const ( dispatchRetryCount = 2 ) -func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error { +func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation string) TSFuture { var ( retryable bool err error + req *tsoRequest ) for i := 0; i < dispatchRetryCount; i++ { // Do not delay for the first time. @@ -838,12 +823,22 @@ func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error { err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil") continue } + // Get a new request from the pool if it's nil or not from the current pool. + if req == nil || req.pool != tsoClient.tsoReqPool { + req = tsoClient.getTSORequest(ctx, dcLocation) + } retryable, err = tsoClient.dispatchRequest(req) if !retryable { break } } - return err + if err != nil { + if req == nil { + return newTSORequestFastFail(err) + } + req.tryDone(err) + } + return req } func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) { diff --git a/client/tso_client.go b/client/tso_client.go index 5f8b12df36f..8185b99d1d0 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -43,33 +43,6 @@ type TSOClient interface { GetMinTS(ctx context.Context) (int64, int64, error) } -type tsoRequest struct { - start time.Time - clientCtx context.Context - requestCtx context.Context - done chan error - physical int64 - logical int64 - dcLocation string -} - -var tsoReqPool = sync.Pool{ - New: func() any { - return &tsoRequest{ - done: make(chan error, 1), - physical: 0, - logical: 0, - } - }, -} - -func (req *tsoRequest) tryDone(err error) { - select { - case req.done <- err: - default: - } -} - type tsoClient struct { ctx context.Context cancel context.CancelFunc @@ -84,6 +57,8 @@ type tsoClient struct { // tso allocator leader is switched. tsoAllocServingURLSwitchedCallback []func() + // tsoReqPool is the pool to recycle `*tsoRequest`. + tsoReqPool *sync.Pool // tsoDispatcher is used to dispatch different TSO requests to // the corresponding dc-location TSO channel. tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher @@ -104,11 +79,20 @@ func newTSOClient( ) *tsoClient { ctx, cancel := context.WithCancel(ctx) c := &tsoClient{ - ctx: ctx, - cancel: cancel, - option: option, - svcDiscovery: svcDiscovery, - tsoStreamBuilderFactory: factory, + ctx: ctx, + cancel: cancel, + option: option, + svcDiscovery: svcDiscovery, + tsoStreamBuilderFactory: factory, + tsoReqPool: &sync.Pool{ + New: func() any { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, + } + }, + }, checkTSDeadlineCh: make(chan struct{}), checkTSODispatcherCh: make(chan struct{}, 1), updateTSOConnectionCtxsCh: make(chan struct{}, 1), @@ -155,6 +139,19 @@ func (c *tsoClient) Close() { log.Info("tso client is closed") } +func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { + req := c.tsoReqPool.Get().(*tsoRequest) + // Set needed fields in the request before using it. + req.start = time.Now() + req.pool = c.tsoReqPool + req.requestCtx = ctx + req.clientCtx = c.ctx + req.physical = 0 + req.logical = 0 + req.dcLocation = dcLocation + return req +} + // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map func (c *tsoClient) GetTSOAllocators() *sync.Map { return &c.tsoAllocators diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index ad3aa1c5d74..d02fdd52af8 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -115,38 +115,6 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { return false, nil } -// TSFuture is a future which promises to return a TSO. -type TSFuture interface { - // Wait gets the physical and logical time, it would block caller if data is not available yet. - Wait() (int64, int64, error) -} - -func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { - // If tso command duration is observed very high, the reason could be it - // takes too long for Wait() be called. - start := time.Now() - cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds()) - select { - case err = <-req.done: - defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End() - err = errors.WithStack(err) - defer tsoReqPool.Put(req) - if err != nil { - cmdFailDurationTSO.Observe(time.Since(req.start).Seconds()) - return 0, 0, err - } - physical, logical = req.physical, req.logical - now := time.Now() - cmdDurationWait.Observe(now.Sub(start).Seconds()) - cmdDurationTSO.Observe(now.Sub(req.start).Seconds()) - return - case <-req.requestCtx.Done(): - return 0, 0, errors.WithStack(req.requestCtx.Err()) - case <-req.clientCtx.Done(): - return 0, 0, errors.WithStack(req.clientCtx.Err()) - } -} - func (c *tsoClient) updateTSODispatcher() { // Set up the new TSO dispatcher and batch controller. c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { diff --git a/client/tso_request.go b/client/tso_request.go new file mode 100644 index 00000000000..f30ceb5268a --- /dev/null +++ b/client/tso_request.go @@ -0,0 +1,96 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "runtime/trace" + "sync" + "time" + + "github.com/pingcap/errors" +) + +// TSFuture is a future which promises to return a TSO. +type TSFuture interface { + // Wait gets the physical and logical time, it would block caller if data is not available yet. + Wait() (int64, int64, error) +} + +var ( + _ TSFuture = (*tsoRequest)(nil) + _ TSFuture = (*tsoRequestFastFail)(nil) +) + +type tsoRequest struct { + requestCtx context.Context + clientCtx context.Context + done chan error + physical int64 + logical int64 + dcLocation string + + // Runtime fields. + start time.Time + pool *sync.Pool +} + +// tryDone tries to send the result to the channel, it will not block. +func (req *tsoRequest) tryDone(err error) { + select { + case req.done <- err: + default: + } +} + +// Wait will block until the TSO result is ready. +func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { + // If tso command duration is observed very high, the reason could be it + // takes too long for Wait() be called. + start := time.Now() + cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds()) + select { + case err = <-req.done: + defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End() + defer req.pool.Put(req) + err = errors.WithStack(err) + if err != nil { + cmdFailDurationTSO.Observe(time.Since(req.start).Seconds()) + return 0, 0, err + } + physical, logical = req.physical, req.logical + now := time.Now() + cmdDurationWait.Observe(now.Sub(start).Seconds()) + cmdDurationTSO.Observe(now.Sub(req.start).Seconds()) + return + case <-req.requestCtx.Done(): + return 0, 0, errors.WithStack(req.requestCtx.Err()) + case <-req.clientCtx.Done(): + return 0, 0, errors.WithStack(req.clientCtx.Err()) + } +} + +type tsoRequestFastFail struct { + err error +} + +func newTSORequestFastFail(err error) *tsoRequestFastFail { + return &tsoRequestFastFail{err} +} + +// Wait returns the error directly. +func (req *tsoRequestFastFail) Wait() (physical int64, logical int64, err error) { + return 0, 0, req.err +} diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 355226cd2d8..cdc826a1dda 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -31,7 +31,7 @@ import ( ) // DefaultCacheSize is the default length of waiting list. -const DefaultCacheSize = 1000 +const DefaultCacheSize = 100000 var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny") diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index cfc5196909f..e5b722a488d 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -43,6 +43,9 @@ func init() { // TODO: remove this global variable in the future. // And use a function to create hot schduler for test. schedulePeerPr = 1.0 + // disable denoising in test. + statistics.Denoising = false + statisticsInterval = 0 RegisterScheduler(utils.Write.String(), func(opController *operator.Controller, _ endpoint.ConfigStorage, _ ConfigDecoder, _ ...func(string) error) (Scheduler, error) { cfg := initHotRegionScheduleConfig() return newHotWriteScheduler(opController, cfg), nil @@ -200,10 +203,8 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { func TestSplitIfRegionTooHot(t *testing.T) { re := require.New(t) - statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - tc.SetHotRegionCacheHitsThreshold(1) hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) b := &metapb.Buckets{ @@ -274,9 +275,7 @@ func TestSplitIfRegionTooHot(t *testing.T) { func TestSplitBucketsBySize(t *testing.T) { re := require.New(t) - statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() - tc.SetHotRegionCacheHitsThreshold(1) tc.SetRegionBucketEnabled(true) defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -327,9 +326,7 @@ func TestSplitBucketsBySize(t *testing.T) { func TestSplitBucketsByLoad(t *testing.T) { re := require.New(t) - statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() - tc.SetHotRegionCacheHitsThreshold(1) tc.SetRegionBucketEnabled(true) defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -388,8 +385,6 @@ func TestSplitBucketsByLoad(t *testing.T) { func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */) checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) checkHotWriteRegionPlacement(re, true) @@ -406,7 +401,6 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"}) tc.AddLabelsStore(2, 2, map[string]string{"zone": "z1", "host": "h2"}) @@ -633,12 +627,9 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2)) - tc.SetHotRegionCacheHitsThreshold(0) re.NoError(tc.RuleManager.SetRules([]*placement.Rule{ { GroupID: placement.DefaultGroupID, @@ -853,8 +844,6 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { }() cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - statistics.Denoising = false - statisticsInterval = 0 hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) @@ -863,7 +852,6 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.QueryPriority, utils.BytePriority} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -888,8 +876,6 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -901,7 +887,6 @@ func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { hb.(*hotScheduler).conf.RankFormulaVersion = "v1" hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -948,8 +933,6 @@ func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { func TestHotWriteRegionScheduleUnhealthyStore(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -958,7 +941,6 @@ func TestHotWriteRegionScheduleUnhealthyStore(t *testing.T) { hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -996,8 +978,6 @@ func TestHotWriteRegionScheduleUnhealthyStore(t *testing.T) { func TestHotWriteRegionScheduleCheckHot(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -1006,7 +986,6 @@ func TestHotWriteRegionScheduleCheckHot(t *testing.T) { hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -1031,8 +1010,6 @@ func TestHotWriteRegionScheduleCheckHot(t *testing.T) { func TestHotWriteRegionScheduleWithLeader(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -1041,7 +1018,6 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) { hb.(*hotScheduler).conf.SetHistorySampleDuration(0) re.NoError(err) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -1094,8 +1070,6 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) { func TestHotWriteRegionScheduleWithPendingInfluence(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 checkHotWriteRegionScheduleWithPendingInfluence(re, 0) // 0: byte rate checkHotWriteRegionScheduleWithPendingInfluence(re, 1) // 1: key rate } @@ -1114,7 +1088,6 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim pendingAmpFactor = old }() - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -1189,8 +1162,6 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnablePlacementRules(true) @@ -1199,7 +1170,6 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) key, err := hex.DecodeString("") re.NoError(err) // skip stddev check @@ -1282,7 +1252,6 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { hb := scheduler.(*hotScheduler) hb.conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} hb.conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) // Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0. tc.AddRegionStore(1, 3) @@ -1396,8 +1365,6 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { func TestHotReadRegionScheduleWithQuery(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -1408,7 +1375,6 @@ func TestHotReadRegionScheduleWithQuery(t *testing.T) { hb.(*hotScheduler).conf.RankFormulaVersion = "v1" hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -1432,8 +1398,6 @@ func TestHotReadRegionScheduleWithQuery(t *testing.T) { func TestHotReadRegionScheduleWithKeyRate(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -1445,7 +1409,6 @@ func TestHotReadRegionScheduleWithKeyRate(t *testing.T) { hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -1491,8 +1454,6 @@ func TestHotReadRegionScheduleWithKeyRate(t *testing.T) { func TestHotReadRegionScheduleWithPendingInfluence(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 checkHotReadRegionScheduleWithPendingInfluence(re, 0) // 0: byte rate checkHotReadRegionScheduleWithPendingInfluence(re, 1) // 1: key rate } @@ -1515,7 +1476,6 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim pendingAmpFactor = old }() - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -1614,8 +1574,6 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim func TestHotReadWithEvictLeaderScheduler(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -1625,7 +1583,6 @@ func TestHotReadWithEvictLeaderScheduler(t *testing.T) { hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetStrictPickingStore(false) hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -1657,7 +1614,6 @@ func TestHotCacheUpdateCache(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() - tc.SetHotRegionCacheHitsThreshold(0) // For read flow addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1724,7 +1680,6 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // only a few regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() - tc.SetHotRegionCacheHitsThreshold(0) addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, @@ -1796,7 +1751,6 @@ func TestHotCacheByteAndKey(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() - tc.SetHotRegionCacheHitsThreshold(0) statistics.ThresholdsUpdateInterval = 0 defer func() { statistics.ThresholdsUpdateInterval = 8 * time.Second @@ -2090,8 +2044,6 @@ func TestInfluenceByRWType(t *testing.T) { defer func() { schedulePeerPr = originValue }() - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -2100,7 +2052,6 @@ func TestInfluenceByRWType(t *testing.T) { hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -2214,8 +2165,6 @@ func checkHotReadPeerSchedule(re *require.Assertions, enablePlacementRules bool) func TestHotScheduleWithPriority(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -2231,7 +2180,6 @@ func TestHotScheduleWithPriority(t *testing.T) { stddevThreshold = origin }() - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) @@ -2323,9 +2271,6 @@ func TestHotScheduleWithPriority(t *testing.T) { func TestHotScheduleWithStddev(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -2334,7 +2279,6 @@ func TestHotScheduleWithStddev(t *testing.T) { hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.0) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -2384,9 +2328,6 @@ func TestHotScheduleWithStddev(t *testing.T) { func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -2396,7 +2337,6 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { hb.(*hotScheduler).conf.SetHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -2428,9 +2368,6 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { func TestCompatibility(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index 78a30cebaca..25d6d94f7b1 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -33,8 +32,6 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - statistics.Denoising = false - statisticsInterval = 0 sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) @@ -43,7 +40,6 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { hb.conf.SetRankFormulaVersion("v1") hb.conf.SetHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -96,8 +92,6 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - statistics.Denoising = false - sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) @@ -106,7 +100,6 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { hb.conf.SetRankFormulaVersion("v1") hb.conf.SetHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -148,9 +141,6 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -2. re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) @@ -161,7 +151,6 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { hb.conf.SetRankFormulaVersion("v1") hb.conf.SetHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -212,9 +201,6 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -1. re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) @@ -225,7 +211,6 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { hb.conf.SetRankFormulaVersion("v1") hb.conf.SetHistorySampleDuration(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -275,9 +260,6 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { func TestSkipUniformStore(t *testing.T) { re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 - cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) @@ -287,7 +269,6 @@ func TestSkipUniformStore(t *testing.T) { hb.(*hotScheduler).conf.SetRankFormulaVersion("v2") hb.(*hotScheduler).conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) @@ -439,7 +420,6 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo addOtherRegions func(*mockcluster.Cluster, *hotScheduler)) []*operator.Operator { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - statistics.Denoising = false sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) @@ -447,7 +427,6 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo hb.conf.SetDstToleranceRatio(1) hb.conf.SetRankFormulaVersion("v2") hb.conf.ReadPriorities = []string{utils.QueryPriority, utils.BytePriority} - tc.SetHotRegionCacheHitsThreshold(0) tc.AddRegionStore(1, 40) tc.AddRegionStore(2, 10) tc.AddRegionStore(3, 10) @@ -470,7 +449,6 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo } } addRegionInfo(tc, utils.Read, regions) - tc.SetHotRegionCacheHitsThreshold(1) addOtherRegions(tc, hb) ops, _ := hb.Schedule(tc, false) return ops diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 77c190ad943..d30ef3ad0aa 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -46,6 +46,7 @@ func prepareSchedulersTest(needToRunStream ...bool) (context.CancelFunc, config. stream = hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, needToRunStream[0]) } oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetSchedulerConfig(), stream) + tc.SetHotRegionCacheHitsThreshold(1) return cancel, opt, tc, oc } @@ -183,7 +184,6 @@ func checkBalance(re *require.Assertions, enablePlacementRules bool) { tc.AddLeaderRegionWithWriteInfo(1, 1, 512*units.KiB*utils.RegionHeartBeatReportInterval, 0, 0, utils.RegionHeartBeatReportInterval, []uint64{2, 3}) tc.AddLeaderRegionWithWriteInfo(2, 1, 512*units.KiB*utils.RegionHeartBeatReportInterval, 0, 0, utils.RegionHeartBeatReportInterval, []uint64{3, 4}) tc.AddLeaderRegionWithWriteInfo(3, 1, 512*units.KiB*utils.RegionHeartBeatReportInterval, 0, 0, utils.RegionHeartBeatReportInterval, []uint64{2, 4}) - tc.SetHotRegionCacheHitsThreshold(0) // try to get an operator var ops []*operator.Operator @@ -218,7 +218,6 @@ func TestHotRegionScheduleAbnormalReplica(t *testing.T) { tc.AddRegionWithReadInfo(1, 1, 512*units.KiB*utils.StoreHeartBeatReportInterval, 0, 0, utils.StoreHeartBeatReportInterval, []uint64{2}) tc.AddRegionWithReadInfo(2, 2, 512*units.KiB*utils.StoreHeartBeatReportInterval, 0, 0, utils.StoreHeartBeatReportInterval, []uint64{1, 3}) tc.AddRegionWithReadInfo(3, 1, 512*units.KiB*utils.StoreHeartBeatReportInterval, 0, 0, utils.StoreHeartBeatReportInterval, []uint64{2, 3}) - tc.SetHotRegionCacheHitsThreshold(0) re.True(tc.IsRegionHot(tc.GetRegion(1))) re.False(hb.IsScheduleAllowed(tc)) } @@ -318,7 +317,6 @@ func TestSpecialUseHotRegion(t *testing.T) { hs, err := CreateScheduler(utils.Write.String(), oc, storage, cd) re.NoError(err) - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 10) tc.AddRegionStore(2, 4) @@ -368,7 +366,6 @@ func TestSpecialUseReserved(t *testing.T) { bs, err := CreateScheduler(BalanceRegionType, oc, storage, cd) re.NoError(err) - tc.SetHotRegionCacheHitsThreshold(0) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.AddRegionStore(1, 10) tc.AddRegionStore(2, 4) diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 488763142e1..cb0de6f601b 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -22,6 +22,7 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/syncutil" + "go.uber.org/zap" ) // RegionInfoProvider is an interface to provide the region information. @@ -250,6 +251,7 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.startDownPeerTS)) } else { info.startDownPeerTS = time.Now().Unix() + logDownPeerWithNoDisconnectedStore(region, stores) } } else if typ == MissPeer && len(region.GetVoters()) < desiredVoters { if info.startMissVoterPeerTS != 0 { @@ -440,3 +442,24 @@ func notIsolatedStoresWithLabel(stores []*core.StoreInfo, label string) [][]*cor } return res } + +// logDownPeerWithNoDisconnectedStore logs down peers on connected stores. +// It won't log down peer when any store of the replica is disconnected which is +// used to avoid too many logs when a store is disconnected. +// TODO: it's not a good way to log down peer during process region heartbeat, we should handle it in another way. +// region: the region which has down peer +// stores: all stores that the region has peer on them +func logDownPeerWithNoDisconnectedStore(region *core.RegionInfo, stores []*core.StoreInfo) { + for _, store := range stores { + if store.IsDisconnected() { + return + } + } + for _, p := range region.GetDownPeers() { + log.Warn("region has down peer on connected store", + zap.Uint64("region-id", region.GetID()), + zap.Uint64("down-peer", p.GetPeer().GetId()), + zap.Uint64("down-seconds", p.GetDownSeconds()), + zap.Uint64("store-id", p.GetPeer().GetStoreId())) + } +} diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 6ddeafe4573..c402081fa2f 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -172,6 +172,7 @@ func TestEtcdClientSync(t *testing.T) { servers, client1, clean := NewTestEtcdCluster(t, 1) defer clean() etcd1, cfg1 := servers[0], servers[0].Config() + defer etcd1.Close() // Add a new member. etcd2 := MustAddEtcdMember(t, &cfg1, client1) @@ -180,10 +181,22 @@ func TestEtcdClientSync(t *testing.T) { // wait for etcd client sync endpoints checkEtcdEndpointNum(re, client1, 2) - // Remove the first member and close the etcd1. - _, err := RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + // remove one member that is not the one we connected to. + resp, err := ListEtcdMembers(ctx, client1) + re.NoError(err) + + var memIDToRemove uint64 + for _, m := range resp.Members { + if m.ID != resp.Header.MemberId { + memIDToRemove = m.ID + break + } + } + + _, err = RemoveEtcdMember(client1, memIDToRemove) re.NoError(err) - etcd1.Close() // Check the client can get the new member with the new endpoints. checkEtcdEndpointNum(re, client1, 1) @@ -300,11 +313,10 @@ func checkEtcdWithHangLeader(t *testing.T) error { etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) - time.Sleep(1 * time.Second) // wait for etcd client sync endpoints // Hang the etcd1 and wait for the client to connect to etcd2. enableDiscard.Store(true) - time.Sleep(time.Second) + time.Sleep(3 * time.Second) _, err = EtcdKVGet(client1, "test/key1") return err } @@ -366,7 +378,8 @@ func ioCopy(ctx context.Context, dst io.Writer, src io.Reader, enableDiscard *at return nil default: if enableDiscard.Load() { - io.Copy(io.Discard, src) + _, err := io.Copy(io.Discard, src) + return err } readNum, errRead := src.Read(buffer) if readNum > 0 { diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index 3ea4d057645..d9464eeceeb 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -122,18 +122,27 @@ func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) { // Check the client can get the new member. - listResp, err := ListEtcdMembers(client.Ctx(), client) - re.NoError(err) - re.Len(listResp.Members, len(etcds)) - inList := func(m *etcdserverpb.Member) bool { - for _, etcd := range etcds { - if m.ID == uint64(etcd.Server.ID()) { - return true + testutil.Eventually(re, func() bool { + listResp, err := ListEtcdMembers(client.Ctx(), client) + if err != nil { + return false + } + if len(etcds) != len(listResp.Members) { + return false + } + inList := func(m *etcdserverpb.Member) bool { + for _, etcd := range etcds { + if m.ID == uint64(etcd.Server.ID()) { + return true + } } + return false } - return false - } - for _, m := range listResp.Members { - re.True(inList(m)) - } + for _, m := range listResp.Members { + if !inList(m) { + return false + } + } + return true + }) } diff --git a/server/api/admin_test.go b/server/api/admin_test.go index 3a628a1de61..f3b3dd64bd3 100644 --- a/server/api/admin_test.go +++ b/server/api/admin_test.go @@ -181,7 +181,7 @@ func (suite *adminTestSuite) TestPersistFile() { func makeTS(offset time.Duration) uint64 { physical := time.Now().Add(offset).UnixNano() / int64(time.Millisecond) - return uint64(physical << 18) + return uint64(physical) << 18 } func (suite *adminTestSuite) TestResetTS() { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index d01446ba143..ecd579d8881 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3480,10 +3480,11 @@ func TestStoreOverloadedWithReplace(t *testing.T) { re.False(oc.AddOperator(op3)) ops, _ := lb.Schedule(tc, false /* dryRun */) re.Empty(ops) - // sleep 2 seconds to make sure that token is filled up - time.Sleep(2 * time.Second) - ops, _ = lb.Schedule(tc, false /* dryRun */) - re.NotEmpty(ops) + // make sure that token is filled up + testutil.Eventually(re, func() bool { + ops, _ = lb.Schedule(tc, false /* dryRun */) + return len(ops) != 0 + }) } func TestDownStoreLimit(t *testing.T) { diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index be51123532d..e9033e5016a 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -647,11 +647,14 @@ func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) { Version: "2.0.0", }, } + // prevent the offline store from changing to tombstone + tests.MustPutRegion(re, cluster, 3, 6, []byte("a"), []byte("b")) for _, store := range stores { tests.MustPutStore(re, cluster, store) + if store.GetId() == 6 { + cluster.GetLeaderServer().GetRaftCluster().GetBasicCluster().UpdateStoreStatus(6) + } } - // prevent the offline store from changing to tombstone - tests.MustPutRegion(re, cluster, 3, 6, []byte("a"), []byte("b")) // Test /stores apiServerAddr := cluster.GetLeaderServer().GetAddr() urlPrefix := fmt.Sprintf("%s/pd/api/v1/stores", apiServerAddr) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 9194811cd37..909972f0315 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -781,13 +781,12 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault Keyspaces: []uint32{uint32(i)}, }) keyspaces = append(keyspaces, uint32(i)) - if len(keyspaceGroups) < etcdutil.MaxEtcdTxnOps/2 && i != keyspaceGroupNum { + if i != keyspaceGroupNum { continue } handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: keyspaceGroups, }) - keyspaceGroups = keyspaceGroups[:0] } // Check if all the keyspace groups are created. groups := handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0") diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index ac3d914aa80..5590ba68d37 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -152,12 +152,12 @@ func (suite *tsoServerTestSuite) TestConcurrentlyReset() { for i := 0; i < 2; i++ { go func() { defer wg.Done() - for j := 0; j <= 100; j++ { + for j := 0; j <= 50; j++ { // Get a copy of now then call base.add, because now is shared by all goroutines // and now.add() will add to itself which isn't atomic and multi-goroutine safe. base := now - physical := base.Add(time.Duration(2*j)*time.Minute).UnixNano() / int64(time.Millisecond) - ts := uint64(physical << 18) + physical := base.Add(time.Duration(j)*time.Minute).UnixNano() / int64(time.Millisecond) + ts := uint64(physical) << 18 suite.resetTS(ts, false, false) } }() diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go index d26ce732714..c5682aafbce 100644 --- a/tests/server/apiv2/handlers/testutil.go +++ b/tests/server/apiv2/handlers/testutil.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" ) @@ -168,8 +169,14 @@ func tryCreateKeyspaceGroup(re *require.Assertions, server *tests.TestServer, re // MustLoadKeyspaceGroupByID loads the keyspace group by ID with HTTP API. func MustLoadKeyspaceGroupByID(re *require.Assertions, server *tests.TestServer, id uint32) *endpoint.KeyspaceGroup { - kg, code := TryLoadKeyspaceGroupByID(re, server, id) - re.Equal(http.StatusOK, code) + var ( + kg *endpoint.KeyspaceGroup + code int + ) + testutil.Eventually(re, func() bool { + kg, code = TryLoadKeyspaceGroupByID(re, server, id) + return code == http.StatusOK + }) return kg } @@ -232,15 +239,28 @@ func MustSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id // MustFinishSplitKeyspaceGroup finishes a keyspace group split with HTTP API. func MustFinishSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32) { - httpReq, err := http.NewRequest(http.MethodDelete, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/split", id), http.NoBody) - re.NoError(err) - // Send request. - resp, err := dialClient.Do(httpReq) - re.NoError(err) - defer resp.Body.Close() - data, err := io.ReadAll(resp.Body) - re.NoError(err) - re.Equal(http.StatusOK, resp.StatusCode, string(data)) + testutil.Eventually(re, func() bool { + httpReq, err := http.NewRequest(http.MethodDelete, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/split", id), http.NoBody) + if err != nil { + return false + } + // Send request. + resp, err := dialClient.Do(httpReq) + if err != nil { + return false + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + return false + } + if resp.StatusCode == http.StatusServiceUnavailable || + resp.StatusCode == http.StatusInternalServerError { + return false + } + re.Equal(http.StatusOK, resp.StatusCode, string(data)) + return true + }) } // MustMergeKeyspaceGroup merges keyspace groups with HTTP API. diff --git a/tests/server/config/config_test.go b/tests/server/config/config_test.go index b6fcecbd47b..57e4272f7ea 100644 --- a/tests/server/config/config_test.go +++ b/tests/server/config/config_test.go @@ -32,7 +32,6 @@ import ( tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/pkg/versioninfo" - "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" ) @@ -74,11 +73,7 @@ func TestRateLimitConfigReload(t *testing.T) { oldLeaderName := leader.GetServer().Name() leader.GetServer().GetMember().ResignEtcdLeader(leader.GetServer().Context(), oldLeaderName, "") - var servers []*server.Server - for _, s := range cluster.GetServers() { - servers = append(servers, s.GetServer()) - } - server.MustWaitLeader(re, servers) + re.NotEmpty(cluster.WaitLeader()) leader = cluster.GetLeaderServer() re.NotNil(leader) re.True(leader.GetServer().GetServiceMiddlewarePersistOptions().IsRateLimitEnabled()) diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go index f705bdf12b5..8dd98b1d628 100644 --- a/tests/server/tso/global_tso_test.go +++ b/tests/server/tso/global_tso_test.go @@ -165,7 +165,7 @@ func TestLogicalOverflow(t *testing.T) { re.NoError(err) if i == 1 { // the 2nd request may (but not must) overflow, as max logical interval is 262144 - re.Less(time.Since(begin), updateInterval+20*time.Millisecond) // additional 20ms for gRPC latency + re.Less(time.Since(begin), updateInterval+50*time.Millisecond) // additional 50ms for gRPC latency } } // the 3rd request must overflow diff --git a/tools/pd-ut/go-compile-without-link.sh b/tools/pd-ut/go-compile-without-link.sh new file mode 100755 index 00000000000..88e6282b076 --- /dev/null +++ b/tools/pd-ut/go-compile-without-link.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# See https://gist.github.com/howardjohn/c0f5d0bc293ef7d7fada533a2c9ffaf4 +# Usage: go test -exec=true -toolexec=go-compile-without-link -vet=off ./... +# Preferably as an alias like `alias go-test-compile='go test -exec=true -toolexec=go-compile-without-link -vet=off'` +# This will compile all tests, but not link them (which is the least cacheable part) + +if [[ "${2}" == "-V=full" ]]; then + "$@" + exit 0 +fi +case "$(basename ${1})" in + link) + # Output a dummy file + touch "${3}" + ;; + # We could skip vet as well, but it can be done with -vet=off if desired + *) + "$@" +esac diff --git a/tools/pd-ut/ut.go b/tools/pd-ut/ut.go index 69a83f007b6..7fc96ee11cf 100644 --- a/tools/pd-ut/ut.go +++ b/tools/pd-ut/ut.go @@ -589,8 +589,28 @@ func skipDIR(pkg string) bool { return false } +func generateBuildCache() error { + // cd cmd/pd-server && go test -tags=tso_function_test,deadlock -exec-=true -vet=off -toolexec=go-compile-without-link + cmd := exec.Command("go", "test", "-exec=true", "-vet", "off", "--tags=tso_function_test,deadlock") + goCompileWithoutLink := fmt.Sprintf("-toolexec=%s/tools/pd-ut/go-compile-without-link.sh", workDir) + cmd.Args = append(cmd.Args, goCompileWithoutLink) + cmd.Dir = fmt.Sprintf("%s/cmd/pd-server", workDir) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return withTrace(err) + } + return nil +} + // buildTestBinaryMulti is much faster than build the test packages one by one. func buildTestBinaryMulti(pkgs []string) error { + // staged build, generate the build cache for all the tests first, then generate the test binary. + // This way is faster than generating test binaries directly, because the cache can be used. + if err := generateBuildCache(); err != nil { + return withTrace(err) + } + // go test --exec=xprog --tags=tso_function_test,deadlock -vet=off --count=0 $(pkgs) xprogPath := path.Join(workDir, "bin/xprog") packages := make([]string, 0, len(pkgs))