diff --git a/CHANGELOG.md b/CHANGELOG.md index 87b1ac497c6..77ac2758d9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # PD Change Log +## Unreleased + ++ Fix the issue about the limit of the hot region [#1552](https://github.com/pingcap/pd/pull/1552) ++ Add a option about grpc gateway [#1596](https://github.com/pingcap/pd/pull/1596) ++ Add the missing schedule config items [#1601](https://github.com/pingcap/pd/pull/1601) + ## v3.0.0 + Support re-creating a cluster from a single node diff --git a/server/api/etcd_api_test.go b/server/api/etcd_api_test.go new file mode 100644 index 00000000000..593ea904e2a --- /dev/null +++ b/server/api/etcd_api_test.go @@ -0,0 +1,51 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "encoding/json" + "net/http" + "strings" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testEtcdAPISuite{}) + +type testEtcdAPISuite struct { + hc *http.Client +} + +func (s *testEtcdAPISuite) SetUpSuite(c *C) { + s.hc = newHTTPClient() +} + +func (s *testEtcdAPISuite) TestGRPCGateway(c *C) { + svr, clean := mustNewServer(c) + defer clean() + + addr := svr.GetConfig().ClientUrls + "/v3/kv/put" + putKey := map[string]string{"key": "Zm9v", "value": "YmFy"} + v, _ := json.Marshal(putKey) + err := postJSON(addr, v) + c.Assert(err, IsNil) + addr = svr.GetConfig().ClientUrls + "/v3/kv/range" + getKey := map[string]string{"key": "Zm9v"} + v, _ = json.Marshal(getKey) + err = postJSON(addr, v, func(res []byte) bool { + c.Assert(strings.Contains(string(res), "Zm9v"), IsTrue) + return true + }) + c.Assert(err, IsNil) +} diff --git a/server/api/util.go b/server/api/util.go index bed7abf8ad1..4dcae2ab382 100644 --- a/server/api/util.go +++ b/server/api/util.go @@ -86,19 +86,26 @@ func readJSON(r io.ReadCloser, data interface{}) error { return nil } -func postJSON(url string, data []byte) error { +func postJSON(url string, data []byte, checkOpts ...func(res []byte) bool) error { resp, err := dialClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return errors.WithStack(err) } res, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() + defer resp.Body.Close() + if err != nil { return err } + if resp.StatusCode != http.StatusOK { return errors.New(string(res)) } + for _, opt := range checkOpts { + if !opt(res) { + return errors.New("check failed") + } + } return nil } diff --git a/server/config.go b/server/config.go index 33a460f0828..2308c7b2f42 100644 --- a/server/config.go +++ b/server/config.go @@ -49,9 +49,10 @@ type Config struct { AdvertiseClientUrls string `toml:"advertise-client-urls" json:"advertise-client-urls"` AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"` - Name string `toml:"name" json:"name"` - DataDir string `toml:"data-dir" json:"data-dir"` - ForceNewCluster bool `json:"force-new-cluster"` + Name string `toml:"name" json:"name"` + DataDir string `toml:"data-dir" json:"data-dir"` + ForceNewCluster bool `json:"force-new-cluster"` + EnableGRPCGateway bool `json:"enable-grpc-gateway"` InitialCluster string `toml:"initial-cluster" json:"initial-cluster"` InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"` @@ -199,6 +200,7 @@ const ( defaultUseRegionStorage = true defaultStrictlyMatchLabel = false + defaultEnableGRPCGateway = true ) func adjustString(v *string, defValue string) { @@ -427,6 +429,9 @@ func (c *Config) Adjust(meta *toml.MetaData) error { if !configMetaData.IsDefined("enable-prevote") { c.PreVote = true } + if !configMetaData.IsDefined("enable-grpc-gateway") { + c.EnableGRPCGateway = defaultEnableGRPCGateway + } return nil } @@ -872,6 +877,7 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) { cfg.PeerTLSInfo.KeyFile = c.Security.KeyPath cfg.ForceNewCluster = c.ForceNewCluster cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(c.logger, c.logger.Core(), c.logProps.Syncer) + cfg.EnableGRPCGateway = c.EnableGRPCGateway cfg.Logger = "zap" var err error diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 893a75734a5..6447643d28c 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -909,6 +909,9 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { opt.HotRegionScheduleLimit = mockoption.NewScheduleOptions().HotRegionScheduleLimit opt.RegionScheduleLimit = 0 c.Assert(hb.Schedule(tc), HasLen, 1) + // Always produce operator + c.Assert(hb.Schedule(tc), HasLen, 1) + c.Assert(hb.Schedule(tc), HasLen, 1) //| region_id | leader_store | follower_store | follower_store | written_bytes | //|-----------|--------------|----------------|----------------|---------------| diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 44b57c4eb0b..2ad7002f8ff 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -128,20 +128,13 @@ func (h *balanceHotRegionsScheduler) IsScheduleAllowed(cluster schedule.Cluster) return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster) } -func min(a, b uint64) uint64 { - if a < b { - return a - } - return b -} - func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool { - return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) && + return h.opController.OperatorCount(schedule.OpHotRegion) < minUint64(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) && h.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit() } func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool { - return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.peerLimit, cluster.GetHotRegionScheduleLimit()) + return h.opController.OperatorCount(schedule.OpHotRegion) < minUint64(h.peerLimit, cluster.GetHotRegionScheduleLimit()) } func (h *balanceHotRegionsScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator { @@ -443,7 +436,8 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt avgRegionCount := hotRegionTotalCount / float64(len(storesStat)) // Multiplied by hotRegionLimitFactor to avoid transfer back and forth - return uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor) + limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor) + return maxUint64(limit, 1) } func (h *balanceHotRegionsScheduler) GetHotReadStatus() *statistics.StoreHotRegionInfos { diff --git a/server/statistics/store_collection.go b/server/statistics/store_collection.go index 25384d8db7c..768a564ca59 100644 --- a/server/statistics/store_collection.go +++ b/server/statistics/store_collection.go @@ -29,22 +29,35 @@ const ( ) // ScheduleOptions is an interface to access configurations. +// TODO: merge the Options to schedule.Options type ScheduleOptions interface { GetLocationLabels() []string - GetMaxStoreDownTime() time.Duration + GetLowSpaceRatio() float64 GetHighSpaceRatio() float64 GetTolerantSizeRatio() float64 + GetStoreBalanceRate() float64 + + GetSchedulerMaxWaitingOperator() uint64 GetLeaderScheduleLimit(name string) uint64 GetRegionScheduleLimit(name string) uint64 GetReplicaScheduleLimit(name string) uint64 GetMergeScheduleLimit(name string) uint64 + GetHotRegionScheduleLimit(name string) uint64 GetMaxReplicas(name string) int + GetHotRegionCacheHitsThreshold() int + GetMaxSnapshotCount() uint64 + GetMaxPendingPeerCount() uint64 + GetMaxMergeRegionSize() uint64 + GetMaxMergeRegionKeys() uint64 + IsRaftLearnerEnabled() bool IsMakeUpReplicaEnabled() bool IsRemoveExtraReplicaEnabled() bool IsRemoveDownReplicaEnabled() bool IsReplaceOfflineReplicaEnabled() bool + + GetMaxStoreDownTime() time.Duration } type storeStatistics struct { @@ -160,6 +173,13 @@ func (s *storeStatistics) Collect() { configs["high_space_ratio"] = float64(s.opt.GetHighSpaceRatio()) configs["low_space_ratio"] = float64(s.opt.GetLowSpaceRatio()) configs["tolerant_size_ratio"] = float64(s.opt.GetTolerantSizeRatio()) + configs["store-balance-rate"] = float64(s.opt.GetStoreBalanceRate()) + configs["hot-region-schedule-limit"] = float64(s.opt.GetHotRegionScheduleLimit(s.namespace)) + configs["hot-region-cache-hits-threshold"] = float64(s.opt.GetHotRegionCacheHitsThreshold()) + configs["max-pending-peer-count"] = float64(s.opt.GetMaxPendingPeerCount()) + configs["max-snapshot-count"] = float64(s.opt.GetMaxSnapshotCount()) + configs["max-merge-region-size"] = float64(s.opt.GetMaxMergeRegionSize()) + configs["max-merge-region-keys"] = float64(s.opt.GetMaxMergeRegionKeys()) var disableMakeUpReplica, disableLearner, disableRemoveDownReplica, disableRemoveExtraReplica, disableReplaceOfflineReplica float64 if !s.opt.IsMakeUpReplicaEnabled() {