diff --git a/go.mod b/go.mod index da6e73842bb..fc6cca5d702 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/gorilla/mux v1.7.3 github.com/gorilla/websocket v1.2.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 - github.com/grpc-ecosystem/grpc-gateway v1.12.1 github.com/json-iterator/go v1.1.9 // indirect github.com/juju/ratelimit v1.0.1 github.com/kevinburke/go-bindata v3.18.0+incompatible diff --git a/pkg/component/manager.go b/pkg/component/manager.go new file mode 100644 index 00000000000..965d8f008d1 --- /dev/null +++ b/pkg/component/manager.go @@ -0,0 +1,150 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "fmt" + "net/url" + "sync" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Manager is used to manage components. +type Manager struct { + sync.RWMutex + // component -> addresses + Addresses map[string][]string +} + +// NewManager creates a new component manager. +func NewManager() *Manager { + return &Manager{ + Addresses: make(map[string][]string), + } +} + +// GetComponentAddrs returns component addresses for a given component. +func (c *Manager) GetComponentAddrs(component string) []string { + c.RLock() + defer c.RUnlock() + addresses := []string{} + if ca, ok := c.Addresses[component]; ok { + addresses = append(addresses, ca...) + } + return addresses +} + +// GetAllComponentAddrs returns all components' addresses. +func (c *Manager) GetAllComponentAddrs() map[string][]string { + c.RLock() + defer c.RUnlock() + n := make(map[string][]string) + for k, v := range c.Addresses { + b := make([]string, len(v)) + copy(b, v) + n[k] = b + } + return n +} + +// GetComponent returns the component from a given component ID. +func (c *Manager) GetComponent(addr string) string { + c.RLock() + defer c.RUnlock() + + addr, err := validateAddr(addr) + if err != nil { + return "" + } + for component, ca := range c.Addresses { + if exist, _ := contains(ca, addr); exist { + return component + } + } + return "" +} + +// Register is used for registering a component with an address to PD. +func (c *Manager) Register(component, addr string) error { + c.Lock() + defer c.Unlock() + + addr, err := validateAddr(addr) + if err != nil { + return err + } + ca, ok := c.Addresses[component] + if exist, _ := contains(ca, addr); ok && exist { + log.Info("address has already been registered", zap.String("component", component), zap.String("address", addr)) + return fmt.Errorf("component %s address %s has already been registered", component, addr) + } + + ca = append(ca, addr) + c.Addresses[component] = ca + log.Info("address registers successfully", zap.String("component", component), zap.String("address", addr)) + return nil +} + +// UnRegister is used for unregistering a component with an address from PD. +func (c *Manager) UnRegister(component, addr string) error { + c.Lock() + defer c.Unlock() + + addr, err := validateAddr(addr) + if err != nil { + return err + } + ca, ok := c.Addresses[component] + if !ok { + return fmt.Errorf("component %s not found", component) + } + + if exist, idx := contains(ca, addr); exist { + ca = append(ca[:idx], ca[idx+1:]...) + log.Info("address has successfully been unregistered", zap.String("component", component), zap.String("address", addr)) + if len(ca) == 0 { + delete(c.Addresses, component) + return nil + } + + c.Addresses[component] = ca + return nil + } + + return fmt.Errorf("address %s not found", addr) +} + +func contains(slice []string, item string) (bool, int) { + for i, s := range slice { + if s == item { + return true, i + } + } + + return false, 0 +} + +func validateAddr(addr string) (string, error) { + u, err := url.Parse(addr) + if err != nil || u.Host == "" { + u1, err1 := url.Parse("http://" + addr) + if err1 != nil { + return "", fmt.Errorf("address %s is not valid", addr) + } + return u1.Host, nil + } + return u.Host, nil +} diff --git a/pkg/component/manager_test.go b/pkg/component/manager_test.go new file mode 100644 index 00000000000..8c2e883de65 --- /dev/null +++ b/pkg/component/manager_test.go @@ -0,0 +1,66 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "strings" + "testing" + + . "github.com/pingcap/check" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testManagerSuite{}) + +type testManagerSuite struct{} + +func (s *testManagerSuite) TestManager(c *C) { + m := NewManager() + // register legal address + c.Assert(m.Register("c1", "127.0.0.1:1"), IsNil) + c.Assert(m.Register("c1", "127.0.0.1:2"), IsNil) + // register repeatedly + c.Assert(strings.Contains(m.Register("c1", "127.0.0.1:2").Error(), "already"), IsTrue) + c.Assert(m.Register("c2", "127.0.0.1:3"), IsNil) + + // register illegal address + c.Assert(m.Register("c1", " 127.0.0.1:4"), NotNil) + + // get all addresses + all := map[string][]string{ + "c1": {"127.0.0.1:1", "127.0.0.1:2"}, + "c2": {"127.0.0.1:3"}, + } + c.Assert(m.GetAllComponentAddrs(), DeepEquals, all) + + // get the specific component addresses + c.Assert(m.GetComponentAddrs("c1"), DeepEquals, all["c1"]) + c.Assert(m.GetComponentAddrs("c2"), DeepEquals, all["c2"]) + + // get the component from the address + c.Assert(m.GetComponent("127.0.0.1:1"), Equals, "c1") + c.Assert(m.GetComponent("127.0.0.1:2"), Equals, "c1") + c.Assert(m.GetComponent("127.0.0.1:3"), Equals, "c2") + + // unregister address + c.Assert(m.UnRegister("c1", "127.0.0.1:1"), IsNil) + c.Assert(m.GetComponentAddrs("c1"), DeepEquals, []string{"127.0.0.1:2"}) + c.Assert(m.UnRegister("c1", "127.0.0.1:2"), IsNil) + c.Assert(m.GetComponentAddrs("c1"), DeepEquals, []string{}) + all = map[string][]string{"c2": {"127.0.0.1:3"}} + c.Assert(m.GetAllComponentAddrs(), DeepEquals, all) +} diff --git a/pkg/dashboard/adapter/manager.go b/pkg/dashboard/adapter/manager.go index 86fe9700515..3d3e1b5b51d 100644 --- a/pkg/dashboard/adapter/manager.go +++ b/pkg/dashboard/adapter/manager.go @@ -45,8 +45,6 @@ type Manager struct { service *apiserver.Service redirector *Redirector - enableDynamic bool - isLeader bool members []*pdpb.Member } @@ -55,12 +53,11 @@ type Manager struct { func NewManager(srv *server.Server, s *apiserver.Service, redirector *Redirector) *Manager { ctx, cancel := context.WithCancel(srv.Context()) return &Manager{ - ctx: ctx, - cancel: cancel, - srv: srv, - service: s, - redirector: redirector, - enableDynamic: srv.GetConfig().EnableDynamicConfig, + ctx: ctx, + cancel: cancel, + srv: srv, + service: s, + redirector: redirector, } } @@ -101,9 +98,7 @@ func (m *Manager) updateInfo() { if !m.srv.GetMember().IsLeader() { m.isLeader = false m.members = nil - if !m.enableDynamic { - m.srv.GetPersistOptions().Reload(m.srv.GetStorage()) - } + m.srv.GetPersistOptions().Reload(m.srv.GetStorage()) return } @@ -196,12 +191,6 @@ func (m *Manager) setNewAddress() { } } // set new dashboard address - if m.enableDynamic { - if err := m.srv.UpdateConfigManager("pd-server.dashboard-address", addr); err != nil { - log.Error("failed to update the dashboard address in config manager", zap.Error(err)) - } - return - } cfg := m.srv.GetPersistOptions().GetPDServerConfig().Clone() cfg.DashboardAddress = addr m.srv.SetPDServerConfig(*cfg) diff --git a/server/api/component.go b/server/api/component.go new file mode 100644 index 00000000000..c599ace64e2 --- /dev/null +++ b/server/api/component.go @@ -0,0 +1,120 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/pingcap/errcode" + "github.com/pingcap/pd/v4/pkg/apiutil" + "github.com/pingcap/pd/v4/server" + "github.com/pkg/errors" + "github.com/unrolled/render" +) + +// Addresses is mapping from component to addresses. +type Addresses map[string][]string + +type componentHandler struct { + svr *server.Server + rd *render.Render +} + +func newComponentHandler(svr *server.Server, rd *render.Render) *componentHandler { + return &componentHandler{ + svr: svr, + rd: rd, + } +} + +// @Tags component +// @Summary Register component address. +// @Produce json +// @Success 200 {string} string +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /component [post] +func (h *componentHandler) Register(w http.ResponseWriter, r *http.Request) { + input := make(map[string]string) + if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { + return + } + component, ok := input["component"] + if !ok { + apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errors.New("not set component"))) + return + } + addr, ok := input["addr"] + if !ok { + apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errors.New("not set addr"))) + return + } + m := h.svr.GetComponentManager() + err := m.Register(component, addr) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) +} + +// @Tags component +// @Summary Unregister component address. +// @Produce json +// @Success 200 {string} string +// @Failure 400 {string} string "The input is invalid." +// @Router /component [delete] +func (h *componentHandler) UnRegister(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + component := vars["component"] + addr := vars["addr"] + m := h.svr.GetComponentManager() + err := m.UnRegister(component, addr) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) +} + +// @Tags component +// @Summary List all component addresses +// @Produce json +// @Success 200 {object} Addresses +// @Router /component [get] +func (h *componentHandler) GetAllAddress(w http.ResponseWriter, r *http.Request) { + m := h.svr.GetComponentManager() + addrs := m.GetAllComponentAddrs() + h.rd.JSON(w, http.StatusOK, addrs) +} + +// @Tags component +// @Summary List component addresses +// @Produce json +// @Success 200 {array} string +// @Failure 404 {string} string "The component does not exist." +// @Router /component/{type} [get] +func (h *componentHandler) GetAddress(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + component := vars["type"] + m := h.svr.GetComponentManager() + addrs := m.GetComponentAddrs(component) + + if len(addrs) == 0 { + h.rd.JSON(w, http.StatusNotFound, "component not found") + return + } + h.rd.JSON(w, http.StatusOK, addrs) +} diff --git a/server/api/component_test.go b/server/api/component_test.go new file mode 100644 index 00000000000..697e1486b15 --- /dev/null +++ b/server/api/component_test.go @@ -0,0 +1,127 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "encoding/json" + "fmt" + "strings" + + . "github.com/pingcap/check" + "github.com/pingcap/pd/v4/server" +) + +var _ = Suite(&testComponentSuite{}) + +type testComponentSuite struct { + svr *server.Server + cleanup cleanUpFunc + urlPrefix string +} + +func (s *testComponentSuite) SetUpSuite(c *C) { + s.svr, s.cleanup = mustNewServer(c) + mustWaitLeader(c, []*server.Server{s.svr}) + + addr := s.svr.GetAddr() + s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) +} + +func (s *testComponentSuite) TearDownSuite(c *C) { + s.cleanup() +} + +func (s *testComponentSuite) TestComponent(c *C) { + // register not happen + addr := fmt.Sprintf("%s/component", s.urlPrefix) + output := make(map[string][]string) + err := readJSON(addr, &output) + c.Assert(err, IsNil) + c.Assert(len(output), Equals, 0) + + addr1 := fmt.Sprintf("%s/component/c1", s.urlPrefix) + var output1 []string + err = readJSON(addr1, &output) + c.Assert(strings.Contains(err.Error(), "404"), IsTrue) + c.Assert(len(output1), Equals, 0) + + // register 2 c1 and 1 c2 + reqs := []map[string]string{ + {"component": "c1", "addr": "127.0.0.1:1"}, + {"component": "c1", "addr": "127.0.0.1:2"}, + {"component": "c2", "addr": "127.0.0.1:3"}, + {"component": "c3", "addr": "example.com"}, + } + for _, req := range reqs { + postData, err := json.Marshal(req) + c.Assert(err, IsNil) + err = postJSON(addr, postData) + c.Assert(err, IsNil) + } + + // get all addresses + expected := map[string][]string{ + "c1": {"127.0.0.1:1", "127.0.0.1:2"}, + "c2": {"127.0.0.1:3"}, + "c3": {"example.com"}, + } + + output = make(map[string][]string) + err = readJSON(addr, &output) + c.Assert(err, IsNil) + c.Assert(output, DeepEquals, expected) + + // get the specific component addresses + expected1 := []string{"127.0.0.1:1", "127.0.0.1:2"} + var output2 []string + err = readJSON(addr1, &output2) + c.Assert(err, IsNil) + c.Assert(output2, DeepEquals, expected1) + + addr2 := fmt.Sprintf("%s/component/c2", s.urlPrefix) + expected2 := []string{"127.0.0.1:3"} + var output3 []string + err = readJSON(addr2, &output3) + c.Assert(err, IsNil) + c.Assert(output3, DeepEquals, expected2) + + // unregister address + addr3 := fmt.Sprintf("%s/component/c1/127.0.0.1:1", s.urlPrefix) + res, err := doDelete(addr3) + c.Assert(err, IsNil) + c.Assert(res.StatusCode, Equals, 200) + + expected3 := map[string][]string{ + "c1": {"127.0.0.1:2"}, + "c2": {"127.0.0.1:3"}, + "c3": {"example.com"}, + } + output = make(map[string][]string) + err = readJSON(addr, &output) + c.Assert(err, IsNil) + c.Assert(output, DeepEquals, expected3) + + addr4 := fmt.Sprintf("%s/component/c1/127.0.0.1:2", s.urlPrefix) + res, err = doDelete(addr4) + c.Assert(err, IsNil) + c.Assert(res.StatusCode, Equals, 200) + expected4 := map[string][]string{ + "c2": {"127.0.0.1:3"}, + "c3": {"example.com"}, + } + output = make(map[string][]string) + err = readJSON(addr, &output) + c.Assert(err, IsNil) + c.Assert(output, DeepEquals, expected4) +} diff --git a/server/api/config.go b/server/api/config.go index 888318a52f7..5af59c36d56 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -21,16 +21,12 @@ import ( "reflect" "strings" - "github.com/BurntSushi/toml" "github.com/pingcap/errcode" - "github.com/pingcap/kvproto/pkg/configpb" - "github.com/pingcap/log" "github.com/pingcap/pd/v4/pkg/apiutil" "github.com/pingcap/pd/v4/server" "github.com/pingcap/pd/v4/server/config" "github.com/pkg/errors" "github.com/unrolled/render" - "go.uber.org/zap" ) type confHandler struct { @@ -82,28 +78,6 @@ func (h *confHandler) GetDefault(w http.ResponseWriter, r *http.Request) { // @Failure 503 {string} string "PD server has no leader." // @Router /config [post] func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { - if h.svr.GetConfig().EnableDynamicConfig { - cm := h.svr.GetConfigManager() - m := make(map[string]interface{}) - json.NewDecoder(r.Body).Decode(&m) - entries, err := transToEntries(m) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - client := h.svr.GetConfigClient() - if client == nil { - h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") - return - } - err = redirectUpdateReq(h.svr.Context(), client, cm, entries) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - h.rd.JSON(w, http.StatusOK, nil) - return - } config := h.svr.GetConfig() data, err := ioutil.ReadAll(r.Body) r.Body.Close() @@ -212,28 +186,6 @@ func (h *confHandler) GetSchedule(w http.ResponseWriter, r *http.Request) { // @Failure 503 {string} string "PD server has no leader." // @Router /config/schedule [post] func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) { - if h.svr.GetConfig().EnableDynamicConfig { - cm := h.svr.GetConfigManager() - m := make(map[string]interface{}) - json.NewDecoder(r.Body).Decode(&m) - entries, err := transToEntries(m) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - client := h.svr.GetConfigClient() - if client == nil { - h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") - return - } - err = redirectUpdateReq(h.svr.Context(), client, cm, entries) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - h.rd.JSON(w, http.StatusOK, nil) - return - } config := h.svr.GetScheduleConfig() if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil { return @@ -266,29 +218,6 @@ func (h *confHandler) GetReplication(w http.ResponseWriter, r *http.Request) { // @Failure 503 {string} string "PD server has no leader." // @Router /config/replicate [post] func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) { - if h.svr.GetConfig().EnableDynamicConfig { - cm := h.svr.GetConfigManager() - m := make(map[string]interface{}) - json.NewDecoder(r.Body).Decode(&m) - entries, err := transToEntries(m) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - client := h.svr.GetConfigClient() - if client == nil { - h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") - return - } - - err = redirectUpdateReq(h.svr.Context(), client, cm, entries) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - h.rd.JSON(w, http.StatusOK, nil) - return - } config := h.svr.GetReplicationConfig() if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil { return @@ -326,56 +255,6 @@ func (h *confHandler) SetLabelProperty(w http.ResponseWriter, r *http.Request) { return } - if h.svr.GetConfig().EnableDynamicConfig { - cm := h.svr.GetConfigManager() - typ := input["type"] - labelKey, labelValue := input["label-key"], input["label-value"] - cfg := h.svr.GetPersistOptions().LoadLabelPropertyConfig().Clone() - switch input["action"] { - case "set": - for _, l := range cfg[typ] { - if l.Key == labelKey && l.Value == labelValue { - return - } - } - cfg[typ] = append(cfg[typ], config.StoreLabel{Key: labelKey, Value: labelValue}) - case "delete": - oldLabels := cfg[typ] - cfg[typ] = []config.StoreLabel{} - for _, l := range oldLabels { - if l.Key == labelKey && l.Value == labelValue { - continue - } - cfg[typ] = append(cfg[typ], l) - } - if len(cfg[typ]) == 0 { - delete(cfg, typ) - } - default: - err := errors.Errorf("unknown action %v", input["action"]) - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - var buf bytes.Buffer - if err := toml.NewEncoder(&buf).Encode(cfg); err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - entries := []*entry{{key: "label-property", value: buf.String()}} - client := h.svr.GetConfigClient() - if client == nil { - h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") - return - } - err := redirectUpdateReq(h.svr.Context(), client, cm, entries) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - h.rd.JSON(w, http.StatusOK, nil) - return - } - var err error switch input["action"] { case "set": @@ -421,23 +300,6 @@ func (h *confHandler) SetClusterVersion(w http.ResponseWriter, r *http.Request) return } - if h.svr.GetConfig().EnableDynamicConfig { - kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}} - cm := h.svr.GetConfigManager() - v := &configpb.Version{Global: cm.GetGlobalVersion(cm.GetGlobalConfigs(server.Component))} - entry := &configpb.ConfigEntry{Name: "cluster-version", Value: version} - client := h.svr.GetConfigClient() - if client == nil { - h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") - } - _, _, err := h.svr.GetConfigClient().Update(h.svr.Context(), v, kind, []*configpb.ConfigEntry{entry}) - if err != nil { - log.Error("update cluster version meet error", zap.Error(err)) - } - h.rd.JSON(w, http.StatusOK, nil) - return - } - err := h.svr.SetClusterVersion(version) if err != nil { apiutil.ErrorResp(h.rd, w, errcode.NewInternalErr(err)) diff --git a/server/api/config_test.go b/server/api/config_test.go index bd381ef0b0e..7957869439a 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -33,14 +33,11 @@ type testConfigSuite struct { } func (s *testConfigSuite) SetUpSuite(c *C) { - server.ConfigCheckInterval = 10 * time.Millisecond - s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableDynamicConfig = true }) + s.svr, s.cleanup = mustNewServer(c) mustWaitLeader(c, []*server.Server{s.svr}) addr := s.svr.GetAddr() s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) - // make sure the config client is initialized - time.Sleep(20 * time.Millisecond) } func (s *testConfigSuite) TearDownSuite(c *C) { @@ -75,7 +72,6 @@ func (s *testConfigSuite) TestConfigAll(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) newCfg := &config.Config{} err = readJSON(addr, newCfg) c.Assert(err, IsNil) @@ -97,7 +93,6 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) sc1 := &config.ScheduleConfig{} c.Assert(readJSON(addr, sc1), IsNil) c.Assert(*sc, DeepEquals, *sc1) @@ -124,7 +119,6 @@ func (s *testConfigSuite) TestConfigReplication(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) rc3 := &config.ReplicationConfig{} err = readJSON(addr, rc3) c.Assert(err, IsNil) @@ -153,7 +147,6 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) { for _, cmd := range cmds { err := postJSON(addr, []byte(cmd)) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) } cfg = loadProperties() @@ -171,7 +164,6 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) { for _, cmd := range cmds { err := postJSON(addr, []byte(cmd)) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) } cfg = loadProperties() @@ -204,7 +196,6 @@ func (s *testConfigSuite) TestConfigDefault(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) addr = fmt.Sprintf("%s/config/default", s.urlPrefix) defaultCfg := &config.Config{} err = readJSON(addr, defaultCfg) diff --git a/server/api/label_test.go b/server/api/label_test.go index 2e37a658e1d..d75c86c7900 100644 --- a/server/api/label_test.go +++ b/server/api/label_test.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "strings" - "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -108,10 +107,8 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) { }, } - server.ConfigCheckInterval = 10 * time.Millisecond s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.StrictlyMatchLabel = false - cfg.EnableDynamicConfig = true }) mustWaitLeader(c, []*server.Server{s.svr}) @@ -122,8 +119,6 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) { for _, store := range s.stores { mustPutStore(c, s.svr, store.Id, store.State, store.Labels) } - // make sure the config client is initialized - time.Sleep(20 * time.Millisecond) } func (s *testLabelsStoreSuite) TearDownSuite(c *C) { @@ -138,7 +133,6 @@ func (s *testLabelsStoreSuite) TestLabelsGet(c *C) { } func (s *testLabelsStoreSuite) TestStoresLabelFilter(c *C) { - var table = []struct { name, value string want []*metapb.Store @@ -190,11 +184,9 @@ type testStrictlyLabelsStoreSuite struct { } func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) { - server.ConfigCheckInterval = 10 * time.Millisecond s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.LocationLabels = []string{"zone", "disk"} cfg.Replication.StrictlyMatchLabel = true - cfg.EnableDynamicConfig = true }) mustWaitLeader(c, []*server.Server{s.svr}) @@ -202,8 +194,6 @@ func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) { s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) mustBootstrapCluster(c, s.svr) - // make sure the config client is initialized - time.Sleep(20 * time.Millisecond) } func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { @@ -288,7 +278,6 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { // enable placement rules. Report no error any more. c.Assert(postJSON(fmt.Sprintf("%s/config", s.urlPrefix), []byte(`{"enable-placement-rules":"true"}`)), IsNil) - time.Sleep(20 * time.Millisecond) for _, t := range cases { _, err := s.svr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, diff --git a/server/api/log.go b/server/api/log.go index 97d95bf0c7d..c9cd0edf830 100644 --- a/server/api/log.go +++ b/server/api/log.go @@ -15,7 +15,6 @@ package api import ( "encoding/json" - "fmt" "io/ioutil" "net/http" @@ -48,24 +47,6 @@ func newlogHandler(svr *server.Server, rd *render.Render) *logHandler { // @Failure 503 {string} string "PD server has no leader." // @Router /admin/log [post] func (h *logHandler) Handle(w http.ResponseWriter, r *http.Request) { - if h.svr.GetConfig().EnableDynamicConfig { - cm := h.svr.GetConfigManager() - var str string - json.NewDecoder(r.Body).Decode(&str) - entries := []*entry{{key: "log.level", value: fmt.Sprintf("level = \"%v\"", str)}} - client := h.svr.GetConfigClient() - if client == nil { - h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") - return - } - err := redirectUpdateReq(h.svr.Context(), client, cm, entries) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - h.rd.JSON(w, http.StatusOK, nil) - return - } var level string data, err := ioutil.ReadAll(r.Body) r.Body.Close() diff --git a/server/api/middleware.go b/server/api/middleware.go index 396216da0f3..0d47750fb9d 100644 --- a/server/api/middleware.go +++ b/server/api/middleware.go @@ -14,31 +14,13 @@ package api import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" "net/http" - "net/url" - "reflect" - "strings" - "github.com/BurntSushi/toml" - "github.com/gogo/protobuf/jsonpb" - "github.com/gorilla/mux" - "github.com/pingcap/kvproto/pkg/configpb" "github.com/pingcap/pd/v4/server" "github.com/pingcap/pd/v4/server/cluster" - "github.com/pingcap/pd/v4/server/config" - "github.com/pkg/errors" "github.com/unrolled/render" ) -const ( - localKind = "local" - globalKind = "global" -) - type clusterMiddleware struct { s *server.Server rd *render.Render @@ -62,227 +44,3 @@ func (m clusterMiddleware) Middleware(h http.Handler) http.Handler { h.ServeHTTP(w, r.WithContext(ctx)) }) } - -type entry struct { - key string - value string -} - -type componentMiddleware struct { - s *server.Server - rd *render.Render -} - -func newComponentMiddleware(s *server.Server) componentMiddleware { - return componentMiddleware{ - s: s, - rd: render.New(render.Options{IndentJSON: true}), - } -} - -func (m componentMiddleware) Middleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var statusCode int - var err error - switch r.Method { - case "POST": - r, statusCode, err = handleComponentPost(m.s, r) - if err != nil { - m.rd.JSON(w, statusCode, err.Error()) - return - } - case "GET": - r, statusCode, err = handleComponentGet(m.s, r) - if err != nil { - m.rd.JSON(w, statusCode, err.Error()) - return - } - case "DELETE": - r, statusCode, err = handleComponentDelete(m.s, r) - if err != nil { - m.rd.JSON(w, statusCode, err.Error()) - return - } - } - next.ServeHTTP(w, r) - }) -} - -func getComponentInfo(req map[string]interface{}) string { - var componentInfo string - if c, ok := req["componentInfo"]; ok { - componentInfo = c.(string) - } else { - componentInfo = "" - } - delete(req, "componentInfo") - return componentInfo -} - -func transToEntries(req map[string]interface{}) ([]*entry, error) { - mapKeys := reflect.ValueOf(req).MapKeys() - var entries []*entry - for _, k := range mapKeys { - if config.IsDeprecated(k.String()) { - return nil, errors.New("config item has already been deprecated") - } - itemMap := make(map[string]interface{}) - itemMap[k.String()] = req[k.String()] - var buf bytes.Buffer - if err := toml.NewEncoder(&buf).Encode(itemMap); err != nil { - return nil, err - } - value := buf.String() - key := findTag(reflect.TypeOf(&config.Config{}).Elem(), k.String()) - if key == "" { - return nil, errors.New("config item not found") - } - entries = append(entries, &entry{key, value}) - } - return entries, nil -} - -func findTag(t reflect.Type, tag string) string { - for i := 0; i < t.NumField(); i++ { - field := t.Field(i) - - column := field.Tag.Get("json") - c := strings.Split(column, ",") - if c[0] == tag { - return c[0] - } - - if field.Type.Kind() == reflect.Struct { - path := findTag(field.Type, tag) - if path == "" { - continue - } - return field.Tag.Get("json") + "." + path - } - } - return "" -} - -func updateBody(s *server.Server, component, componentID string, kind string, entries []*entry) (string, error) { - clusterID := s.ClusterID() - var configEntries []*configpb.ConfigEntry - for _, e := range entries { - configEntry := &configpb.ConfigEntry{Name: e.key, Value: e.value} - configEntries = append(configEntries, configEntry) - } - var version *configpb.Version - var k *configpb.ConfigKind - cm := s.GetConfigManager() - switch kind { - case localKind: - version = cm.GetLocalVersion(cm.GetLocalConfig(component, componentID)) - k = &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: componentID}}} - case globalKind: - version = &configpb.Version{Global: cm.GetGlobalVersion(cm.GetGlobalConfigs(component))} - k = &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: component}}} - default: - return "", errors.New("no valid kind") - } - - req := &configpb.UpdateRequest{ - Header: &configpb.Header{ - ClusterId: clusterID, - }, - Version: version, - Kind: k, - Entries: configEntries, - } - - m := jsonpb.Marshaler{} - return m.MarshalToString(req) -} - -func handleComponentPost(s *server.Server, r *http.Request) (*http.Request, int, error) { - var component, componentID, kind string - req := make(map[string]interface{}) - json.NewDecoder(r.Body).Decode(&req) - cm := s.GetConfigManager() - componentInfo := getComponentInfo(req) - component = cm.GetComponent(componentInfo) - if component == "" { - if strings.Contains(componentInfo, ":") { - return nil, http.StatusBadRequest, errors.Errorf("invalid component id: %v", componentInfo) - } - component = componentInfo - kind = globalKind - } else { - componentID = componentInfo - kind = localKind - } - mapKeys := reflect.ValueOf(req).MapKeys() - var entries []*entry - for _, k := range mapKeys { - var value string - switch req[k.String()].(type) { - case float64, float32: - value = fmt.Sprintf("%f", req[k.String()]) - default: - value = fmt.Sprintf("%v", req[k.String()]) - } - entries = append(entries, &entry{k.String(), value}) - } - str, err := updateBody(s, component, componentID, kind, entries) - if err != nil { - return nil, http.StatusInternalServerError, err - } - u, err := url.ParseRequestURI("/component") - if err != nil { - return nil, http.StatusBadRequest, err - } - r.URL = u - r.Body = ioutil.NopCloser(strings.NewReader(str)) - return r, http.StatusOK, nil -} - -func handleComponentGet(s *server.Server, r *http.Request) (*http.Request, int, error) { - var component string - vars := mux.Vars(r) - varName := "component_id" - componentID, ok := vars[varName] - if !ok { - return nil, http.StatusBadRequest, errors.Errorf("field %s is not present", varName) - } - cm := s.GetConfigManager() - component = cm.GetComponent(componentID) - version := cm.GetLatestVersion(component, componentID) - if component == "" { - return nil, http.StatusBadRequest, errors.Errorf("cannot find component with component ID: %s", componentID) - } - clusterID := s.ClusterID() - getURI := fmt.Sprintf("/component?header.cluster_id=%d&component=%s&component_id=%s&version.local=%d&version.global=%d", - clusterID, component, componentID, version.GetLocal(), version.GetGlobal()) - u, err := url.ParseRequestURI(getURI) - if err != nil { - return nil, http.StatusBadRequest, err - } - r.URL = u - r.Header.Set("Accept", "application/toml") - return r, http.StatusOK, nil -} - -func handleComponentDelete(s *server.Server, r *http.Request) (*http.Request, int, error) { - var component string - vars := mux.Vars(r) - varName := "component_id" - componentID, ok := vars[varName] - if !ok { - return nil, http.StatusBadRequest, errors.Errorf("field %s is not present", varName) - } - cm := s.GetConfigManager() - component = cm.GetComponent(componentID) - version := cm.GetLatestVersion(component, componentID) - clusterID := s.ClusterID() - getURI := fmt.Sprintf("/component?header.cluster_id=%d&kind.local.component_id=%s&version.local=%d&version.global=%d", - clusterID, componentID, version.GetLocal(), version.GetGlobal()) - u, err := url.ParseRequestURI(getURI) - if err != nil { - return nil, http.StatusBadRequest, err - } - r.URL = u - return r, http.StatusOK, nil -} diff --git a/server/api/router.go b/server/api/router.go index 8c313d52d3f..9c08fe4f262 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -15,22 +15,12 @@ package api import ( "context" - "errors" - "io" "net/http" "net/http/pprof" - "net/url" - "github.com/golang/protobuf/proto" "github.com/gorilla/mux" - "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/pingcap/kvproto/pkg/configpb" - "github.com/pingcap/log" "github.com/pingcap/pd/v4/server" "github.com/unrolled/render" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" ) func createStreamingRender() *render.Render { @@ -55,7 +45,7 @@ func createIndentRender() *render.Render { // @license.name Apache 2.0 // @license.url http://www.apache.org/licenses/LICENSE-2.0.html // @BasePath /pd/api/v1 -func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux.Router, func()) { +func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.Router { rd := createIndentRender() rootRouter := mux.NewRouter().PathPrefix(prefix).Subrouter() @@ -187,6 +177,12 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux. replicationModeHandler := newReplicationModeHandler(svr, rd) clusterRouter.HandleFunc("/replication_mode/status", replicationModeHandler.GetStatus) + componentHandler := newComponentHandler(svr, rd) + apiRouter.HandleFunc("/component", componentHandler.Register).Methods("POST") + apiRouter.HandleFunc("/component/{component}/{addr}", componentHandler.UnRegister).Methods("DELETE") + apiRouter.HandleFunc("/component", componentHandler.GetAllAddress).Methods("GET") + apiRouter.HandleFunc("/component/{type}", componentHandler.GetAddress).Methods("GET") + pluginHandler := newPluginHandler(handler, rd) apiRouter.HandleFunc("/plugin", pluginHandler.LoadPlugin).Methods("POST") apiRouter.HandleFunc("/plugin", pluginHandler.UnloadPlugin).Methods("DELETE") @@ -213,89 +209,5 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux. // Deprecated rootRouter.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET") - if svr.GetConfig().EnableDynamicConfig { - apiRouter.HandleFunc("/component/ids/{component}", func(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - varName := vars["component"] - if varName == "all" { - rd.JSON(w, http.StatusOK, svr.GetConfigManager().GetAllComponentIDs()) - } else { - rd.JSON(w, http.StatusOK, svr.GetConfigManager().GetComponentIDs(varName)) - } - }).Methods("GET") - return rootRouter, func() { lazyComponentRouter(ctx, svr, apiRouter) } - } - return rootRouter, nil -} - -func lazyComponentRouter(ctx context.Context, svr *server.Server, apiRouter *mux.Router) { - componentRouter := apiRouter.PathPrefix("/component").Methods("POST", "GET", "DELETE").Subrouter() - CustomForwardResponseOption := func(ctx context.Context, w http.ResponseWriter, pm proto.Message) error { - if _, ok := pm.(*configpb.GetResponse); ok { - str := pm.(*configpb.GetResponse).GetConfig() - w.Write([]byte(str)) - } - return nil - } - gwmux := runtime.NewServeMux( - runtime.WithForwardResponseOption(CustomForwardResponseOption), - runtime.WithMarshalerOption("application/json", &runtime.JSONPb{OrigName: true}), - runtime.WithMarshalerOption("application/toml", &nopMarshaler{}), - ) - UnaryClientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - invoker(ctx, method, req, reply, cc, opts...) - var errMsg string - switch method { - case "/configpb.Config/Update": - errMsg = reply.(*configpb.UpdateResponse).GetStatus().GetMessage() - case "/configpb.Config/Get": - errMsg = reply.(*configpb.GetResponse).GetStatus().GetMessage() - case "/configpb.Config/Delete": - errMsg = reply.(*configpb.DeleteResponse).GetStatus().GetMessage() - } - if errMsg != "" { - return errors.New(errMsg) - } - return nil - } - tlsCfg, err := svr.GetSecurityConfig().ToTLSConfig() - if err != nil { - log.Error("fail to use TLS, use insecure instead", zap.Error(err)) - } - opt := grpc.WithInsecure() - if tlsCfg != nil { - creds := credentials.NewTLS(tlsCfg) - opt = grpc.WithTransportCredentials(creds) - } - opts := []grpc.DialOption{opt, grpc.WithUnaryInterceptor(UnaryClientInterceptor)} - addr := svr.GetAddr() - u, err := url.Parse(addr) - if err != nil { - log.Error("failed to parse url", zap.Error(err)) - return - } - err = configpb.RegisterConfigHandlerFromEndpoint(ctx, gwmux, u.Host+u.Path, opts) - if err != nil { - log.Error("fail to register grpc gateway", zap.Error(err)) - return - } - - componentRouter.Handle("", gwmux).Methods("POST") - componentRouter.Handle("/{component_id}", gwmux).Methods("GET") - componentRouter.Handle("/{component_id}", gwmux).Methods("DELETE") - componentRouter.Use(newComponentMiddleware(svr).Middleware) -} - -type nopMarshaler struct{} - -func (c *nopMarshaler) Marshal(v interface{}) ([]byte, error) { - return nil, nil + return rootRouter } - -func (c *nopMarshaler) Unmarshal(data []byte, v interface{}) error { return nil } - -func (c *nopMarshaler) NewDecoder(r io.Reader) runtime.Decoder { return nil } - -func (c *nopMarshaler) NewEncoder(w io.Writer) runtime.Encoder { return nil } - -func (c *nopMarshaler) ContentType() string { return "application/toml" } diff --git a/server/api/server.go b/server/api/server.go index 67b08c7529e..544b46c7e01 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -32,15 +32,12 @@ func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.S IsCore: true, } router := mux.NewRouter() - r, f := createRouter(ctx, apiPrefix, svr) + r := createRouter(ctx, apiPrefix, svr) router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr), negroni.Wrap(r)), ) - if f != nil { - svr.AddStartCallback(f) - } return router, group, nil } diff --git a/server/api/store_test.go b/server/api/store_test.go index d038a399d01..91873a7c0cb 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/v4/server" - "github.com/pingcap/pd/v4/server/config" "github.com/pingcap/pd/v4/server/core" ) @@ -84,8 +83,7 @@ func (s *testStoreSuite) SetUpSuite(c *C) { Version: "2.0.0", }, } - server.ConfigCheckInterval = 10 * time.Millisecond - s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableDynamicConfig = true }) + s.svr, s.cleanup = mustNewServer(c) mustWaitLeader(c, []*server.Server{s.svr}) addr := s.svr.GetAddr() @@ -96,8 +94,6 @@ func (s *testStoreSuite) SetUpSuite(c *C) { for _, store := range s.stores { mustPutStore(c, s.svr, store.Id, store.State, nil) } - // make sure the config client is initialized - time.Sleep(20 * time.Millisecond) } func (s *testStoreSuite) TearDownSuite(c *C) { @@ -179,7 +175,6 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { lc, _ := json.Marshal(labelCheck) err = postJSON(s.urlPrefix+"/config", lc) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) // Test set. labels := map[string]string{"zone": "cn", "host": "local"} b, err := json.Marshal(labels) @@ -190,7 +185,6 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { ll, _ := json.Marshal(locationLabels) err = postJSON(s.urlPrefix+"/config", ll) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) err = postJSON(url+"/label", b) c.Assert(err, IsNil) @@ -208,7 +202,6 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { err = postJSON(s.urlPrefix+"/config", lc) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) labels = map[string]string{"zack": "zack1", "Host": "host1"} b, err = json.Marshal(labels) c.Assert(err, IsNil) diff --git a/server/api/util.go b/server/api/util.go index 9b742cee95c..5c5fd6df8b2 100644 --- a/server/api/util.go +++ b/server/api/util.go @@ -15,16 +15,11 @@ package api import ( "bytes" - "context" "encoding/json" "io/ioutil" "net/http" "net/url" - "github.com/pingcap/kvproto/pkg/configpb" - pd "github.com/pingcap/pd/v4/client" - "github.com/pingcap/pd/v4/server" - configmanager "github.com/pingcap/pd/v4/server/config_manager" "github.com/pkg/errors" ) @@ -120,21 +115,3 @@ func doDelete(url string) (*http.Response, error) { res.Body.Close() return res, nil } - -func redirectUpdateReq(ctx context.Context, client pd.ConfigClient, cm *configmanager.ConfigManager, entries []*entry) error { - var configEntries []*configpb.ConfigEntry - for _, e := range entries { - configEntry := &configpb.ConfigEntry{Name: e.key, Value: e.value} - configEntries = append(configEntries, configEntry) - } - version := &configpb.Version{Global: cm.GetGlobalVersion(cm.GetGlobalConfigs(server.Component))} - kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}} - status, _, err := client.Update(ctx, version, kind, configEntries) - if err != nil { - return err - } - if status.GetCode() != configpb.StatusCode_OK { - return errors.New(status.GetMessage()) - } - return nil -} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 08f3d037d9c..6889676d9d9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -63,7 +63,6 @@ type Server interface { GetHBStreams() opt.HeartbeatStreams GetRaftCluster() *RaftCluster GetBasicCluster() *core.BasicCluster - GetSchedulersCallback() func() } // RaftCluster is used for cluster config management. @@ -108,9 +107,6 @@ type RaftCluster struct { client *clientv3.Client replicationMode *replication.ModeManager - - schedulersCallback func() - configCheck bool } // Status saves some state information. @@ -180,7 +176,7 @@ func (c *RaftCluster) loadBootstrapTime() (time.Time, error) { } // InitCluster initializes the raft cluster. -func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, storage *core.Storage, basicCluster *core.BasicCluster, cb func()) { +func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, storage *core.Storage, basicCluster *core.BasicCluster) { c.core = basicCluster c.opt = opt c.storage = storage @@ -190,7 +186,6 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s c.prepareChecker = newPrepareChecker() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.hotSpotCache = statistics.NewHotCache() - c.schedulersCallback = cb } // Start starts a cluster. @@ -203,7 +198,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } - c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetSchedulersCallback()) + c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster()) cluster, err := c.LoadClusterInfo() if err != nil { return err @@ -335,7 +330,6 @@ func (c *RaftCluster) Stop() { } c.running = false - c.configCheck = false close(c.quit) c.coordinator.stop() c.Unlock() @@ -621,6 +615,7 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) { c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize) } +//nolint:unused func (c *RaftCluster) getClusterID() uint64 { c.RLock() defer c.RUnlock() @@ -996,20 +991,6 @@ func (c *RaftCluster) AttachAvailableFunc(storeID uint64, limitType storelimit.T c.core.AttachAvailableFunc(storeID, limitType, f) } -// SetConfigCheck sets a flag for preventing outdated config. -func (c *RaftCluster) SetConfigCheck() { - c.Lock() - defer c.Unlock() - c.configCheck = true -} - -// GetConfigCheck returns a configCheck flag. -func (c *RaftCluster) GetConfigCheck() bool { - c.Lock() - defer c.Unlock() - return c.configCheck -} - // SetStoreState sets up a store's state. func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) error { c.Lock() @@ -1233,7 +1214,7 @@ func (c *RaftCluster) AllocID() (uint64, error) { } // OnStoreVersionChange changes the version of the cluster when needed. -func (c *RaftCluster) OnStoreVersionChange() *semver.Version { +func (c *RaftCluster) OnStoreVersionChange() { c.RLock() defer c.RUnlock() var ( @@ -1270,9 +1251,7 @@ func (c *RaftCluster) OnStoreVersionChange() *semver.Version { log.Info("cluster version changed", zap.Stringer("old-cluster-version", clusterVersion), zap.Stringer("new-cluster-version", minVersion)) - return minVersion } - return nil } func (c *RaftCluster) changedRegionNotifier() <-chan *core.RegionInfo { @@ -1486,7 +1465,7 @@ func (c *RaftCluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel func (c *RaftCluster) isPrepared() bool { c.RLock() defer c.RUnlock() - return c.prepareChecker.check(c) && c.configCheck + return c.prepareChecker.check(c) } // GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. @@ -1541,6 +1520,7 @@ func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.Hot // TODO: remove me. // only used in test. +//nolint:unused func (c *RaftCluster) putRegion(region *core.RegionInfo) error { c.Lock() defer c.Unlock() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 5021654850d..aa15a12b852 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -671,7 +671,7 @@ func newTestCluster(opt *config.PersistOptions) *testCluster { func newTestRaftCluster(id id.Allocator, opt *config.PersistOptions, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster { rc := &RaftCluster{} - rc.InitCluster(id, opt, storage, basicCluster, func() {}) + rc.InitCluster(id, opt, storage, basicCluster) return rc } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index a11c14e0f41..15de8bdf219 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -472,8 +472,6 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) go c.runScheduler(s) c.schedulers[s.GetName()] = s c.cluster.opt.AddSchedulerCfg(s.GetType(), args) - c.cluster.schedulersCallback() - return nil } @@ -504,8 +502,6 @@ func (c *coordinator) removeScheduler(name string) error { log.Error("can not remove the scheduler config", zap.Error(err)) } } - - c.cluster.schedulersCallback() return err } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 192ee667db9..28833d8675a 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -289,7 +289,6 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run if setTc != nil { setTc(tc) } - tc.RaftCluster.configCheck = true co := newCoordinator(ctx, tc.RaftCluster, hbStreams) if run != nil { run(co) @@ -339,7 +338,6 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { co.wg.Wait() tc = newTestCluster(opt) - tc.configCheck = true co = newCoordinator(s.ctx, tc.RaftCluster, hbStreams) co.run() diff --git a/server/config/config.go b/server/config/config.go index 109426d4a94..2f673415fcc 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -133,8 +133,6 @@ type Config struct { logger *zap.Logger logProps *log.ZapProperties - EnableDynamicConfig bool `toml:"enable-dynamic-config" json:"enable-dynamic-config"` - Dashboard DashboardConfig `toml:"dashboard" json:"dashboard"` ReplicationMode ReplicationModeConfig `toml:"replication-mode" json:"replication-mode"` @@ -208,8 +206,7 @@ const ( defaultEnableGRPCGateway = true defaultDisableErrorVerbose = true - defaultEnableDynamicConfig = false - defaultDashboardAddress = "auto" + defaultDashboardAddress = "auto" defaultDRWaitStoreTimeout = time.Minute defaultDRWaitSyncTimeout = time.Minute @@ -457,10 +454,6 @@ func (c *Config) Adjust(meta *toml.MetaData) error { c.EnableGRPCGateway = defaultEnableGRPCGateway } - if !configMetaData.IsDefined("enable-dynamic-config") { - c.EnableDynamicConfig = defaultEnableDynamicConfig - } - c.ReplicationMode.adjust(configMetaData.Child("replication-mode")) return nil diff --git a/server/config_manager/config_manager.go b/server/config_manager/config_manager.go deleted file mode 100644 index 74473e5c113..00000000000 --- a/server/config_manager/config_manager.go +++ /dev/null @@ -1,766 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package configmanager - -import ( - "bytes" - "context" - "fmt" - "reflect" - "strconv" - "strings" - "sync" - - "github.com/BurntSushi/toml" - "github.com/pingcap/kvproto/pkg/configpb" - "github.com/pingcap/pd/v4/server/cluster" - "github.com/pingcap/pd/v4/server/config" - "github.com/pingcap/pd/v4/server/core" - "github.com/pingcap/pd/v4/server/member" - "github.com/pkg/errors" -) - -var ( - // errUnknownKind is error info for the kind. - errUnknownKind = func(k *configpb.ConfigKind) string { - return fmt.Sprintf("unknown kind: %v", k.String()) - } - // errEncode is error info for the encode process. - errEncode = func(e error) string { - return fmt.Sprintf("encode error: %v", e) - } - // errDecode is error info for the decode process. - errDecode = func(e error) string { - return fmt.Sprintf("decode error: %v", e) - } - errNotSupported = "not supported" -) - -// Server is the interface for configuration manager. -type Server interface { - IsClosed() bool - ClusterID() uint64 - GetConfig() *config.Config - GetRaftCluster() *cluster.RaftCluster - GetStorage() *core.Storage - GetMember() *member.Member -} - -// ConfigManager is used to manage all components' config. -type ConfigManager struct { - sync.RWMutex - svr Server - // component -> GlobalConfig - GlobalCfgs map[string]*GlobalConfig - // component -> componentID -> LocalConfig - LocalCfgs map[string]map[string]*LocalConfig -} - -// NewConfigManager creates a new ConfigManager. -func NewConfigManager(svr Server) *ConfigManager { - return &ConfigManager{ - svr: svr, - GlobalCfgs: make(map[string]*GlobalConfig), - LocalCfgs: make(map[string]map[string]*LocalConfig), - } -} - -// GetGlobalConfigs returns the global config for a given component. -func (c *ConfigManager) GetGlobalConfigs(component string) *GlobalConfig { - c.RLock() - defer c.RUnlock() - if gc, ok := c.GlobalCfgs[component]; ok { - return gc - } - return nil -} - -// GetLocalConfig returns the local config for a given component and component ID. -func (c *ConfigManager) GetLocalConfig(component, componentID string) *LocalConfig { - c.RLock() - defer c.RUnlock() - if lcs, ok := c.LocalCfgs[component]; ok { - if lc, ok := lcs[componentID]; ok { - return lc - } - } - return nil -} - -// GetComponentIDs returns component IDs for a given component. -func (c *ConfigManager) GetComponentIDs(component string) []string { - c.RLock() - defer c.RUnlock() - var addresses []string - if _, ok := c.LocalCfgs[component]; ok { - for address := range c.LocalCfgs[component] { - addresses = append(addresses, address) - } - } - return addresses -} - -// GetAllComponentIDs returns all components. -func (c *ConfigManager) GetAllComponentIDs() map[string][]string { - c.RLock() - defer c.RUnlock() - var addresses = make(map[string][]string) - for component := range c.LocalCfgs { - for address := range c.LocalCfgs[component] { - addresses[component] = append(addresses[component], address) - } - } - return addresses -} - -// Persist saves the configuration to the storage. -func (c *ConfigManager) Persist(storage *core.Storage) error { - c.Lock() - defer c.Unlock() - return storage.SaveComponentsConfig(c) -} - -// Reload reloads the configuration from the storage. -func (c *ConfigManager) Reload(storage *core.Storage) error { - c.Lock() - defer c.Unlock() - _, err := storage.LoadComponentsConfig(c) - return err -} - -// GetComponent returns the component from a given component ID. -func (c *ConfigManager) GetComponent(id string) string { - c.RLock() - defer c.RUnlock() - return c.getComponentLocked(id) -} - -func (c *ConfigManager) getComponentLocked(id string) string { - for component, cfgs := range c.LocalCfgs { - if _, ok := cfgs[id]; ok { - return component - } - } - return "" -} - -// GetGlobalVersion returns the global version. -func (c *ConfigManager) GetGlobalVersion(gc *GlobalConfig) uint64 { - c.RLock() - defer c.RUnlock() - return gc.getVersionLocked() -} - -// GetLocalVersion returns the local version. -func (c *ConfigManager) GetLocalVersion(lc *LocalConfig) *configpb.Version { - c.RLock() - defer c.RUnlock() - return lc.getVersionLocked() -} - -// GetAllConfig returns all configs in the config manager. -func (c *ConfigManager) GetAllConfig(ctx context.Context) ([]*configpb.LocalConfig, *configpb.Status) { - c.RLock() - defer c.RUnlock() - localConfigs := make([]*configpb.LocalConfig, 0, 8) - for component, localCfg := range c.LocalCfgs { - for componentID, cfg := range localCfg { - config, err := encodeConfigs(cfg.getConfigs()) - if err != nil { - return nil, &configpb.Status{ - Code: configpb.StatusCode_UNKNOWN, - Message: errEncode(err), - } - } - localConfigs = append(localConfigs, &configpb.LocalConfig{ - Version: cfg.getVersionLocked(), - Component: component, - ComponentId: componentID, - Config: config, - }) - } - } - - return localConfigs, &configpb.Status{Code: configpb.StatusCode_OK} -} - -// GetConfig returns config and the latest version. -func (c *ConfigManager) GetConfig(version *configpb.Version, component, componentID string) (*configpb.Version, string, *configpb.Status) { - c.RLock() - defer c.RUnlock() - var config string - var err error - var status *configpb.Status - var localCfgs map[string]*LocalConfig - var cfg *LocalConfig - var ok bool - - if localCfgs, ok = c.LocalCfgs[component]; !ok { - return c.getLatestVersionLocked(component, componentID), config, &configpb.Status{Code: configpb.StatusCode_COMPONENT_NOT_FOUND} - } - - if cfg, ok = localCfgs[componentID]; !ok { - return c.getLatestVersionLocked(component, componentID), config, &configpb.Status{Code: configpb.StatusCode_COMPONENT_ID_NOT_FOUND} - } - - config, err = c.getComponentCfg(component, componentID) - if err != nil { - return version, "", &configpb.Status{ - Code: configpb.StatusCode_UNKNOWN, - Message: errEncode(err), - } - } - if versionEqual(cfg.getVersionLocked(), version) { - status = &configpb.Status{Code: configpb.StatusCode_OK} - } else { - status = &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} - } - - return c.getLatestVersionLocked(component, componentID), config, status -} - -// CreateConfig is used for registering a component to PD. -func (c *ConfigManager) CreateConfig(version *configpb.Version, component, componentID, cfg string) (*configpb.Version, string, *configpb.Status) { - c.Lock() - defer c.Unlock() - - var status *configpb.Status - latestVersion := c.getLatestVersionLocked(component, componentID) - initVersion := &configpb.Version{Local: 0, Global: 0} - if localCfgs, ok := c.LocalCfgs[component]; ok { - if local, ok := localCfgs[componentID]; ok { - // restart a component - local.updateLocalConfig(cfg) - if versionEqual(initVersion, version) { - status = &configpb.Status{Code: configpb.StatusCode_OK} - } else { - status = &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} - } - } else { - // add a new component - lc, err := NewLocalConfig(cfg, initVersion) - if err != nil { - status = &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: errDecode(err)} - } else { - localCfgs[componentID] = lc - status = &configpb.Status{Code: configpb.StatusCode_OK} - } - } - } else { - c.LocalCfgs[component] = make(map[string]*LocalConfig) - // start the first component - lc, err := NewLocalConfig(cfg, initVersion) - if err != nil { - status = &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: errDecode(err)} - } else { - c.LocalCfgs[component][componentID] = lc - status = &configpb.Status{Code: configpb.StatusCode_OK} - } - } - - // Apply global config to new component - globalCfg := c.GlobalCfgs[component] - if globalCfg != nil { - entries := globalCfg.getConfigEntries() - if err := c.applyGlobalConifg(globalCfg, component, globalCfg.getVersionLocked(), entries); err != nil { - return latestVersion, "", &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} - } - } - - config, err := c.getComponentCfg(component, componentID) - if err != nil { - status = &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: errEncode(err)} - return latestVersion, "", status - } - - return latestVersion, config, status -} - -// GetLatestVersion returns the latest version of config for a given a component ID. -func (c *ConfigManager) GetLatestVersion(component, componentID string) *configpb.Version { - c.RLock() - defer c.RUnlock() - return c.getLatestVersionLocked(component, componentID) -} - -func (c *ConfigManager) getLatestVersionLocked(component, componentID string) *configpb.Version { - v := &configpb.Version{ - Global: c.GlobalCfgs[component].getVersionLocked(), - Local: c.LocalCfgs[component][componentID].getVersionLocked().GetLocal(), - } - return v -} - -func (c *ConfigManager) getComponentCfg(component, componentID string) (string, error) { - config := c.LocalCfgs[component][componentID].getConfigs() - return encodeConfigs(config) -} - -// UpdateConfig is used to update a config with a given config type. -func (c *ConfigManager) UpdateConfig(kind *configpb.ConfigKind, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { - c.Lock() - defer c.Unlock() - - global := kind.GetGlobal() - if global != nil { - return c.updateGlobalLocked(global.GetComponent(), version, entries) - } - - local := kind.GetLocal() - if local != nil { - return c.updateLocal(local.GetComponentId(), version, entries) - } - return &configpb.Version{Global: 0, Local: 0}, &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: errUnknownKind(kind)} -} - -// applyGlobalConifg applies the global change to each local component. -func (c *ConfigManager) applyGlobalConifg(globalCfg *GlobalConfig, component string, newGlobalVersion uint64, entries []*configpb.ConfigEntry) error { - // get the global config - updateEntries := make(map[string]*EntryValue) - for _, entry := range entries { - globalCfg.updateEntry(entry, &configpb.Version{Global: newGlobalVersion, Local: 0}) - } - - globalUpdateEntries := c.GlobalCfgs[component].getUpdateEntries() - for k, v := range globalUpdateEntries { - updateEntries[k] = v - } - // update all local config - // merge the global config with each local config and update it - for _, LocalCfg := range c.LocalCfgs[component] { - if wrongEntry, err := mergeAndUpdateConfig(LocalCfg, updateEntries); err != nil { - c.deleteEntry(component, wrongEntry) - return err - } - LocalCfg.Version = &configpb.Version{Global: newGlobalVersion, Local: 0} - } - - // update the global version - globalCfg.Version = newGlobalVersion - return nil -} - -// UpdateGlobal is used to update the global config. -func (c *ConfigManager) UpdateGlobal(component string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { - c.Lock() - defer c.Unlock() - return c.updateGlobalLocked(component, version, entries) -} - -func (c *ConfigManager) updateGlobalLocked(component string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { - // if the global config of the component is existed. - if globalCfg, ok := c.GlobalCfgs[component]; ok { - globalLatestVersion := globalCfg.getVersionLocked() - if globalLatestVersion != version.GetGlobal() { - return &configpb.Version{Global: globalLatestVersion, Local: version.GetLocal()}, - &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} - } - if err := c.applyGlobalConifg(globalCfg, component, version.GetGlobal()+1, entries); err != nil { - return &configpb.Version{Global: globalLatestVersion, Local: version.GetLocal()}, - &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} - } - } else { - // The global version of first global update should be 0. - if version.GetGlobal() != 0 { - return &configpb.Version{Global: 0, Local: 0}, - &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} - } - - globalCfg := NewGlobalConfig(entries, &configpb.Version{Global: 0, Local: 0}) - c.GlobalCfgs[component] = globalCfg - - if err := c.applyGlobalConifg(globalCfg, component, 1, entries); err != nil { - return &configpb.Version{Global: 0, Local: version.GetLocal()}, - &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} - } - } - return &configpb.Version{Global: c.GlobalCfgs[component].getVersionLocked(), Local: 0}, &configpb.Status{Code: configpb.StatusCode_OK} -} - -func mergeAndUpdateConfig(localCfg *LocalConfig, updateEntries map[string]*EntryValue) (string, error) { - config := localCfg.getConfigs() - newUpdateEntries := make(map[string]*EntryValue) - for k, v := range updateEntries { - newUpdateEntries[k] = v - } - - // apply the local change to updateEntries - for k1, v1 := range localCfg.getUpdateEntries() { - if v, ok := newUpdateEntries[k1]; ok { - // apply conflict - if v1.Version.GetGlobal() == v.Version.GetGlobal() { - newUpdateEntries[k1] = v1 - } - } else { - newUpdateEntries[k1] = v1 - } - } - - for k, v := range newUpdateEntries { - configName := strings.Split(k, ".") - if err := update(config, configName, v.Value); err != nil { - return k, err - } - } - return "", nil -} - -func (c *ConfigManager) updateLocal(componentID string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { - component := c.getComponentLocked(componentID) - if component == "" { - return &configpb.Version{Global: 0, Local: 0}, &configpb.Status{Code: configpb.StatusCode_COMPONENT_NOT_FOUND} - } - updateEntries := make(map[string]*EntryValue) - if _, ok := c.GlobalCfgs[component]; ok { - globalUpdateEntries := c.GlobalCfgs[component].getUpdateEntries() - for k, v := range globalUpdateEntries { - updateEntries[k] = v - } - } - if localCfg, ok := c.LocalCfgs[component][componentID]; ok { - localLatestVersion := localCfg.getVersionLocked() - if !versionEqual(localLatestVersion, version) { - return localLatestVersion, &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} - } - for _, entry := range entries { - localCfg.updateEntry(entry, version) - } - if wrongEntry, err := mergeAndUpdateConfig(localCfg, updateEntries); err != nil { - c.deleteEntry(component, wrongEntry) - return localLatestVersion, &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} - } - localCfg.Version = &configpb.Version{Global: version.GetGlobal(), Local: version.GetLocal() + 1} - } else { - return version, &configpb.Status{Code: configpb.StatusCode_COMPONENT_ID_NOT_FOUND} - } - return c.LocalCfgs[component][componentID].getVersionLocked(), &configpb.Status{Code: configpb.StatusCode_OK} -} - -func (c *ConfigManager) deleteEntry(component, e string) { - if globalCfg, ok := c.GlobalCfgs[component]; ok { - delete(globalCfg.UpdateEntries, e) - } - for _, localCfg := range c.LocalCfgs[component] { - delete(localCfg.UpdateEntries, e) - } -} - -// DeleteConfig removes a component from the config manager. -func (c *ConfigManager) DeleteConfig(kind *configpb.ConfigKind, version *configpb.Version) *configpb.Status { - c.Lock() - defer c.Unlock() - - global := kind.GetGlobal() - if global != nil { - return c.deleteGlobal(global.GetComponent(), version) - } - - local := kind.GetLocal() - if local != nil { - return c.deleteLocal(local.GetComponentId(), version) - } - - return &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: errUnknownKind(kind)} -} - -func (c *ConfigManager) deleteGlobal(component string, version *configpb.Version) *configpb.Status { - // TODO: Add delete global - return &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: errNotSupported} -} - -func (c *ConfigManager) deleteLocal(componentID string, version *configpb.Version) *configpb.Status { - component := c.getComponentLocked(componentID) - if component == "" { - return &configpb.Status{Code: configpb.StatusCode_COMPONENT_NOT_FOUND} - } - if localCfg, ok := c.LocalCfgs[component][componentID]; ok { - localLatestVersion := localCfg.getVersionLocked() - if !versionEqual(localLatestVersion, version) { - return &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} - } - delete(c.LocalCfgs[component], componentID) - } else { - return &configpb.Status{Code: configpb.StatusCode_COMPONENT_ID_NOT_FOUND} - } - return &configpb.Status{Code: configpb.StatusCode_OK} -} - -// EntryValue is composed by version and value. -type EntryValue struct { - Version *configpb.Version - Value string -} - -// NewEntryValue creates a new EntryValue. -func NewEntryValue(e *configpb.ConfigEntry, version *configpb.Version) *EntryValue { - return &EntryValue{ - Version: version, - Value: e.GetValue(), - } -} - -// GlobalConfig is used to manage the global config of components. -type GlobalConfig struct { - Version uint64 - UpdateEntries map[string]*EntryValue -} - -// NewGlobalConfig create a new GlobalConfig. -func NewGlobalConfig(entries []*configpb.ConfigEntry, version *configpb.Version) *GlobalConfig { - updateEntries := make(map[string]*EntryValue) - for _, entry := range entries { - updateEntries[entry.GetName()] = NewEntryValue(entry, version) - } - return &GlobalConfig{ - Version: version.GetGlobal(), - UpdateEntries: updateEntries, - } -} - -func (gc *GlobalConfig) updateEntry(entry *configpb.ConfigEntry, version *configpb.Version) { - entries := gc.getUpdateEntries() - entries[entry.GetName()] = NewEntryValue(entry, version) -} - -func (gc *GlobalConfig) getVersionLocked() uint64 { - if gc == nil { - return 0 - } - return gc.Version -} - -// GetUpdateEntries returns a map of global entries which needs to be update. -func (gc *GlobalConfig) getUpdateEntries() map[string]*EntryValue { - return gc.UpdateEntries -} - -// GetConfigEntries returns config entries. -func (gc *GlobalConfig) getConfigEntries() []*configpb.ConfigEntry { - var entries []*configpb.ConfigEntry - for k, v := range gc.UpdateEntries { - entries = append(entries, &configpb.ConfigEntry{Name: k, Value: v.Value}) - } - return entries -} - -// LocalConfig is used to manage the local config of a component. -type LocalConfig struct { - Version *configpb.Version - UpdateEntries map[string]*EntryValue - Configs map[string]interface{} -} - -// NewLocalConfig create a new LocalConfig. -func NewLocalConfig(cfg string, version *configpb.Version) (*LocalConfig, error) { - configs := make(map[string]interface{}) - if err := decodeConfigs(cfg, configs); err != nil { - return nil, err - } - updateEntries := make(map[string]*EntryValue) - return &LocalConfig{ - Version: version, - UpdateEntries: updateEntries, - Configs: configs, - }, nil -} - -// GetUpdateEntries returns a map of local entries which needs to be update. -func (lc *LocalConfig) getUpdateEntries() map[string]*EntryValue { - return lc.UpdateEntries -} - -func (lc *LocalConfig) updateEntry(entry *configpb.ConfigEntry, version *configpb.Version) { - entries := lc.getUpdateEntries() - entries[entry.GetName()] = NewEntryValue(entry, version) -} - -// updateLocalConfig updates a LocalConfig when there is a new config item. -func (lc *LocalConfig) updateLocalConfig(cfg string) error { - new := make(map[string]interface{}) - if err := decodeConfigs(cfg, new); err != nil { - return err - } - old := lc.getConfigs() - updateItem(new, old) - return nil -} - -func updateItem(new, old map[string]interface{}) { - for key := range new { - if sub, ok := old[key]; ok { - oldSub, ok := sub.(map[string]interface{}) - newSub, ok1 := new[key].(map[string]interface{}) - if ok && ok1 { - updateItem(newSub, oldSub) - } - } else { - old[key] = new[key] - } - } -} - -func (lc *LocalConfig) getVersionLocked() *configpb.Version { - if lc == nil { - return nil - } - return lc.Version -} - -func (lc *LocalConfig) getConfigs() map[string]interface{} { - return lc.Configs -} - -func update(config map[string]interface{}, configName []string, value string) error { - if len(configName) > 1 { - sub, ok := config[configName[0]] - if !ok { - return errors.Errorf("cannot find the config item: %v", configName[0]) - } - s, ok := sub.(map[string]interface{}) - if ok { - return update(s, configName[1:], value) - } - } - - _, ok := config[configName[0]] - if !ok { - // TODO: remove it - if configName[0] != "schedulers-v2" { - return errors.Errorf("cannot find the config item: %v", configName[0]) - } - } - - container := make(map[string]interface{}) - - // TODO: remove it - if configName[0] == "cluster-version" { - cv, err := cluster.ParseVersion(value) - if err != nil { - return errors.Errorf("failed to parse version: %v", err.Error()) - } - container[configName[0]] = cv - } else if configName[0] == "schedulers" { - var tmp map[string]interface{} - _, err := toml.Decode(value, &tmp) - if err != nil { - return errors.Errorf("failed to decode schedulers: %v", err.Error()) - } - config[configName[0]] = tmp["schedulers"] - return nil - } else if _, err := toml.Decode(value, &container); err != nil { - if !strings.Contains(err.Error(), "bare keys") { - return errors.Errorf("failed to decode value: %v", err.Error()) - } - container[configName[0]] = value - } else if configName[0] == "label-property" { - config[configName[0]] = container - return nil - } - - v, err := getUpdateValue(config[configName[0]], container[configName[0]]) - if err != nil { - return err - } - config[configName[0]] = v - return nil -} - -func getUpdateValue(item, updateItem interface{}) (interface{}, error) { - var err error - var v interface{} - var tmp float64 - t := reflect.TypeOf(item) - // It is used to handle "schedulers-v2". - if t == nil { - return v, nil - } - switch t.Kind() { - case reflect.Bool: - switch t1 := updateItem.(type) { - case string: - v, err = strconv.ParseBool(updateItem.(string)) - case bool: - v = updateItem - default: - return nil, errors.Errorf("unexpected type: %T\n", t1) - } - case reflect.Int64: - switch t1 := updateItem.(type) { - case string: - tmp, err = strconv.ParseFloat(updateItem.(string), 64) - v = int64(tmp) - case float64: - v = int64(updateItem.(float64)) - case int64: - v = updateItem - default: - return nil, errors.Errorf("unexpected type: %T\n", t1) - } - case reflect.Slice: - if item, ok := updateItem.(string); ok { - strSlice := strings.Split(item, ",") - var slice []interface{} - for _, str := range strSlice { - slice = append(slice, str) - } - v = slice - } else { - return nil, errors.Errorf("%v cannot cast to string", updateItem) - } - case reflect.Float64: - switch t1 := updateItem.(type) { - case string: - v, err = strconv.ParseFloat(updateItem.(string), 64) - case float64: - v = updateItem - default: - return nil, errors.Errorf("unexpected type: %T\n", t1) - } - case reflect.String: - switch t1 := updateItem.(type) { - case string: - v = updateItem - default: - return nil, errors.Errorf("unexpected type: %T\n", t1) - } - case reflect.Map, reflect.Struct, reflect.Ptr: - v = updateItem - default: - return nil, errors.Errorf("unsupported type: %T\n", t.Kind()) - } - - if err != nil { - return nil, err - } - return v, nil -} - -func encodeConfigs(configs map[string]interface{}) (string, error) { - buf := new(bytes.Buffer) - if err := toml.NewEncoder(buf).Encode(configs); err != nil { - return "", err - } - return buf.String(), nil -} - -func decodeConfigs(cfg string, configs map[string]interface{}) error { - if _, err := toml.Decode(cfg, &configs); err != nil { - return err - } - return nil -} - -func versionEqual(a, b *configpb.Version) bool { - return a.GetGlobal() == b.GetGlobal() && a.GetLocal() == b.GetLocal() -} diff --git a/server/config_manager/config_manager_test.go b/server/config_manager/config_manager_test.go deleted file mode 100644 index 3504c26467b..00000000000 --- a/server/config_manager/config_manager_test.go +++ /dev/null @@ -1,621 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package configmanager - -import ( - "context" - "strings" - "testing" - - . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/configpb" - "github.com/pingcap/pd/v4/server/core" - "github.com/pingcap/pd/v4/server/kv" -) - -func Test(t *testing.T) { - TestingT(t) -} - -var _ = Suite(&testComponentsConfigSuite{}) - -type testComponentsConfigSuite struct{} - -func (s *testComponentsConfigSuite) TestDecodeAndEncode(c *C) { - cfgData := ` -log-level = "debug" -panic-when-unexpected-key-or-data = true - -[pd] -endpoints = [ - "example.com:443", -] - -[coprocessor] -split-region-on-table = true -batch-split-limit = 1 -region-max-size = "12MB" - -[rocksdb] -wal-recovery-mode = 1 -wal-dir = "/var" -create-if-missing = false - -[rocksdb.titan] -enabled = true -dirname = "bar" -max-background-gc = 9 - -[rocksdb.defaultcf] -block-size = "12KB" -disable-block-cache = false -bloom-filter-bits-per-key = 123 -compression-per-level = [ - "no", - "lz4", -] - -[rocksdb.defaultcf.titan] -min-blob-size = "2018B" -discardable-ratio = 0.00156 - -[rocksdb.writecf] -block-size = "12KB" -disable-block-cache = false -bloom-filter-bits-per-key = 123 -compression-per-level = [ - "no", - "zstd", -] -` - cfg := make(map[string]interface{}) - err := decodeConfigs(cfgData, cfg) - c.Assert(err, IsNil) - decoded := make(map[string]interface{}) - decoded["log-level"] = "debug" - decoded["panic-when-unexpected-key-or-data"] = true - pdMap := map[string]interface{}{"endpoints": []interface{}{"example.com:443"}} - decoded["pd"] = pdMap - copMap := map[string]interface{}{ - "split-region-on-table": true, - "batch-split-limit": int64(1), - "region-max-size": "12MB", - } - decoded["coprocessor"] = copMap - titanMap := map[string]interface{}{ - "enabled": true, - "dirname": "bar", - "max-background-gc": int64(9), - } - defaultcfTitanMap := map[string]interface{}{ - "min-blob-size": "2018B", - "discardable-ratio": 0.00156, - } - defaultcfMap := map[string]interface{}{ - "block-size": "12KB", - "disable-block-cache": false, - "bloom-filter-bits-per-key": int64(123), - "compression-per-level": []interface{}{"no", "lz4"}, - "titan": defaultcfTitanMap, - } - writecfMap := map[string]interface{}{ - "block-size": "12KB", - "disable-block-cache": false, - "bloom-filter-bits-per-key": int64(123), - "compression-per-level": []interface{}{"no", "zstd"}, - } - rocksdbMap := map[string]interface{}{ - "wal-recovery-mode": int64(1), - "wal-dir": "/var", - "create-if-missing": false, - "titan": titanMap, - "defaultcf": defaultcfMap, - "writecf": writecfMap, - } - decoded["rocksdb"] = rocksdbMap - c.Assert(cfg, DeepEquals, decoded) - - str, err := encodeConfigs(decoded) - c.Assert(err, IsNil) - encodedStr := `log-level = "debug" -panic-when-unexpected-key-or-data = true - -[coprocessor] - batch-split-limit = 1 - region-max-size = "12MB" - split-region-on-table = true - -[pd] - endpoints = ["example.com:443"] - -[rocksdb] - create-if-missing = false - wal-dir = "/var" - wal-recovery-mode = 1 - [rocksdb.defaultcf] - block-size = "12KB" - bloom-filter-bits-per-key = 123 - compression-per-level = ["no", "lz4"] - disable-block-cache = false - [rocksdb.defaultcf.titan] - discardable-ratio = 0.00156 - min-blob-size = "2018B" - [rocksdb.titan] - dirname = "bar" - enabled = true - max-background-gc = 9 - [rocksdb.writecf] - block-size = "12KB" - bloom-filter-bits-per-key = 123 - compression-per-level = ["no", "zstd"] - disable-block-cache = false -` - c.Assert(str, Equals, encodedStr) -} - -func (s *testComponentsConfigSuite) TestUpdateConfig(c *C) { - cfg := make(map[string]interface{}) - defaultcfTitanMap := map[string]interface{}{ - "discardable-ratio": 0.00156, - } - defaultcfMap := map[string]interface{}{ - "block-size": "12KB", - "compression-per-level": []interface{}{"no", "lz4"}, - "titan": defaultcfTitanMap, - } - rocksdbMap := map[string]interface{}{ - "wal-recovery-mode": int64(1), - "defaultcf": defaultcfMap, - } - cfg["rocksdb"] = rocksdbMap - err := update(cfg, strings.Split("rocksdb.defaultcf.titan.discardable-ratio", "."), "0.002") - c.Assert(err, IsNil) - c.Assert(defaultcfTitanMap["discardable-ratio"], Equals, 0.002) -} - -func (s *testComponentsConfigSuite) TestReloadConfig(c *C) { - cfgData := ` -[rocksdb] -wal-recovery-mode = 1 - -[rocksdb.defaultcf] -block-size = "12KB" -disable-block-cache = false -compression-per-level = [ - "no", - "lz4", -] - -[rocksdb.defaultcf.titan] -discardable-ratio = 0.00156 -` - cfg := NewConfigManager(nil) - lc, err := NewLocalConfig(cfgData, &configpb.Version{Global: 0, Local: 1}) - c.Assert(err, IsNil) - gc := NewGlobalConfig( - []*configpb.ConfigEntry{{ - Name: "rocksdb.defaultcf.disable-block-cache", - Value: "true"}}, - &configpb.Version{Global: 1, Local: 0}) - cfg.GlobalCfgs["tikv"] = gc - cfg.LocalCfgs["tikv"] = make(map[string]*LocalConfig) - cfg.LocalCfgs["tikv"]["tikv1"] = lc - - storage := core.NewStorage(kv.NewMemoryKV()) - err = cfg.Persist(storage) - c.Assert(err, IsNil) - - cfg1 := NewConfigManager(nil) - err = cfg1.Reload(storage) - c.Assert(err, IsNil) - c.Assert(cfg1.LocalCfgs["tikv"]["tikv1"], DeepEquals, lc) - c.Assert(cfg1.GlobalCfgs["tikv"], DeepEquals, gc) - - // test cover config - cfgData1 := ` -[rocksdb] -wal-recovery-mode = 1 - -[rocksdb.defaultcf] -block-size = "20KB" -disable-block-cache = true -compression-per-level = [ - "zstd", - "zstd", -] -` - lc1, err := NewLocalConfig(cfgData1, &configpb.Version{Global: 0, Local: 1}) - c.Assert(err, IsNil) - gc1 := NewGlobalConfig( - []*configpb.ConfigEntry{{ - Name: "rocksdb.defaultcf.disable-block-cache", - Value: "true"}}, - &configpb.Version{Global: 1, Local: 0}) - cfg.GlobalCfgs["tikv"] = gc1 - cfg.LocalCfgs["tikv"]["tikv1"] = lc1 - err = cfg.Persist(storage) - c.Assert(err, IsNil) - err = cfg1.Reload(storage) - c.Assert(err, IsNil) - c.Assert(cfg1.LocalCfgs["tikv"]["tikv1"], DeepEquals, lc1) - c.Assert(cfg1.GlobalCfgs["tikv"], DeepEquals, gc1) -} - -func (s *testComponentsConfigSuite) TestGetConfig(c *C) { - cfgData := ` -[rocksdb] -wal-recovery-mode = 1 - -[rocksdb.defaultcf] -block-size = "12KB" -disable-block-cache = false -compression-per-level = [ - "no", - "lz4", -] - -[rocksdb.defaultcf.titan] -discardable-ratio = 0.00156 -` - cfg := NewConfigManager(nil) - lc, err := NewLocalConfig(cfgData, &configpb.Version{Global: 0, Local: 0}) - c.Assert(err, IsNil) - entry := []*configpb.ConfigEntry{{ - Name: "rocksdb.defaultcf.disable-block-cache", - Value: "true"}} - gc := NewGlobalConfig( - []*configpb.ConfigEntry{{ - Name: "rocksdb.defaultcf.disable-block-cache", - Value: "true"}}, - &configpb.Version{Global: 1, Local: 0}) - cfg.GlobalCfgs["tikv"] = gc - cfg.LocalCfgs["tikv"] = make(map[string]*LocalConfig) - cfg.LocalCfgs["tikv"]["tikv1"] = lc - err = cfg.applyGlobalConifg(cfg.GlobalCfgs["tikv"], "tikv", 1, entry) - c.Assert(err, IsNil) - str, err := cfg.getComponentCfg("tikv", "tikv1") - c.Assert(err, IsNil) - expect := `[rocksdb] - wal-recovery-mode = 1 - [rocksdb.defaultcf] - block-size = "12KB" - compression-per-level = ["no", "lz4"] - disable-block-cache = true - [rocksdb.defaultcf.titan] - discardable-ratio = 0.00156 -` - c.Assert(str, Equals, expect) - cfg.updateLocal("tikv1", &configpb.Version{Global: 1, Local: 0}, []*configpb.ConfigEntry{{ - Name: "rocksdb.defaultcf.disable-block-cache", - Value: "false"}}) - str, err = cfg.getComponentCfg("tikv", "tikv1") - c.Assert(err, IsNil) - expect = `[rocksdb] - wal-recovery-mode = 1 - [rocksdb.defaultcf] - block-size = "12KB" - compression-per-level = ["no", "lz4"] - disable-block-cache = false - [rocksdb.defaultcf.titan] - discardable-ratio = 0.00156 -` - c.Assert(str, Equals, expect) -} - -func (s *testComponentsConfigSuite) TestCreate(c *C) { - cfgData := ` -log-level = "debug" -` - cfg := NewConfigManager(nil) - v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - expect := `log-level = "debug" -` - c.Assert(config, Equals, expect) - v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, expect) - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 0, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - expect1 := `log-level = "info" -` - c.Assert(config, Equals, expect1) - v, config, status = cfg.CreateConfig(&configpb.Version{Global: 10, Local: 10}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - c.Assert(config, Equals, expect1) -} - -func (s *testComponentsConfigSuite) TestUpdate(c *C) { - cfgData := ` -log-level = "debug" -` - cfg := NewConfigManager(nil) - v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - expect := `log-level = "debug" -` - expect1 := `log-level = "info" -` - c.Assert(config, Equals, expect) - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 0, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - result, err := cfg.getComponentCfg("tikv", "tikv1") - c.Assert(err, IsNil) - c.Assert(result, Equals, expect1) - - // stale update request - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 0, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, - &configpb.Version{Global: 10, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, - &configpb.Version{Global: 0, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - result, err = cfg.getComponentCfg("tikv", "tikv1") - c.Assert(err, IsNil) - c.Assert(result, Equals, expect) - - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, - &configpb.Version{Global: 1, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 2, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - result, err = cfg.getComponentCfg("tikv", "tikv1") - c.Assert(err, IsNil) - c.Assert(result, Equals, expect1) - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 2, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 2, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - result, err = cfg.getComponentCfg("tikv", "tikv1") - c.Assert(err, IsNil) - c.Assert(result, Equals, expect) - - // stale update request - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, - &configpb.Version{Global: 0, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 2, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - - // nil case - v, status = cfg.UpdateConfig(nil, nil, nil) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_UNKNOWN) -} - -func (s *testComponentsConfigSuite) TestGetAll(c *C) { - cfgData := ` -log-level = "debug" -` - expect := `log-level = "debug" -` - cfg := NewConfigManager(nil) - v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, expect) - v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tidb", "tidb1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, expect) - - local, status := cfg.GetAllConfig(context.Background()) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(len(local), Equals, 2) - for _, conf := range local { - if conf.Component == "tikv" { - c.Assert(conf.ComponentId, Equals, "tikv1") - c.Assert(conf.Config, Equals, expect) - } else { - c.Assert(conf.ComponentId, Equals, "tidb1") - c.Assert(conf.Config, Equals, expect) - } - } -} - -func (s *testComponentsConfigSuite) TestGet(c *C) { - cfgData := ` -log-level = "debug" -` - cfg := NewConfigManager(nil) - v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - expect := `log-level = "debug" -` - c.Assert(config, Equals, expect) - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 0, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1") - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - expect1 := `log-level = "info" -` - c.Assert(config, Equals, expect1) - v, config, status = cfg.GetConfig(&configpb.Version{Global: 10, Local: 0}, "tikv", "tikv1") - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - c.Assert(config, Equals, expect1) - v, config, status = cfg.GetConfig(&configpb.Version{Global: 10, Local: 1}, "tikv", "tikv1") - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - c.Assert(config, Equals, expect1) -} - -func (s *testComponentsConfigSuite) TestDeleteLocal(c *C) { - cfgData := ` -log-level = "debug" -` - cfg := NewConfigManager(nil) - - v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - expect := `log-level = "debug" -` - c.Assert(config, Equals, expect) - v, status = cfg.UpdateConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 0, Local: 0}, - []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, - ) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1") - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - expect1 := `log-level = "info" -` - c.Assert(config, Equals, expect1) - - status = cfg.DeleteConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 0, Local: 0}, - ) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, expect1) - - status = cfg.DeleteConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 1, Local: 1}, - ) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, expect1) - - status = cfg.DeleteConfig( - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, - &configpb.Version{Global: 0, Local: 1}, - ) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_COMPONENT_ID_NOT_FOUND) - c.Assert(config, Equals, "") -} - -func (s *testComponentsConfigSuite) TestCreateNewItem(c *C) { - cfgData := ` -log-level = "debug" -` - cfg := NewConfigManager(nil) - - v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - expect := `log-level = "debug" -` - c.Assert(config, Equals, expect) - cfgData1 := ` -log-level = "info" -[rocksdb] -wal-recovery-mode = 1 - -[rocksdb.defaultcf] -block-size = "12KB" -disable-block-cache = false -compression-per-level = [ - "no", - "lz4", -] - -[rocksdb.defaultcf.titan] -discardable-ratio = 0.00156 -` - v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData1) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - expect1 := `log-level = "debug" - -[rocksdb] - wal-recovery-mode = 1 - [rocksdb.defaultcf] - block-size = "12KB" - compression-per-level = ["no", "lz4"] - disable-block-cache = false - [rocksdb.defaultcf.titan] - discardable-ratio = 0.00156 -` - c.Assert(config, Equals, expect1) - cfgData2 := ` -log-level = "info" -[rocksdb] -wal-recovery-mode = 2 - -[rocksdb.defaultcf] -block-size = "10KB" -disable-block-cache = true -compression-per-level = [ - "lz4", - "no", -] - -[rocksdb.defaultcf.titan] -discardable-ratio = 0.00211 -` - v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData2) - c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, expect1) -} diff --git a/server/config_manager/grpc_service.go b/server/config_manager/grpc_service.go deleted file mode 100644 index 339ae0183f7..00000000000 --- a/server/config_manager/grpc_service.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package configmanager - -import ( - "context" - - "github.com/pingcap/kvproto/pkg/configpb" - "github.com/pingcap/log" - "github.com/pkg/errors" - "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -var notLeaderError = status.Errorf(codes.Unavailable, "not leader") - -// Create implements gRPC PDServer. -func (c *ConfigManager) Create(ctx context.Context, request *configpb.CreateRequest) (*configpb.CreateResponse, error) { - if err := c.validateComponentRequest(request.GetHeader()); err != nil { - return nil, err - } - - if !c.svr.GetConfig().EnableDynamicConfig { - component, componentID := request.Component, request.ComponentId - lc, err := NewLocalConfig(request.Config, request.Version) - if err != nil { - log.Error("failed to update component config", zap.String("component", component), zap.String("component-id", componentID)) - } - c.Lock() - if localCfgs, ok := c.LocalCfgs[component]; ok { - localCfgs[componentID] = lc - } else { - c.LocalCfgs[component] = make(map[string]*LocalConfig) - c.LocalCfgs[component][componentID] = lc - } - c.Unlock() - c.Persist(c.svr.GetStorage()) - return &configpb.CreateResponse{ - Header: c.componentHeader(), - Status: &configpb.Status{Code: configpb.StatusCode_OK}, - Version: request.Version, - Config: request.Config, - }, nil - } - - version, config, status := c.CreateConfig(request.GetVersion(), request.GetComponent(), request.GetComponentId(), request.GetConfig()) - if status.GetCode() == configpb.StatusCode_OK { - log.Info("component has registered", zap.String("component", request.GetComponent()), zap.String("component-id", request.GetComponentId())) - c.Persist(c.svr.GetStorage()) - } - - return &configpb.CreateResponse{ - Header: c.componentHeader(), - Status: status, - Version: version, - Config: config, - }, nil -} - -// GetAll implements gRPC PDServer. -func (c *ConfigManager) GetAll(ctx context.Context, request *configpb.GetAllRequest) (*configpb.GetAllResponse, error) { - if err := c.validateComponentRequest(request.GetHeader()); err != nil { - return nil, err - } - - if !c.svr.GetConfig().EnableDynamicConfig { - return &configpb.GetAllResponse{ - Header: c.componentHeader(), - Status: &configpb.Status{Code: configpb.StatusCode_OK}, - }, nil - } - - localConfigs, status := c.GetAllConfig(ctx) - return &configpb.GetAllResponse{ - Header: c.componentHeader(), - Status: status, - LocalConfigs: localConfigs, - }, nil -} - -// Get implements gRPC PDServer. -func (c *ConfigManager) Get(ctx context.Context, request *configpb.GetRequest) (*configpb.GetResponse, error) { - if err := c.validateComponentRequest(request.GetHeader()); err != nil { - return nil, err - } - - if !c.svr.GetConfig().EnableDynamicConfig { - return &configpb.GetResponse{ - Header: c.componentHeader(), - Status: &configpb.Status{Code: configpb.StatusCode_OK}, - }, nil - } - - version, config, status := c.GetConfig(request.GetVersion(), request.GetComponent(), request.GetComponentId()) - - return &configpb.GetResponse{ - Header: c.componentHeader(), - Status: status, - Version: version, - Config: config, - }, nil -} - -// Update implements gRPC PDServer. -func (c *ConfigManager) Update(ctx context.Context, request *configpb.UpdateRequest) (*configpb.UpdateResponse, error) { - if err := c.validateComponentRequest(request.GetHeader()); err != nil { - return nil, err - } - - if !c.svr.GetConfig().EnableDynamicConfig { - return &configpb.UpdateResponse{ - Header: c.componentHeader(), - Status: &configpb.Status{Code: configpb.StatusCode_OK}, - }, nil - } - - version, status := c.UpdateConfig(request.GetKind(), request.GetVersion(), request.GetEntries()) - if status.GetCode() == configpb.StatusCode_OK { - log.Info("config has updated in config manager", zap.Reflect("entries", request.GetEntries())) - c.Persist(c.svr.GetStorage()) - } - - return &configpb.UpdateResponse{ - Header: c.componentHeader(), - Status: status, - Version: version, - }, nil -} - -// Delete implements gRPC PDServer. -func (c *ConfigManager) Delete(ctx context.Context, request *configpb.DeleteRequest) (*configpb.DeleteResponse, error) { - if err := c.validateComponentRequest(request.GetHeader()); err != nil { - return nil, err - } - - if !c.svr.GetConfig().EnableDynamicConfig { - return &configpb.DeleteResponse{ - Header: c.componentHeader(), - Status: &configpb.Status{Code: configpb.StatusCode_OK}, - }, nil - } - - status := c.DeleteConfig(request.GetKind(), request.GetVersion()) - if status.GetCode() == configpb.StatusCode_OK { - c.Persist(c.svr.GetStorage()) - } - - return &configpb.DeleteResponse{ - Header: c.componentHeader(), - Status: status, - }, nil -} - -func (c *ConfigManager) componentHeader() *configpb.Header { - return &configpb.Header{ClusterId: c.svr.ClusterID()} -} - -func (c *ConfigManager) validateComponentRequest(header *configpb.Header) error { - if c.svr.IsClosed() || !c.svr.GetMember().IsLeader() { - return errors.WithStack(notLeaderError) - } - clusterID := c.svr.ClusterID() - if header.GetClusterId() != clusterID { - return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, header.GetClusterId()) - } - return nil -} diff --git a/server/grpc_service.go b/server/grpc_service.go index fd9310f6ab4..884767a3346 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -232,13 +232,7 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (* } log.Info("put store ok", zap.Stringer("store", store)) - v := rc.OnStoreVersionChange() - if s.GetConfig().EnableDynamicConfig && v != nil { - err := s.UpdateConfigManager("cluster-version", v.String()) - if err != nil { - log.Error("failed to update the cluster version in config manager", zap.Error(err)) - } - } + rc.OnStoreVersionChange() CheckPDVersion(s.persistOptions) return &pdpb.PutStoreResponse{ diff --git a/server/server.go b/server/server.go index 09548974375..381917bc4f3 100644 --- a/server/server.go +++ b/server/server.go @@ -14,39 +14,33 @@ package server import ( - "bytes" "context" "fmt" "math/rand" "net/http" - "net/url" "path" "path/filepath" - "reflect" "strconv" "strings" "sync" "sync/atomic" "time" - "github.com/BurntSushi/toml" "github.com/coreos/go-semver/semver" "github.com/golang/protobuf/proto" "github.com/gorilla/mux" "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/configpb" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" - pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/pd/v4/pkg/component" "github.com/pingcap/pd/v4/pkg/etcdutil" "github.com/pingcap/pd/v4/pkg/grpcutil" "github.com/pingcap/pd/v4/pkg/logutil" "github.com/pingcap/pd/v4/pkg/typeutil" "github.com/pingcap/pd/v4/server/cluster" "github.com/pingcap/pd/v4/server/config" - configmanager "github.com/pingcap/pd/v4/server/config_manager" "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/id" "github.com/pingcap/pd/v4/server/kv" @@ -81,12 +75,6 @@ var ( EtcdStartTimeout = time.Minute * 5 ) -// Component is used to represent the kind of component in config manager. -const Component = "pd" - -// ConfigCheckInterval represents the time interval of running config check. -var ConfigCheckInterval = 1 * time.Second - // Server is the pd server. type Server struct { diagnosticspb.DiagnosticsServer @@ -132,11 +120,8 @@ type Server struct { lg *zap.Logger logProps *log.ZapProperties - // components' configuration management - cfgManager *configmanager.ConfigManager - // component config - configVersion *configpb.Version - configClient pd.ConfigClient + // It's used to manage components. + componentManager *component.Manager // Add callback functions at different stages startCallbacks []func() @@ -229,7 +214,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), } - s.cfgManager = configmanager.NewConfigManager(s) s.handler = newHandler(s) // Adjust etcd config. @@ -247,7 +231,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha etcdCfg.ServiceRegister = func(gs *grpc.Server) { pdpb.RegisterPDServer(gs, s) diagnosticspb.RegisterDiagnosticsServer(gs, s) - configpb.RegisterConfigServer(gs, s.cfgManager) } s.etcdCfg = etcdCfg if EnableZap { @@ -365,10 +348,8 @@ func (s *Server) startServer(ctx context.Context) error { s.storage = core.NewStorage(kvBase).SetRegionStorage(regionStorage) s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.GetClusterRootPath(), s.clusterID, syncer.NewRegionSyncer(s), s.client) - if !s.cfg.EnableDynamicConfig { - s.cluster.SetConfigCheck() - } s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster) + s.componentManager = component.NewManager() // Run callbacks for _, cb := range s.startCallbacks { @@ -475,10 +456,6 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.leaderLoop() go s.etcdLeaderLoop() go s.serverMetricsLoop() - if s.cfg.EnableDynamicConfig { - s.serverLoopWg.Add(1) - go s.configCheckLoop() - } } func (s *Server) stopServerLoop() { @@ -627,14 +604,9 @@ func (s *Server) GetClient() *clientv3.Client { return s.client } -// GetConfigManager returns the config manager of server. -func (s *Server) GetConfigManager() *configmanager.ConfigManager { - return s.cfgManager -} - -// GetConfigClient returns the config client of server. -func (s *Server) GetConfigClient() pd.ConfigClient { - return s.configClient +// GetComponentManager returns the component manager of server. +func (s *Server) GetComponentManager() *component.Manager { + return s.componentManager } // GetLeader returns leader of etcd. @@ -678,26 +650,6 @@ func (s *Server) GetAllocator() *id.AllocatorImpl { return s.idAllocator } -// GetSchedulersCallback returns a callback function to update config manager. -func (s *Server) GetSchedulersCallback() func() { - return func() { - if s.GetConfig().EnableDynamicConfig { - value := s.GetScheduleConfig().Schedulers - tmp := map[string]interface{}{ - "schedulers": value, - } - var buf bytes.Buffer - if err := toml.NewEncoder(&buf).Encode(tmp); err != nil { - log.Error("failed to encode config", zap.Error(err)) - } - - if err := s.UpdateConfigManager("schedule.schedulers", buf.String()); err != nil { - log.Error("failed to update the schedulers in config manager", zap.Error(err)) - } - } - } -} - // Name returns the unique etcd Name for this server in etcd cluster. func (s *Server) Name() string { return s.cfg.Name @@ -916,20 +868,6 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { return nil } -// UpdateConfigManager is used to update config manager directly. -func (s *Server) UpdateConfigManager(name, value string) error { - cm := s.GetConfigManager() - globalVersion := cm.GetGlobalVersion(cm.GetGlobalConfigs(Component)) - version := &configpb.Version{Global: globalVersion} - entries := []*configpb.ConfigEntry{{Name: name, Value: value}} - _, status := cm.UpdateGlobal(Component, version, entries) - if status.GetCode() != configpb.StatusCode_OK { - return errors.New(status.GetMessage()) - } - - return cm.Persist(s.GetStorage()) -} - // GetLabelProperty returns the whole label property config. func (s *Server) GetLabelProperty() config.LabelPropertyConfig { return s.persistOptions.LoadLabelPropertyConfig().Clone() @@ -1025,18 +963,6 @@ func (s *Server) SetLogLevel(level string) { log.Warn("log level changed", zap.String("level", log.GetLevel().String())) } -// GetConfigVersion returns the config version. -func (s *Server) GetConfigVersion() *configpb.Version { - if s.configVersion == nil { - return &configpb.Version{Local: 0, Global: 0} - } - return s.configVersion -} - -func (s *Server) setConfigVersion(version *configpb.Version) { - s.configVersion = version -} - func (s *Server) leaderLoop() { defer logutil.LogPanic() defer s.serverLoopWg.Done() @@ -1172,177 +1098,6 @@ func (s *Server) etcdLeaderLoop() { } } -func (s *Server) configCheckLoop() { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - - ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() - - ticker := time.NewTicker(ConfigCheckInterval) - defer ticker.Stop() - for { - // wait leader - leader := s.GetLeader() - if leader != nil { - var err error - securityConfig := s.GetSecurityConfig() - s.configClient, err = pd.NewConfigClientWithContext(ctx, s.GetEndpoints(), pd.SecurityOption{ - CAPath: securityConfig.CAPath, - CertPath: securityConfig.CertPath, - KeyPath: securityConfig.KeyPath, - }) - if err != nil { - log.Error("failed to create config client", zap.Error(err)) - return - } - break - } - select { - case <-ticker.C: - case <-ctx.Done(): - log.Info("config check stop running") - return - } - } - - addr := s.GetAddr() - u, err := url.Parse(addr) - if err != nil { - log.Error("failed to parse url", zap.Error(err)) - return - } - compoenntID := u.Host + u.Path - version := s.GetConfigVersion() - var config bytes.Buffer - if err := toml.NewEncoder(&config).Encode(*s.GetConfig()); err != nil { - log.Error("failed to encode config", zap.Error(err)) - return - } - if err := s.createComponentConfig(ctx, version, compoenntID, config.String()); err != nil { - log.Error("failed to create config", zap.Error(err)) - return - } - - for { - select { - case <-ctx.Done(): - log.Info("config check has been stopped") - return - case <-ticker.C: - if err := s.updateConfig(ctx, compoenntID); err != nil { - log.Error("failed to update config", zap.Error(err)) - } - - rc := s.GetRaftCluster() - if s.GetMember().IsLeader() && rc != nil { - if !rc.GetConfigCheck() { - rc.SetConfigCheck() - } - } - } - } -} - -func (s *Server) updateConfig(ctx context.Context, compoenntID string) error { - version := s.GetConfigVersion() - config, err := s.getComponentConfig(ctx, version, compoenntID) - if err != nil { - return err - } - if config == "" { - return nil - } - return s.updateComponentConfig(config) -} - -func (s *Server) createComponentConfig(ctx context.Context, version *configpb.Version, componentID, config string) error { - status, v, config, err := s.configClient.Create(ctx, version, Component, componentID, config) - if err != nil { - return err - } - switch status.GetCode() { - case configpb.StatusCode_OK, configpb.StatusCode_WRONG_VERSION: - s.setConfigVersion(v) - s.updateComponentConfig(config) - case configpb.StatusCode_UNKNOWN: - return errors.Errorf("unknown error: %v", status.GetMessage()) - } - return nil -} - -func (s *Server) getComponentConfig(ctx context.Context, version *configpb.Version, componentID string) (string, error) { - status, v, cfg, err := s.configClient.Get(ctx, version, Component, componentID) - if err != nil { - return "", err - } - var config string - switch status.GetCode() { - case configpb.StatusCode_OK: - case configpb.StatusCode_WRONG_VERSION: - config = cfg - s.setConfigVersion(v) - case configpb.StatusCode_COMPONENT_ID_NOT_FOUND: - return "", errors.Errorf("component ID not found: %v", componentID) - case configpb.StatusCode_COMPONENT_NOT_FOUND: - return "", errors.Errorf("component not found: %v", Component) - case configpb.StatusCode_UNKNOWN: - return "", errors.Errorf("unknown error: %v", status.GetMessage()) - } - return config, nil -} - -// TODO: support more config item -func (s *Server) updateComponentConfig(cfg string) error { - new := &config.Config{} - var saveFile bool - if _, err := toml.Decode(cfg, &new); err != nil { - return err - } - var err error - old := *s.GetConfig() - // SchedulersPayload doesn't need to be updated. - new.Schedule.SchedulersPayload = nil - old.Schedule.SchedulersPayload = nil - if !reflect.DeepEqual(old.Schedule, new.Schedule) { - err = s.SetScheduleConfig(new.Schedule) - saveFile = true - } - - if !reflect.DeepEqual(old.Replication, new.Replication) { - err = s.SetReplicationConfig(new.Replication) - saveFile = true - } - - if !reflect.DeepEqual(old.PDServerCfg, new.PDServerCfg) { - err = s.SetPDServerConfig(new.PDServerCfg) - saveFile = true - } - - if !reflect.DeepEqual(old.ClusterVersion, new.ClusterVersion) { - err = s.SetClusterVersion(new.ClusterVersion.String()) - saveFile = true - } - - if !reflect.DeepEqual(old.LabelProperty, new.LabelProperty) { - err = s.SetLabelPropertyConfig(new.LabelProperty) - saveFile = true - } - - if !reflect.DeepEqual(old.Log, new.Log) { - err = s.SetLogConfig(new.Log) - saveFile = true - } - if err != nil { - return err - } - - if saveFile { - return s.cfg.RewriteFile(new) - } - return nil -} - func (s *Server) reloadConfigFromKV() error { err := s.persistOptions.Reload(s.storage) if err != nil { @@ -1355,13 +1110,5 @@ func (s *Server) reloadConfigFromKV() error { s.storage.SwitchToDefaultStorage() log.Info("server disable region storage") } - - // The request only valid when there is a leader. - // And before the a PD becomes a leader it will firstly reload the config. - if s.cfg.EnableDynamicConfig { - err = s.cfgManager.Reload(s.storage) - return err - } - return nil } diff --git a/tests/client/config_client_test.go b/tests/client/config_client_test.go deleted file mode 100644 index 3e16c8ec462..00000000000 --- a/tests/client/config_client_test.go +++ /dev/null @@ -1,291 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package client_test - -import ( - "context" - "path/filepath" - "sort" - "strconv" - "strings" - "time" - - . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/configpb" - pd "github.com/pingcap/pd/v4/client" - "github.com/pingcap/pd/v4/pkg/testutil" - "github.com/pingcap/pd/v4/server" - "github.com/pingcap/pd/v4/server/config" - "github.com/pingcap/pd/v4/tests" - "go.etcd.io/etcd/clientv3" -) - -var _ = Suite(&configClientTestSuite{}) - -type configClientTestSuite struct { - ctx context.Context - cancel context.CancelFunc -} - -func (s *configClientTestSuite) SetUpSuite(c *C) { - s.ctx, s.cancel = context.WithCancel(context.Background()) - server.EnableZap = true -} - -func (s *configClientTestSuite) TearDownSuite(c *C) { - s.cancel() -} - -func (s *configClientTestSuite) TestUpdateWrongEntry(c *C) { - cluster, err := tests.NewTestCluster(s.ctx, 1, func(cfg *config.Config) { cfg.EnableDynamicConfig = true }) - c.Assert(err, IsNil) - defer cluster.Destroy() - - err = cluster.RunInitialServers() - c.Assert(err, IsNil) - cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) - c.Assert(leaderServer.BootstrapCluster(), IsNil) - - var endpoints []string - for _, s := range cluster.GetServers() { - endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) - } - cli, err := pd.NewConfigClientWithContext(s.ctx, endpoints, pd.SecurityOption{}) - c.Assert(err, IsNil) - - cfgData := `[aaa] - xxx-yyy-zzz = 1 - [aaa.bbb] - xxx-yyy = "1KB" - xxx-zzz = false - yyy-zzz = ["aa", "bb"] - [aaa.bbb.ccc] - yyy-xxx = 0.00005 -` - - // create config - status, version, config, err := cli.Create(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1", cfgData) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData) - c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(err, IsNil) - - // update wrong config - status, version, err = cli.Update(s.ctx, - &configpb.Version{Global: 0, Local: 0}, - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, - []*configpb.ConfigEntry{{Name: "aaa.xxx-xxx", Value: "2"}}, - ) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_UNKNOWN) - c.Assert(strings.Contains(status.GetMessage(), "cannot find the config item"), IsTrue) - c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(err, IsNil) - - // update right config - status, version, err = cli.Update(s.ctx, - &configpb.Version{Global: 0, Local: 0}, - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, - []*configpb.ConfigEntry{{Name: "aaa.xxx-yyy-zzz", Value: "2"}}, - ) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(err, IsNil) -} - -func (s *configClientTestSuite) TestClientLeaderChange(c *C) { - cluster, err := tests.NewTestCluster(s.ctx, 3, func(cfg *config.Config) { cfg.EnableDynamicConfig = true }) - c.Assert(err, IsNil) - defer cluster.Destroy() - - err = cluster.RunInitialServers() - c.Assert(err, IsNil) - cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) - c.Assert(leaderServer.BootstrapCluster(), IsNil) - - var endpoints []string - for _, s := range cluster.GetServers() { - endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) - } - cli, err := pd.NewConfigClientWithContext(s.ctx, endpoints, pd.SecurityOption{}) - c.Assert(err, IsNil) - - cfgData := `[aaa] - xxx-yyy-zzz = 1 - [aaa.bbb] - xxx-yyy = "1KB" - xxx-zzz = false - yyy-zzz = ["aa", "bb"] - [aaa.bbb.ccc] - yyy-xxx = 0.00005 -` - - // create config - status, version, config, err := cli.Create(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1", cfgData) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData) - c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(err, IsNil) - - // get config - status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1") - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData) - c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(err, IsNil) - - // update config - status, version, err = cli.Update(s.ctx, - &configpb.Version{Global: 0, Local: 0}, - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, - []*configpb.ConfigEntry{{Name: "aaa.xxx-yyy-zzz", Value: "2"}}, - ) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(err, IsNil) - cfgData1 := `[aaa] - xxx-yyy-zzz = 2 - [aaa.bbb] - xxx-yyy = "1KB" - xxx-zzz = false - yyy-zzz = ["aa", "bb"] - [aaa.bbb.ccc] - yyy-xxx = 0.00005 -` - // get config - status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData1) - c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(err, IsNil) - - leader := cluster.GetLeader() - s.waitLeader(c, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls) - - err = cluster.GetServer(leader).Stop() - c.Assert(err, IsNil) - leader = cluster.WaitLeader() - c.Assert(leader, Not(Equals), "") - s.waitLeader(c, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls) - - // get config - status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData1) - c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(err, IsNil) - - // Check URL list. - cli.Close() - urls := cli.(client).GetURLs() - sort.Strings(urls) - sort.Strings(endpoints) - c.Assert(urls, DeepEquals, endpoints) -} - -func (s *configClientTestSuite) TestLeaderTransfer(c *C) { - cluster, err := tests.NewTestCluster(s.ctx, 2, func(cfg *config.Config) { cfg.EnableDynamicConfig = true }) - c.Assert(err, IsNil) - defer cluster.Destroy() - - err = cluster.RunInitialServers() - c.Assert(err, IsNil) - cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) - c.Assert(leaderServer.BootstrapCluster(), IsNil) - - var endpoints []string - for _, s := range cluster.GetServers() { - endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) - } - cli, err := pd.NewConfigClientWithContext(s.ctx, endpoints, pd.SecurityOption{}) - c.Assert(err, IsNil) - cfgData := `[aaa] - xxx-yyy-zzz = 1 - [aaa.bbb] - xxx-yyy = "1KB" - xxx-zzz = false - yyy-zzz = ["aa", "bb"] - [aaa.bbb.ccc] - yyy-xxx = 0.00005 -` - // create config - status, version, config, err := cli.Create(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1", cfgData) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData) - c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(err, IsNil) - - // get config - status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1") - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData) - c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) - c.Assert(err, IsNil) - - // update config - status, version, err = cli.Update(s.ctx, - &configpb.Version{Global: 0, Local: 0}, - &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, - []*configpb.ConfigEntry{{Name: "aaa.bbb.xxx-yyy", Value: "2KB"}}, - ) - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(err, IsNil) - cfgData1 := `[aaa] - xxx-yyy-zzz = 1 - [aaa.bbb] - xxx-yyy = "2KB" - xxx-zzz = false - yyy-zzz = ["aa", "bb"] - [aaa.bbb.ccc] - yyy-xxx = 0.00005 -` - // get config - status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData1) - c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(err, IsNil) - - // Transfer leader. - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: time.Second, - }) - c.Assert(err, IsNil) - leaderPath := filepath.Join("/pd", strconv.FormatUint(cli.GetClusterID(context.Background()), 10), "leader") - for i := 0; i < 10; i++ { - cluster.WaitLeader() - _, err = etcdCli.Delete(context.TODO(), leaderPath) - c.Assert(err, IsNil) - // Sleep to make sure all servers are notified and starts campaign. - time.Sleep(time.Second) - } - - // get config - status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") - c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - c.Assert(config, Equals, cfgData1) - c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) - c.Assert(err, IsNil) -} - -func (s *configClientTestSuite) waitLeader(c *C, cli client, leader string) { - testutil.WaitUntil(c, func(c *C) bool { - cli.ScheduleCheckLeader() - return cli.GetLeaderAddr() == leader - }) -} diff --git a/tests/dashboard/service_test.go b/tests/dashboard/service_test.go index 76a290ed2d8..9eab1c02539 100644 --- a/tests/dashboard/service_test.go +++ b/tests/dashboard/service_test.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/pd/v4/pkg/dashboard" "github.com/pingcap/pd/v4/pkg/testutil" "github.com/pingcap/pd/v4/server" - "github.com/pingcap/pd/v4/server/config" "github.com/pingcap/pd/v4/tests" "github.com/pingcap/pd/v4/tests/pdctl" @@ -53,7 +52,6 @@ type serverTestSuite struct { func (s *serverTestSuite) SetUpSuite(c *C) { server.EnableZap = true - server.ConfigCheckInterval = 10 * time.Millisecond dashboard.SetCheckInterval(10 * time.Millisecond) s.ctx, s.cancel = context.WithCancel(context.Background()) s.httpClient = &http.Client{ @@ -115,19 +113,8 @@ func (s *serverTestSuite) checkServiceIsStopped(c *C, servers map[string]*tests. } } -func (s *serverTestSuite) checkServiceIsChanging(c *C, servers map[string]*tests.TestServer) { - s.waitForConfigSync() - for _, srv := range servers { - addr := srv.GetAddr() - s.checkRespCode(c, fmt.Sprintf("%s/dashboard/", addr), http.StatusTemporaryRedirect) - s.checkRespCode(c, fmt.Sprintf("%s/dashboard/api/keyvisual/heatmaps", addr), http.StatusLoopDetected) - } -} - func (s *serverTestSuite) TestDashboard(c *C) { - cluster, err := tests.NewTestCluster(s.ctx, 3, func(conf *config.Config) { - conf.EnableDynamicConfig = true - }) + cluster, err := tests.NewTestCluster(s.ctx, 3) c.Assert(err, IsNil) defer cluster.Destroy() err = cluster.RunInitialServers() @@ -157,21 +144,6 @@ func (s *serverTestSuite) TestDashboard(c *C) { s.checkServiceIsStarted(c, servers, leader) c.Assert(leader.GetServer().GetPersistOptions().GetDashboardAddress(), Equals, dashboardAddress2) - // Changing dashboard address - for _, srv := range servers { - addr := srv.GetAddr() - var changingAddr string - if addr == dashboardAddress1 { - changingAddr = dashboardAddress2 - } else { - changingAddr = dashboardAddress1 - } - args = []string{"-u", leaderAddr, "component", "set", addr[7:], "pd-server.dashboard-address", changingAddr} - _, _, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - } - s.checkServiceIsChanging(c, servers) - // pd-ctl set stop args = []string{"-u", leaderAddr, "config", "set", "dashboard-address", "none"} _, _, err = pdctl.ExecuteCommandC(cmd, args...) diff --git a/tests/pdctl/component/component_test.go b/tests/pdctl/component/component_test.go deleted file mode 100644 index a551fa3c393..00000000000 --- a/tests/pdctl/component/component_test.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package componenttest - -import ( - "context" - "strings" - "testing" - "time" - - . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/pd/v4/server" - "github.com/pingcap/pd/v4/server/config" - "github.com/pingcap/pd/v4/tests" - "github.com/pingcap/pd/v4/tests/pdctl" -) - -func Test(t *testing.T) { - TestingT(t) -} - -var _ = Suite(&componentTestSuite{}) - -type componentTestSuite struct{} - -func (s *componentTestSuite) SetUpSuite(c *C) { - server.EnableZap = true - server.ConfigCheckInterval = 10 * time.Millisecond -} - -func (s *componentTestSuite) TestComponent(c *C) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2, func(cfg *config.Config) { cfg.EnableDynamicConfig = true }) - c.Assert(err, IsNil) - err = cluster.RunInitialServers() - c.Assert(err, IsNil) - cluster.WaitLeader() - pdAddrs := cluster.GetConfig().GetClientURLs() - cmd := pdctl.InitCommand() - - store := metapb.Store{ - Id: 1, - State: metapb.StoreState_Up, - } - leaderServer := cluster.GetServer(cluster.GetLeader()) - c.Assert(leaderServer.BootstrapCluster(), IsNil) - svr := leaderServer.GetServer() - pdctl.MustPutStore(c, svr, store.Id, store.State, store.Labels) - defer cluster.Destroy() - - // component ids - args := []string{"-u", pdAddrs[0], "component", "ids", "pd"} - _, output, err := pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - obtain := string(output) - c.Assert(strings.Contains(obtain, pdAddrs[0][7:]), IsTrue) - c.Assert(strings.Contains(obtain, pdAddrs[1][7:]), IsTrue) - // component ids no parameter - args = []string{"-u", pdAddrs[0], "component", "ids"} - _, output, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - obtain = string(output) - c.Assert(strings.Contains(obtain, pdAddrs[0][7:]), IsTrue) - c.Assert(strings.Contains(obtain, pdAddrs[1][7:]), IsTrue) - - // component show - for i := 0; i < len(pdAddrs); i++ { - args = []string{"-u", pdAddrs[0], "component", "show", pdAddrs[i][7:]} - _, output, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - obtain := string(output) - c.Assert(strings.Contains(obtain, "region-schedule-limit = 2048"), IsTrue) - c.Assert(strings.Contains(obtain, "location-labels = []"), IsTrue) - c.Assert(strings.Contains(obtain, `level = ""`), IsTrue) - } - - // component set - args = []string{"-u", pdAddrs[0], "component", "set", "pd", "schedule.region-schedule-limit", "1"} - _, _, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - args = []string{"-u", pdAddrs[0], "component", "set", "pd", "replication.location-labels", "zone,rack"} - _, _, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - args = []string{"-u", pdAddrs[0], "component", "set", "pd", "log.level", "warn"} - _, _, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) - - // component show - for i := 0; i < len(pdAddrs); i++ { - args = []string{"-u", pdAddrs[0], "component", "show", pdAddrs[i][7:]} - _, output, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - obtain := string(output) - c.Assert(strings.Contains(obtain, "region-schedule-limit = 1"), IsTrue) - c.Assert(strings.Contains(obtain, `location-labels = ["zone", "rack"]`), IsTrue) - c.Assert(strings.Contains(obtain, `level = "warn"`), IsTrue) - } - - // change multi config at one time - args = []string{"-u", pdAddrs[0], "component", "set", "pd", "schedule.high-space-ratio", "0.3", "schedule.low-space-ratio", "0.4"} - _, _, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) - for i := 0; i < len(pdAddrs); i++ { - args = []string{"-u", pdAddrs[0], "component", "show", pdAddrs[i][7:]} - _, output, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - obtain := string(output) - c.Assert(strings.Contains(obtain, "high-space-ratio = 0.3"), IsTrue) - c.Assert(strings.Contains(obtain, "low-space-ratio = 0.4"), IsTrue) - } -} diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 890ed964168..323ddcd8e83 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -20,7 +20,6 @@ import ( "reflect" "strings" "testing" - "time" "github.com/coreos/go-semver/semver" . "github.com/pingcap/check" @@ -42,7 +41,6 @@ type configTestSuite struct{} func (s *configTestSuite) SetUpSuite(c *C) { server.EnableZap = true - server.ConfigCheckInterval = 10 * time.Millisecond } type testItem struct { @@ -120,7 +118,6 @@ func (s *configTestSuite) TestConfig(c *C) { args2 := []string{"-u", pdAddr, "config", "set", "cluster-version", "2.1.0-rc.5"} _, _, err = pdctl.ExecuteCommandC(cmd, args2...) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) c.Assert(clusterVersion, Not(DeepEquals), svr.GetClusterVersion()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) @@ -140,7 +137,6 @@ func (s *configTestSuite) TestConfig(c *C) { args2 = []string{"-u", pdAddr, "config", "set", "label-property", "reject-leader", "zone", "cn"} _, _, err = pdctl.ExecuteCommandC(cmd, args2...) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) c.Assert(labelPropertyCfg, Not(DeepEquals), svr.GetLabelProperty()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) @@ -152,7 +148,6 @@ func (s *configTestSuite) TestConfig(c *C) { args3 := []string{"-u", pdAddr, "config", "delete", "label-property", "reject-leader", "zone", "cn"} _, _, err = pdctl.ExecuteCommandC(cmd, args3...) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) c.Assert(labelPropertyCfg, Not(DeepEquals), svr.GetLabelProperty()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) @@ -238,9 +233,6 @@ func (s *configTestSuite) TestPlacementRules(c *C) { c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) - // wait config manager reload - time.Sleep(time.Second) - // test show var rules []placement.Rule _, output, err = pdctl.ExecuteCommandC(cmd, "-u", pdAddr, "config", "placement-rules", "show") diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 48e5e1d11bc..5e5342cf782 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -41,7 +41,6 @@ type hotTestSuite struct{} func (s *hotTestSuite) SetUpSuite(c *C) { server.EnableZap = true - server.ConfigCheckInterval = 10 * time.Millisecond } func (s *hotTestSuite) TestHot(c *C) { diff --git a/tests/pdctl/label/label_test.go b/tests/pdctl/label/label_test.go index d62243ea08f..6c3902b99e2 100644 --- a/tests/pdctl/label/label_test.go +++ b/tests/pdctl/label/label_test.go @@ -18,7 +18,6 @@ import ( "encoding/json" "strings" "testing" - "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -39,7 +38,6 @@ type labelTestSuite struct{} func (s *labelTestSuite) SetUpSuite(c *C) { server.EnableZap = true - server.ConfigCheckInterval = 10 * time.Millisecond } func (s *labelTestSuite) TestLabel(c *C) { diff --git a/tests/pdctl/log/log_test.go b/tests/pdctl/log/log_test.go index 7c0c091c47c..5b37a9d3b89 100644 --- a/tests/pdctl/log/log_test.go +++ b/tests/pdctl/log/log_test.go @@ -16,7 +16,6 @@ package log_test import ( "context" "testing" - "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -35,7 +34,6 @@ type logTestSuite struct{} func (s *logTestSuite) SetUpSuite(c *C) { server.EnableZap = true - server.ConfigCheckInterval = 10 * time.Millisecond } func (s *logTestSuite) TestLog(c *C) { @@ -89,7 +87,6 @@ func (s *logTestSuite) TestLog(c *C) { for _, testCase := range testCases { _, _, err = pdctl.ExecuteCommandC(cmd, testCase.cmd...) c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) c.Assert(svr.GetConfig().Log.Level, Equals, testCase.expect) } } diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 8ec210e2252..6560c0053ed 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -38,7 +38,6 @@ type operatorTestSuite struct{} func (s *operatorTestSuite) SetUpSuite(c *C) { server.EnableZap = true - server.ConfigCheckInterval = 10 * time.Millisecond } func (s *operatorTestSuite) TestOperator(c *C) { @@ -210,7 +209,6 @@ func (s *operatorTestSuite) TestOperator(c *C) { _, _, err = pdctl.ExecuteCommandC(cmd, "config", "set", "enable-placement-rules", "true") c.Assert(err, IsNil) - time.Sleep(20 * time.Millisecond) _, output, err = pdctl.ExecuteCommandC(cmd, "operator", "add", "transfer-region", "1", "2", "3") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "not supported"), IsTrue) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 95d2f15c4ed..6470a39b58d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,7 +37,6 @@ import ( syncer "github.com/pingcap/pd/v4/server/region_syncer" "github.com/pingcap/pd/v4/server/schedule/storelimit" "github.com/pingcap/pd/v4/tests" - "github.com/pingcap/pd/v4/tests/pdctl" "github.com/pkg/errors" ) @@ -60,7 +59,6 @@ type clusterTestSuite struct { func (s *clusterTestSuite) SetUpSuite(c *C) { s.ctx, s.cancel = context.WithCancel(context.Background()) server.EnableZap = true - server.ConfigCheckInterval = 1 * time.Second // to prevent GetStorage dashboard.SetCheckInterval(30 * time.Minute) } @@ -165,60 +163,6 @@ func (s *clusterTestSuite) TestGetPutConfig(c *C) { c.Assert(meta.GetMaxPeerCount(), Equals, uint32(5)) } -func (s *clusterTestSuite) TestReloadConfig(c *C) { - tc, err := tests.NewTestCluster(s.ctx, 3, func(conf *config.Config) { - conf.PDServerCfg.UseRegionStorage = true - conf.EnableDynamicConfig = true - }) - defer tc.Destroy() - c.Assert(err, IsNil) - - err = tc.RunInitialServers() - c.Assert(err, IsNil) - tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) - c.Assert(leaderServer.BootstrapCluster(), IsNil) - rc := leaderServer.GetServer().GetRaftCluster() - c.Assert(rc, NotNil) - - // wait for creating config client - time.Sleep(2 * time.Second) - cmd := pdctl.InitCommand() - pdAddr := leaderServer.GetAddr() - args := []string{"-u", pdAddr, "config", "set", "enable-placement-rules", "true"} - _, _, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - - // transfer leader - tc.ResignLeader() - tc.WaitLeader() - leaderServer = tc.GetServer(tc.GetLeader()) - c.Assert(leaderServer.GetServer().GetPersistOptions().GetReplication().IsPlacementRulesEnabled(), IsFalse) - rc = leaderServer.GetServer().GetRaftCluster() - r := &metapb.Region{ - Id: 3, - RegionEpoch: &metapb.RegionEpoch{ - ConfVer: 1, - Version: 1, - }, - StartKey: []byte{byte(1)}, - EndKey: []byte{byte(2)}, - Peers: []*metapb.Peer{{Id: 4, StoreId: uint64(1), IsLearner: true}}, - } - region := core.NewRegionInfo(r, r.Peers[0]) - c.Assert(rc.HandleRegionHeartbeat(region), IsNil) - - // wait for checking region - time.Sleep(300 * time.Millisecond) - c.Assert(leaderServer.GetServer().GetPersistOptions().GetReplication().IsPlacementRulesEnabled(), IsFalse) - c.Assert(rc.GetOperatorController().GetOperator(3), IsNil) - - // wait for configuration valid - time.Sleep(1 * time.Second) - c.Assert(leaderServer.GetServer().GetPersistOptions().GetReplication().IsPlacementRulesEnabled(), IsTrue) - c.Assert(rc.GetOperatorController().GetOperator(3), IsNil) -} - func testPutStore(c *C, clusterID uint64, rc *cluster.RaftCluster, grpcPDClient pdpb.PDClient, store *metapb.Store) { // Update store. _, err := putStore(c, grpcPDClient, clusterID, store) @@ -595,8 +539,7 @@ func (s *clusterTestSuite) TestConcurrentHandleRegion(c *C) { } func (s *clusterTestSuite) TestSetScheduleOpt(c *C) { - // Here needs to disable dynamic config to prevent GetStorage, otherwise, it may have a data race problem. - tc, err := tests.NewTestCluster(s.ctx, 1, func(cfg *config.Config) { cfg.EnableDynamicConfig = false }) + tc, err := tests.NewTestCluster(s.ctx, 1) defer tc.Destroy() c.Assert(err, IsNil) @@ -687,7 +630,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) { rc := cluster.NewRaftCluster(s.ctx, svr.GetClusterRootPath(), svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient()) // Cluster is not bootstrapped. - rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), func() {}) + rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster()) raftCluster, err := rc.LoadClusterInfo() c.Assert(err, IsNil) c.Assert(raftCluster, IsNil) @@ -726,7 +669,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) { c.Assert(storage.Flush(), IsNil) raftCluster = &cluster.RaftCluster{} - raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster, func() {}) + raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster) raftCluster, err = raftCluster.LoadClusterInfo() c.Assert(err, IsNil) c.Assert(raftCluster, NotNil)