diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 411933e55c34..7c5e3e010dc2 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -73,7 +73,7 @@ func NewService(srv *rmserver.Service) *Service { manager := srv.GetManager() apiHandlerEngine.Use(func(c *gin.Context) { // manager implements the interface of basicserver.Service. - c.Set("service", manager.GetBasicServer()) + c.Set(multiservicesapi.ServiceContextKey, manager.GetBasicServer()) c.Next() }) apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index e8c4faa5d559..3d1c3921470a 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -34,7 +34,7 @@ import ( ) // APIPathPrefix is the prefix of the API path. -const APIPathPrefix = "/scheduling/api/v1/" +const APIPathPrefix = "/scheduling/api/v1" var ( once sync.Once diff --git a/pkg/utils/apiutil/apiutil.go b/pkg/utils/apiutil/apiutil.go index 6c32640218e0..c5d7c247aca7 100644 --- a/pkg/utils/apiutil/apiutil.go +++ b/pkg/utils/apiutil/apiutil.go @@ -196,12 +196,14 @@ func PostJSON(client *http.Client, url string, data []byte) (*http.Response, err return nil, err } req.Header.Set("Content-Type", "application/json") + req.Header.Add("Accept-Encoding", "identity") return client.Do(req) } // GetJSON is used to send GET request to specific url func GetJSON(client *http.Client, url string, data []byte) (*http.Response, error) { req, err := http.NewRequest(http.MethodGet, url, bytes.NewBuffer(data)) + req.Header.Add("Accept-Encoding", "identity") if err != nil { return nil, err } diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 7d403ecef13d..d7850029e2ed 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -17,6 +17,7 @@ package serverapi import ( "net/http" "net/url" + "strings" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -108,13 +109,19 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri return false, "" } for _, rule := range h.microserviceRedirectRules { - if rule.matchPath == r.URL.Path { + if strings.HasPrefix(r.URL.Path, rule.matchPath) { addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) if !ok || addr == "" { log.Warn("failed to get the service primary addr when try match redirect rules", zap.String("path", r.URL.Path)) } - r.URL.Path = rule.targetPath + // Extract parameters from the URL path + pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath) + if len(pathParams) > 0 && pathParams[0] == '/' { + pathParams = pathParams[1:] // Remove leading '/' + } + r.URL.Path = rule.targetPath + "/" + pathParams + r.URL.Path = strings.TrimRight(r.URL.Path, "/") return true, addr } } @@ -122,10 +129,10 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri } func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { - matchedFlag, targetAddr := h.matchMicroServiceRedirectRules(r) + needRedirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r) allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0 isLeader := h.s.GetMember().IsLeader() - if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !matchedFlag { + if !h.s.IsClosed() && (allowFollowerHandle || isLeader) && !needRedirectToMicroService { next(w, r) return } @@ -150,7 +157,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http } var clientUrls []string - if matchedFlag { + if needRedirectToMicroService { if len(targetAddr) == 0 { http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError) return diff --git a/server/api/server.go b/server/api/server.go index 1d881022c042..5df5d22f2d03 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -19,6 +19,7 @@ import ( "net/http" "github.com/gorilla/mux" + scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" @@ -35,14 +36,29 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP Name: "core", IsCore: true, } - router := mux.NewRouter() + prefix := apiPrefix + "/api/v1" r := createRouter(apiPrefix, svr) + router := mux.NewRouter() router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), - serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule( - apiPrefix+"/api/v1"+"/admin/reset-ts", - tsoapi.APIPathPrefix+"/admin/reset-ts", - mcs.TSOServiceName)), + serverapi.NewRedirector(svr, + serverapi.MicroserviceRedirectRule( + prefix+"/admin/reset-ts", + tsoapi.APIPathPrefix+"/admin/reset-ts", + mcs.TSOServiceName), + serverapi.MicroserviceRedirectRule( + prefix+"/operators", + scheapi.APIPathPrefix+"/operators", + mcs.SchedulingServiceName), + serverapi.MicroserviceRedirectRule( + prefix+"/checker", // Note: this is a typo in the original code + scheapi.APIPathPrefix+"/checkers", + mcs.SchedulingServiceName), + serverapi.MicroserviceRedirectRule( + prefix+"/schedulers", + scheapi.APIPathPrefix+"/schedulers", + mcs.SchedulingServiceName), + ), negroni.Wrap(r)), ) diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 48bdf1ab95c6..03db3433ec94 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -106,3 +106,34 @@ func (suite *apiTestSuite) TestGetCheckerByName() { suite.False(resp["paused"].(bool)) } } + +func (suite *apiTestSuite) TestAPIForward() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints) + var slice []string + var resp map[string]interface{} + + // Test opeartor + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice) + re.NoError(err) + re.Len(slice, 0) + + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/2"), &resp) + re.NoError(err) + re.Nil(resp) + + // Test checker + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), &resp) + re.NoError(err) + suite.False(resp["paused"].(bool)) + + // Test scheduler + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), &slice) + re.NoError(err) + re.Contains(slice, "balance-leader-scheduler") +} diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index fde6bcb8da05..9040ab41b36f 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -30,6 +30,7 @@ import ( apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" ) @@ -47,10 +48,11 @@ var dialClient = &http.Client{ type tsoAPITestSuite struct { suite.Suite - ctx context.Context - cancel context.CancelFunc - pdCluster *tests.TestCluster - tsoCluster *tests.TestTSOCluster + ctx context.Context + cancel context.CancelFunc + pdCluster *tests.TestCluster + tsoCluster *tests.TestTSOCluster + backendEndpoints string } func TestTSOAPI(t *testing.T) { @@ -69,7 +71,8 @@ func (suite *tsoAPITestSuite) SetupTest() { leaderName := suite.pdCluster.WaitLeader() pdLeaderServer := suite.pdCluster.GetServer(leaderName) re.NoError(pdLeaderServer.BootstrapCluster()) - suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr()) + suite.backendEndpoints = pdLeaderServer.GetAddr() + suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) re.NoError(err) } @@ -95,6 +98,25 @@ func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() { re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID) } +func (suite *tsoAPITestSuite) TestResetTS() { + re := suite.Require() + primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re) + re.NotNil(primary) + url := suite.backendEndpoints + "/pd/api/v1/admin/reset-ts" + + // Test reset ts + input := []byte(`{"tso":"121312", "force-use-larger":true}`) + err := testutil.CheckPostJSON(dialClient, url, input, + testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully")) + suite.NoError(err) + + // Test reset ts with invalid tso + input = []byte(`{}`) + err = testutil.CheckPostJSON(dialClient, url, input, + testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value")) + re.NoError(err) +} + func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember { httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil) re.NoError(err) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index d87f15421799..58006b87eebf 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -106,23 +106,19 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() { cc, err := grpc.DialContext(suite.ctx, s.GetAddr(), grpc.WithInsecure()) re.NoError(err) cc.Close() - url := s.GetAddr() + tsoapi.APIPathPrefix - { - resetJSON := `{"tso":"121312", "force-use-larger":true}` - re.NoError(err) - resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON)) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusOK, resp.StatusCode) - } - { - resetJSON := `{}` - re.NoError(err) - resp, err := http.Post(url+"/admin/reset-ts", "application/json", strings.NewReader(resetJSON)) - re.NoError(err) - defer resp.Body.Close() - re.Equal(http.StatusBadRequest, resp.StatusCode) - } + + url := s.GetAddr() + tsoapi.APIPathPrefix + "/admin/reset-ts" + // Test reset ts + input := []byte(`{"tso":"121312", "force-use-larger":true}`) + err = testutil.CheckPostJSON(dialClient, url, input, + testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully")) + suite.NoError(err) + + // Test reset ts with invalid tso + input = []byte(`{}`) + err = testutil.CheckPostJSON(dialClient, suite.backendEndpoints+"/pd/api/v1/admin/reset-ts", input, + testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value")) + re.NoError(err) } func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() { diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index ab5687cdc042..63d62c8a8748 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -17,6 +17,7 @@ package operator_test import ( "context" "encoding/hex" + "encoding/json" "strconv" "strings" "testing" @@ -251,3 +252,32 @@ func TestOperator(t *testing.T) { return strings.Contains(string(output1), "Success!") || strings.Contains(string(output2), "Success!") }) } + +func TestMicroservice(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestAPICluster(ctx, 1) + re.NoError(err) + re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) + server := cluster.GetServer(cluster.GetLeader()) + re.NoError(server.BootstrapCluster()) + backendEndpoints := server.GetAddr() + tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + cmd := pdctlCmd.GetRootCmd() + args := []string{"-u", backendEndpoints, "operator", "show"} + var slice []string + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &slice)) + re.Len(slice, 0) + args = []string{"-u", backendEndpoints, "operator", "check", "2"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "null") +} diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index e1c0a210c01c..bc3cf6e931e1 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -486,3 +486,28 @@ func TestScheduler(t *testing.T) { re.NoError(err) checkSchedulerWithStatusCommand(nil, "disabled", nil) } + +func TestMicroservice(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestAPICluster(ctx, 1) + re.NoError(err) + re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) + server := cluster.GetServer(cluster.GetLeader()) + re.NoError(server.BootstrapCluster()) + backendEndpoints := server.GetAddr() + tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + cmd := pdctlCmd.GetRootCmd() + args := []string{"-u", backendEndpoints, "scheduler", "show"} + var slice []string + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &slice)) + re.Contains(slice, "balance-leader-scheduler") +} diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 5d8552da51a0..47e8a18eceff 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -185,6 +185,7 @@ func requestJSON(cmd *cobra.Command, method, prefix string, input map[string]int return err } req.Header.Set("Content-Type", "application/json") + req.Header.Add("Accept-Encoding", "identity") resp, err = dialClient.Do(req) default: err := errors.Errorf("method %s not supported", method) @@ -228,6 +229,7 @@ func do(endpoint, prefix, method string, resp *string, customHeader http.Header, var req *http.Request req, err = http.NewRequest(method, url, b.body) + req.Header.Add("Accept-Encoding", "identity") if err != nil { return err }