Skip to content

Commit

Permalink
Finalize stream error codes, polish requests to return consistent nil…
Browse files Browse the repository at this point in the history
… bytes upon error, use HttpToStreamsErrCode everywhere
  • Loading branch information
infiloop2 committed Feb 22, 2024
1 parent bad4376 commit 4540b21
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ const (
type ErrCode uint32

const (
// TODO: Finalize these
ErrCodeNil ErrCode = 0
ErrCodeStreamsPartialContent ErrCode = 808206
ErrCodeStreamsBadRequest ErrCode = 808400
ErrCodeStreamsUnauthorized ErrCode = 808401
ErrCodeStreamsInternalError ErrCode = 808500
ErrCodeStreamsBadResponse ErrCode = 808600
ErrCodeStreamsTimeout ErrCode = 808602
ErrCodeStreamsUnknownError ErrCode = 808700
ErrCodeNil ErrCode = 0
ErrCodeStreamsPartialContent ErrCode = 808206
ErrCodeStreamsBadRequest ErrCode = 808400
ErrCodeStreamsUnauthorized ErrCode = 808401
ErrCodeStreamsNotFound ErrCode = 808404
ErrCodeStreamsInternalError ErrCode = 808500
ErrCodeStreamsBadGateway ErrCode = 808502
ErrCodeStreamsServiceUnavailable ErrCode = 808503
ErrCodeStreamsStatusGatewayTimeout ErrCode = 808504
ErrCodeStreamsBadResponse ErrCode = 808600
ErrCodeStreamsTimeout ErrCode = 808601
ErrCodeStreamsUnknownError ErrCode = 808700
)

func HttpToStreamsErrCode(statusCode int) ErrCode {
Expand All @@ -67,10 +70,18 @@ func HttpToStreamsErrCode(statusCode int) ErrCode {
return ErrCodeStreamsBadRequest
case http.StatusUnauthorized:
return ErrCodeStreamsUnauthorized
case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
case http.StatusNotFound:
return ErrCodeStreamsNotFound
case http.StatusInternalServerError:
return ErrCodeStreamsInternalError
case http.StatusBadGateway:
return ErrCodeStreamsBadGateway
case http.StatusServiceUnavailable:
return ErrCodeStreamsServiceUnavailable
case http.StatusGatewayTimeout:
return ErrCodeStreamsStatusGatewayTimeout
default:
return 0
return ErrCodeStreamsUnknownError
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo
resultLen := len(streamsLookup.Feeds)
ch := make(chan mercury.MercuryData, resultLen)
if len(streamsLookup.Feeds) == 0 {
return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil
return encoding.NoPipelineError, nil, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil
}
for i := range streamsLookup.Feeds {
// TODO (AUTO-7209): limit the number of concurrent requests
Expand Down Expand Up @@ -184,7 +184,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
// Not a pipeline error, a bad streams response, send back error code
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
Bytes: nil,
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
Expand All @@ -207,8 +207,8 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
c.lggr.Errorf("at block %s upkeep %s received unhandled status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index])
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
ErrCode: encoding.ErrCodeStreamsUnknownError,
Bytes: nil,
ErrCode: encoding.HttpToStreamsErrCode(httpResponse.StatusCode),
State: encoding.NoPipelineError,
}
sent = true
Expand All @@ -222,7 +222,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
c.lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV02Response for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds[index], err)
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
Bytes: nil,
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
Expand All @@ -233,7 +233,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
c.lggr.Warnf("at block %s upkeep %s failed to decode chainlinkBlob %s for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), m.ChainlinkBlob, sl.Feeds[index], err)
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
Bytes: nil,
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
if !sent {
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
Bytes: nil,
ErrCode: errCode,
State: state,
Retryable: retryable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,13 @@ func TestV02_SingleFeedRequest(t *testing.T) {
},
UpkeepId: upkeepId,
},
blob: "0xab2123dc",
retryNumber: totalAttempt,
statusCode: http.StatusServiceUnavailable,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 503\n#2: 503\n#3: 503",
blob: "0xab2123dc",
retryNumber: totalAttempt,
statusCode: http.StatusServiceUnavailable,
streamsErrCode: encoding.ErrCodeStreamsServiceUnavailable,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 503\n#2: 503\n#3: 503",
},
{
name: "failure StatusBadGateway - returns retryable",
Expand All @@ -232,12 +233,13 @@ func TestV02_SingleFeedRequest(t *testing.T) {
},
UpkeepId: upkeepId,
},
blob: "0xab2123dc",
retryNumber: totalAttempt,
statusCode: http.StatusBadGateway,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 502\n#2: 502\n#3: 502",
blob: "0xab2123dc",
retryNumber: totalAttempt,
statusCode: http.StatusBadGateway,
streamsErrCode: encoding.ErrCodeStreamsBadGateway,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 502\n#2: 502\n#3: 502",
},
{
name: "failure - returns retryable and then non-retryable",
Expand Down Expand Up @@ -364,10 +366,10 @@ func TestV02_SingleFeedRequest(t *testing.T) {
assert.Equal(t, tt.state, m.State)
if tt.streamsErrCode != encoding.ErrCodeNil {
assert.Equal(t, tt.streamsErrCode, m.ErrCode)
assert.Equal(t, [][]byte{}, m.Bytes)
assert.Equal(t, [][]byte(nil), m.Bytes)
} else if tt.retryNumber >= totalAttempt || tt.errorMessage != "" {
assert.Equal(t, tt.errorMessage, m.Error.Error())
assert.Equal(t, [][]byte{}, m.Bytes)
assert.Equal(t, [][]byte(nil), m.Bytes)
} else {
blobBytes, err := hexutil.Decode(tt.blob)
assert.Nil(t, err)
Expand Down Expand Up @@ -435,6 +437,28 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
expectedErrCode: encoding.ErrCodeStreamsInternalError,
expectedState: encoding.NoPipelineError,
},
{
name: "failure - retryable but out of retries for conditional 404 error code",
lookup: &mercury.StreamsLookup{
StreamsLookupError: &mercury.StreamsLookupError{
FeedParamKey: mercury.FeedIdHex,
Feeds: []string{"0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"},
TimeParamKey: mercury.BlockNumber,
Time: big.NewInt(25880526),
ExtraData: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100},
},
UpkeepId: upkeepId,
},
upkeepType: automationTypes.ConditionTrigger,
mockHttpStatusCode: http.StatusNotFound,
mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"},
expectedValues: [][]byte(nil),
expectedRetryable: false,
pluginRetries: 0,
expectedRetryInterval: 0 * time.Second,
expectedErrCode: encoding.ErrCodeStreamsNotFound,
expectedState: encoding.NoPipelineError,
},
{
name: "failure - retryable and interval is 1s",
lookup: &mercury.StreamsLookup{
Expand Down Expand Up @@ -495,11 +519,11 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
},
upkeepType: automationTypes.LogTrigger,
pluginRetries: 6,
mockHttpStatusCode: http.StatusInternalServerError,
mockHttpStatusCode: http.StatusBadGateway,
mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"},
expectedValues: [][]byte(nil),
expectedRetryInterval: 0 * time.Second,
expectedErrCode: encoding.ErrCodeStreamsInternalError,
expectedErrCode: encoding.ErrCodeStreamsBadGateway,
expectedRetryable: false,
expectedState: encoding.NoPipelineError,
},
Expand Down Expand Up @@ -534,7 +558,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
},
UpkeepId: upkeepId,
},
expectedValues: [][]byte{},
expectedValues: [][]byte(nil),
expectedErrCode: encoding.ErrCodeStreamsBadRequest,
},
{
Expand All @@ -549,7 +573,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) {
},
UpkeepId: upkeepId,
},
expectedValues: [][]byte{},
expectedValues: [][]byte(nil),
expectedErrCode: encoding.ErrCodeStreamsBadRequest,
},
}
Expand Down Expand Up @@ -661,7 +685,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineError(t *testing.T
hc.On("Do", mock.Anything).Return(resp, nil).Once()
// Second request returns MercuryFlakyError
resp = &http.Response{
StatusCode: http.StatusBadGateway,
StatusCode: http.StatusServiceUnavailable,
Body: io.NopCloser(bytes.NewReader(b)),
}
hc.On("Do", mock.Anything).Return(resp, nil).Times(totalAttempt)
Expand All @@ -681,7 +705,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineError(t *testing.T
state, values, errCode, retryable, retryInterval, _ := c.DoRequest(testutils.Context(t), lookup, automationTypes.LogTrigger, pluginRetryKey)
assert.Equal(t, true, retryable)
assert.Equal(t, 1*time.Second, retryInterval)
assert.Equal(t, encoding.ErrCodeStreamsInternalError, errCode)
assert.Equal(t, encoding.ErrCodeStreamsServiceUnavailable, errCode)
assert.Equal(t, encoding.MercuryFlakyFailure, state)
assert.Equal(t, [][]byte(nil), values)
}
Expand Down Expand Up @@ -755,7 +779,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineErrorConvertedErro
hc.On("Do", mock.Anything).Return(resp, nil).Once()
// Second request returns MercuryFlakyError
resp = &http.Response{
StatusCode: http.StatusBadGateway,
StatusCode: http.StatusGatewayTimeout,
Body: io.NopCloser(bytes.NewReader(b)),
}
hc.On("Do", mock.Anything).Return(resp, nil).Times(totalAttempt)
Expand All @@ -775,7 +799,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineErrorConvertedErro
state, values, errCode, retryable, retryInterval, _ := c.DoRequest(testutils.Context(t), lookup, automationTypes.ConditionTrigger, pluginRetryKey)
assert.Equal(t, false, retryable)
assert.Equal(t, 0*time.Second, retryInterval)
assert.Equal(t, encoding.ErrCodeStreamsInternalError, errCode)
assert.Equal(t, encoding.ErrCodeStreamsStatusGatewayTimeout, errCode)
assert.Equal(t, encoding.NoPipelineError, state)
assert.Equal(t, [][]byte(nil), values)
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H

func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, upkeepType automationTypes.UpkeepType, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) {
if len(streamsLookup.Feeds) == 0 {
return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil
return encoding.NoPipelineError, nil, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil
}
resultLen := 1 // Only 1 multi-feed request is made for all feeds
ch := make(chan mercury.MercuryData, resultLen)
Expand Down Expand Up @@ -203,7 +203,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
ch <- mercury.MercuryData{
Index: 0,
ErrCode: encoding.ErrCodeStreamsUnknownError,
ErrCode: encoding.HttpToStreamsErrCode(resp.StatusCode),
State: encoding.NoPipelineError,
}
sent = true
Expand Down Expand Up @@ -234,7 +234,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
retryable = true
state = encoding.MercuryFlakyFailure
errCode = encoding.HttpToStreamsErrCode(http.StatusPartialContent)
return fmt.Errorf("%d", http.StatusNotFound)
return fmt.Errorf("%d", http.StatusPartialContent)
}
var reportBytes [][]byte
for _, rsp := range response.Reports {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
if !sent {
ch <- mercury.MercuryData{
Index: 0,
Bytes: [][]byte{},
Bytes: nil,
Retryable: retryable,
Error: retryErr,
ErrCode: errCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestV03_DoMercuryRequestV03_OneFeedSuccessOneFeedPipelineError(t *testing.T
state, values, errCode, retryable, retryInterval, _ := c.DoRequest(testutils.Context(t), lookup, automationTypes.LogTrigger, pluginRetryKey)
assert.Equal(t, true, retryable)
assert.Equal(t, 1*time.Second, retryInterval)
assert.Equal(t, encoding.ErrCodeStreamsInternalError, errCode)
assert.Equal(t, encoding.ErrCodeStreamsBadGateway, errCode)
assert.Equal(t, encoding.MercuryFlakyFailure, state)
assert.Equal(t, [][]byte(nil), values)
}
Expand Down Expand Up @@ -611,11 +611,12 @@ func TestV03_MultiFeedRequest(t *testing.T) {
},
UpkeepId: upkeepId,
},
retryNumber: totalAttempt,
statusCode: http.StatusGatewayTimeout,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "All attempts fail:\n#1: 504\n#2: 504\n#3: 504",
retryNumber: totalAttempt,
statusCode: http.StatusGatewayTimeout,
state: encoding.MercuryFlakyFailure,
retryable: true,
streamsErrCode: encoding.ErrCodeStreamsStatusGatewayTimeout,
errorMessage: "All attempts fail:\n#1: 504\n#2: 504\n#3: 504",
},
{
name: "failure - StatusServiceUnavailable - returns retryable",
Expand All @@ -628,11 +629,12 @@ func TestV03_MultiFeedRequest(t *testing.T) {
},
UpkeepId: upkeepId,
},
retryNumber: totalAttempt,
statusCode: http.StatusServiceUnavailable,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "All attempts fail:\n#1: 503\n#2: 503\n#3: 503",
retryNumber: totalAttempt,
statusCode: http.StatusServiceUnavailable,
state: encoding.MercuryFlakyFailure,
retryable: true,
streamsErrCode: encoding.ErrCodeStreamsServiceUnavailable,
errorMessage: "All attempts fail:\n#1: 503\n#2: 503\n#3: 503",
},
{
name: "failure - StatusBadGateway - returns retryable",
Expand All @@ -645,11 +647,12 @@ func TestV03_MultiFeedRequest(t *testing.T) {
},
UpkeepId: upkeepId,
},
retryNumber: totalAttempt,
statusCode: http.StatusBadGateway,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "All attempts fail:\n#1: 502\n#2: 502\n#3: 502",
retryNumber: totalAttempt,
statusCode: http.StatusBadGateway,
streamsErrCode: encoding.ErrCodeStreamsBadGateway,
state: encoding.MercuryFlakyFailure,
retryable: true,
errorMessage: "All attempts fail:\n#1: 502\n#2: 502\n#3: 502",
},

{
Expand All @@ -673,11 +676,12 @@ func TestV03_MultiFeedRequest(t *testing.T) {
},
},
},
statusCode: http.StatusOK,
retryNumber: totalAttempt,
retryable: true,
errorMessage: "All attempts fail:\n#1: 404\n#2: 404\n#3: 404",
state: encoding.MercuryFlakyFailure,
statusCode: http.StatusOK,
retryNumber: totalAttempt,
retryable: true,
streamsErrCode: encoding.ErrCodeStreamsPartialContent,
errorMessage: "All attempts fail:\n#1: 404\n#2: 404\n#3: 404",
state: encoding.MercuryFlakyFailure,
},
{
name: "failure - partial content three times with status partial content",
Expand Down Expand Up @@ -818,7 +822,7 @@ func TestV03_MultiFeedRequest(t *testing.T) {
assert.Equal(t, [][]byte(nil), m.Bytes)
} else if tt.retryNumber >= totalAttempt || tt.errorMessage != "" {
assert.Equal(t, tt.errorMessage, m.Error.Error())
assert.Equal(t, [][]byte{}, m.Bytes)
assert.Equal(t, [][]byte(nil), m.Bytes)
} else {
assert.Nil(t, m.Error)
var reports [][]byte
Expand Down

0 comments on commit 4540b21

Please sign in to comment.