Skip to content

Commit

Permalink
Increase disablement of cache if LatestReportTTL=0 (#11636)
Browse files Browse the repository at this point in the history
* Increase disablement of cache if LatestReportTTL=0

* Also reset transport on consecutive LatestReport request failures

* Optimize case where caching disabled

* Fix comment

* Fix tests
  • Loading branch information
samsondav authored Dec 21, 2023
1 parent be53519 commit c5aa49b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 31 deletions.
6 changes: 5 additions & 1 deletion core/services/relay/evm/mercury/wsrpc/cache/cache_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newCacheSet(lggr logger.Logger, cfg Config) *cacheSet {

func (cs *cacheSet) Start(context.Context) error {
return cs.StartOnce("CacheSet", func() error {
cs.lggr.Debugw("CacheSet starting", "config", cs.cfg)
cs.lggr.Debugw("CacheSet starting", "config", cs.cfg, "cachingEnabled", cs.cfg.LatestReportTTL > 0)
return nil
})
}
Expand All @@ -65,6 +65,10 @@ func (cs *cacheSet) Close() error {
}

func (cs *cacheSet) Get(ctx context.Context, client Client) (f Fetcher, err error) {
if cs.cfg.LatestReportTTL == 0 {
// caching disabled
return client, nil
}
ok := cs.IfStarted(func() {
f, err = cs.get(ctx, client)
})
Expand Down
13 changes: 12 additions & 1 deletion core/services/relay/evm/mercury/wsrpc/cache/cache_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

func Test_CacheSet(t *testing.T) {
lggr := logger.TestLogger(t)
cs := newCacheSet(lggr, Config{})
cs := newCacheSet(lggr, Config{LatestReportTTL: 1})
disabledCs := newCacheSet(lggr, Config{LatestReportTTL: 0})
ctx := testutils.Context(t)
servicetest.Run(t, cs)

Expand All @@ -22,6 +23,16 @@ func Test_CacheSet(t *testing.T) {

var err error
var f Fetcher
t.Run("with caching disabled, returns the passed client", func(t *testing.T) {
assert.Len(t, disabledCs.caches, 0)

f, err = disabledCs.Get(ctx, c)
require.NoError(t, err)

assert.Same(t, c, f)
assert.Len(t, disabledCs.caches, 0)
})

t.Run("with virgin cacheset, makes new entry and returns it", func(t *testing.T) {
assert.Len(t, cs.caches, 0)

Expand Down
45 changes: 25 additions & 20 deletions core/services/relay/evm/mercury/wsrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// MaxConsecutiveTransmitFailures controls how many consecutive requests are
// MaxConsecutiveRequestFailures controls how many consecutive requests are
// allowed to time out before we reset the connection
const MaxConsecutiveTransmitFailures = 5
const MaxConsecutiveRequestFailures = 10

var (
timeoutCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -55,7 +55,7 @@ var (
)
connectionResetCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_connection_reset_count",
Help: fmt.Sprintf("Running count of times connection to mercury server has been reset (connection reset happens automatically after %d consecutive transmit failures)", MaxConsecutiveTransmitFailures),
Help: fmt.Sprintf("Running count of times connection to mercury server has been reset (connection reset happens automatically after %d consecutive request failures)", MaxConsecutiveRequestFailures),
},
[]string{"serverURL"},
)
Expand Down Expand Up @@ -256,13 +256,26 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p
return nil, errors.Wrap(err, "Transmit call failed")
}
resp, err = w.rawClient.Transmit(ctx, req)
w.handleTimeout(err)
if err != nil {
w.logger.Warnw("Transmit call failed due to networking error", "err", err, "resp", resp)
incRequestStatusMetric(statusFailed)
} else {
w.logger.Tracew("Transmit call succeeded", "resp", resp)
incRequestStatusMetric(statusSuccess)
setRequestLatencyMetric(float64(time.Since(start).Milliseconds()))
}
return
}

func (w *client) handleTimeout(err error) {
if errors.Is(err, context.DeadlineExceeded) {
w.timeoutCountMetric.Inc()
cnt := w.consecutiveTimeoutCnt.Add(1)
if cnt == MaxConsecutiveTransmitFailures {
if cnt == MaxConsecutiveRequestFailures {
w.logger.Errorf("Timed out on %d consecutive transmits, resetting transport", cnt)
// NOTE: If we get 5+ request timeouts in a row, close and re-open
// the websocket connection.
// NOTE: If we get at least MaxConsecutiveRequestFailures request
// timeouts in a row, close and re-open the websocket connection.
//
// This *shouldn't* be necessary in theory (ideally, wsrpc would
// handle it for us) but it acts as a "belts and braces" approach
Expand All @@ -271,11 +284,11 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p
select {
case w.chResetTransport <- struct{}{}:
default:
// This can happen if we had 5 consecutive timeouts, already
// sent a reset signal, then the connection started working
// again (resetting the count) then we got 5 additional
// failures before the runloop was able to close the bad
// connection.
// This can happen if we had MaxConsecutiveRequestFailures
// consecutive timeouts, already sent a reset signal, then the
// connection started working again (resetting the count) then
// we got MaxConsecutiveRequestFailures additional failures
// before the runloop was able to close the bad connection.
//
// It should be safe to just ignore in this case.
//
Expand All @@ -286,15 +299,6 @@ func (w *client) Transmit(ctx context.Context, req *pb.TransmitRequest) (resp *p
} else {
w.consecutiveTimeoutCnt.Store(0)
}
if err != nil {
w.logger.Warnw("Transmit call failed due to networking error", "err", err, "resp", resp)
incRequestStatusMetric(statusFailed)
} else {
w.logger.Tracew("Transmit call succeeded", "resp", resp)
incRequestStatusMetric(statusSuccess)
setRequestLatencyMetric(float64(time.Since(start).Milliseconds()))
}
return
}

func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
Expand All @@ -306,6 +310,7 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest)
var cached bool
if w.cache == nil {
resp, err = w.rawClient.LatestReport(ctx, req)
w.handleTimeout(err)
} else {
cached = true
resp, err = w.cache.LatestReport(ctx, req)
Expand Down
15 changes: 6 additions & 9 deletions core/services/relay/evm/mercury/wsrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func Test_Client_Transmit(t *testing.T) {

noopCacheSet := newNoopCacheSet()

t.Run("sends on reset channel after MaxConsecutiveTransmitFailures timed out transmits", func(t *testing.T) {
t.Run("sends on reset channel after MaxConsecutiveRequestFailures timed out transmits", func(t *testing.T) {
calls := 0
transmitErr := context.DeadlineExceeded
wsrpcClient := &mocks.MockWSRPCClient{
Expand All @@ -70,19 +70,19 @@ func Test_Client_Transmit(t *testing.T) {
c.conn = conn
c.rawClient = wsrpcClient
require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil }))
for i := 1; i < MaxConsecutiveTransmitFailures; i++ {
for i := 1; i < MaxConsecutiveRequestFailures; i++ {
_, err := c.Transmit(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
}
assert.Equal(t, 4, calls)
assert.Equal(t, MaxConsecutiveRequestFailures-1, calls)
select {
case <-c.chResetTransport:
t.Fatal("unexpected send on chResetTransport")
default:
}
_, err := c.Transmit(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
assert.Equal(t, 5, calls)
assert.Equal(t, MaxConsecutiveRequestFailures, calls)
select {
case <-c.chResetTransport:
default:
Expand All @@ -94,14 +94,14 @@ func Test_Client_Transmit(t *testing.T) {
// working transmit to reset counter
_, err = c.Transmit(ctx, req)
require.NoError(t, err)
assert.Equal(t, 6, calls)
assert.Equal(t, MaxConsecutiveRequestFailures+1, calls)
assert.Equal(t, 0, int(c.consecutiveTimeoutCnt.Load()))
})

t.Run("doesn't block in case channel is full", func(t *testing.T) {
transmitErr = context.DeadlineExceeded
c.chResetTransport = nil // simulate full channel
for i := 0; i < MaxConsecutiveTransmitFailures; i++ {
for i := 0; i < MaxConsecutiveRequestFailures; i++ {
_, err := c.Transmit(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
}
Expand Down Expand Up @@ -162,10 +162,7 @@ func Test_Client_LatestReport(t *testing.T) {

// simulate start without dialling
require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil }))
var err error
servicetest.Run(t, cacheSet)
c.cache, err = cacheSet.Get(ctx, c)
require.NoError(t, err)

for i := 0; i < 5; i++ {
r, err := c.LatestReport(ctx, req)
Expand Down

0 comments on commit c5aa49b

Please sign in to comment.