From 62930071fd2fd0ef21008a9d51ed0423f5ace895 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 12 Oct 2023 19:14:56 +0800 Subject: [PATCH] add rule forward and test Signed-off-by: lhy1024 --- errors.toml | 5 + pkg/errs/errno.go | 1 + pkg/mcs/scheduling/server/apis/v1/api.go | 301 +++++++++++++++++- server/api/rule.go | 2 +- server/api/server.go | 2 +- tests/integrations/mcs/scheduling/api_test.go | 103 +++++- 6 files changed, 385 insertions(+), 29 deletions(-) diff --git a/errors.toml b/errors.toml index 04d3a5fd732..ef61a01dd64 100644 --- a/errors.toml +++ b/errors.toml @@ -566,6 +566,11 @@ error = ''' invalid rule content, %s ''' +["PD:placement:ErrRuleNotFound"] +error = ''' +rule not found +''' + ["PD:plugin:ErrLoadPlugin"] error = ''' failed to load plugin diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 801324ec64e..0edc97daf8c 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -161,6 +161,7 @@ var ( ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList")) ErrPlacementDisabled = errors.Normalize("placement rules feature is disabled", errors.RFCCodeText("PD:placement:ErrPlacementDisabled")) ErrKeyFormat = errors.Normalize("key should be in hex format", errors.RFCCodeText("PD:placement:ErrKeyFormat")) + ErrRuleNotFound = errors.Normalize("rule not found", errors.RFCCodeText("PD:placement:ErrRuleNotFound")) ) // region label errors diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index b364a64be5f..90124d49034 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -15,6 +15,7 @@ package apis import ( + "encoding/hex" "fmt" "net/http" "strconv" @@ -26,6 +27,7 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" sche "github.com/tikv/pd/pkg/schedule/core" @@ -114,6 +116,7 @@ func NewService(srv *scheserver.Service) *Service { s.RegisterSchedulersRouter() s.RegisterCheckersRouter() s.RegisterHotspotRouter() + s.RegisterConfigRouter() return s } @@ -150,16 +153,6 @@ func (s *Service) RegisterHotspotRouter() { router.GET("/buckets", getHotBuckets) } -// RegisterConfigRouter registers the router of the config handler. -func (s *Service) RegisterConfigRouter() { - // router := s.root.Group("config") - // router.GET("/rule", getHotBuckets) - // router.GET("/rules", getHotBuckets) - // router.GET("/rule_group", getHotBuckets) - // router.GET("/rule_groups", getHotBuckets) - // router.GET("/placement-rule", getHotBuckets) -} - // RegisterOperatorsRouter registers the router of the operators handler. func (s *Service) RegisterOperatorsRouter() { router := s.root.Group("operators") @@ -170,6 +163,31 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +// RegisterConfigRouter registers the router of the config handler. +func (s *Service) RegisterConfigRouter() { + router := s.root.Group("config") + + rules := router.Group("rules") + rules.GET("", getAllRules) + rules.GET("/group/:group", getRuleByGroup) + rules.GET("/region/:region", getRulesByRegion) + rules.GET("/region/:region/detail", checkRegionPlacementRule) + rules.GET("/key/:key", getRulesByKey) + + // We cannot merge `/rule` and `/rules`, because we allow `group_id` to be "group", + // which is the same as the prefix of `/rules/group/:group`. + rule := router.Group("rule") + rule.GET("/:group/:id", getRuleByGroupAndID) + + groups := router.Group("rule_groups") + groups.GET("", getAllGroupConfigs) + groups.GET("/:id", getGroupConfig) + + placementRule := router.Group("placement-rule") + placementRule.GET("", getPlacementRules) + placementRule.GET("/:group", getPlacementRuleByGroup) +} + func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) var level string @@ -558,3 +576,266 @@ func getHistoryHotRegions(c *gin.Context) { var res storage.HistoryHotRegions c.IndentedJSON(http.StatusOK, res) } + +// @Tags rule +// @Summary List all rules of cluster. +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules [get] +func getAllRules(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + rules := manager.GetAllRules() + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary List all rules of cluster by group. +// @Param group path string true "The name of group" +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/group/{group} [get] +func getRuleByGroup(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + group := c.Param("group") + rules := manager.GetRulesByGroup(group) + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary List all rules of cluster by region. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/region/{region} [get] +func getRulesByRegion(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + regionStr := c.Param("region") + region, code, err := handler.PreCheckForRegion(regionStr) + if err != nil { + c.String(code, err.Error()) + return + } + rules := manager.GetRulesForApplyRegion(region) + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary List rules and matched peers related to the given region. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {object} placement.RegionFit +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/region/{region}/detail [get] +func checkRegionPlacementRule(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + regionStr := c.Param("region") + region, code, err := handler.PreCheckForRegion(regionStr) + if err != nil { + c.String(code, err.Error()) + return + } + regionFit, err := handler.CheckRegionPlacementRule(region) + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, regionFit) +} + +// @Tags rule +// @Summary List all rules of cluster by key. +// @Param key path string true "The name of key" +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 400 {string} string "The input is invalid." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/key/{key} [get] +func getRulesByKey(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + keyHex := c.Param("key") + key, err := hex.DecodeString(keyHex) + if err != nil { + c.String(http.StatusBadRequest, errs.ErrKeyFormat.Error()) + return + } + rules := manager.GetRulesByKey(key) + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary Get rule of cluster by group and id. +// @Param group path string true "The name of group" +// @Param id path string true "Rule Id" +// @Produce json +// @Success 200 {object} placement.Rule +// @Failure 404 {string} string "The rule does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Router /config/rule/{group}/{id} [get] +func getRuleByGroupAndID(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + group, id := c.Param("group"), c.Param("id") + rule := manager.GetRule(group, id) + if rule == nil { + c.String(http.StatusNotFound, errs.ErrRuleNotFound.Error()) + return + } + c.IndentedJSON(http.StatusOK, rule) +} + +// @Tags rule +// @Summary List all rule group configs. +// @Produce json +// @Success 200 {array} placement.RuleGroup +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rule_groups [get] +func getAllGroupConfigs(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + ruleGroups := manager.GetRuleGroups() + c.IndentedJSON(http.StatusOK, ruleGroups) +} + +// @Tags rule +// @Summary Get rule group config by group id. +// @Param id path string true "Group Id" +// @Produce json +// @Success 200 {object} placement.RuleGroup +// @Failure 404 {string} string "The RuleGroup does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rule_groups/{id} [get] +func getGroupConfig(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + id := c.Param("id") + group := manager.GetRuleGroup(id) + if group == nil { + c.String(http.StatusNotFound, errs.ErrRuleNotFound.Error()) + return + } + c.IndentedJSON(http.StatusOK, group) +} + +// @Tags rule +// @Summary List all rules and groups configuration. +// @Produce json +// @Success 200 {array} placement.GroupBundle +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/placement-rules [get] +func getPlacementRules(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + bundles := manager.GetAllGroupBundles() + c.IndentedJSON(http.StatusOK, bundles) +} + +// @Tags rule +// @Summary Get group config and all rules belong to the group. +// @Param group path string true "The name of group" +// @Produce json +// @Success 200 {object} placement.GroupBundle +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/placement-rules/{group} [get] +func getPlacementRuleByGroup(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + g := c.Param("group") + group := manager.GetGroupBundle(g) + c.IndentedJSON(http.StatusOK, group) +} diff --git a/server/api/rule.go b/server/api/rule.go index 3ef3d51ea89..116a50f6259 100644 --- a/server/api/rule.go +++ b/server/api/rule.go @@ -237,7 +237,7 @@ func (h *ruleHandler) GetRuleByGroupAndID(w http.ResponseWriter, r *http.Request group, id := mux.Vars(r)["group"], mux.Vars(r)["id"] rule := manager.GetRule(group, id) if rule == nil { - h.rd.JSON(w, http.StatusNotFound, nil) + h.rd.JSON(w, http.StatusNotFound, errs.ErrRuleNotFound.Error()) return } h.rd.JSON(w, http.StatusOK, rule) diff --git a/server/api/server.go b/server/api/server.go index 7871478ca7f..4a4764293d5 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -95,7 +95,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( prefix+"/config/rule_group", - scheapi.APIPathPrefix+"/config/rule_group", + scheapi.APIPathPrefix+"/config/rule_groups", // Note: this is a typo in the original code mcs.SchedulingServiceName, []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 5284913813c..140fc9d3ebe 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -10,12 +10,13 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" "github.com/tikv/pd/pkg/schedule/handler" + "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" ) @@ -40,7 +41,7 @@ func TestAPI(t *testing.T) { suite.Run(t, &apiTestSuite{}) } -func (suite *apiTestSuite) SetupSuite() { +func (suite *apiTestSuite) SetupTest() { ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx cluster, err := tests.NewTestAPICluster(suite.ctx, 1) @@ -59,14 +60,19 @@ func (suite *apiTestSuite) SetupSuite() { suite.cleanupFunc = func() { cancel() } + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + suite.NoError(err) + suite.cluster.SetSchedulingCluster(tc) + tc.WaitForPrimaryServing(suite.Require()) } -func (suite *apiTestSuite) TearDownSuite() { +func (suite *apiTestSuite) TearDownTest() { suite.cluster.Destroy() suite.cleanupFunc() } func (suite *apiTestSuite) TestGetCheckerByName() { + re := suite.Require() testCases := []struct { name string }{ @@ -78,14 +84,8 @@ func (suite *apiTestSuite) TestGetCheckerByName() { {name: "joint-state"}, } - re := suite.Require() - s, cleanup := tests.StartSingleSchedulingTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) - defer cleanup() - testutil.Eventually(re, func() bool { - return s.IsServing() - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - addr := s.GetAddr() - urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/checkers", addr) + s := suite.cluster.GetSchedulingPrimaryServer() + urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/checkers", s.GetAddr()) co := s.GetCoordinator() for _, testCase := range testCases { @@ -120,17 +120,12 @@ func (suite *apiTestSuite) TestAPIForward() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader")) }() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) - re.NoError(err) - defer tc.Destroy() - tc.WaitForPrimaryServing(re) - urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints) var slice []string var resp map[string]interface{} // Test opeartor - err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice, + err := testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) re.Len(slice, 0) @@ -217,4 +212,78 @@ func (suite *apiTestSuite) TestAPIForward() { err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/regions/history"), &history, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) + + // Test rules: only forward `GET` request + var rules []*placement.Rule + tests.MustPutRegion(re, suite.cluster, 2, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60)) + rules = []*placement.Rule{ + { + GroupID: "pd", + ID: "default", + Role: "voter", + Count: 3, + LocationLabels: []string{}, + }, + } + rulesArgs, err := json.Marshal(rules) + suite.NoError(err) + + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "/config/rules"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/batch"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/group/pd"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/region/2"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + var fit placement.RegionFit + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/region/2/detail"), &fit, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/key/0000000000000001"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule/pd/2"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule/pd/2"), + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_group/pd"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_group/pd"), + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_group"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_groups"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule/pd"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule/pd"), + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule/pd"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) }