Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support admin/cache http interface in scheduling server #7279

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
error = '''
cannot find scheduling address
'''

["PD:mcs:ErrSchedulingServer"]
error = '''
scheduling server meets %v
'''

["PD:member:ErrCheckCampaign"]
error = '''
check campaign failed
Expand Down
6 changes: 6 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,9 @@ var (
ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
)

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
51 changes: 49 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apis

import (
"fmt"
"net/http"
"strconv"
"sync"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
Expand Down Expand Up @@ -121,6 +121,8 @@ func NewService(srv *scheserver.Service) *Service {
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
router.PUT("/log", changeLogLevel)
router.DELETE("cache/regions", deleteAllRegionCache)
router.DELETE("cache/regions/:id", deleteRegionCacheByID)
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
Expand Down Expand Up @@ -160,6 +162,11 @@ func (s *Service) RegisterOperatorsRouter() {
router.GET("/records", getOperatorRecords)
}

// @Tags admin
// @Summary Change the log level.
// @Produce json
// @Success 200 {string} string "The log level is updated."
// @Router /admin/log [put]
func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var level string
Expand All @@ -176,6 +183,46 @@ func changeLogLevel(c *gin.Context) {
c.String(http.StatusOK, "The log level is updated.")
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
// @Success 200 {string} string "All regions are removed from server cache."
// @Router /admin/cache/regions [delete]
func deleteAllRegionCache(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cluster := svr.GetCluster()
if cluster == nil {
c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return
}
cluster.DropCacheAllRegion()
c.String(http.StatusOK, "All regions are removed from server cache.")
}

// @Tags admin
// @Summary Drop a specific region from cache.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {string} string "The region is removed from server cache."
// @Failure 400 {string} string "The input is invalid."
// @Router /admin/cache/regions/{id} [delete]
func deleteRegionCacheByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cluster := svr.GetCluster()
if cluster == nil {
c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return
}
regionIDStr := c.Param("id")
regionID, err := strconv.ParseUint(regionIDStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
cluster.DropCacheRegion(regionID)
c.String(http.StatusOK, "The region is removed from server cache.")
}

// @Tags operators
// @Summary Get an operator by ID.
// @Param region_id path int true "A Region's Id"
Expand Down Expand Up @@ -475,7 +522,7 @@ func getHotRegions(typ utils.RWType, c *gin.Context) {
for _, storeID := range storeIDs {
id, err := strconv.ParseUint(storeID, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID))
c.String(http.StatusBadRequest, errs.ErrInvalidStoreID.FastGenByArgs(storeID).Error())
return
}
_, err = handler.GetStore(id)
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,3 +593,13 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
func (c *Cluster) IsPrepared() bool {
return c.coordinator.GetPrepareChecker().IsPrepared()
}

// DropCacheAllRegion removes all cached regions.
func (c *Cluster) DropCacheAllRegion() {
c.ResetRegionCache()
}

// DropCacheRegion removes a region from the cache.
func (c *Cluster) DropCacheRegion(id uint64) {
c.RemoveRegionIfExist(id)
}
57 changes: 53 additions & 4 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package api

import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
Expand Down Expand Up @@ -59,7 +61,11 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
return
}
rc.DropCacheRegion(regionID)
h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.")
if h.svr.IsAPIServiceMode() {
err = h.DeleteRegionCacheInSchedulingServer(regionID)
}
msg := "The region is removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
}

// @Tags admin
Expand Down Expand Up @@ -95,8 +101,11 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques
}
// Remove region from cache.
rc.DropCacheRegion(regionID)

h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.")
if h.svr.IsAPIServiceMode() {
err = h.DeleteRegionCacheInSchedulingServer(regionID)
}
msg := "The region is removed from server cache and region meta storage."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
}

// @Tags admin
Expand All @@ -105,9 +114,14 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques
// @Success 200 {string} string "All regions are removed from server cache."
// @Router /admin/cache/regions [delete]
func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) {
var err error
rc := getCluster(r)
rc.DropCacheAllRegion()
h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.")
if h.svr.IsAPIServiceMode() {
err = h.DeleteRegionCacheInSchedulingServer()
}
msg := "All regions are removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
}

// Intentionally no swagger mark as it is supposed to be only used in
Expand Down Expand Up @@ -200,3 +214,38 @@ func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) {

_ = h.rd.Text(w, http.StatusOK, "")
}

func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName)
if !ok {
return errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
}
var idStr string
if len(id) > 0 {
idStr = strconv.FormatUint(id[0], 10)
}
url := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions/%s", addr, idStr)
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
return err
}
resp, err := h.svr.GetHTTPClient().Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode)
}
return nil
}

func (h *adminHandler) buildMsg(msg string, err error) string {
if !h.svr.IsAPIServiceMode() {
return msg
}
if err != nil {
return fmt.Sprintf("%s But the scheduling server meets error: %s", msg, err.Error())
}
return msg
}
56 changes: 56 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/statistics"
Expand Down Expand Up @@ -218,3 +220,57 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)
}

func TestAdminRegionCache(t *testing.T) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
re := require.New(t)
checkAdminRegionCache := func(cluster *tests.TestCluster) {
r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r2)
r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r3)

s := cluster.GetSchedulingPrimaryServer()
re.Equal(3, s.GetCluster().GetRegionCount([]byte{}, []byte{}))

addr := s.GetAddr()
urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions", addr)
err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re))
re.NoError(err)
re.Equal(2, s.GetCluster().GetRegionCount([]byte{}, []byte{}))

err = testutil.CheckDelete(testDialClient, urlPrefix, testutil.StatusOK(re))
re.NoError(err)
re.Equal(0, s.GetCluster().GetRegionCount([]byte{}, []byte{}))
}
env := tests.NewSchedulingTestEnvironment(t)
env.RunTestInAPIMode(checkAdminRegionCache)
}

func TestAdminRegionCacheForward(t *testing.T) {
re := require.New(t)
checkAdminRegionCache := func(cluster *tests.TestCluster) {
r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r2)
r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r3)

s := cluster.GetSchedulingPrimaryServer()
re.Equal(3, s.GetCluster().GetRegionCount([]byte{}, []byte{}))

addr := cluster.GetLeaderServer().GetAddr()
urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr)
err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re))
re.NoError(err)
re.Equal(2, s.GetCluster().GetRegionCount([]byte{}, []byte{}))

err = testutil.CheckDelete(testDialClient, urlPrefix+"s", testutil.StatusOK(re))
re.NoError(err)
re.Equal(0, s.GetCluster().GetRegionCount([]byte{}, []byte{}))
}
env := tests.NewSchedulingTestEnvironment(t)
env.RunTestInAPIMode(checkAdminRegionCache)
}