Skip to content

Commit

Permalink
refactor config api set method
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Apr 20, 2020
1 parent b7c1d76 commit 33368e9
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 145 deletions.
2 changes: 1 addition & 1 deletion pkg/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func IsServiceAllowed(s *server.Server, group server.ServiceGroup) bool {
}

opt := s.GetServerOption()
cfg := opt.LoadPDServerConfig()
cfg := opt.GetPDServerConfig()
if cfg != nil {
for _, allow := range cfg.RuntimeServices {
if group.Name == allow {
Expand Down
237 changes: 127 additions & 110 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ func (h *confHandler) GetDefault(w http.ResponseWriter, r *http.Request) {
// @Success 200 {string} string "The config is updated."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Failure 503 {string} string "PD server has no leader."
// @Router /config [post]
func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
config := h.svr.GetConfig()
cfg := h.svr.GetConfig()
data, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
Expand All @@ -96,171 +95,189 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
}

for k, v := range conf {
if kp := strings.Split(k, "."); len(kp) == 2 {
if !isPrefixLegal(kp[0]) {
h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("prefix %s not found", kp[0]))
if s := strings.Split(k, "."); len(s) > 1 {
if err := h.updateConfig(cfg, k, v); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
delete(conf, k)
conf[kp[1]] = v
continue
}
key := findTag(reflect.TypeOf(&config.Config{}).Elem(), k)
if key == "" {
h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("config item %s not found", k))
return
}
if err := h.updateConfig(cfg, key, v); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
}
data, err = json.Marshal(conf)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.updateWithoutPrefix(w, config, data)

h.rd.JSON(w, http.StatusOK, nil)
}

func isPrefixLegal(prefix string) bool {
switch prefix {
case "schedule", "replication", "pd-server", "log":
return true
default:
return false
func (h *confHandler) updateConfig(cfg *config.Config, key string, value interface{}) error {
kp := strings.Split(key, ".")
switch kp[0] {
case "schedule":
return h.updateSchedule(cfg, kp[len(kp)-1], value)
case "replication":
return h.updateReplication(cfg, kp[len(kp)-1], value)
case "replication-mode":
return h.updateReplicationModeConfig(cfg, kp[1:], value)
case "pd-server":
return h.updatePDServerConfig(cfg, kp[len(kp)-1], value)
case "log":
return h.updateLogLevel(kp, value)
case "cluster-version":
return h.updateClusterVersion(value)
case "label-property": // TODO: support changing label-property
}
return errors.Errorf("config prefix %s not found", kp[0])
}

func findTag(t reflect.Type, tag string) string {
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)

column := field.Tag.Get("json")
c := strings.Split(column, ",")
if c[0] == tag {
return c[0]
}

if field.Type.Kind() == reflect.Struct {
path := findTag(field.Type, tag)
if path == "" {
continue
}
return field.Tag.Get("json") + "." + path
}
}
return ""
}

func (h *confHandler) updateWithoutPrefix(w http.ResponseWriter, config *config.Config, data []byte) {
var err error
func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
return err
}
found1, err := h.updateSchedule(data, config)

updated, found, err := h.mergeConfig(&config.Schedule, data)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
return err
}
found2, err := h.updateReplication(data, config)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

if !found {
return errors.Errorf("config item %s not found", key)
}
found3, err := h.updatePDServerConfig(data, config)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

if updated {
err = h.svr.SetScheduleConfig(config.Schedule)
}
found4, err := h.updateLogLevel(data, config)
return err
}

func (h *confHandler) updateReplication(config *config.Config, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
return err
}
found5, err := h.updateClusterVersion(data, config)

updated, found, err := h.mergeConfig(&config.Replication, data)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
return err
}
found6, err := h.updateLabelProperty(data, config)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return

if !found {
return errors.Errorf("config item %s not found", key)
}
if !found1 && !found2 && !found3 && !found4 && !found5 && !found6 {
h.rd.JSON(w, http.StatusBadRequest, "config item not found")
return

if updated {
err = h.svr.SetReplicationConfig(config.Replication)
}
return err
}

func (h *confHandler) updateSchedule(data []byte, config *config.Config) (bool, error) {
updated, found, err := h.mergeConfig(&config.Schedule, data)
func (h *confHandler) updateReplicationModeConfig(config *config.Config, key []string, value interface{}) error {
cfg := make(map[string]interface{})
cfg = getConfigMap(cfg, key, value)
data, err := json.Marshal(cfg)
if err != nil {
return false, err
return err
}
if updated {
err = h.svr.SetScheduleConfig(config.Schedule)
}
return found, err
}

func (h *confHandler) updateReplication(data []byte, config *config.Config) (bool, error) {
updated, found, err := h.mergeConfig(&config.Replication, data)
updated, found, err := h.mergeConfig(&config.ReplicationMode, data)
if err != nil {
return false, err
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetReplicationConfig(config.Replication)
err = h.svr.SetReplicationModeConfig(config.ReplicationMode)
}
return found, err
return err
}

func (h *confHandler) updatePDServerConfig(data []byte, config *config.Config) (bool, error) {
func (h *confHandler) updatePDServerConfig(config *config.Config, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
return err
}

updated, found, err := h.mergeConfig(&config.PDServerCfg, data)
if err != nil {
return false, err
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetPDServerConfig(config.PDServerCfg)
}
return found, err
return err
}

func (h *confHandler) updateLogLevel(data []byte, config *config.Config) (bool, error) {
cfg := make(map[string]interface{})
err := json.Unmarshal(data, &cfg)
if err != nil {
return false, err
func (h *confHandler) updateLogLevel(kp []string, value interface{}) error {
if len(kp) != 2 || kp[1] != "level" {
return errors.Errorf("only support changing log level")
}

if level, ok := cfg["level"].(string); ok {
err = h.svr.SetLogLevel(level)
if level, ok := value.(string); ok {
err := h.svr.SetLogLevel(level)
if err != nil {
return true, err
return err
}
log.SetLevel(logutil.StringToZapLogLevel(level))
return true, nil
return nil
}
return false, err
return errors.Errorf("input value %v is illegal", value)
}

func (h *confHandler) updateClusterVersion(data []byte, config *config.Config) (bool, error) {
cfg := make(map[string]interface{})
err := json.Unmarshal(data, &cfg)
if err != nil {
return false, err
}

if version, ok := cfg["cluster-version"].(string); ok {
func (h *confHandler) updateClusterVersion(value interface{}) error {
if version, ok := value.(string); ok {
err := h.svr.SetClusterVersion(version)
if err != nil {
return true, err
return err
}
return true, nil
return nil
}
return false, err
return errors.Errorf("input value %v is illegal", value)
}

func (h *confHandler) updateLabelProperty(data []byte, config *config.Config) (bool, error) {
cfg := make(map[string]interface{})
err := json.Unmarshal(data, &cfg)
if err != nil {
return false, err
func getConfigMap(cfg map[string]interface{}, key []string, value interface{}) map[string]interface{} {
if len(key) == 1 {
cfg[key[0]] = value
return cfg
}

if lp, ok := cfg["label-property"].(string); ok {
input := make(map[string]string)
err = json.Unmarshal([]byte(lp), &input)
if err != nil {
return true, err
}
switch input["action"] {
case "set":
err = h.svr.SetLabelProperty(input["type"], input["label-key"], input["label-value"])
case "delete":
err = h.svr.DeleteLabelProperty(input["type"], input["label-key"], input["label-value"])
default:
err = errors.Errorf("unknown action %v", input["action"])
}
if err != nil {
return true, err
}
return true, nil
}
return false, err
subConfig := make(map[string]interface{})
cfg[key[0]] = getConfigMap(subConfig, key[1:], value)
return cfg
}

func (h *confHandler) mergeConfig(v interface{}, data []byte) (updated bool, found bool, err error) {
Expand Down
25 changes: 11 additions & 14 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ func (s *testConfigSuite) TestConfigAll(c *C) {

// the new way
l = map[string]interface{}{
"schedule.tolerant-size-ratio": 2.5,
"replication.location-labels": "idc,host",
"pd-server.metric-storage": "http://127.0.0.1:1234",
"log.level": "warn",
"cluster-version": "v4.0.0-beta",
"label-property": `{"type": "foo", "action": "set", "label-key": "zone", "label-value": "cn1"}`,
"schedule.tolerant-size-ratio": 2.5,
"replication.location-labels": "idc,host",
"pd-server.metric-storage": "http://127.0.0.1:1234",
"log.level": "warn",
"cluster-version": "v4.0.0-beta",
"replication-mode.replication-mode": "dr_auto_sync",
"replication-mode.dr-auto-sync.label-key": "foobar",
}
postData, err = json.Marshal(l)
c.Assert(err, IsNil)
Expand All @@ -104,17 +105,13 @@ func (s *testConfigSuite) TestConfigAll(c *C) {
cfg.Replication.LocationLabels = []string{"idc", "host"}
cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:1234"
cfg.Log.Level = "warn"
cfg.ReplicationMode.DRAutoSync.LabelKey = "foobar"
cfg.ReplicationMode.ReplicationMode = "dr_auto_sync"
v, err := cluster.ParseVersion("v4.0.0-beta")
c.Assert(err, IsNil)
cfg.ClusterVersion = *v
cfg.LabelProperty = map[string][]config.StoreLabel{
"foo": {{Key: "zone", Value: "cn1"}},
}
c.Assert(newCfg1, DeepEquals, cfg)

l = map[string]interface{}{
"label-property": `{"type": "foo", "action": "delete", "label-key": "zone", "label-value": "cn1"}`,
}
postData, err = json.Marshal(l)
c.Assert(err, IsNil)
err = postJSON(addr, postData)
Expand All @@ -127,7 +124,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) {
postData, err = json.Marshal(l)
c.Assert(err, IsNil)
err = postJSON(addr, postData)
c.Assert(strings.Contains(err.Error(), "replicate"), IsTrue)
c.Assert(strings.Contains(err.Error(), "not found"), IsTrue)

// config item not found
l = map[string]interface{}{
Expand All @@ -136,7 +133,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) {
postData, err = json.Marshal(l)
c.Assert(err, IsNil)
err = postJSON(addr, postData)
c.Assert(strings.Contains(err.Error(), "config item not found"), IsTrue)
c.Assert(strings.Contains(err.Error(), "not found"), IsTrue)
}

func (s *testConfigSuite) TestConfigSchedule(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *testConfigSuite) TestReloadConfig(c *C) {
scheduleCfg := opt.Load()
scheduleCfg.MaxSnapshotCount = 10
opt.SetMaxReplicas(5)
opt.LoadPDServerConfig().UseRegionStorage = true
opt.GetPDServerConfig().UseRegionStorage = true
c.Assert(opt.Persist(storage), IsNil)

// suppose we add a new default enable scheduler "adjacent-region"
Expand All @@ -70,7 +70,7 @@ func (s *testConfigSuite) TestReloadConfig(c *C) {
c.Assert(newOpt.Reload(storage), IsNil)
schedulers := newOpt.GetSchedulers()
c.Assert(schedulers, HasLen, 5)
c.Assert(newOpt.LoadPDServerConfig().UseRegionStorage, IsTrue)
c.Assert(newOpt.GetPDServerConfig().UseRegionStorage, IsTrue)
for i, s := range schedulers {
c.Assert(s.Type, Equals, defaultSchedulers[i])
c.Assert(s.Disable, IsFalse)
Expand Down
Loading

0 comments on commit 33368e9

Please sign in to comment.