From dc3e9ac4015521b2f1962ddecbce8fde168b48bf Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 26 Oct 2023 22:45:36 +0800 Subject: [PATCH 1/4] mcs: support admin/cache http interface in scheduling server Signed-off-by: lhy1024 --- errors.toml | 10 ++++ pkg/errs/errno.go | 6 ++ pkg/mcs/scheduling/server/apis/v1/api.go | 52 ++++++++++++++++- pkg/mcs/scheduling/server/cluster.go | 10 ++++ server/api/admin.go | 57 +++++++++++++++++-- tests/integrations/mcs/scheduling/api_test.go | 56 ++++++++++++++++++ 6 files changed, 185 insertions(+), 6 deletions(-) diff --git a/errors.toml b/errors.toml index 1b96de8a209..1d10d40d294 100644 --- a/errors.toml +++ b/errors.toml @@ -496,6 +496,16 @@ error = ''' init file log error, %s ''' +["PD:mcs:ErrNotFoundSchedulingAddr"] +error = ''' +cannot find scheduling address +''' + +["PD:mcs:ErrSchedulingServer"] +error = ''' +scheduling server meets %v +''' + ["PD:member:ErrCheckCampaign"] error = ''' check campaign failed diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 181dfc9b393..e5bac8519be 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -403,3 +403,9 @@ var ( ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup")) ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup")) ) + +// Micro service errors +var ( + ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr")) + ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer")) +) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 39be00ef9a0..cec075d73fe 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -15,7 +15,6 @@ package apis import ( - "fmt" "net/http" "strconv" "sync" @@ -26,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" sche "github.com/tikv/pd/pkg/schedule/core" @@ -121,6 +121,9 @@ func NewService(srv *scheserver.Service) *Service { func (s *Service) RegisterAdminRouter() { router := s.root.Group("admin") router.PUT("/log", changeLogLevel) + cacheRouter := router.Group("cache/regions") + cacheRouter.DELETE("", deleteAllRegionCache) + cacheRouter.DELETE("/:id", deleteRegionCacheByID) } // RegisterSchedulersRouter registers the router of the schedulers handler. @@ -160,6 +163,11 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +// @Tags admin +// @Summary Change the log level. +// @Produce json +// @Success 200 {string} string "The log level is updated." +// @Router /admin/log [put] func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) var level string @@ -176,6 +184,46 @@ func changeLogLevel(c *gin.Context) { c.String(http.StatusOK, "The log level is updated.") } +// @Tags admin +// @Summary Drop all regions from cache. +// @Produce json +// @Success 200 {string} string "All regions are removed from server cache." +// @Router /admin/cache/regions [delete] +func deleteAllRegionCache(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + cluster := svr.GetCluster() + if cluster == nil { + c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error()) + return + } + cluster.DropCacheAllRegion() + c.String(http.StatusOK, "All regions are removed from server cache.") +} + +// @Tags admin +// @Summary Drop a specific region from cache. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {string} string "The region is removed from server cache." +// @Failure 400 {string} string "The input is invalid." +// @Router /admin/cache/regions/{id} [delete] +func deleteRegionCacheByID(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + cluster := svr.GetCluster() + if cluster == nil { + c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error()) + return + } + regionIDStr := c.Param("id") + regionID, err := strconv.ParseUint(regionIDStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + cluster.DropCacheRegion(regionID) + c.String(http.StatusOK, "The region is removed from server cache.") +} + // @Tags operators // @Summary Get an operator by ID. // @Param region_id path int true "A Region's Id" @@ -475,7 +523,7 @@ func getHotRegions(typ utils.RWType, c *gin.Context) { for _, storeID := range storeIDs { id, err := strconv.ParseUint(storeID, 10, 64) if err != nil { - c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID)) + c.String(http.StatusBadRequest, errs.ErrInvalidStoreID.FastGenByArgs(storeID).Error()) return } _, err = handler.GetStore(id) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 09ae4ede528..4f9768a2d31 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -586,3 +586,13 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { func (c *Cluster) IsPrepared() bool { return c.coordinator.GetPrepareChecker().IsPrepared() } + +// DropCacheAllRegion removes all cached regions. +func (c *Cluster) DropCacheAllRegion() { + c.ResetRegionCache() +} + +// DropCacheRegion removes a region from the cache. +func (c *Cluster) DropCacheRegion(id uint64) { + c.RemoveRegionIfExist(id) +} diff --git a/server/api/admin.go b/server/api/admin.go index 7a1dfb0f1e8..b3def7c8e48 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -16,6 +16,7 @@ package api import ( "encoding/json" + "fmt" "io" "net/http" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" "github.com/unrolled/render" @@ -59,7 +61,11 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) return } rc.DropCacheRegion(regionID) - h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer(regionID) + } + msg := "The region is removed from server cache." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // @Tags admin @@ -95,8 +101,11 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques } // Remove region from cache. rc.DropCacheRegion(regionID) - - h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer(regionID) + } + msg := "The region is removed from server cache and region meta storage." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // @Tags admin @@ -105,9 +114,14 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques // @Success 200 {string} string "All regions are removed from server cache." // @Router /admin/cache/regions [delete] func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) { + var err error rc := getCluster(r) rc.DropCacheAllRegion() - h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer() + } + msg := "All regions are removed from server cache." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // Intentionally no swagger mark as it is supposed to be only used in @@ -200,3 +214,38 @@ func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) { _ = h.rd.Text(w, http.StatusOK, "") } + +func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error { + addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName) + if !ok { + return errs.ErrNotFoundSchedulingAddr.FastGenByArgs() + } + var idStr string + if len(id) > 0 { + idStr = strconv.FormatUint(id[0], 10) + } + url := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions/%s", addr, idStr) + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + resp, err := h.svr.GetHTTPClient().Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode) + } + return nil +} + +func (h *adminHandler) buildMsg(msg string, err error) string { + if !h.svr.IsAPIServiceMode() { + return msg + } + if err != nil { + return fmt.Sprintf("%s But the scheduling server meets error: %s", msg, err.Error()) + } + return msg +} diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 5284913813c..ade2e1fd268 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -9,7 +9,9 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/statistics" @@ -218,3 +220,57 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) } + +func TestAdminRegionCache(t *testing.T) { + re := require.New(t) + checkAdminRegionCache := func(cluster *tests.TestCluster) { + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r3) + + s := cluster.GetSchedulingPrimaryServer() + re.Equal(3, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + + addr := s.GetAddr() + urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions", addr) + err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) + re.NoError(err) + re.Equal(2, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + + err = testutil.CheckDelete(testDialClient, urlPrefix, testutil.StatusOK(re)) + re.NoError(err) + re.Equal(0, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + } + env := tests.NewSchedulingTestEnvironment(t) + env.RunTestInAPIMode(checkAdminRegionCache) +} + +func TestAdminRegionCacheForward(t *testing.T) { + re := require.New(t) + checkAdminRegionCache := func(cluster *tests.TestCluster) { + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r3) + + s := cluster.GetSchedulingPrimaryServer() + re.Equal(3, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + + addr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr) + err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) + re.NoError(err) + re.Equal(2, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + + err = testutil.CheckDelete(testDialClient, urlPrefix+"s", testutil.StatusOK(re)) + re.NoError(err) + re.Equal(0, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + } + env := tests.NewSchedulingTestEnvironment(t) + env.RunTestInAPIMode(checkAdminRegionCache) +} From c6bcd4fc397c8ddafab3d53f41ad5a6ab650d259 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 27 Oct 2023 15:47:14 +0800 Subject: [PATCH 2/4] address comments Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index cec075d73fe..d0acdf39a09 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -121,9 +121,8 @@ func NewService(srv *scheserver.Service) *Service { func (s *Service) RegisterAdminRouter() { router := s.root.Group("admin") router.PUT("/log", changeLogLevel) - cacheRouter := router.Group("cache/regions") - cacheRouter.DELETE("", deleteAllRegionCache) - cacheRouter.DELETE("/:id", deleteRegionCacheByID) + router.DELETE("cache/regions", deleteAllRegionCache) + router.DELETE("cache/regions/:id", deleteRegionCacheByID) } // RegisterSchedulersRouter registers the router of the schedulers handler. From 8d4c72d566bd2e2c5cd0a85cdec570980c8fc3e9 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 30 Oct 2023 13:46:05 +0800 Subject: [PATCH 3/4] address comments Signed-off-by: lhy1024 --- tests/integrations/mcs/scheduling/api_test.go | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index ade2e1fd268..d6028204325 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -231,18 +231,18 @@ func TestAdminRegionCache(t *testing.T) { r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) tests.MustPutRegionInfo(re, cluster, r3) - s := cluster.GetSchedulingPrimaryServer() - re.Equal(3, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + schedulingServer := cluster.GetSchedulingPrimaryServer() + re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) - addr := s.GetAddr() + addr := schedulingServer.GetAddr() urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions", addr) err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) re.NoError(err) - re.Equal(2, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) err = testutil.CheckDelete(testDialClient, urlPrefix, testutil.StatusOK(re)) re.NoError(err) - re.Equal(0, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) } env := tests.NewSchedulingTestEnvironment(t) env.RunTestInAPIMode(checkAdminRegionCache) @@ -258,18 +258,22 @@ func TestAdminRegionCacheForward(t *testing.T) { r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) tests.MustPutRegionInfo(re, cluster, r3) - s := cluster.GetSchedulingPrimaryServer() - re.Equal(3, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + apiServer := cluster.GetLeaderServer().GetServer() + schedulingServer := cluster.GetSchedulingPrimaryServer() + re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(3, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) addr := cluster.GetLeaderServer().GetAddr() urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr) err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) re.NoError(err) - re.Equal(2, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(2, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) err = testutil.CheckDelete(testDialClient, urlPrefix+"s", testutil.StatusOK(re)) re.NoError(err) - re.Equal(0, s.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(0, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) } env := tests.NewSchedulingTestEnvironment(t) env.RunTestInAPIMode(checkAdminRegionCache) From 0cc98c46eaaaa598b89a89156a516b6d47a42eaa Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 30 Oct 2023 14:16:57 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: lhy1024 --- server/api/admin.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/api/admin.go b/server/api/admin.go index b3def7c8e48..246c9239f59 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -241,11 +241,8 @@ func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error { } func (h *adminHandler) buildMsg(msg string, err error) string { - if !h.svr.IsAPIServiceMode() { - return msg - } - if err != nil { - return fmt.Sprintf("%s But the scheduling server meets error: %s", msg, err.Error()) + if h.svr.IsAPIServiceMode() && err != nil { + return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) } return msg }