Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_api (ticdc): Add some unit tests for HTTP API #3935

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2540180
http_api (ticdc): Fix list processors API panic
asddongmen Dec 16, 2021
5844757
http_api (ticdc): move http_router.go from pkg cdc to pkg capture
asddongmen Dec 16, 2021
9ad2e2f
http_api (ticdc): add some unit tests for http api
asddongmen Dec 17, 2021
49deec7
fix data race
asddongmen Dec 17, 2021
892d929
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 17, 2021
f1926f5
fmt
asddongmen Dec 17, 2021
69d1907
Merge remote-tracking branch 'origin/fix_http_api_get_processor_panic…
asddongmen Dec 17, 2021
a9d09ba
http_api (ticdc): fix error
asddongmen Dec 17, 2021
05e4c48
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 17, 2021
cb8442c
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 20, 2021
4971962
resolves conflict
asddongmen Dec 20, 2021
afac99c
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 20, 2021
7e6e259
fix error
asddongmen Dec 20, 2021
71187b5
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 20, 2021
18329c9
http_api (ticdc): refine the code struct of http api related files
asddongmen Dec 22, 2021
6663b88
fix make error
asddongmen Dec 22, 2021
da892dc
http_status: clean useless codes
asddongmen Dec 22, 2021
1c5d1be
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 22, 2021
184dc55
Merge remote-tracking branch 'origin/fix_http_api_get_processor_panic…
asddongmen Dec 22, 2021
e667bd7
*: fix data race
asddongmen Dec 23, 2021
87bb84e
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 23, 2021
b975691
fix test error
asddongmen Dec 23, 2021
39abf5c
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 23, 2021
7f091a0
fix test error
asddongmen Dec 23, 2021
a427835
Merge remote-tracking branch 'origin/fix_http_api_get_processor_panic…
asddongmen Dec 23, 2021
496b346
Merge branch 'master' into fix_http_api_get_processor_panic
asddongmen Dec 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func (c *Capture) AsyncClose() {
})
c.captureMu.Lock()
defer c.captureMu.Unlock()

if c.processorManager != nil {
c.processorManager.AsyncClose()
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc_test.go → cdc/capture/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package cdc
package capture

import (
"testing"
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture/http_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var httpBadRequestError = []*errors.Error{
cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC,
cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible,
cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError,
cerror.ErrMySQLInvalidConfig,
cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist,
}

// IsHTTPBadRequestError check if a error is a http bad request error
Expand Down
59 changes: 30 additions & 29 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ const (

// HTTPHandler is a HTTPHandler of capture
type HTTPHandler struct {
capture *Capture
capture *Capture
statusProvider owner.StatusProvider
}

// NewHTTPHandler return a HTTPHandler for OpenAPI
func NewHTTPHandler(capture *Capture) HTTPHandler {
func NewHTTPHandler(capture *Capture, statusProvider owner.StatusProvider) HTTPHandler {
return HTTPHandler{
capture: capture,
capture: capture,
statusProvider: statusProvider,
}
}

Expand All @@ -72,17 +74,17 @@ func (h *HTTPHandler) ListChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
// get all changefeed status
statuses, err := statusProvider.GetAllChangeFeedStatuses(ctx)
statuses, err := h.statusProvider.GetAllChangeFeedStatuses(ctx)
if err != nil {
_ = c.Error(err)
return
}
// get all changefeed infos
infos, err := statusProvider.GetAllChangeFeedInfo(ctx)
infos, err := h.statusProvider.GetAllChangeFeedInfo(ctx)
if err != nil {
// this call will return a parsedError generated by the error we passed in
// so it is no need to check the parsedError
Expand Down Expand Up @@ -137,7 +139,6 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
Expand All @@ -146,19 +147,19 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) {
return
}

info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID)
info, err := h.statusProvider.GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

status, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
status, err := h.statusProvider.GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

processorInfos, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID)
processorInfos, err := h.statusProvider.GetAllTaskStatuses(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -249,7 +250,7 @@ func (h *HTTPHandler) PauseChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()

changefeedID := c.Param(apiOpVarChangefeedID)
Expand All @@ -258,7 +259,7 @@ func (h *HTTPHandler) PauseChangefeed(c *gin.Context) {
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider.GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -292,15 +293,15 @@ func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider.GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -340,15 +341,15 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)

if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID)
info, err := h.statusProvider.GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -396,15 +397,15 @@ func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider.GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -438,7 +439,7 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)

Expand All @@ -447,7 +448,7 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider.GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -478,15 +479,15 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider.GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -552,7 +553,6 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()

Expand All @@ -568,24 +568,27 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
return
}

statuses, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID)
statuses, err := h.statusProvider.GetAllTaskStatuses(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
status, exist := statuses[captureID]
if !exist {
_ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))
return
}

positions, err := statusProvider.GetTaskPositions(ctx, changefeedID)
positions, err := h.statusProvider.GetTaskPositions(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

position, exist := positions[captureID]
if !exist {
_ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))
return
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
}

processorDetail := &model.ProcessorDetail{CheckPointTs: position.CheckPointTs, ResolvedTs: position.ResolvedTs, Error: position.Error}
Expand All @@ -611,10 +614,9 @@ func (h *HTTPHandler) ListProcessor(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
infos, err := statusProvider.GetProcessors(ctx)
infos, err := h.statusProvider.GetProcessors(ctx)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -641,10 +643,9 @@ func (h *HTTPHandler) ListCapture(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()

ctx := c.Request.Context()
captureInfos, err := statusProvider.GetCaptures(ctx)
captureInfos, err := h.statusProvider.GetCaptures(ctx)
if err != nil {
_ = c.Error(err)
return
Expand Down
Loading