Skip to content

Commit

Permalink
check multiple responses:
Browse files Browse the repository at this point in the history
- server timeout
- unauthorized
- bad req
- internal server err
- not found
  • Loading branch information
amirylm committed Feb 23, 2024
1 parent 5b18f3b commit 0bcbe19
Showing 1 changed file with 54 additions and 41 deletions.
95 changes: 54 additions & 41 deletions core/services/ocr2/plugins/ocr2keeper/integration_21_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package ocr2keeper_test

import (
"context"
"crypto/rand"
crand "crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"math/rand"
"net/http"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -60,7 +62,7 @@ import (

func TestFilterNamesFromSpec21(t *testing.T) {
b := make([]byte, 20)
_, err := rand.Read(b)
_, err := crand.Read(b)
require.NoError(t, err)
address := common.HexToAddress(hexutil.Encode(b))

Expand Down Expand Up @@ -415,48 +417,39 @@ func TestIntegration_KeeperPluginLogUpkeep_ErrHandler(t *testing.T) {

_, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner)

const upkeepCount = 10
const mercuryFailRounds = 7
const mercuryFailCount = upkeepCount * 3 * mercuryFailRounds

// testing with the mercury server involves mocking responses. currently,
// there is not a way to connect a mercury call to an upkeep id (though we
// could add custom headers) so the test must be fairly basic and just
// count calls before switching to successes
var (
mu sync.Mutex
count int
)

mercuryServer.RegisterHandler(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()

count++

_ = r.ParseForm()

t.Logf("MercuryHTTPServe:RequestURI: %s", r.RequestURI)
upkeepCount := 10

for key, value := range r.Form {
t.Logf("MercuryHTTPServe:FormValue: key: %s; value: %s;", key, value)
}

// the streams lookup retries against the remote server 3 times before
// returning a result as retryable.
// the simulation here should force the streams lookup process to return
// retryable 2 times.
// the total count of failures should be (upkeepCount * 3 * tryCount)
if count <= mercuryFailCount {
w.WriteHeader(http.StatusNotFound)
return
respTimeout := -1
errResponses := []int{
respTimeout,
http.StatusUnauthorized,
http.StatusBadRequest,
http.StatusInternalServerError,
}
// inline retries (3) * retry rounds (6) + timeout round (1)
mercuryFailCountPerUpkeep := 3*6 + 1
// mercuryFailCount is the number of times the mercury server will return an
// error before returning a success
mercuryFailCount := 2 + (mercuryFailCountPerUpkeep * 3)
for i := 3; i < mercuryFailCount; i++ {
errResponses = append(errResponses, http.StatusNotFound)
}
startMercuryServer(t, mercuryServer, func(i int) (int, []byte) {
if i < len(errResponses) {
resp := errResponses[i]
switch resp {
case http.StatusNotFound, http.StatusInternalServerError:
// in case we got a 404 or 500, we wait a bit to simulate real behavior
time.Sleep(time.Duration(rand.Intn(2500)) * time.Millisecond)
case respTimeout: // mercury server timeout
time.Sleep(60 * time.Second) // 60s is the default timeout
resp = http.StatusNotFound
default:
}
return resp, nil
}

// start sending success messages
output := `{"chainlinkBlob":"0x0001c38d71fed6c320b90e84b6f559459814d068e2a1700adc931ca9717d4fe70000000000000000000000000000000000000000000000000000000001a80b52b4bf1233f9cb71144a253a1791b202113c4ab4a92fa1b176d684b4959666ff8200000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001004254432d5553442d415242495452554d2d544553544e4554000000000000000000000000000000000000000000000000000000000000000000000000645570be000000000000000000000000000000000000000000000000000002af2b818dc5000000000000000000000000000000000000000000000000000002af2426faf3000000000000000000000000000000000000000000000000000002af32dc209700000000000000000000000000000000000000000000000000000000012130f8df0a9745bb6ad5e2df605e158ba8ad8a33ef8a0acf9851f0f01668a3a3f2b68600000000000000000000000000000000000000000000000000000000012130f60000000000000000000000000000000000000000000000000000000000000002c4a7958dce105089cf5edb68dad7dcfe8618d7784eb397f97d5a5fade78c11a58275aebda478968e545f7e3657aba9dcbe8d44605e4c6fde3e24edd5e22c94270000000000000000000000000000000000000000000000000000000000000002459c12d33986018a8959566d145225f0c4a4e61a9a3f50361ccff397899314f0018162cf10cd89897635a0bb62a822355bd199d09f4abe76e4d05261bb44733d"}`

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(output))
return http.StatusOK, []byte(output)
})
defer mercuryServer.Stop()

Expand Down Expand Up @@ -508,6 +501,25 @@ func TestIntegration_KeeperPluginLogUpkeep_ErrHandler(t *testing.T) {
done()
}

func startMercuryServer(t *testing.T, mercuryServer *SimulatedMercuryServer, responder func(i int) (int, []byte)) {
i := atomic.Int32{}
mercuryServer.RegisterHandler(func(w http.ResponseWriter, r *http.Request) {
_ = r.ParseForm()
t.Logf("MercuryHTTPServe:RequestURI: %s", r.RequestURI)
for key, value := range r.Form {
t.Logf("MercuryHTTPServe:FormValue: key: %s; value: %s;", key, value)
}

ii := int(i.Load())
i.Add(1)
status, body := responder(ii)
w.WriteHeader(status)
if len(body) > 0 {
_, _ = w.Write(body)
}
})
}

func listenEvents(t *testing.T, backend *backends.SimulatedBackend, addrs []common.Address, topic common.Hash, startBlock int64, count int) (func() bool, func()) {
ctx, cancel := context.WithCancel(testutils.Context(t))
visited := make(map[string]bool)
Expand Down Expand Up @@ -536,6 +548,7 @@ func listenEvents(t *testing.T, backend *backends.SimulatedBackend, addrs []comm
continue
}
visited[visitedID] = true
t.Logf("found err handler log in block %d [%s]", log.BlockNumber, log.BlockHash.Hex())
cacheID := log.Address.Hex()
count, ok := cache.Load(cacheID)
if !ok {
Expand Down

0 comments on commit 0bcbe19

Please sign in to comment.