diff --git a/client/http/client.go b/client/http/client.go index f16a3abed89..1235266a271 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -21,7 +21,6 @@ import ( "encoding/json" "io" "net/http" - "os" "time" "github.com/pingcap/errors" @@ -270,21 +269,6 @@ func WithMetrics( } } -// WithLoggerRedirection configures the client with the given logger redirection. -func WithLoggerRedirection(logLevel, fileName string) ClientOption { - cfg := &log.Config{} - cfg.Level = logLevel - if fileName != "" { - f, _ := os.CreateTemp(".", fileName) - fname := f.Name() - f.Close() - cfg.File.Filename = fname - } - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) - return func(c *client) {} -} - // NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery. func NewClientWithServiceDiscovery( source string, @@ -314,6 +298,10 @@ func NewClient( opt(c) } sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf) + if err := sd.Init(); err != nil { + log.Error("[pd] init service discovery failed", zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err)) + return nil + } c.inner.init(sd) return c } @@ -371,6 +359,7 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts . headerOpts...) } +/* The following functions are only for test */ // requestChecker is used to check the HTTP request sent by the client. type requestChecker func(req *http.Request) error @@ -385,3 +374,21 @@ func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client { Transport: checker, } } + +// newClientWithoutInitServiceDiscovery creates a PD HTTP client +// with the given PD addresses and TLS config without init service discovery. +func newClientWithoutInitServiceDiscovery( + source string, + pdAddrs []string, + opts ...ClientOption, +) Client { + ctx, cancel := context.WithCancel(context.Background()) + c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID} + // Apply the options first. + for _, opt := range opts { + opt(c) + } + sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf) + c.inner.init(sd) + return c +} diff --git a/client/http/client_test.go b/client/http/client_test.go index b9fcb5a75e0..02fce93838e 100644 --- a/client/http/client_test.go +++ b/client/http/client_test.go @@ -40,7 +40,7 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) { } return nil }) - c := NewClient("test-header", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient)) + c := newClientWithoutInitServiceDiscovery("test-header", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient)) c.GetRegions(context.Background()) c.GetHistoryHotRegions(context.Background(), &HistoryHotRegionsRequest{}) c.Close() @@ -58,7 +58,7 @@ func TestCallerID(t *testing.T) { } return nil }) - c := NewClient("test-caller-id", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient)) + c := newClientWithoutInitServiceDiscovery("test-caller-id", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient)) c.GetRegions(context.Background()) expectedVal.Store("test") c.WithCallerID(expectedVal.Load()).GetRegions(context.Background()) @@ -69,7 +69,7 @@ func TestWithBackoffer(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := NewClient("test-with-backoffer", []string{"http://127.0.0.1"}) + c := newClientWithoutInitServiceDiscovery("test-with-backoffer", []string{"http://127.0.0.1"}) base := 100 * time.Millisecond max := 500 * time.Millisecond diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 28bb8bbd661..15ea9cadb46 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -911,7 +911,9 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error { if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 { return errors.WithStack(errNoServiceModeReturned) } - c.serviceModeUpdateCb(clusterInfo.ServiceModes[0]) + if c.serviceModeUpdateCb != nil { + c.serviceModeUpdateCb(clusterInfo.ServiceModes[0]) + } return nil } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 5cfd8fc25f2..5ff5fef0222 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -16,7 +16,6 @@ package client_test import ( "context" - "errors" "math" "net/http" "sort" @@ -24,11 +23,11 @@ import ( "testing" "time" + "github.com/pingcap/errors" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - pdCli "github.com/tikv/pd/client" pd "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" @@ -41,13 +40,27 @@ import ( "github.com/tikv/pd/tests" ) +type mode int + +// We have two ways to create HTTP client. +// 1. using `NewClient` which created `DefaultPDServiceDiscovery` +// 2. using `NewClientWithServiceDiscovery` which pass a `PDServiceDiscovery` as parameter +// test cases should be run in both modes. +const ( + defaultServiceDiscovery mode = iota + specificServiceDiscovery +) + type httpClientTestSuite struct { suite.Suite + env map[mode]*httpClientTestEnv +} + +type httpClientTestEnv struct { ctx context.Context cancelFunc context.CancelFunc cluster *tests.TestCluster - client pd.Client - sd pdCli.ServiceDiscovery + endpoints []string } func TestHTTPClientTestSuite(t *testing.T) { @@ -55,116 +68,150 @@ func TestHTTPClientTestSuite(t *testing.T) { } func (suite *httpClientTestSuite) SetupSuite() { + suite.env = make(map[mode]*httpClientTestEnv) re := suite.Require() - var err error - suite.ctx, suite.cancelFunc = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestCluster(suite.ctx, 2) - re.NoError(err) - err = suite.cluster.RunInitialServers() - re.NoError(err) - leader := suite.cluster.WaitLeader() - re.NotEmpty(leader) - leaderServer := suite.cluster.GetLeaderServer() - err = leaderServer.BootstrapCluster() - re.NoError(err) - for _, region := range []*core.RegionInfo{ - core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), - core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), - } { - err := leaderServer.GetRaftCluster().HandleRegionHeartbeat(region) + + for _, mode := range []mode{defaultServiceDiscovery, specificServiceDiscovery} { + env := &httpClientTestEnv{} + env.ctx, env.cancelFunc = context.WithCancel(context.Background()) + + cluster, err := tests.NewTestCluster(env.ctx, 2) re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) + leader := cluster.WaitLeader() + re.NotEmpty(leader) + leaderServer := cluster.GetLeaderServer() + err = leaderServer.BootstrapCluster() + re.NoError(err) + for _, region := range []*core.RegionInfo{ + core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), + core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), + } { + err := leaderServer.GetRaftCluster().HandleRegionHeartbeat(region) + re.NoError(err) + } + var ( + testServers = cluster.GetServers() + endpoints = make([]string, 0, len(testServers)) + ) + for _, s := range testServers { + endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) + } + env.endpoints = endpoints + env.cluster = cluster + + suite.env[mode] = env } - var ( - testServers = suite.cluster.GetServers() - endpoints = make([]string, 0, len(testServers)) - ) - for _, s := range testServers { - endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) - } - cli := setupCli(re, suite.ctx, endpoints) - suite.sd = cli.GetServiceDiscovery() - suite.client = pd.NewClientWithServiceDiscovery("pd-http-client-it", suite.sd) } func (suite *httpClientTestSuite) TearDownSuite() { - suite.cancelFunc() - suite.client.Close() - suite.cluster.Destroy() + for _, env := range suite.env { + env.cancelFunc() + env.cluster.Destroy() + } +} + +// RunTestInTwoModes is to run test in two modes. +func (suite *httpClientTestSuite) RunTestInTwoModes(test func(mode mode, client pd.Client)) { + // Run test with specific service discovery. + cli := setupCli(suite.Require(), suite.env[specificServiceDiscovery].ctx, suite.env[specificServiceDiscovery].endpoints) + sd := cli.GetServiceDiscovery() + client := pd.NewClientWithServiceDiscovery("pd-http-client-it-grpc", sd) + test(specificServiceDiscovery, client) + client.Close() + + // Run test with default service discovery. + client = pd.NewClient("pd-http-client-it-http", suite.env[defaultServiceDiscovery].endpoints) + test(defaultServiceDiscovery, client) + client.Close() } func (suite *httpClientTestSuite) TestMeta() { + suite.RunTestInTwoModes(suite.checkMeta) +} + +func (suite *httpClientTestSuite) checkMeta(mode mode, client pd.Client) { re := suite.Require() - replicateConfig, err := suite.client.GetReplicateConfig(suite.ctx) + env := suite.env[mode] + replicateConfig, err := client.GetReplicateConfig(env.ctx) re.NoError(err) re.Equal(3.0, replicateConfig["max-replicas"]) - region, err := suite.client.GetRegionByID(suite.ctx, 10) + region, err := client.GetRegionByID(env.ctx, 10) re.NoError(err) re.Equal(int64(10), region.ID) re.Equal(core.HexRegionKeyStr([]byte("a1")), region.StartKey) re.Equal(core.HexRegionKeyStr([]byte("a2")), region.EndKey) - region, err = suite.client.GetRegionByKey(suite.ctx, []byte("a2")) + region, err = client.GetRegionByKey(env.ctx, []byte("a2")) re.NoError(err) re.Equal(int64(11), region.ID) re.Equal(core.HexRegionKeyStr([]byte("a2")), region.StartKey) re.Equal(core.HexRegionKeyStr([]byte("a3")), region.EndKey) - regions, err := suite.client.GetRegions(suite.ctx) + regions, err := client.GetRegions(env.ctx) re.NoError(err) re.Equal(int64(2), regions.Count) re.Len(regions.Regions, 2) - regions, err = suite.client.GetRegionsByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), -1) + regions, err = client.GetRegionsByKeyRange(env.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), -1) re.NoError(err) re.Equal(int64(2), regions.Count) re.Len(regions.Regions, 2) - regions, err = suite.client.GetRegionsByStoreID(suite.ctx, 1) + regions, err = client.GetRegionsByStoreID(env.ctx, 1) re.NoError(err) re.Equal(int64(2), regions.Count) re.Len(regions.Regions, 2) - regions, err = suite.client.GetEmptyRegions(suite.ctx) + regions, err = client.GetEmptyRegions(env.ctx) re.NoError(err) re.Equal(int64(2), regions.Count) re.Len(regions.Regions, 2) - state, err := suite.client.GetRegionsReplicatedStateByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) + state, err := client.GetRegionsReplicatedStateByKeyRange(env.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) re.NoError(err) re.Equal("INPROGRESS", state) - regionStats, err := suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), false) + regionStats, err := client.GetRegionStatusByKeyRange(env.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), false) re.NoError(err) re.Greater(regionStats.Count, 0) re.NotEmpty(regionStats.StoreLeaderCount) - regionStats, err = suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), true) + regionStats, err = client.GetRegionStatusByKeyRange(env.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), true) re.NoError(err) re.Greater(regionStats.Count, 0) re.Empty(regionStats.StoreLeaderCount) - hotReadRegions, err := suite.client.GetHotReadRegions(suite.ctx) + hotReadRegions, err := client.GetHotReadRegions(env.ctx) re.NoError(err) re.Len(hotReadRegions.AsPeer, 1) re.Len(hotReadRegions.AsLeader, 1) - hotWriteRegions, err := suite.client.GetHotWriteRegions(suite.ctx) + hotWriteRegions, err := client.GetHotWriteRegions(env.ctx) re.NoError(err) re.Len(hotWriteRegions.AsPeer, 1) re.Len(hotWriteRegions.AsLeader, 1) - historyHorRegions, err := suite.client.GetHistoryHotRegions(suite.ctx, &pd.HistoryHotRegionsRequest{ + historyHorRegions, err := client.GetHistoryHotRegions(env.ctx, &pd.HistoryHotRegionsRequest{ StartTime: 0, EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond), }) re.NoError(err) re.Empty(historyHorRegions.HistoryHotRegion) - store, err := suite.client.GetStores(suite.ctx) + store, err := client.GetStores(env.ctx) re.NoError(err) re.Equal(1, store.Count) re.Len(store.Stores, 1) storeID := uint64(store.Stores[0].Store.ID) // TODO: why type is different? - store2, err := suite.client.GetStore(suite.ctx, storeID) + store2, err := client.GetStore(env.ctx, storeID) re.NoError(err) re.EqualValues(storeID, store2.Store.ID) - version, err := suite.client.GetClusterVersion(suite.ctx) + version, err := client.GetClusterVersion(env.ctx) re.NoError(err) re.Equal("0.0.0", version) } func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { + suite.RunTestInTwoModes(suite.checkGetMinResolvedTSByStoresIDs) +} + +func (suite *httpClientTestSuite) checkGetMinResolvedTSByStoresIDs(mode mode, client pd.Client) { re := suite.Require() + env := suite.env[mode] + testMinResolvedTS := tsoutil.TimeToTS(time.Now()) - raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster() + raftCluster := env.cluster.GetLeaderServer().GetRaftCluster() err := raftCluster.SetMinResolvedTS(1, testMinResolvedTS) re.NoError(err) // Make sure the min resolved TS is updated. @@ -173,18 +220,18 @@ func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { return minResolvedTS == testMinResolvedTS }) // Wait for the cluster-level min resolved TS to be initialized. - minResolvedTS, storeMinResolvedTSMap, err := suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, nil) + minResolvedTS, storeMinResolvedTSMap, err := client.GetMinResolvedTSByStoresIDs(env.ctx, nil) re.NoError(err) re.Equal(testMinResolvedTS, minResolvedTS) re.Empty(storeMinResolvedTSMap) // Get the store-level min resolved TS. - minResolvedTS, storeMinResolvedTSMap, err = suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, []uint64{1}) + minResolvedTS, storeMinResolvedTSMap, err = client.GetMinResolvedTSByStoresIDs(env.ctx, []uint64{1}) re.NoError(err) re.Equal(testMinResolvedTS, minResolvedTS) re.Len(storeMinResolvedTSMap, 1) re.Equal(minResolvedTS, storeMinResolvedTSMap[1]) // Get the store-level min resolved TS with an invalid store ID. - minResolvedTS, storeMinResolvedTSMap, err = suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, []uint64{1, 2}) + minResolvedTS, storeMinResolvedTSMap, err = client.GetMinResolvedTSByStoresIDs(env.ctx, []uint64{1, 2}) re.NoError(err) re.Equal(testMinResolvedTS, minResolvedTS) re.Len(storeMinResolvedTSMap, 2) @@ -193,16 +240,22 @@ func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { } func (suite *httpClientTestSuite) TestRule() { + suite.RunTestInTwoModes(suite.checkRule) +} + +func (suite *httpClientTestSuite) checkRule(mode mode, client pd.Client) { re := suite.Require() - bundles, err := suite.client.GetAllPlacementRuleBundles(suite.ctx) + env := suite.env[mode] + + bundles, err := client.GetAllPlacementRuleBundles(env.ctx) re.NoError(err) re.Len(bundles, 1) re.Equal(placement.DefaultGroupID, bundles[0].ID) - bundle, err := suite.client.GetPlacementRuleBundleByGroup(suite.ctx, placement.DefaultGroupID) + bundle, err := client.GetPlacementRuleBundleByGroup(env.ctx, placement.DefaultGroupID) re.NoError(err) re.Equal(bundles[0], bundle) // Check if we have the default rule. - suite.checkRule(re, &pd.Rule{ + suite.checkRuleResult(re, env, client, &pd.Rule{ GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, Role: pd.Voter, @@ -211,7 +264,7 @@ func (suite *httpClientTestSuite) TestRule() { EndKey: []byte{}, }, 1, true) // Should be the same as the rules in the bundle. - suite.checkRule(re, bundle.Rules[0], 1, true) + suite.checkRuleResult(re, env, client, bundle.Rules[0], 1, true) testRule := &pd.Rule{ GroupID: placement.DefaultGroupID, ID: "test", @@ -220,39 +273,39 @@ func (suite *httpClientTestSuite) TestRule() { StartKey: []byte{}, EndKey: []byte{}, } - err = suite.client.SetPlacementRule(suite.ctx, testRule) + err = client.SetPlacementRule(env.ctx, testRule) re.NoError(err) - suite.checkRule(re, testRule, 2, true) - err = suite.client.DeletePlacementRule(suite.ctx, placement.DefaultGroupID, "test") + suite.checkRuleResult(re, env, client, testRule, 2, true) + err = client.DeletePlacementRule(env.ctx, placement.DefaultGroupID, "test") re.NoError(err) - suite.checkRule(re, testRule, 1, false) + suite.checkRuleResult(re, env, client, testRule, 1, false) testRuleOp := &pd.RuleOp{ Rule: testRule, Action: pd.RuleOpAdd, } - err = suite.client.SetPlacementRuleInBatch(suite.ctx, []*pd.RuleOp{testRuleOp}) + err = client.SetPlacementRuleInBatch(env.ctx, []*pd.RuleOp{testRuleOp}) re.NoError(err) - suite.checkRule(re, testRule, 2, true) + suite.checkRuleResult(re, env, client, testRule, 2, true) testRuleOp = &pd.RuleOp{ Rule: testRule, Action: pd.RuleOpDel, } - err = suite.client.SetPlacementRuleInBatch(suite.ctx, []*pd.RuleOp{testRuleOp}) + err = client.SetPlacementRuleInBatch(env.ctx, []*pd.RuleOp{testRuleOp}) re.NoError(err) - suite.checkRule(re, testRule, 1, false) - err = suite.client.SetPlacementRuleBundles(suite.ctx, []*pd.GroupBundle{ + suite.checkRuleResult(re, env, client, testRule, 1, false) + err = client.SetPlacementRuleBundles(env.ctx, []*pd.GroupBundle{ { ID: placement.DefaultGroupID, Rules: []*pd.Rule{testRule}, }, }, true) re.NoError(err) - suite.checkRule(re, testRule, 1, true) - ruleGroups, err := suite.client.GetAllPlacementRuleGroups(suite.ctx) + suite.checkRuleResult(re, env, client, testRule, 1, true) + ruleGroups, err := client.GetAllPlacementRuleGroups(env.ctx) re.NoError(err) re.Len(ruleGroups, 1) re.Equal(placement.DefaultGroupID, ruleGroups[0].ID) - ruleGroup, err := suite.client.GetPlacementRuleGroupByID(suite.ctx, placement.DefaultGroupID) + ruleGroup, err := client.GetPlacementRuleGroupByID(env.ctx, placement.DefaultGroupID) re.NoError(err) re.Equal(ruleGroups[0], ruleGroup) testRuleGroup := &pd.RuleGroup{ @@ -260,14 +313,14 @@ func (suite *httpClientTestSuite) TestRule() { Index: 1, Override: true, } - err = suite.client.SetPlacementRuleGroup(suite.ctx, testRuleGroup) + err = client.SetPlacementRuleGroup(env.ctx, testRuleGroup) re.NoError(err) - ruleGroup, err = suite.client.GetPlacementRuleGroupByID(suite.ctx, testRuleGroup.ID) + ruleGroup, err = client.GetPlacementRuleGroupByID(env.ctx, testRuleGroup.ID) re.NoError(err) re.Equal(testRuleGroup, ruleGroup) - err = suite.client.DeletePlacementRuleGroupByID(suite.ctx, testRuleGroup.ID) + err = client.DeletePlacementRuleGroupByID(env.ctx, testRuleGroup.ID) re.NoError(err) - ruleGroup, err = suite.client.GetPlacementRuleGroupByID(suite.ctx, testRuleGroup.ID) + ruleGroup, err = client.GetPlacementRuleGroupByID(env.ctx, testRuleGroup.ID) re.ErrorContains(err, http.StatusText(http.StatusNotFound)) re.Empty(ruleGroup) // Test the start key and end key. @@ -279,32 +332,34 @@ func (suite *httpClientTestSuite) TestRule() { StartKey: []byte("a1"), EndKey: []byte(""), } - err = suite.client.SetPlacementRule(suite.ctx, testRule) + err = client.SetPlacementRule(env.ctx, testRule) re.NoError(err) - suite.checkRule(re, testRule, 1, true) + suite.checkRuleResult(re, env, client, testRule, 1, true) } -func (suite *httpClientTestSuite) checkRule( +func (suite *httpClientTestSuite) checkRuleResult( re *require.Assertions, + env *httpClientTestEnv, + client pd.Client, rule *pd.Rule, totalRuleCount int, exist bool, ) { if exist { - got, err := suite.client.GetPlacementRule(suite.ctx, rule.GroupID, rule.ID) + got, err := client.GetPlacementRule(env.ctx, rule.GroupID, rule.ID) re.NoError(err) // skip comparison of the generated field got.StartKeyHex = rule.StartKeyHex got.EndKeyHex = rule.EndKeyHex re.Equal(rule, got) } else { - _, err := suite.client.GetPlacementRule(suite.ctx, rule.GroupID, rule.ID) + _, err := client.GetPlacementRule(env.ctx, rule.GroupID, rule.ID) re.ErrorContains(err, http.StatusText(http.StatusNotFound)) } // Check through the `GetPlacementRulesByGroup` API. - rules, err := suite.client.GetPlacementRulesByGroup(suite.ctx, rule.GroupID) + rules, err := client.GetPlacementRulesByGroup(env.ctx, rule.GroupID) re.NoError(err) checkRuleFunc(re, rules, rule, totalRuleCount, exist) // Check through the `GetPlacementRuleBundleByGroup` API. - bundle, err := suite.client.GetPlacementRuleBundleByGroup(suite.ctx, rule.GroupID) + bundle, err := client.GetPlacementRuleBundleByGroup(env.ctx, rule.GroupID) re.NoError(err) checkRuleFunc(re, bundle.Rules, rule, totalRuleCount, exist) } @@ -332,8 +387,14 @@ func checkRuleFunc( } func (suite *httpClientTestSuite) TestRegionLabel() { + suite.RunTestInTwoModes(suite.checkRegionLabel) +} + +func (suite *httpClientTestSuite) checkRegionLabel(mode mode, client pd.Client) { re := suite.Require() - labelRules, err := suite.client.GetAllRegionLabelRules(suite.ctx) + env := suite.env[mode] + + labelRules, err := client.GetAllRegionLabelRules(env.ctx) re.NoError(err) re.Len(labelRules, 1) re.Equal("keyspaces/0", labelRules[0].ID) @@ -344,9 +405,9 @@ func (suite *httpClientTestSuite) TestRegionLabel() { RuleType: "key-range", Data: labeler.MakeKeyRanges("1234", "5678"), } - err = suite.client.SetRegionLabelRule(suite.ctx, labelRule) + err = client.SetRegionLabelRule(env.ctx, labelRule) re.NoError(err) - labelRules, err = suite.client.GetAllRegionLabelRules(suite.ctx) + labelRules, err = client.GetAllRegionLabelRules(env.ctx) re.NoError(err) re.Len(labelRules, 2) sort.Slice(labelRules, func(i, j int) bool { @@ -366,9 +427,9 @@ func (suite *httpClientTestSuite) TestRegionLabel() { SetRules: []*pd.LabelRule{labelRule}, DeleteRules: []string{"rule1"}, } - err = suite.client.PatchRegionLabelRules(suite.ctx, patch) + err = client.PatchRegionLabelRules(env.ctx, patch) re.NoError(err) - allLabelRules, err := suite.client.GetAllRegionLabelRules(suite.ctx) + allLabelRules, err := client.GetAllRegionLabelRules(env.ctx) re.NoError(err) re.Len(labelRules, 2) sort.Slice(allLabelRules, func(i, j int) bool { @@ -377,7 +438,7 @@ func (suite *httpClientTestSuite) TestRegionLabel() { re.Equal(labelRule.ID, allLabelRules[1].ID) re.Equal(labelRule.Labels, allLabelRules[1].Labels) re.Equal(labelRule.RuleType, allLabelRules[1].RuleType) - labelRules, err = suite.client.GetRegionLabelRulesByIDs(suite.ctx, []string{"keyspaces/0", "rule2"}) + labelRules, err = client.GetRegionLabelRulesByIDs(env.ctx, []string{"keyspaces/0", "rule2"}) re.NoError(err) sort.Slice(labelRules, func(i, j int) bool { return labelRules[i].ID < labelRules[j].ID @@ -386,18 +447,24 @@ func (suite *httpClientTestSuite) TestRegionLabel() { } func (suite *httpClientTestSuite) TestAccelerateSchedule() { + suite.RunTestInTwoModes(suite.checkAccelerateSchedule) +} + +func (suite *httpClientTestSuite) checkAccelerateSchedule(mode mode, client pd.Client) { re := suite.Require() - raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster() + env := suite.env[mode] + + raftCluster := env.cluster.GetLeaderServer().GetRaftCluster() suspectRegions := raftCluster.GetSuspectRegions() re.Empty(suspectRegions) - err := suite.client.AccelerateSchedule(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a2"))) + err := client.AccelerateSchedule(env.ctx, pd.NewKeyRange([]byte("a1"), []byte("a2"))) re.NoError(err) suspectRegions = raftCluster.GetSuspectRegions() re.Len(suspectRegions, 1) raftCluster.ClearSuspectRegions() suspectRegions = raftCluster.GetSuspectRegions() re.Empty(suspectRegions) - err = suite.client.AccelerateScheduleInBatch(suite.ctx, []*pd.KeyRange{ + err = client.AccelerateScheduleInBatch(env.ctx, []*pd.KeyRange{ pd.NewKeyRange([]byte("a1"), []byte("a2")), pd.NewKeyRange([]byte("a2"), []byte("a3")), }) @@ -407,18 +474,24 @@ func (suite *httpClientTestSuite) TestAccelerateSchedule() { } func (suite *httpClientTestSuite) TestConfig() { + suite.RunTestInTwoModes(suite.checkConfig) +} + +func (suite *httpClientTestSuite) checkConfig(mode mode, client pd.Client) { re := suite.Require() - config, err := suite.client.GetConfig(suite.ctx) + env := suite.env[mode] + + config, err := client.GetConfig(env.ctx) re.NoError(err) re.Equal(float64(4), config["schedule"].(map[string]interface{})["leader-schedule-limit"]) newConfig := map[string]interface{}{ "schedule.leader-schedule-limit": float64(8), } - err = suite.client.SetConfig(suite.ctx, newConfig) + err = client.SetConfig(env.ctx, newConfig) re.NoError(err) - config, err = suite.client.GetConfig(suite.ctx) + config, err = client.GetConfig(env.ctx) re.NoError(err) re.Equal(float64(8), config["schedule"].(map[string]interface{})["leader-schedule-limit"]) @@ -426,58 +499,76 @@ func (suite *httpClientTestSuite) TestConfig() { newConfig = map[string]interface{}{ "schedule.leader-schedule-limit": float64(16), } - err = suite.client.SetConfig(suite.ctx, newConfig, 5) + err = client.SetConfig(env.ctx, newConfig, 5) re.NoError(err) - resp, err := suite.cluster.GetEtcdClient().Get(suite.ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit") + resp, err := env.cluster.GetEtcdClient().Get(env.ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit") re.NoError(err) re.Equal([]byte("16"), resp.Kvs[0].Value) } func (suite *httpClientTestSuite) TestScheduleConfig() { + suite.RunTestInTwoModes(suite.checkScheduleConfig) +} + +func (suite *httpClientTestSuite) checkScheduleConfig(mode mode, client pd.Client) { re := suite.Require() - config, err := suite.client.GetScheduleConfig(suite.ctx) + env := suite.env[mode] + + config, err := client.GetScheduleConfig(env.ctx) re.NoError(err) re.Equal(float64(4), config["leader-schedule-limit"]) re.Equal(float64(2048), config["region-schedule-limit"]) config["leader-schedule-limit"] = float64(8) - err = suite.client.SetScheduleConfig(suite.ctx, config) + err = client.SetScheduleConfig(env.ctx, config) re.NoError(err) - config, err = suite.client.GetScheduleConfig(suite.ctx) + config, err = client.GetScheduleConfig(env.ctx) re.NoError(err) re.Equal(float64(8), config["leader-schedule-limit"]) re.Equal(float64(2048), config["region-schedule-limit"]) } func (suite *httpClientTestSuite) TestSchedulers() { + suite.RunTestInTwoModes(suite.checkSchedulers) +} + +func (suite *httpClientTestSuite) checkSchedulers(mode mode, client pd.Client) { re := suite.Require() - schedulers, err := suite.client.GetSchedulers(suite.ctx) + env := suite.env[mode] + + schedulers, err := client.GetSchedulers(env.ctx) re.NoError(err) re.Empty(schedulers) - err = suite.client.CreateScheduler(suite.ctx, "evict-leader-scheduler", 1) + err = client.CreateScheduler(env.ctx, "evict-leader-scheduler", 1) re.NoError(err) - schedulers, err = suite.client.GetSchedulers(suite.ctx) + schedulers, err = client.GetSchedulers(env.ctx) re.NoError(err) re.Len(schedulers, 1) - err = suite.client.SetSchedulerDelay(suite.ctx, "evict-leader-scheduler", 100) + err = client.SetSchedulerDelay(env.ctx, "evict-leader-scheduler", 100) re.NoError(err) - err = suite.client.SetSchedulerDelay(suite.ctx, "not-exist", 100) + err = client.SetSchedulerDelay(env.ctx, "not-exist", 100) re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message } func (suite *httpClientTestSuite) TestSetStoreLabels() { + suite.RunTestInTwoModes(suite.checkSetStoreLabels) +} + +func (suite *httpClientTestSuite) checkSetStoreLabels(mode mode, client pd.Client) { re := suite.Require() - resp, err := suite.client.GetStores(suite.ctx) + env := suite.env[mode] + + resp, err := client.GetStores(env.ctx) re.NoError(err) setStore := resp.Stores[0] re.Empty(setStore.Store.Labels, nil) storeLabels := map[string]string{ "zone": "zone1", } - err = suite.client.SetStoreLabels(suite.ctx, 1, storeLabels) + err = client.SetStoreLabels(env.ctx, 1, storeLabels) re.NoError(err) - resp, err = suite.client.GetStores(suite.ctx) + resp, err = client.GetStores(env.ctx) re.NoError(err) for _, store := range resp.Stores { if store.Store.ID == setStore.Store.ID { @@ -489,74 +580,103 @@ func (suite *httpClientTestSuite) TestSetStoreLabels() { } func (suite *httpClientTestSuite) TestTransferLeader() { + suite.RunTestInTwoModes(suite.checkTransferLeader) +} + +func (suite *httpClientTestSuite) checkTransferLeader(mode mode, client pd.Client) { re := suite.Require() - members, err := suite.client.GetMembers(suite.ctx) + env := suite.env[mode] + + members, err := client.GetMembers(env.ctx) re.NoError(err) re.Len(members.Members, 2) - leader, err := suite.client.GetLeader(suite.ctx) + leader, err := client.GetLeader(env.ctx) re.NoError(err) // Transfer leader to another pd for _, member := range members.Members { if member.GetName() != leader.GetName() { - err = suite.client.TransferLeader(suite.ctx, member.GetName()) + err = client.TransferLeader(env.ctx, member.GetName()) re.NoError(err) break } } - newLeader := suite.cluster.WaitLeader() + newLeader := env.cluster.WaitLeader() re.NotEmpty(newLeader) re.NoError(err) re.NotEqual(leader.GetName(), newLeader) // Force to update the members info. testutil.Eventually(re, func() bool { - leader, err = suite.client.GetLeader(suite.ctx) + leader, err = client.GetLeader(env.ctx) re.NoError(err) return newLeader == leader.GetName() }) - members, err = suite.client.GetMembers(suite.ctx) + members, err = client.GetMembers(env.ctx) re.NoError(err) re.Len(members.Members, 2) re.Equal(leader.GetName(), members.Leader.GetName()) } func (suite *httpClientTestSuite) TestVersion() { + suite.RunTestInTwoModes(suite.checkVersion) +} + +func (suite *httpClientTestSuite) checkVersion(mode mode, client pd.Client) { re := suite.Require() - ver, err := suite.client.GetPDVersion(suite.ctx) + env := suite.env[mode] + + ver, err := client.GetPDVersion(env.ctx) re.NoError(err) re.Equal(versioninfo.PDReleaseVersion, ver) } func (suite *httpClientTestSuite) TestAdmin() { + suite.RunTestInTwoModes(suite.checkAdmin) +} + +func (suite *httpClientTestSuite) checkAdmin(mode mode, client pd.Client) { re := suite.Require() - err := suite.client.SetSnapshotRecoveringMark(suite.ctx) + env := suite.env[mode] + + err := client.SetSnapshotRecoveringMark(env.ctx) re.NoError(err) - err = suite.client.ResetTS(suite.ctx, 123, true) + err = client.ResetTS(env.ctx, 123, true) re.NoError(err) - err = suite.client.ResetBaseAllocID(suite.ctx, 456) + err = client.ResetBaseAllocID(env.ctx, 456) re.NoError(err) - err = suite.client.DeleteSnapshotRecoveringMark(suite.ctx) + err = client.DeleteSnapshotRecoveringMark(env.ctx) re.NoError(err) } func (suite *httpClientTestSuite) TestWithBackoffer() { + suite.RunTestInTwoModes(suite.checkWithBackoffer) +} + +func (suite *httpClientTestSuite) checkWithBackoffer(mode mode, client pd.Client) { re := suite.Require() + env := suite.env[mode] + // Should return with 404 error without backoffer. - rule, err := suite.client.GetPlacementRule(suite.ctx, "non-exist-group", "non-exist-rule") + rule, err := client.GetPlacementRule(env.ctx, "non-exist-group", "non-exist-rule") re.ErrorContains(err, http.StatusText(http.StatusNotFound)) re.Nil(rule) // Should return with 404 error even with an infinite backoffer. - rule, err = suite.client. + rule, err = client. WithBackoffer(retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)). - GetPlacementRule(suite.ctx, "non-exist-group", "non-exist-rule") + GetPlacementRule(env.ctx, "non-exist-group", "non-exist-rule") re.ErrorContains(err, http.StatusText(http.StatusNotFound)) re.Nil(rule) } func (suite *httpClientTestSuite) TestRedirectWithMetrics() { re := suite.Require() + env := suite.env[defaultServiceDiscovery] + + cli := setupCli(suite.Require(), env.ctx, env.endpoints) + sd := cli.GetServiceDiscovery() + metricCnt := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "check", @@ -568,7 +688,7 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() { } return nil }) - c := pd.NewClientWithServiceDiscovery("pd-http-client-it", suite.sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) + c := pd.NewClientWithServiceDiscovery("pd-http-client-it", sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) c.CreateScheduler(context.Background(), "test", 0) var out dto.Metric failureCnt, err := metricCnt.GetMetricWithLabelValues([]string{"CreateScheduler", "network error"}...) @@ -577,7 +697,7 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() { re.Equal(float64(2), out.Counter.GetValue()) c.Close() - leader := suite.sd.GetServingAddr() + leader := sd.GetServingAddr() httpClient = pd.NewHTTPClientWithRequestChecker(func(req *http.Request) error { // mock leader success. if !strings.Contains(leader, req.Host) { @@ -585,7 +705,7 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() { } return nil }) - c = pd.NewClientWithServiceDiscovery("pd-http-client-it", suite.sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) + c = pd.NewClientWithServiceDiscovery("pd-http-client-it", sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) c.CreateScheduler(context.Background(), "test", 0) successCnt, err := metricCnt.GetMetricWithLabelValues([]string{"CreateScheduler", ""}...) re.NoError(err) @@ -600,7 +720,7 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() { } return nil }) - c = pd.NewClientWithServiceDiscovery("pd-http-client-it", suite.sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) + c = pd.NewClientWithServiceDiscovery("pd-http-client-it", sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) c.CreateScheduler(context.Background(), "test", 0) successCnt, err = metricCnt.GetMetricWithLabelValues([]string{"CreateScheduler", ""}...) re.NoError(err) diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index e1eeeeded5e..31d43cb86f6 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -15,6 +15,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/docker/go-units v0.5.0 github.com/go-sql-driver/mysql v1.7.0 + github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/kvproto v0.0.0-20231226064240-4f28b82c7860 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 @@ -126,7 +127,6 @@ require ( github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect - github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect github.com/pingcap/tidb-dashboard v0.0.0-20240111062855-41f7c8011953 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect diff --git a/tests/integrations/realcluster/Makefile b/tests/integrations/realcluster/Makefile index e03007a4c31..4817b94b5da 100644 --- a/tests/integrations/realcluster/Makefile +++ b/tests/integrations/realcluster/Makefile @@ -48,7 +48,7 @@ kill_cluster: echo $$pid; \ kill $$pid; \ echo "waiting for cluster to exit..."; \ - sleep 10; \ + sleep 30; \ fi test: diff --git a/tests/integrations/realcluster/util.go b/tests/integrations/realcluster/util.go index 412df51894b..f6c8295b6ef 100644 --- a/tests/integrations/realcluster/util.go +++ b/tests/integrations/realcluster/util.go @@ -23,7 +23,7 @@ import ( const physicalShiftBits = 18 var ( - pdAddrs = []string{"127.0.0.1:2379"} + pdAddrs = []string{"http://127.0.0.1:2379"} pdHTTPCli = http.NewClient("pd-real-cluster-test", pdAddrs) ) diff --git a/tools/pd-ctl/main.go b/tools/pd-ctl/main.go index 1478a13fcda..0d052d95680 100644 --- a/tools/pd-ctl/main.go +++ b/tools/pd-ctl/main.go @@ -20,8 +20,10 @@ import ( "os/signal" "syscall" + "github.com/pingcap/log" "github.com/tikv/pd/tools/pd-ctl/pdctl" "github.com/tikv/pd/tools/pd-ctl/pdctl/command" + "go.uber.org/zap/zapcore" ) func main() { @@ -51,6 +53,7 @@ func main() { } }() + log.SetLevel(zapcore.FatalLevel) var inputs []string stat, _ := os.Stdin.Stat() if (stat.Mode() & os.ModeCharDevice) == 0 { diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 67d70773d27..806ad4ecc53 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -43,8 +43,7 @@ func SetNewPDClient(addrs []string, opts ...pd.ClientOption) { if PDCli != nil { PDCli.Close() } - withOpts := append(opts, pd.WithLoggerRedirection("fatal", "")) - PDCli = pd.NewClient(pdControlCallerID, addrs, withOpts...) + PDCli = pd.NewClient(pdControlCallerID, addrs, opts...) } // TODO: replace dialClient with PDCli