diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index dde50f4980d..f6939a8d7f7 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -46,24 +46,24 @@ const ( forWardFromCapture = "TiCDC-ForwardFromCapture" ) -// openAPI provides capture APIs. -type openAPI struct { +// OpenAPI provides capture APIs. +type OpenAPI struct { capture *capture.Capture // use for unit test only testStatusProvider owner.StatusProvider } -// NewOpenAPI creates a new openAPI. -func NewOpenAPI(c *capture.Capture) openAPI { - return openAPI{capture: c} +// NewOpenAPI creates a new OpenAPI. +func NewOpenAPI(c *capture.Capture) OpenAPI { + return OpenAPI{capture: c} } -// NewOpenAPI4Test return a openAPI for test -func NewOpenAPI4Test(c *capture.Capture, p owner.StatusProvider) openAPI { - return openAPI{capture: c, testStatusProvider: p} +// NewOpenAPI4Test return a OpenAPI for test +func NewOpenAPI4Test(c *capture.Capture, p owner.StatusProvider) OpenAPI { + return OpenAPI{capture: c, testStatusProvider: p} } -func (h *openAPI) statusProvider() owner.StatusProvider { +func (h *OpenAPI) statusProvider() owner.StatusProvider { if h.testStatusProvider != nil { return h.testStatusProvider } @@ -71,7 +71,7 @@ func (h *openAPI) statusProvider() owner.StatusProvider { } // RegisterOpenAPIRoutes registers routes for OpenAPI -func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) { +func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) { v1 := router.Group("/api/v1") v1.Use(middleware.LogMiddleware()) @@ -118,7 +118,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) { // @Success 200 {array} model.ChangefeedCommonInfo // @Failure 500 {object} model.HTTPError // @Router /api/v1/changefeeds [get] -func (h *openAPI) ListChangefeed(c *gin.Context) { +func (h *OpenAPI) ListChangefeed(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -184,7 +184,7 @@ func (h *openAPI) ListChangefeed(c *gin.Context) { // @Success 200 {object} model.ChangefeedDetail // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id} [get] -func (h *openAPI) GetChangefeed(c *gin.Context) { +func (h *OpenAPI) GetChangefeed(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -253,7 +253,7 @@ func (h *openAPI) GetChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds [post] -func (h *openAPI) CreateChangefeed(c *gin.Context) { +func (h *OpenAPI) CreateChangefeed(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -302,7 +302,7 @@ func (h *openAPI) CreateChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id}/pause [post] -func (h *openAPI) PauseChangefeed(c *gin.Context) { +func (h *OpenAPI) PauseChangefeed(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -345,7 +345,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id}/resume [post] -func (h *openAPI) ResumeChangefeed(c *gin.Context) { +func (h *OpenAPI) ResumeChangefeed(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -393,7 +393,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id} [put] -func (h *openAPI) UpdateChangefeed(c *gin.Context) { +func (h *OpenAPI) UpdateChangefeed(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -450,7 +450,7 @@ func (h *openAPI) UpdateChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id} [delete] -func (h *openAPI) RemoveChangefeed(c *gin.Context) { +func (h *OpenAPI) RemoveChangefeed(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -492,7 +492,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post] -func (h *openAPI) RebalanceTables(c *gin.Context) { +func (h *OpenAPI) RebalanceTables(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -532,7 +532,7 @@ func (h *openAPI) RebalanceTables(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id}/tables/move_table [post] -func (h *openAPI) MoveTable(c *gin.Context) { +func (h *OpenAPI) MoveTable(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -585,7 +585,7 @@ func (h *openAPI) MoveTable(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/owner/resign [post] -func (h *openAPI) ResignOwner(c *gin.Context) { +func (h *OpenAPI) ResignOwner(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -608,7 +608,7 @@ func (h *openAPI) ResignOwner(c *gin.Context) { // @Success 200 {object} model.ProcessorDetail // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/processors/{changefeed_id}/{capture_id} [get] -func (h *openAPI) GetProcessor(c *gin.Context) { +func (h *OpenAPI) GetProcessor(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -688,7 +688,7 @@ func (h *openAPI) GetProcessor(c *gin.Context) { // @Success 200 {array} model.ProcessorCommonInfo // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/processors [get] -func (h *openAPI) ListProcessor(c *gin.Context) { +func (h *OpenAPI) ListProcessor(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -720,7 +720,7 @@ func (h *openAPI) ListProcessor(c *gin.Context) { // @Success 200 {array} model.Capture // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/captures [get] -func (h *openAPI) ListCapture(c *gin.Context) { +func (h *OpenAPI) ListCapture(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -754,7 +754,7 @@ func (h *openAPI) ListCapture(c *gin.Context) { // @Success 200 {object} model.ServerStatus // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/status [get] -func (h *openAPI) ServerStatus(c *gin.Context) { +func (h *OpenAPI) ServerStatus(c *gin.Context) { status := model.ServerStatus{ Version: version.ReleaseVersion, GitHash: version.GitHash, @@ -774,7 +774,7 @@ func (h *openAPI) ServerStatus(c *gin.Context) { // @Success 200 // @Failure 500 {object} model.HTTPError // @Router /api/v1/health [get] -func (h *openAPI) Health(c *gin.Context) { +func (h *OpenAPI) Health(c *gin.Context) { ctx := c.Request.Context() if _, err := h.capture.GetOwnerCaptureInfo(ctx); err != nil { @@ -815,7 +815,7 @@ func SetLogLevel(c *gin.Context) { } // forwardToOwner forward an request to owner -func (h *openAPI) forwardToOwner(c *gin.Context) { +func (h *OpenAPI) forwardToOwner(c *gin.Context) { ctx := c.Request.Context() // every request can only forward to owner one time if len(c.GetHeader(forWardFromCapture)) != 0 { diff --git a/cdc/api/v2/api.go b/cdc/api/v2/api.go index 29e41591cc3..f957c9e0609 100644 --- a/cdc/api/v2/api.go +++ b/cdc/api/v2/api.go @@ -24,15 +24,18 @@ type OpenAPIV2 struct { capture *capture.Capture } -// NewOpenAPI creates a new openAPIs. -func NewOpenAPI(c *capture.Capture) *OpenAPIV2 { - return &OpenAPIV2{capture: c} +// NewOpenAPIV2 creates a new OpenAPIV2. +func NewOpenAPIV2(c *capture.Capture) OpenAPIV2 { + return OpenAPIV2{capture: c} } -// RegisterOpenAPIRoutes registers routes for OpenAPI -func RegisterOpenAPIRoutes(router *gin.Engine, api *OpenAPIV2) { +// RegisterOpenAPIV2Routes registers routes for OpenAPI +func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { v2 := router.Group("/api/v2") v2.Use(middleware.LogMiddleware()) v2.Use(middleware.ErrorHandleMiddleware()) + + // common APIs + v2.GET("/tso", api.GetTso) } diff --git a/cdc/api/v2/api_test.go b/cdc/api/v2/api_test.go new file mode 100644 index 00000000000..bd636589159 --- /dev/null +++ b/cdc/api/v2/api_test.go @@ -0,0 +1,44 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "context" + "time" + + "github.com/gin-gonic/gin" + "github.com/pingcap/tiflow/cdc/capture" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" +) + +// MockPDClient mocks pd.Client to facilitate unit testing. +type MockPDClient struct { + pd.Client +} + +func (m *MockPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return oracle.GetPhysical(time.Now()), 0, nil +} + +type testCase struct { + url string + method string +} + +func newRouter(c *capture.Capture) *gin.Engine { + router := gin.New() + RegisterOpenAPIV2Routes(router, NewOpenAPIV2(c)) + return router +} diff --git a/cdc/api/v2/tso.go b/cdc/api/v2/tso.go new file mode 100644 index 00000000000..8ae9af52b4c --- /dev/null +++ b/cdc/api/v2/tso.go @@ -0,0 +1,46 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/upstream" +) + +// GetTso request and returns a TSO from PD +func (h *OpenAPIV2) GetTso(c *gin.Context) { + ctx := c.Request.Context() + if h.capture.UpstreamManager == nil { + c.Status(http.StatusServiceUnavailable) + return + } + pdClient := h.capture.UpstreamManager.Get(upstream.DefaultUpstreamID).PDClient + if pdClient == nil { + c.Status(http.StatusServiceUnavailable) + return + } + + timestamp, logicalTime, err := pdClient.GetTS(ctx) + if err != nil { + _ = c.Error(err) + c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + return + } + + resp := Tso{timestamp, logicalTime} + c.IndentedJSON(http.StatusOK, resp) +} diff --git a/cdc/api/v2/tso_test.go b/cdc/api/v2/tso_test.go new file mode 100644 index 00000000000..ea464b25b92 --- /dev/null +++ b/cdc/api/v2/tso_test.go @@ -0,0 +1,46 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/golang/mock/gomock" + "github.com/pingcap/tiflow/cdc/capture" + mock_owner "github.com/pingcap/tiflow/cdc/owner/mock" + "github.com/pingcap/tiflow/pkg/upstream" + "github.com/stretchr/testify/require" +) + +func TestGetTso(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mockOwner := mock_owner.NewMockOwner(ctrl) + mockPDClient := &MockPDClient{} + mockManager := upstream.NewManager4Test(mockPDClient) + cp := capture.NewCaptureWithManager4Test(mockOwner, mockManager) + + router := newRouter(cp) + w := httptest.NewRecorder() + + tc := testCase{url: "/api/v2/tso", method: "GET"} + req, err := http.NewRequestWithContext(context.Background(), tc.method, tc.url, nil) + require.Nil(t, err) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) +} diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 4a719512bf4..7be91a01f01 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -106,6 +106,15 @@ func NewCapture4Test(o owner.Owner) *Capture { return res } +// NewCaptureWithManager4Test returns a new Capture instance for test. +func NewCaptureWithManager4Test(o owner.Owner, m *upstream.Manager) *Capture { + res := &Capture{ + UpstreamManager: m, + } + res.owner = o + return res +} + func (c *Capture) reset(ctx context.Context) error { conf := config.GetGlobalServerConfig() sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(), diff --git a/cdc/http.go b/cdc/http.go index 240adbe5412..8d1a782ba1f 100644 --- a/cdc/http.go +++ b/cdc/http.go @@ -46,7 +46,7 @@ func RegisterRoutes( // Open API V1 v1.RegisterOpenAPIRoutes(router, v1.NewOpenAPI(capture)) // Open API V2 - v2.RegisterOpenAPIRoutes(router, v2.NewOpenAPI(capture)) + v2.RegisterOpenAPIV2Routes(router, v2.NewOpenAPIV2(capture)) // Owner API owner.RegisterOwnerAPIRoutes(router, capture) diff --git a/tests/integration_tests/http_api/run.sh b/tests/integration_tests/http_api/run.sh index c33becb6010..74e74bf417b 100644 --- a/tests/integration_tests/http_api/run.sh +++ b/tests/integration_tests/http_api/run.sh @@ -109,6 +109,7 @@ function run() { "set_log_level" "remove_changefeed" "resign_owner" + "get_tso" ) for case in ${sequential_cases[@]}; do diff --git a/tests/integration_tests/http_api/util/test_case.py b/tests/integration_tests/http_api/util/test_case.py index b02f2a3c0cc..77c884e8833 100644 --- a/tests/integration_tests/http_api/util/test_case.py +++ b/tests/integration_tests/http_api/util/test_case.py @@ -10,6 +10,8 @@ BASE_URL0 = "https://127.0.0.1:8300/api/v1" BASE_URL1 = "https://127.0.0.1:8301/api/v1" +V2_BASE_URL0 = "https://127.0.0.1:8300/api/v2" + # we should write some SQLs in the run.sh after call create_changefeed def create_changefeed(sink_uri): url = BASE_URL1+"/changefeeds" @@ -298,6 +300,14 @@ def set_log_level(): print("pass test: set log level") +def get_tso(): + # test state: all + url = V2_BASE_URL0+"/tso" + resp = rq.get(url, cert=CERT, verify=VERIFY) + assert resp.status_code == rq.codes.ok + + print("pass test: get tso") + # arg1: test case name # arg2: cetificates dir # arg3: sink uri @@ -327,6 +337,7 @@ def set_log_level(): "set_log_level": set_log_level, "remove_changefeed": remove_changefeed, "resign_owner": resign_owner, + "get_tso": get_tso } func = FUNC_MAP[sys.argv[1]]