diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index 2f3488c9c24..d455e17406a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -46,15 +46,18 @@ const ( type ErrCode uint32 const ( - // TODO: Finalize these - ErrCodeNil ErrCode = 0 - ErrCodeStreamsPartialContent ErrCode = 808206 - ErrCodeStreamsBadRequest ErrCode = 808400 - ErrCodeStreamsUnauthorized ErrCode = 808401 - ErrCodeStreamsInternalError ErrCode = 808500 - ErrCodeStreamsBadResponse ErrCode = 808600 - ErrCodeStreamsTimeout ErrCode = 808602 - ErrCodeStreamsUnknownError ErrCode = 808700 + ErrCodeNil ErrCode = 0 + ErrCodeStreamsPartialContent ErrCode = 808206 + ErrCodeStreamsBadRequest ErrCode = 808400 + ErrCodeStreamsUnauthorized ErrCode = 808401 + ErrCodeStreamsNotFound ErrCode = 808404 + ErrCodeStreamsInternalError ErrCode = 808500 + ErrCodeStreamsBadGateway ErrCode = 808502 + ErrCodeStreamsServiceUnavailable ErrCode = 808503 + ErrCodeStreamsStatusGatewayTimeout ErrCode = 808504 + ErrCodeStreamsBadResponse ErrCode = 808600 + ErrCodeStreamsTimeout ErrCode = 808601 + ErrCodeStreamsUnknownError ErrCode = 808700 ) func HttpToStreamsErrCode(statusCode int) ErrCode { @@ -67,10 +70,18 @@ func HttpToStreamsErrCode(statusCode int) ErrCode { return ErrCodeStreamsBadRequest case http.StatusUnauthorized: return ErrCodeStreamsUnauthorized - case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + case http.StatusNotFound: + return ErrCodeStreamsNotFound + case http.StatusInternalServerError: return ErrCodeStreamsInternalError + case http.StatusBadGateway: + return ErrCodeStreamsBadGateway + case http.StatusServiceUnavailable: + return ErrCodeStreamsServiceUnavailable + case http.StatusGatewayTimeout: + return ErrCodeStreamsStatusGatewayTimeout default: - return 0 + return ErrCodeStreamsUnknownError } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go index ed2de04ff9c..6ad3601398a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go @@ -59,7 +59,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo resultLen := len(streamsLookup.Feeds) ch := make(chan mercury.MercuryData, resultLen) if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil + return encoding.NoPipelineError, nil, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil } for i := range streamsLookup.Feeds { // TODO (AUTO-7209): limit the number of concurrent requests @@ -184,7 +184,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur // Not a pipeline error, a bad streams response, send back error code ch <- mercury.MercuryData{ Index: index, - Bytes: [][]byte{}, + Bytes: nil, ErrCode: encoding.ErrCodeStreamsBadResponse, State: encoding.NoPipelineError, } @@ -207,8 +207,8 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Errorf("at block %s upkeep %s received unhandled status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index]) ch <- mercury.MercuryData{ Index: index, - Bytes: [][]byte{}, - ErrCode: encoding.ErrCodeStreamsUnknownError, + Bytes: nil, + ErrCode: encoding.HttpToStreamsErrCode(httpResponse.StatusCode), State: encoding.NoPipelineError, } sent = true @@ -222,7 +222,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV02Response for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds[index], err) ch <- mercury.MercuryData{ Index: index, - Bytes: [][]byte{}, + Bytes: nil, ErrCode: encoding.ErrCodeStreamsBadResponse, State: encoding.NoPipelineError, } @@ -233,7 +233,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Warnf("at block %s upkeep %s failed to decode chainlinkBlob %s for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), m.ChainlinkBlob, sl.Feeds[index], err) ch <- mercury.MercuryData{ Index: index, - Bytes: [][]byte{}, + Bytes: nil, ErrCode: encoding.ErrCodeStreamsBadResponse, State: encoding.NoPipelineError, } @@ -261,7 +261,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur if !sent { ch <- mercury.MercuryData{ Index: index, - Bytes: [][]byte{}, + Bytes: nil, ErrCode: errCode, State: state, Retryable: retryable, diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go index ad9d2279727..2a5483c9a29 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go @@ -213,12 +213,13 @@ func TestV02_SingleFeedRequest(t *testing.T) { }, UpkeepId: upkeepId, }, - blob: "0xab2123dc", - retryNumber: totalAttempt, - statusCode: http.StatusServiceUnavailable, - state: encoding.MercuryFlakyFailure, - retryable: true, - errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 503\n#2: 503\n#3: 503", + blob: "0xab2123dc", + retryNumber: totalAttempt, + statusCode: http.StatusServiceUnavailable, + streamsErrCode: encoding.ErrCodeStreamsServiceUnavailable, + state: encoding.MercuryFlakyFailure, + retryable: true, + errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 503\n#2: 503\n#3: 503", }, { name: "failure StatusBadGateway - returns retryable", @@ -232,12 +233,13 @@ func TestV02_SingleFeedRequest(t *testing.T) { }, UpkeepId: upkeepId, }, - blob: "0xab2123dc", - retryNumber: totalAttempt, - statusCode: http.StatusBadGateway, - state: encoding.MercuryFlakyFailure, - retryable: true, - errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 502\n#2: 502\n#3: 502", + blob: "0xab2123dc", + retryNumber: totalAttempt, + statusCode: http.StatusBadGateway, + streamsErrCode: encoding.ErrCodeStreamsBadGateway, + state: encoding.MercuryFlakyFailure, + retryable: true, + errorMessage: "failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 502\n#2: 502\n#3: 502", }, { name: "failure - returns retryable and then non-retryable", @@ -364,10 +366,10 @@ func TestV02_SingleFeedRequest(t *testing.T) { assert.Equal(t, tt.state, m.State) if tt.streamsErrCode != encoding.ErrCodeNil { assert.Equal(t, tt.streamsErrCode, m.ErrCode) - assert.Equal(t, [][]byte{}, m.Bytes) + assert.Equal(t, [][]byte(nil), m.Bytes) } else if tt.retryNumber >= totalAttempt || tt.errorMessage != "" { assert.Equal(t, tt.errorMessage, m.Error.Error()) - assert.Equal(t, [][]byte{}, m.Bytes) + assert.Equal(t, [][]byte(nil), m.Bytes) } else { blobBytes, err := hexutil.Decode(tt.blob) assert.Nil(t, err) @@ -435,6 +437,28 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedErrCode: encoding.ErrCodeStreamsInternalError, expectedState: encoding.NoPipelineError, }, + { + name: "failure - retryable but out of retries for conditional 404 error code", + lookup: &mercury.StreamsLookup{ + StreamsLookupError: &mercury.StreamsLookupError{ + FeedParamKey: mercury.FeedIdHex, + Feeds: []string{"0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"}, + TimeParamKey: mercury.BlockNumber, + Time: big.NewInt(25880526), + ExtraData: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100}, + }, + UpkeepId: upkeepId, + }, + upkeepType: automationTypes.ConditionTrigger, + mockHttpStatusCode: http.StatusNotFound, + mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, + expectedValues: [][]byte(nil), + expectedRetryable: false, + pluginRetries: 0, + expectedRetryInterval: 0 * time.Second, + expectedErrCode: encoding.ErrCodeStreamsNotFound, + expectedState: encoding.NoPipelineError, + }, { name: "failure - retryable and interval is 1s", lookup: &mercury.StreamsLookup{ @@ -495,11 +519,11 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { }, upkeepType: automationTypes.LogTrigger, pluginRetries: 6, - mockHttpStatusCode: http.StatusInternalServerError, + mockHttpStatusCode: http.StatusBadGateway, mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, expectedValues: [][]byte(nil), expectedRetryInterval: 0 * time.Second, - expectedErrCode: encoding.ErrCodeStreamsInternalError, + expectedErrCode: encoding.ErrCodeStreamsBadGateway, expectedRetryable: false, expectedState: encoding.NoPipelineError, }, @@ -534,7 +558,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { }, UpkeepId: upkeepId, }, - expectedValues: [][]byte{}, + expectedValues: [][]byte(nil), expectedErrCode: encoding.ErrCodeStreamsBadRequest, }, { @@ -549,7 +573,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { }, UpkeepId: upkeepId, }, - expectedValues: [][]byte{}, + expectedValues: [][]byte(nil), expectedErrCode: encoding.ErrCodeStreamsBadRequest, }, } @@ -661,7 +685,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineError(t *testing.T hc.On("Do", mock.Anything).Return(resp, nil).Once() // Second request returns MercuryFlakyError resp = &http.Response{ - StatusCode: http.StatusBadGateway, + StatusCode: http.StatusServiceUnavailable, Body: io.NopCloser(bytes.NewReader(b)), } hc.On("Do", mock.Anything).Return(resp, nil).Times(totalAttempt) @@ -681,7 +705,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineError(t *testing.T state, values, errCode, retryable, retryInterval, _ := c.DoRequest(testutils.Context(t), lookup, automationTypes.LogTrigger, pluginRetryKey) assert.Equal(t, true, retryable) assert.Equal(t, 1*time.Second, retryInterval) - assert.Equal(t, encoding.ErrCodeStreamsInternalError, errCode) + assert.Equal(t, encoding.ErrCodeStreamsServiceUnavailable, errCode) assert.Equal(t, encoding.MercuryFlakyFailure, state) assert.Equal(t, [][]byte(nil), values) } @@ -755,7 +779,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineErrorConvertedErro hc.On("Do", mock.Anything).Return(resp, nil).Once() // Second request returns MercuryFlakyError resp = &http.Response{ - StatusCode: http.StatusBadGateway, + StatusCode: http.StatusGatewayTimeout, Body: io.NopCloser(bytes.NewReader(b)), } hc.On("Do", mock.Anything).Return(resp, nil).Times(totalAttempt) @@ -775,7 +799,7 @@ func TestV02_DoMercuryRequestV02_OneFeedSuccessOneFeedPipelineErrorConvertedErro state, values, errCode, retryable, retryInterval, _ := c.DoRequest(testutils.Context(t), lookup, automationTypes.ConditionTrigger, pluginRetryKey) assert.Equal(t, false, retryable) assert.Equal(t, 0*time.Second, retryInterval) - assert.Equal(t, encoding.ErrCodeStreamsInternalError, errCode) + assert.Equal(t, encoding.ErrCodeStreamsStatusGatewayTimeout, errCode) assert.Equal(t, encoding.NoPipelineError, state) assert.Equal(t, [][]byte(nil), values) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go index 34f60042d92..98f943177af 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go @@ -65,7 +65,7 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, upkeepType automationTypes.UpkeepType, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) { if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil + return encoding.NoPipelineError, nil, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil } resultLen := 1 // Only 1 multi-feed request is made for all feeds ch := make(chan mercury.MercuryData, resultLen) @@ -203,7 +203,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) ch <- mercury.MercuryData{ Index: 0, - ErrCode: encoding.ErrCodeStreamsUnknownError, + ErrCode: encoding.HttpToStreamsErrCode(resp.StatusCode), State: encoding.NoPipelineError, } sent = true @@ -234,7 +234,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur retryable = true state = encoding.MercuryFlakyFailure errCode = encoding.HttpToStreamsErrCode(http.StatusPartialContent) - return fmt.Errorf("%d", http.StatusNotFound) + return fmt.Errorf("%d", http.StatusPartialContent) } var reportBytes [][]byte for _, rsp := range response.Reports { @@ -271,7 +271,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur if !sent { ch <- mercury.MercuryData{ Index: 0, - Bytes: [][]byte{}, + Bytes: nil, Retryable: retryable, Error: retryErr, ErrCode: errCode, diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go index ebe5f95f7be..3e5f74b9a24 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go @@ -280,7 +280,7 @@ func TestV03_DoMercuryRequestV03_OneFeedSuccessOneFeedPipelineError(t *testing.T state, values, errCode, retryable, retryInterval, _ := c.DoRequest(testutils.Context(t), lookup, automationTypes.LogTrigger, pluginRetryKey) assert.Equal(t, true, retryable) assert.Equal(t, 1*time.Second, retryInterval) - assert.Equal(t, encoding.ErrCodeStreamsInternalError, errCode) + assert.Equal(t, encoding.ErrCodeStreamsBadGateway, errCode) assert.Equal(t, encoding.MercuryFlakyFailure, state) assert.Equal(t, [][]byte(nil), values) } @@ -611,11 +611,12 @@ func TestV03_MultiFeedRequest(t *testing.T) { }, UpkeepId: upkeepId, }, - retryNumber: totalAttempt, - statusCode: http.StatusGatewayTimeout, - state: encoding.MercuryFlakyFailure, - retryable: true, - errorMessage: "All attempts fail:\n#1: 504\n#2: 504\n#3: 504", + retryNumber: totalAttempt, + statusCode: http.StatusGatewayTimeout, + state: encoding.MercuryFlakyFailure, + retryable: true, + streamsErrCode: encoding.ErrCodeStreamsStatusGatewayTimeout, + errorMessage: "All attempts fail:\n#1: 504\n#2: 504\n#3: 504", }, { name: "failure - StatusServiceUnavailable - returns retryable", @@ -628,11 +629,12 @@ func TestV03_MultiFeedRequest(t *testing.T) { }, UpkeepId: upkeepId, }, - retryNumber: totalAttempt, - statusCode: http.StatusServiceUnavailable, - state: encoding.MercuryFlakyFailure, - retryable: true, - errorMessage: "All attempts fail:\n#1: 503\n#2: 503\n#3: 503", + retryNumber: totalAttempt, + statusCode: http.StatusServiceUnavailable, + state: encoding.MercuryFlakyFailure, + retryable: true, + streamsErrCode: encoding.ErrCodeStreamsServiceUnavailable, + errorMessage: "All attempts fail:\n#1: 503\n#2: 503\n#3: 503", }, { name: "failure - StatusBadGateway - returns retryable", @@ -645,11 +647,12 @@ func TestV03_MultiFeedRequest(t *testing.T) { }, UpkeepId: upkeepId, }, - retryNumber: totalAttempt, - statusCode: http.StatusBadGateway, - state: encoding.MercuryFlakyFailure, - retryable: true, - errorMessage: "All attempts fail:\n#1: 502\n#2: 502\n#3: 502", + retryNumber: totalAttempt, + statusCode: http.StatusBadGateway, + streamsErrCode: encoding.ErrCodeStreamsBadGateway, + state: encoding.MercuryFlakyFailure, + retryable: true, + errorMessage: "All attempts fail:\n#1: 502\n#2: 502\n#3: 502", }, { @@ -673,11 +676,12 @@ func TestV03_MultiFeedRequest(t *testing.T) { }, }, }, - statusCode: http.StatusOK, - retryNumber: totalAttempt, - retryable: true, - errorMessage: "All attempts fail:\n#1: 404\n#2: 404\n#3: 404", - state: encoding.MercuryFlakyFailure, + statusCode: http.StatusOK, + retryNumber: totalAttempt, + retryable: true, + streamsErrCode: encoding.ErrCodeStreamsPartialContent, + errorMessage: "All attempts fail:\n#1: 404\n#2: 404\n#3: 404", + state: encoding.MercuryFlakyFailure, }, { name: "failure - partial content three times with status partial content", @@ -818,7 +822,7 @@ func TestV03_MultiFeedRequest(t *testing.T) { assert.Equal(t, [][]byte(nil), m.Bytes) } else if tt.retryNumber >= totalAttempt || tt.errorMessage != "" { assert.Equal(t, tt.errorMessage, m.Error.Error()) - assert.Equal(t, [][]byte{}, m.Bytes) + assert.Equal(t, [][]byte(nil), m.Bytes) } else { assert.Nil(t, m.Error) var reports [][]byte