Skip to content

Commit

Permalink
Merge pull request #12089 from smartcontractkit/AUTO-9012-investigate…
Browse files Browse the repository at this point in the history
…-http-timeouts-for-mercury-server-calls

Fix timeouts for mercury calls
  • Loading branch information
infiloop2 authored Feb 22, 2024
2 parents fcc1fa2 + 528efd0 commit bad4376
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const (
requestTimeout = 10 * time.Second
)

type Lookup interface {
Lookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult
}
Expand Down Expand Up @@ -101,7 +105,7 @@ func (s *streams) Lookup(ctx context.Context, checkResults []ocr2keepers.CheckRe
for i, lookup := range lookups {
wg.Add(1)
func(i int, lookup *mercury.StreamsLookup) {
s.threadCtrl.Go(func(ctx context.Context) {
s.threadCtrl.GoCtx(ctx, func(ctx context.Context) {
s.doLookup(ctx, &wg, lookup, i, checkResults)
})
}(i, lookup)
Expand Down Expand Up @@ -249,10 +253,12 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL
pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block)
upkeepType := core.GetUpkeepType(checkResults[i].UpkeepID)

reqCtx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
if lookup.IsMercuryV02() {
state, values, errCode, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, upkeepType, pluginRetryKey)
state, values, errCode, retryable, retryInterval, err = s.v02Client.DoRequest(reqCtx, lookup, upkeepType, pluginRetryKey)
} else if lookup.IsMercuryV03() {
state, values, errCode, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, upkeepType, pluginRetryKey)
state, values, errCode, retryable, retryInterval, err = s.v03Client.DoRequest(reqCtx, lookup, upkeepType, pluginRetryKey)
}

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (
"strconv"
"time"

"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/common/hexutil"
automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types"

"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo
for i := range streamsLookup.Feeds {
// TODO (AUTO-7209): limit the number of concurrent requests
i := i
c.threadCtrl.Go(func(ctx context.Context) {
c.threadCtrl.GoCtx(ctx, func(ctx context.Context) {
c.singleFeedRequest(ctx, ch, i, streamsLookup)
})
}
Expand Down Expand Up @@ -172,6 +173,9 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
retryable = true
state = encoding.MercuryFlakyFailure
errCode = encoding.ErrCodeStreamsUnknownError
if ctx.Err() != nil {
errCode = encoding.ErrCodeStreamsTimeout
}
return err
}
defer httpResponse.Body.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/mock"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo
}
resultLen := 1 // Only 1 multi-feed request is made for all feeds
ch := make(chan mercury.MercuryData, resultLen)
c.threadCtrl.Go(func(ctx context.Context) {
c.threadCtrl.GoCtx(ctx, func(ctx context.Context) {
c.multiFeedsRequest(ctx, ch, streamsLookup)
})

Expand Down Expand Up @@ -145,6 +145,9 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
retryable = true
state = encoding.MercuryFlakyFailure
errCode = encoding.ErrCodeStreamsUnknownError
if ctx.Err() != nil {
errCode = encoding.ErrCodeStreamsTimeout
}
return err
}
defer resp.Body.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types"

"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke

chResult := make(chan checkResult, 1)

r.threadCtrl.Go(func(ctx context.Context) {
r.threadCtrl.GoCtx(ctx, func(ctx context.Context) {
r.doCheck(ctx, keys, chResult)
})

Expand Down
14 changes: 14 additions & 0 deletions core/utils/thread_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var _ ThreadControl = &threadControl{}
type ThreadControl interface {
// Go starts a goroutine and tracks the lifetime of the goroutine.
Go(fn func(context.Context))
// GoCtx starts a goroutine with a given context and tracks the lifetime of the goroutine.
GoCtx(ctx context.Context, fn func(context.Context))
// Close cancels the goroutines and waits for all of them to exit.
Close()
}
Expand Down Expand Up @@ -40,6 +42,18 @@ func (tc *threadControl) Go(fn func(context.Context)) {
}()
}

func (tc *threadControl) GoCtx(ctx context.Context, fn func(context.Context)) {
tc.threadsWG.Add(1)
go func() {
defer tc.threadsWG.Done()
c, cn := context.WithCancel(ctx)
defer cn()
ctx, cancel := tc.stop.CtxCancel(c, cn)
defer cancel()
fn(ctx)
}()
}

func (tc *threadControl) Close() {
close(tc.stop)
tc.threadsWG.Wait()
Expand Down
27 changes: 27 additions & 0 deletions core/utils/thread_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package utils

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -25,3 +27,28 @@ func TestThreadControl_Close(t *testing.T) {

require.Equal(t, int32(n), finished.Load())
}

func TestThreadControl_GoCtx(t *testing.T) {
tc := NewThreadControl()
defer tc.Close()

var wg sync.WaitGroup
finished := atomic.Int32{}

ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
tc.GoCtx(ctx, func(c context.Context) {
defer wg.Done()
<-c.Done()
finished.Add(1)
})

go func() {
time.After(1 * time.Millisecond)
cancel()
}()

wg.Wait()
require.Equal(t, int32(1), finished.Load())
}

0 comments on commit bad4376

Please sign in to comment.