Skip to content

Commit

Permalink
Merge pull request #5635 from crelax/cdc-cli-v2-tso-query
Browse files Browse the repository at this point in the history
 api(ticdc): add /api/v2/tso: get tso from pd
  • Loading branch information
Xuanhe Chen authored May 31, 2022
2 parents 5868852 + e7ccc25 commit 2e578fb
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 32 deletions.
52 changes: 26 additions & 26 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,32 @@ const (
forWardFromCapture = "TiCDC-ForwardFromCapture"
)

// openAPI provides capture APIs.
type openAPI struct {
// OpenAPI provides capture APIs.
type OpenAPI struct {
capture *capture.Capture
// use for unit test only
testStatusProvider owner.StatusProvider
}

// NewOpenAPI creates a new openAPI.
func NewOpenAPI(c *capture.Capture) openAPI {
return openAPI{capture: c}
// NewOpenAPI creates a new OpenAPI.
func NewOpenAPI(c *capture.Capture) OpenAPI {
return OpenAPI{capture: c}
}

// NewOpenAPI4Test return a openAPI for test
func NewOpenAPI4Test(c *capture.Capture, p owner.StatusProvider) openAPI {
return openAPI{capture: c, testStatusProvider: p}
// NewOpenAPI4Test returns a OpenAPI for test
func NewOpenAPI4Test(c *capture.Capture, p owner.StatusProvider) OpenAPI {
return OpenAPI{capture: c, testStatusProvider: p}
}

func (h *openAPI) statusProvider() owner.StatusProvider {
func (h *OpenAPI) statusProvider() owner.StatusProvider {
if h.testStatusProvider != nil {
return h.testStatusProvider
}
return h.capture.StatusProvider()
}

// RegisterOpenAPIRoutes registers routes for OpenAPI
func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) {
func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) {
v1 := router.Group("/api/v1")

v1.Use(middleware.LogMiddleware())
Expand Down Expand Up @@ -118,7 +118,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) {
// @Success 200 {array} model.ChangefeedCommonInfo
// @Failure 500 {object} model.HTTPError
// @Router /api/v1/changefeeds [get]
func (h *openAPI) ListChangefeed(c *gin.Context) {
func (h *OpenAPI) ListChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -184,7 +184,7 @@ func (h *openAPI) ListChangefeed(c *gin.Context) {
// @Success 200 {object} model.ChangefeedDetail
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id} [get]
func (h *openAPI) GetChangefeed(c *gin.Context) {
func (h *OpenAPI) GetChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -253,7 +253,7 @@ func (h *openAPI) GetChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds [post]
func (h *openAPI) CreateChangefeed(c *gin.Context) {
func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -302,7 +302,7 @@ func (h *openAPI) CreateChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/pause [post]
func (h *openAPI) PauseChangefeed(c *gin.Context) {
func (h *OpenAPI) PauseChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -345,7 +345,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/resume [post]
func (h *openAPI) ResumeChangefeed(c *gin.Context) {
func (h *OpenAPI) ResumeChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -393,7 +393,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id} [put]
func (h *openAPI) UpdateChangefeed(c *gin.Context) {
func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -450,7 +450,7 @@ func (h *openAPI) UpdateChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id} [delete]
func (h *openAPI) RemoveChangefeed(c *gin.Context) {
func (h *OpenAPI) RemoveChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -492,7 +492,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post]
func (h *openAPI) RebalanceTables(c *gin.Context) {
func (h *OpenAPI) RebalanceTables(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -532,7 +532,7 @@ func (h *openAPI) RebalanceTables(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/move_table [post]
func (h *openAPI) MoveTable(c *gin.Context) {
func (h *OpenAPI) MoveTable(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -585,7 +585,7 @@ func (h *openAPI) MoveTable(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/owner/resign [post]
func (h *openAPI) ResignOwner(c *gin.Context) {
func (h *OpenAPI) ResignOwner(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand All @@ -608,7 +608,7 @@ func (h *openAPI) ResignOwner(c *gin.Context) {
// @Success 200 {object} model.ProcessorDetail
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/processors/{changefeed_id}/{capture_id} [get]
func (h *openAPI) GetProcessor(c *gin.Context) {
func (h *OpenAPI) GetProcessor(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -688,7 +688,7 @@ func (h *openAPI) GetProcessor(c *gin.Context) {
// @Success 200 {array} model.ProcessorCommonInfo
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/processors [get]
func (h *openAPI) ListProcessor(c *gin.Context) {
func (h *OpenAPI) ListProcessor(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -720,7 +720,7 @@ func (h *openAPI) ListProcessor(c *gin.Context) {
// @Success 200 {array} model.Capture
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/captures [get]
func (h *openAPI) ListCapture(c *gin.Context) {
func (h *OpenAPI) ListCapture(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand Down Expand Up @@ -754,7 +754,7 @@ func (h *openAPI) ListCapture(c *gin.Context) {
// @Success 200 {object} model.ServerStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/status [get]
func (h *openAPI) ServerStatus(c *gin.Context) {
func (h *OpenAPI) ServerStatus(c *gin.Context) {
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Expand All @@ -774,7 +774,7 @@ func (h *openAPI) ServerStatus(c *gin.Context) {
// @Success 200
// @Failure 500 {object} model.HTTPError
// @Router /api/v1/health [get]
func (h *openAPI) Health(c *gin.Context) {
func (h *OpenAPI) Health(c *gin.Context) {
ctx := c.Request.Context()

if _, err := h.capture.GetOwnerCaptureInfo(ctx); err != nil {
Expand Down Expand Up @@ -815,7 +815,7 @@ func SetLogLevel(c *gin.Context) {
}

// forwardToOwner forward an request to owner
func (h *openAPI) forwardToOwner(c *gin.Context) {
func (h *OpenAPI) forwardToOwner(c *gin.Context) {
ctx := c.Request.Context()
// every request can only forward to owner one time
if len(c.GetHeader(forWardFromCapture)) != 0 {
Expand Down
56 changes: 51 additions & 5 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,71 @@
package v2

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/pingcap/tiflow/cdc/api/middleware"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/pkg/upstream"
)

// OpenAPIV2 provides CDC v2 APIs
type OpenAPIV2 struct {
capture *capture.Capture
// use for unit test only
testStatusProvider owner.StatusProvider
}

// NewOpenAPIV2 creates a new OpenAPIV2.
func NewOpenAPIV2(c *capture.Capture) OpenAPIV2 {
return OpenAPIV2{capture: c}
}

// NewOpenAPIV2ForTest returns a OpenAPIV2 for test
func NewOpenAPIV2ForTest(c *capture.Capture, p owner.StatusProvider) OpenAPIV2 {
return OpenAPIV2{capture: c, testStatusProvider: p}
}

// NewOpenAPI creates a new openAPIs.
func NewOpenAPI(c *capture.Capture) *OpenAPIV2 {
return &OpenAPIV2{capture: c}
func (h *OpenAPIV2) statusProvider() owner.StatusProvider {
if h.testStatusProvider != nil {
return h.testStatusProvider
}
return h.capture.StatusProvider()
}

// RegisterOpenAPIRoutes registers routes for OpenAPI
func RegisterOpenAPIRoutes(router *gin.Engine, api *OpenAPIV2) {
// RegisterOpenAPIV2Routes registers routes for OpenAPI
func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
v2 := router.Group("/api/v2")

v2.Use(middleware.LogMiddleware())
v2.Use(middleware.ErrorHandleMiddleware())

// common APIs
v2.GET("/tso", api.GetTso)
}

// GetTso request and returns a TSO from PD
func (h *OpenAPIV2) GetTso(c *gin.Context) {
ctx := c.Request.Context()
if h.capture.UpstreamManager == nil {
c.Status(http.StatusServiceUnavailable)
return
}
pdClient := h.capture.UpstreamManager.Get(upstream.DefaultUpstreamID).PDClient
if pdClient == nil {
c.Status(http.StatusServiceUnavailable)
return
}

timestamp, logicalTime, err := pdClient.GetTS(ctx)
if err != nil {
_ = c.Error(err)
c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
return
}

resp := Tso{timestamp, logicalTime}
c.IndentedJSON(http.StatusOK, resp)
}
130 changes: 130 additions & 0 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

type mockStatusProvider struct {
mock.Mock
}

// MockPDClient mocks pd.Client to facilitate unit testing.
type MockPDClient struct {
pd.Client
}

func (m *MockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return oracle.GetPhysical(time.Now()), 0, nil
}

func (p *mockStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatus, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedStatus), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedStatus, error) {
args := p.Called(ctx, changefeedID)
log.Info("err", zap.Error(args.Error(1)))
return args.Get(0).(*model.ChangeFeedStatus), args.Error(1)
}

func (p *mockStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedInfo, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedInfo), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedInfo(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedInfo, error) {
args := p.Called(ctx)
return args.Get(0).(*model.ChangeFeedInfo), args.Error(1)
}

func (p *mockStatusProvider) GetAllTaskStatuses(ctx context.Context,
changefeedID model.ChangeFeedID,
) (map[model.CaptureID]*model.TaskStatus, error) {
args := p.Called(ctx)
return args.Get(0).(map[model.CaptureID]*model.TaskStatus), args.Error(1)
}

func (p *mockStatusProvider) GetTaskPositions(ctx context.Context,
changefeedID model.ChangeFeedID,
) (map[model.CaptureID]*model.TaskPosition, error) {
args := p.Called(ctx)
return args.Get(0).(map[model.CaptureID]*model.TaskPosition), args.Error(1)
}

func (p *mockStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) {
args := p.Called(ctx)
return args.Get(0).([]*model.ProcInfoSnap), args.Error(1)
}

func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) {
args := p.Called(ctx)
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

type testCase struct {
url string
method string
}

func newRouter(c *capture.Capture, p *mockStatusProvider) *gin.Engine {
router := gin.New()
RegisterOpenAPIV2Routes(router, NewOpenAPIV2ForTest(c, p))
return router
}

func newStatusProvider() *mockStatusProvider {
statusProvider := &mockStatusProvider{}
return statusProvider
}

func TestGetTso(t *testing.T) {
t.Parallel()
mockPDClient := &MockPDClient{}
mockManager := upstream.NewManager4Test(mockPDClient)
cp := capture.NewCaptureWithManager4Test(mockManager)
router := newRouter(cp, newStatusProvider())
w := httptest.NewRecorder()

tc := testCase{url: "/api/v2/tso", method: "GET"}
req, err := http.NewRequestWithContext(context.Background(), tc.method, tc.url, nil)
require.Nil(t, err)
router.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
}
Loading

0 comments on commit 2e578fb

Please sign in to comment.