Skip to content

Commit

Permalink
parallelize mercury requests in pipeline (#9744)
Browse files Browse the repository at this point in the history
* parallelize mercury calls

* parallelize mercury requests in pipeline

* refactor

* fix tests

* address comments
  • Loading branch information
FelixFan1992 committed Jul 12, 2023
1 parent bfc91ea commit 42e3dcc
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 144 deletions.
152 changes: 88 additions & 64 deletions core/services/ocr2/plugins/ocr2keeper/evm21/feed_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/avast/retry-go/v4"
Expand All @@ -25,8 +26,8 @@ import (

const (
BlockNumber = "blockNumber" // valid for v0.2
FeedID = "feedID" // valid for v0.3
FeedIDHex = "feedIDHex" // valid for v0.2
FeedId = "feedId" // valid for v0.3
FeedIdHex = "feedIdHex" // valid for v0.2
MercuryPathV2 = "/client?"
MercuryPathV3 = "/v1/reports?"
MercuryBatchPathV3 = "/v1/reports/bulk?"
Expand All @@ -46,6 +47,8 @@ type FeedLookup struct {
timeParamKey string
time *big.Int
extraData []byte
upkeepId *big.Int
block uint64
}

// MercuryResponse is used in both single feed endpoint and bulk endpoint because bulk endpoint will return ONE
Expand All @@ -69,7 +72,7 @@ type AdminOffchainConfig struct {

// feedLookup looks through check upkeep results looking for any that need off chain lookup
func (r *EvmRegistry) feedLookup(ctx context.Context, upkeepResults []EVMAutomationUpkeepResult21) ([]EVMAutomationUpkeepResult21, error) {
// TODO (AUTO-2862): parallelize the feed lookup work for all upkeeps
lookups := map[int]*FeedLookup{}
for i := range upkeepResults {
if upkeepResults[i].FailureReason != UPKEEP_FAILURE_REASON_TARGET_CHECK_REVERTED {
continue
Expand All @@ -84,7 +87,6 @@ func (r *EvmRegistry) feedLookup(ctx context.Context, upkeepResults []EVMAutomat
}

allowed, err := r.allowedToUseMercury(opts, upkeepId)
r.lggr.Info(allowed)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s block %d failed to time mercury allow list: %v", upkeepId, block, err)
continue
Expand All @@ -96,52 +98,73 @@ func (r *EvmRegistry) feedLookup(ctx context.Context, upkeepResults []EVMAutomat
continue
}

feedLookup, err := r.decodeFeedLookup(upkeepResults[i].PerformData)
r.lggr.Infof("[FeedLookup] upkeep %s block %d decodeFeedLookup performData=%s", upkeepId, block, hexutil.Encode(upkeepResults[i].PerformData))
lookup, err := r.decodeFeedLookup(upkeepResults[i].PerformData)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s block %d decodeFeedLookup: %v", upkeepId, block, err)
continue
}
r.lggr.Infof("[FeedLookup] upkeep %s block %d feedLookup=%v", upkeepId, block, feedLookup)
lookup.upkeepId = upkeepId
// the block here is exclusively used to call checkCallback at this block, not to be confused with the block number
// in the revert for mercury v0.2, which is denoted by time in the struct bc starting from v0.3, only timestamp will be supported
lookup.block = uint64(block)
r.lggr.Infof("[FeedLookup] upkeep %s block %d decodeFeedLookup feedKey=%s timeKey=%s feeds=%v time=%s extraData=%s", upkeepId, block, lookup.feedParamKey, lookup.timeParamKey, lookup.feeds, lookup.time, hexutil.Encode(lookup.extraData))
lookups[i] = lookup
}

values, retryable, err := r.doMercuryRequest(ctx, feedLookup, upkeepId)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s block %d retryable %v doMercuryRequest: %v", upkeepId, block, retryable, err)
upkeepResults[i].Retryable = retryable
continue
}
var wg sync.WaitGroup
for i, lookup := range lookups {
wg.Add(1)
go r.doLookup(ctx, &wg, lookup, i, upkeepResults)
}
wg.Wait()

r.lggr.Debugf("[FeedLookup] upkeep %s block %d values: %v\nextraData: %v", upkeepId, block, values, feedLookup.extraData)
mercuryBytes, err := r.checkCallback(ctx, upkeepId, values, feedLookup.extraData, block)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s block %d checkCallback err: %v", upkeepId, block, err)
continue
}
// don't surface error to plugin bc FeedLookup process should be self-contained.
return upkeepResults, nil
}

needed, performData, failureReason, _, err := r.packer.UnpackCheckCallbackResult(mercuryBytes)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s block %d UnpackCheckCallbackResult err: %v", upkeepId, block, err)
continue
}
func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *FeedLookup, i int, upkeepResults []EVMAutomationUpkeepResult21) {
defer wg.Done()

if int(failureReason) == UPKEEP_FAILURE_REASON_MERCURY_CALLBACK_REVERTED {
upkeepResults[i].FailureReason = UPKEEP_FAILURE_REASON_MERCURY_CALLBACK_REVERTED
r.lggr.Debugf("[FeedLookup] upkeep %s block %d mercury callback reverts", upkeepId, block)
continue
}
values, retryable, err := r.doMercuryRequest(ctx, lookup)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s retryable %v doMercuryRequest: %v", lookup.upkeepId, retryable, err)
upkeepResults[i].Retryable = retryable
return
}
for j, v := range values {
r.lggr.Infof("[FeedLookup] checkCallback values[%d]=%s", j, hexutil.Encode(v))
}

if !needed {
upkeepResults[i].FailureReason = UPKEEP_FAILURE_REASON_UPKEEP_NOT_NEEDED
r.lggr.Debugf("[FeedLookup] upkeep %s block %d callback reports upkeep not needed", upkeepId, block)
continue
}
mercuryBytes, err := r.checkCallback(ctx, values, lookup)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s block %d checkCallback err: %v", lookup.upkeepId, lookup.block, err)
return
}
r.lggr.Infof("[FeedLookup] checkCallback mercuryBytes=%s", hexutil.Encode(mercuryBytes))

needed, performData, failureReason, _, err := r.packer.UnpackCheckCallbackResult(mercuryBytes)
if err != nil {
r.lggr.Errorf("[FeedLookup] upkeep %s block %d UnpackCheckCallbackResult err: %v", lookup.upkeepId, lookup.block, err)
return
}

upkeepResults[i].FailureReason = UPKEEP_FAILURE_REASON_NONE
upkeepResults[i].Eligible = true
upkeepResults[i].PerformData = performData
r.lggr.Infof("[FeedLookup] upkeep %s block %d successful with perform data: %+v", upkeepId, block, performData)
if int(failureReason) == UPKEEP_FAILURE_REASON_MERCURY_CALLBACK_REVERTED {
upkeepResults[i].FailureReason = UPKEEP_FAILURE_REASON_MERCURY_CALLBACK_REVERTED
r.lggr.Debugf("[FeedLookup] upkeep %s block %d mercury callback reverts", lookup.upkeepId, lookup.block)
return
}
// don't surface error to plugin bc FeedLookup process should be self-contained.
return upkeepResults, nil

if !needed {
upkeepResults[i].FailureReason = UPKEEP_FAILURE_REASON_UPKEEP_NOT_NEEDED
r.lggr.Debugf("[FeedLookup] upkeep %s block %d callback reports upkeep not needed", lookup.upkeepId, lookup.block)
return
}

upkeepResults[i].FailureReason = UPKEEP_FAILURE_REASON_NONE
upkeepResults[i].Eligible = true
upkeepResults[i].PerformData = performData
r.lggr.Infof("[FeedLookup] upkeep %s block %d successful with perform data: %s", lookup.upkeepId, lookup.block, hexutil.Encode(performData))
}

// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if
Expand Down Expand Up @@ -184,8 +207,8 @@ func (r *EvmRegistry) decodeFeedLookup(data []byte) (*FeedLookup, error) {
}, nil
}

func (r *EvmRegistry) checkCallback(ctx context.Context, upkeepID *big.Int, values [][]byte, ed []byte, block uint32) (hexutil.Bytes, error) {
payload, err := r.abi.Pack("checkCallback", upkeepID, values, ed)
func (r *EvmRegistry) checkCallback(ctx context.Context, values [][]byte, lookup *FeedLookup) (hexutil.Bytes, error) {
payload, err := r.abi.Pack("checkCallback", lookup.upkeepId, values, lookup.extraData)
if err != nil {
return nil, err
}
Expand All @@ -196,32 +219,33 @@ func (r *EvmRegistry) checkCallback(ctx context.Context, upkeepID *big.Int, valu
"data": hexutil.Bytes(payload),
}

err = r.client.CallContext(ctx, &b, "eth_call", args, hexutil.EncodeUint64(uint64(block)))
// call checkCallback function at the block which OCR3 has agreed upon
err = r.client.CallContext(ctx, &b, "eth_call", args, hexutil.EncodeUint64(lookup.block))
if err != nil {
return nil, err
}
return b, nil
}

// doMercuryRequest sends requests to Mercury API to retrieve ChainlinkBlob.
func (r *EvmRegistry) doMercuryRequest(ctx context.Context, ml *FeedLookup, upkeepId *big.Int) ([][]byte, bool, error) {
func (r *EvmRegistry) doMercuryRequest(ctx context.Context, ml *FeedLookup) ([][]byte, bool, error) {
// TODO (AUTO-3253): if no feed labels are provided in v0.3, request for all feeds
resultLen := len(ml.feeds)
ch := make(chan MercuryBytes, resultLen)
if ml.feedParamKey == FeedIDHex && ml.timeParamKey == BlockNumber {
if ml.feedParamKey == FeedIdHex && ml.timeParamKey == BlockNumber {
// only mercury v0.2
for i := range ml.feeds {
go r.singleFeedRequest(ctx, ch, upkeepId, i, ml, MercuryV02)
go r.singleFeedRequest(ctx, ch, i, ml, MercuryV02)
}
} else if ml.feedParamKey == FeedID && ml.timeParamKey == Timestamp {
} else if ml.feedParamKey == FeedId && ml.timeParamKey == Timestamp {
// only mercury v0.3
if resultLen == 1 {
go r.singleFeedRequest(ctx, ch, upkeepId, 0, ml, MercuryV03)
go r.singleFeedRequest(ctx, ch, 0, ml, MercuryV03)
} else {
// create a new channel with buffer size 1 since the batch endpoint will only return 1 blob
resultLen = 1
ch = make(chan MercuryBytes, resultLen)
go r.multiFeedsRequest(ctx, ch, upkeepId, ml)
go r.multiFeedsRequest(ctx, ch, ml)
}
} else {
return nil, false, fmt.Errorf("invalid label combination: feed param key %s and time param key %s", ml.feedParamKey, ml.timeParamKey)
Expand All @@ -240,17 +264,17 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, ml *FeedLookup, upke
}
results[m.Index] = m.Bytes
}
r.lggr.Debugf("FeedLookup upkeep %s retryable %s reqErr %w", upkeepId.String(), retryable && !allSuccess, reqErr)
r.lggr.Debugf("FeedLookup upkeep %s retryable %s reqErr %w", ml.upkeepId.String(), retryable && !allSuccess, reqErr)
// only retry when not all successful AND none are not retryable
return results, retryable && !allSuccess, reqErr
}

// singleFeedRequest sends a Mercury request for a single feed report.
func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryBytes, upkeepId *big.Int, index int, ml *FeedLookup, mv MercuryVersion) {
func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryBytes, index int, ml *FeedLookup, mv MercuryVersion) {
q := url.Values{
ml.feedParamKey: {ml.feeds[index]},
ml.timeParamKey: {ml.time.String()},
UserId: {upkeepId.String()},
UserId: {ml.upkeepId.String()},
}
mercuryURL := r.mercury.cred.URL
path := MercuryPathV2
Expand Down Expand Up @@ -279,7 +303,7 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryBy
retryable = false
resp, err1 := r.hc.Do(req)
if err1 != nil {
r.lggr.Errorf("FeedLookup upkeep %s block %s GET request fails for feed %s: %v", upkeepId.String(), ml.time.String(), ml.feeds[index], err1)
r.lggr.Errorf("FeedLookup upkeep %s block %s GET request fails for feed %s: %v", ml.upkeepId.String(), ml.time.String(), ml.feeds[index], err1)
return err1
}
defer resp.Body.Close()
Expand All @@ -289,22 +313,22 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryBy
}

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError {
r.lggr.Errorf("FeedLookup upkeep %s block %s received status code %d for feed %s", upkeepId.String(), ml.time.String(), resp.StatusCode, ml.feeds[index])
r.lggr.Errorf("FeedLookup upkeep %s block %s received status code %d for feed %s", ml.upkeepId.String(), ml.time.String(), resp.StatusCode, ml.feeds[index])
retryable = true
return errors.New(strconv.FormatInt(int64(resp.StatusCode), 10))
} else if resp.StatusCode != http.StatusOK {
return fmt.Errorf("FeedLookup upkeep %s block %s received status code %d for feed %s", upkeepId.String(), ml.time.String(), resp.StatusCode, ml.feeds[index])
return fmt.Errorf("FeedLookup upkeep %s block %s received status code %d for feed %s", ml.upkeepId.String(), ml.time.String(), resp.StatusCode, ml.feeds[index])
}

var m MercuryResponse
err1 = json.Unmarshal(body, &m)
if err1 != nil {
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to unmarshal body to MercuryResponse for feed %s: %v", upkeepId.String(), ml.time.String(), ml.feeds[index], err1)
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to unmarshal body to MercuryResponse for feed %s: %v", ml.upkeepId.String(), ml.time.String(), ml.feeds[index], err1)
return err1
}
blobBytes, err1 := hexutil.Decode(m.ChainlinkBlob)
if err1 != nil {
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to decode chainlinkBlob %s for feed %s: %v", upkeepId.String(), ml.time.String(), m.ChainlinkBlob, ml.feeds[index], err1)
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to decode chainlinkBlob %s for feed %s: %v", ml.upkeepId.String(), ml.time.String(), m.ChainlinkBlob, ml.feeds[index], err1)
return err1
}
ch <- MercuryBytes{Index: index, Bytes: blobBytes}
Expand All @@ -330,11 +354,11 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryBy
}

// multiFeedsRequest sends a Mercury request for a multi-feed report
func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryBytes, upkeepId *big.Int, ml *FeedLookup) {
func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryBytes, ml *FeedLookup) {
q := url.Values{
FeedID: {strings.Join(ml.feeds, ",")},
FeedId: {strings.Join(ml.feeds, ",")},
Timestamp: {ml.time.String()},
UserId: {upkeepId.String()},
UserId: {ml.upkeepId.String()},
}

reqUrl := fmt.Sprintf("%s%s%s", r.mercury.cred.URL, MercuryBatchPathV3, q.Encode())
Expand All @@ -359,7 +383,7 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryBy
retryable = false
resp, err1 := r.hc.Do(req)
if err1 != nil {
r.lggr.Errorf("FeedLookup upkeep %s block %s GET request fails for multi feed: %v", upkeepId.String(), ml.time.String(), err1)
r.lggr.Errorf("FeedLookup upkeep %s block %s GET request fails for multi feed: %v", ml.upkeepId.String(), ml.time.String(), err1)
return err1
}
defer resp.Body.Close()
Expand All @@ -369,22 +393,22 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryBy
}

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError {
r.lggr.Errorf("FeedLookup upkeep %s block %s received status code %d for multi feed", upkeepId.String(), ml.time.String(), resp.StatusCode)
r.lggr.Errorf("FeedLookup upkeep %s block %s received status code %d for multi feed", ml.upkeepId.String(), ml.time.String(), resp.StatusCode)
retryable = true
return errors.New(strconv.FormatInt(int64(resp.StatusCode), 10))
} else if resp.StatusCode != http.StatusOK {
return fmt.Errorf("FeedLookup upkeep %s block %s received status code %d for multi feed", upkeepId.String(), ml.time.String(), resp.StatusCode)
return fmt.Errorf("FeedLookup upkeep %s block %s received status code %d for multi feed", ml.upkeepId.String(), ml.time.String(), resp.StatusCode)
}

var m MercuryResponse
err1 = json.Unmarshal(body, &m)
if err1 != nil {
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to unmarshal body to MercuryResponse for multi feed: %v", upkeepId.String(), ml.time.String(), err1)
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to unmarshal body to MercuryResponse for multi feed: %v", ml.upkeepId.String(), ml.time.String(), err1)
return err1
}
blobBytes, err1 := hexutil.Decode(m.ChainlinkBlob)
if err1 != nil {
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to decode chainlinkBlob %s for multi feed: %v", upkeepId.String(), ml.time.String(), m.ChainlinkBlob, err1)
r.lggr.Errorf("FeedLookup upkeep %s block %s failed to decode chainlinkBlob %s for multi feed: %v", ml.upkeepId.String(), ml.time.String(), m.ChainlinkBlob, err1)
return err1
}
ch <- MercuryBytes{
Expand Down
Loading

0 comments on commit 42e3dcc

Please sign in to comment.