Skip to content

Commit

Permalink
Merge branch 'master' into fix_deadlock_in_sink
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 27, 2021
2 parents d0da11f + 528ca40 commit a73b9a9
Show file tree
Hide file tree
Showing 8 changed files with 995 additions and 0 deletions.
15 changes: 15 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ error = '''
invalid key: %s
'''

["CDC:ErrInvalidHost"]
error = '''
host must be a URL or a host:port pair: %q
'''

["CDC:ErrInvalidRecordKey"]
error = '''
invalid record key - %q
Expand Down Expand Up @@ -826,6 +831,11 @@ error = '''
resolve locks failed
'''

["CDC:ErrRewindRequestBodyError"]
error = '''
failed to seek to the beginning of request body
'''

["CDC:ErrS3StorageAPI"]
error = '''
s3 storage api
Expand Down Expand Up @@ -1061,6 +1071,11 @@ error = '''
wrong table info in unflatten, table id %d, index table id: %d
'''

["CDC:ErrZeroLengthResponseBody"]
error = '''
0-length response with status code: %d
'''

["ErrConflictingFileLocks"]
error = '''
file lock conflict: %s
Expand Down
109 changes: 109 additions & 0 deletions pkg/api/internal/rest/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package rest

import (
"net/url"
"strings"

"github.com/pingcap/tiflow/pkg/httputil"
)

// Enum types for HTTP methods.
type HTTPMethod int

// Valid HTTP methods.
const (
HTTPMethodPost = iota + 1
HTTPMethodPut
HTTPMethodGet
HTTPMethodDelete
)

// String implements Stringer.String.
func (h HTTPMethod) String() string {
switch h {
case HTTPMethodPost:
return "POST"
case HTTPMethodPut:
return "PUT"
case HTTPMethodGet:
return "GET"
case HTTPMethodDelete:
return "DELETE"
default:
return "unknown"
}
}

// CDCRESTInterface includes a set of operations to interact with TiCDC RESTful apis.
type CDCRESTInterface interface {
Method(method HTTPMethod) *Request
Post() *Request
Put() *Request
Get() *Request
Delete() *Request
}

// CDCRESTClient defines a TiCDC RESTful client
type CDCRESTClient struct {
// base is the root URL for all invocations of the client.
base *url.URL

// versionedAPIPath is a http url prefix with api version. eg. /api/v1.
versionedAPIPath string

// Client is a wrapped http client.
Client *httputil.Client
}

// NewCDCRESTClient creates a new CDCRESTClient.
func NewCDCRESTClient(baseURL *url.URL, versionedAPIPath string, client *httputil.Client) (*CDCRESTClient, error) {
if !strings.HasSuffix(baseURL.Path, "/") {
baseURL.Path += "/"
}
baseURL.RawQuery = ""
baseURL.Fragment = ""

return &CDCRESTClient{
base: baseURL,
versionedAPIPath: versionedAPIPath,
Client: client,
}, nil
}

// Method begins a request with a http method (GET, POST, PUT, DELETE).
func (c *CDCRESTClient) Method(method HTTPMethod) *Request {
return NewRequest(c).WithMethod(method)
}

// Post begins a POST request. Short for c.Method(HTTPMethodPost).
func (c *CDCRESTClient) Post() *Request {
return c.Method(HTTPMethodPost)
}

// Put begins a PUT request. Short for c.Method(HTTPMethodPut).
func (c *CDCRESTClient) Put() *Request {
return c.Method(HTTPMethodPut)
}

// Delete begins a DELETE request. Short for c.Method(HTTPMethodDelete).
func (c *CDCRESTClient) Delete() *Request {
return c.Method(HTTPMethodDelete)
}

// Get begins a GET request. Short for c.Method(HTTPMethodGet).
func (c *CDCRESTClient) Get() *Request {
return c.Method(HTTPMethodGet)
}
103 changes: 103 additions & 0 deletions pkg/api/internal/rest/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package rest

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
)

func restClient(testServer *httptest.Server) (*CDCRESTClient, error) {
c, err := CDCRESTClientFromConfig(&Config{
Host: testServer.URL,
APIPath: "/api",
Version: "v1",
})
return c, err
}

func TestRestRequestSuccess(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
if r.URL.Path == "/api/v1/test" {
_, _ = rw.Write([]byte(`{"cdc": "hello world"}`))
}
}))
defer testServer.Close()

c, err := restClient(testServer)
require.Nil(t, err)
body, err := c.Get().WithPrefix("test").Do(context.Background()).Raw()
require.Equal(t, `{"cdc": "hello world"}`, string(body))
require.NoError(t, err)
}

func TestRestRequestFailed(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusNotFound)
_, _ = rw.Write([]byte(`{
"error_msg": "test rest request failed",
"error_code": "test rest request failed"
}`))
}))
defer testServer.Close()

c, err := restClient(testServer)
require.Nil(t, err)
err = c.Get().WithMaxRetries(1).Do(context.Background()).Error()
require.NotNil(t, err)
}

func TestRestRawRequestFailed(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusNotFound)
_, _ = rw.Write([]byte(`{
"error_msg": "test rest request failed",
"error_code": "test rest request failed"
}`))
}))
defer testServer.Close()

c, err := restClient(testServer)
require.Nil(t, err)
body, err := c.Get().WithMaxRetries(1).Do(context.Background()).Raw()
require.NotNil(t, body)
require.NotNil(t, err)
}

func TestHTTPMethods(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
}))
defer testServer.Close()

c, _ := restClient(testServer)

req := c.Post()
require.NotNil(t, req)

req = c.Get()
require.NotNil(t, req)

req = c.Put()
require.NotNil(t, req)

req = c.Delete()
require.NotNil(t, req)
}
88 changes: 88 additions & 0 deletions pkg/api/internal/rest/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package rest

import (
"net/url"
"path"

"github.com/pingcap/errors"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/security"
)

// Config holds the common attributes that can be passed to a cdc REST client
type Config struct {
// Host must be a host string, a host:port pair, or a URL to the base of the cdc server.
Host string
// APIPath is a sub-path that points to an API root.
APIPath string
// Credential holds the security Credential used for generating tls config
Credential *security.Credential
// API verion
Version string
}

// defaultServerURLFromConfig is used to build base URL and api path.
func defaultServerURLFromConfig(config *Config) (*url.URL, string, error) {
host := config.Host
if host == "" {
host = "127.0.0.1:8300"
}
base := host
hostURL, err := url.Parse(base)
if err != nil || hostURL.Scheme == "" || hostURL.Host == "" {
scheme := "http://"
if config.Credential != nil && config.Credential.IsTLSEnabled() {
scheme = "https://"
}
hostURL, err = url.Parse(scheme + base)
if err != nil {
return nil, "", errors.Trace(err)
}
if hostURL.Path != "" && hostURL.Path != "/" {
return nil, "", cerrors.ErrInvalidHost.GenWithStackByArgs(base)
}
}
versionedPath := path.Join("/", config.APIPath, config.Version)
return hostURL, versionedPath, nil
}

// CDCRESTClientFromConfig creates a CDCRESTClient from specific config items.
func CDCRESTClientFromConfig(config *Config) (*CDCRESTClient, error) {
if config.Version == "" {
return nil, errors.New("Version is required when initializing a CDCRESTClient")
}
if config.APIPath == "" {
return nil, errors.New("APIPath is required when initializing a CDCRESTClient")
}

httpClient, err := httputil.NewClient(config.Credential)
if err != nil {
return nil, errors.Trace(err)
}

baseURL, versionedAPIPath, err := defaultServerURLFromConfig(config)
if err != nil {
return nil, errors.Trace(err)
}

restClient, err := NewCDCRESTClient(baseURL, versionedAPIPath, httpClient)
if err != nil {
return nil, errors.Trace(err)
}

return restClient, nil
}
Loading

0 comments on commit a73b9a9

Please sign in to comment.