Skip to content

Commit

Permalink
Some random fixes on REST client (#31)
Browse files Browse the repository at this point in the history
- rm useless setRestMode
- rm fall back to string error parser because to complicated
- define expected status code where error is definitive for each calls
- actually return an error when such such situation occurs
- retry all 409
  • Loading branch information
FrancoisPoinsot authored Jan 21, 2019
1 parent 8adee01 commit 381d081
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 32 deletions.
41 changes: 36 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
name = "github.com/stretchr/testify"
version = "1.1.4"


[[constraint]]
name = "gopkg.in/resty.v1"
version = "1.11.0"

[prune]
go-tests = true
unused-packages = true
22 changes: 5 additions & 17 deletions lib/connectors/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package connectors

import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"gopkg.in/resty.v1"
"time"
Expand All @@ -27,24 +25,14 @@ func (err ErrorResponse) Error() string {
//NewClient generates a new client
func NewClient(url string) *Client {
restClient := resty.New().
OnAfterResponse(func(c *resty.Client, res *resty.Response) error {
// The default error handling given by `SetRESTMode` is a bit weak. This is the override

if res.StatusCode() >= 400 && res.StatusCode() != 404 {
restErr := ErrorResponse{}
decodeErr := json.Unmarshal(res.Body(), &restErr)
if decodeErr != nil {
return restErr
}
return errors.New(fmt.Sprintf("Error while decoding body while error: %v", res.Body()))
}
return nil
}).
SetRESTMode().
SetError(ErrorResponse{}).
SetHostURL(url).
SetHeader("Accept", "application/json").
SetRetryCount(3).
SetTimeout(5 * time.Second)
SetTimeout(5 * time.Second).
AddRetryCondition(func(resp *resty.Response) (bool, error) {
return resp.StatusCode() == 409, nil
})

return &Client{restClient: restClient}
}
Expand Down
25 changes: 17 additions & 8 deletions lib/connectors/connector_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func TestCreateConnector(t *testing.T) {
ConnectorRequest: ConnectorRequest{Name: "test-create-connector"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand All @@ -40,14 +39,30 @@ func TestCreateConnector(t *testing.T) {
assert.Equal(t, 201, resp.Code)
}

func TestErrorCode(t *testing.T) {
client := NewClient(hostConnect)
_, err := client.CreateConnector(
CreateConnectorRequest{
ConnectorRequest: ConnectorRequest{Name: "not-a-valid-connector"},
Config: map[string]interface{}{
"connector.class": "not a valid connector class",
"file": testFile,
"topic": "connect-test",
},
},
true,
)

assert.Error(t, err)
}

func TestGetConnector(t *testing.T) {
client := NewClient(hostConnect)
_, err := client.CreateConnector(
CreateConnectorRequest{
ConnectorRequest: ConnectorRequest{Name: "test-get-connector"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand Down Expand Up @@ -75,7 +90,6 @@ func TestGetAllConnectors(t *testing.T) {
ConnectorRequest: ConnectorRequest{Name: "test-get-all-connectors"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand Down Expand Up @@ -166,7 +180,6 @@ func TestDeleteConnector(t *testing.T) {
ConnectorRequest: ConnectorRequest{Name: "test-delete-connectors"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand Down Expand Up @@ -262,7 +275,6 @@ func TestGetConnectorStatus(t *testing.T) {
ConnectorRequest: ConnectorRequest{Name: "test-get-connector-status"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand All @@ -289,7 +301,6 @@ func TestRestartConnector(t *testing.T) {
ConnectorRequest: ConnectorRequest{Name: "test-restart-connector"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand All @@ -314,7 +325,6 @@ func TestPauseAndResumeConnector(t *testing.T) {
ConnectorRequest: ConnectorRequest{Name: "test-pause-and-resume-connector"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand Down Expand Up @@ -354,7 +364,6 @@ func TestRestartTask(t *testing.T) {
ConnectorRequest: ConnectorRequest{Name: "test-restart-task"},
Config: map[string]interface{}{
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": testFile,
"topic": "connect-test",
},
Expand Down
33 changes: 32 additions & 1 deletion lib/connectors/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (c *Client) GetAll() (GetAllConnectorsResponse, error) {
if err != nil {
return GetAllConnectorsResponse{}, err
}
if resp.Error() != nil {
return GetAllConnectorsResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()
result.Connectors = connectors
Expand All @@ -82,6 +85,9 @@ func (c Client) GetConnector(req ConnectorRequest) (ConnectorResponse, error) {
if err != nil {
return ConnectorResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return ConnectorResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()
return result, nil
Expand All @@ -98,6 +104,9 @@ func (c *Client) CreateConnector(req CreateConnectorRequest, sync bool) (Connect
if err != nil {
return ConnectorResponse{}, err
}
if resp.Error() != nil {
return ConnectorResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()

Expand Down Expand Up @@ -128,6 +137,9 @@ func (c Client) UpdateConnector(req CreateConnectorRequest, sync bool) (Connecto
if err != nil {
return ConnectorResponse{}, err
}
if resp.Error() != nil {
return ConnectorResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()

Expand Down Expand Up @@ -157,6 +169,9 @@ func (c Client) DeleteConnector(req ConnectorRequest, sync bool) (EmptyResponse,
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()

Expand Down Expand Up @@ -187,6 +202,9 @@ func (c Client) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResp
if err != nil {
return GetConnectorConfigResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return GetConnectorConfigResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()
result.Config = config
Expand All @@ -204,6 +222,9 @@ func (c Client) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResp
if err != nil {
return GetConnectorStatusResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return GetConnectorStatusResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()
return result, nil
Expand All @@ -220,6 +241,9 @@ func (c Client) RestartConnector(req ConnectorRequest) (EmptyResponse, error) {
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()
return result, nil
Expand All @@ -237,6 +261,9 @@ func (c Client) PauseConnector(req ConnectorRequest, sync bool) (EmptyResponse,
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()

Expand Down Expand Up @@ -266,6 +293,9 @@ func (c Client) ResumeConnector(req ConnectorRequest, sync bool) (EmptyResponse,
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()

Expand Down Expand Up @@ -322,7 +352,8 @@ func convertConfigValueToString(value interface{}) string {
}
}

//TryUntil repeats the request
// TryUntil repeats exec until it return true or timeout is reached
// TryUntil itself return true if `exec` has return true (success), false if timeout (failure)
func TryUntil(exec func() bool, limit time.Duration) bool {
timeLimit := time.After(limit)

Expand Down
9 changes: 9 additions & 0 deletions lib/connectors/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (c Client) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) {
if err != nil {
return GetAllTasksResponse{}, err
}
if resp.Error() != nil {
return GetAllTasksResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()
return result, nil
Expand All @@ -69,6 +72,9 @@ func (c Client) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) {
if err != nil {
return TaskStatusResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return TaskStatusResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()

Expand All @@ -86,6 +92,9 @@ func (c Client) RestartTask(req TaskRequest) (EmptyResponse, error) {
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
}

result.Code = resp.StatusCode()

Expand Down
4 changes: 3 additions & 1 deletion lib/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ services:
- DISABLE_JMX=1
- DEBUG=1
- SUPERVISORWEB=0
- CONNECTORS=file
- CONNECTORS=file

# you will find config in '/var/run/{SERVICE_NAME}' folder inside container

0 comments on commit 381d081

Please sign in to comment.