From f7623351bc313ea5b050823215d6f22270fb6df9 Mon Sep 17 00:00:00 2001 From: Adam Hamrick Date: Tue, 11 Jul 2023 18:57:20 -0400 Subject: [PATCH] PoC on collect-after --- integration-tests/go.mod | 2 + integration-tests/go.sum | 4 +- integration-tests/testreporters/ocr.go | 393 ++----------------------- integration-tests/testsetups/ocr.go | 292 +++++++++--------- 4 files changed, 170 insertions(+), 521 deletions(-) diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 6f5a708622a..3bb1cdd5f20 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -31,6 +31,8 @@ require ( gopkg.in/guregu/null.v4 v4.0.0 ) +replace github.com/smartcontractkit/chainlink-testing-framework => github.com/smartcontractkit/chainlink-testing-framework v1.12.1-0.20230711191356-9433e54d5d36 + require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.5 // indirect filippo.io/edwards25519 v1.0.0-rc.1 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index e8c3dfd0768..7d140398b6c 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1374,8 +1374,8 @@ github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3 github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3/go.mod h1:MfZBUifutkv3aK7abyw5YmTJbqt8iFwcQDFikrxC/uI= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592 h1:3Ul/LkULxrolCVguHUFnWJamgUDsSGISlm/DzclstmE= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20230622060316-7ce48476dd7d h1:4jSCp6i/p/EIaAkYQDxPK8nXDuv0fBXzKIcVYzetCoI= -github.com/smartcontractkit/chainlink-testing-framework v1.11.8 h1:OIENwV//X8+IMUxWVDDoTxqrVjtutHX1uv7gq7kJsNQ= -github.com/smartcontractkit/chainlink-testing-framework v1.11.8/go.mod h1:yPCQgZEkKK56mOAnsvSsnMnnT+3I6sAWJVxG9t3iRgQ= +github.com/smartcontractkit/chainlink-testing-framework v1.12.1-0.20230711191356-9433e54d5d36 h1:g3PV/c0nvdpaYj+1cWaFied3V3bRDtU+C8Bu+FqzS1I= +github.com/smartcontractkit/chainlink-testing-framework v1.12.1-0.20230711191356-9433e54d5d36/go.mod h1:yPCQgZEkKK56mOAnsvSsnMnnT+3I6sAWJVxG9t3iRgQ= github.com/smartcontractkit/libocr v0.0.0-20230606215712-82b910bef5c1 h1:caG9BWjnCxN/HPBA5ltDGadDraZAsjGIct4S8lh8D5c= github.com/smartcontractkit/libocr v0.0.0-20230606215712-82b910bef5c1/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= github.com/smartcontractkit/ocr2keepers v0.7.0 h1:5csjgDFQEO+kXr45vPX3o/v//X2bznNCWhgn94Qp6n0= diff --git a/integration-tests/testreporters/ocr.go b/integration-tests/testreporters/ocr.go index 37c950c05dd..bc407c82bab 100644 --- a/integration-tests/testreporters/ocr.go +++ b/integration-tests/testreporters/ocr.go @@ -3,11 +3,8 @@ package testreporters import ( "encoding/csv" "fmt" - "math" "os" "path/filepath" - "strings" - "sync" "testing" "time" @@ -19,34 +16,46 @@ import ( // OCRSoakTestReporter collates all OCRAnswerUpdated events into a single report type OCRSoakTestReporter struct { - ContractReports map[string]*OCRSoakTestReport // contractAddress: Answers ExpectedRoundDuration time.Duration AnomaliesDetected bool + timeLine []*OCRTestState namespace string csvLocation string } +type OCRTestState struct { + Time time.Time + Message string +} + // SetNamespace sets the namespace of the report for clean reports func (o *OCRSoakTestReporter) SetNamespace(namespace string) { o.namespace = namespace } +func (o *OCRSoakTestReporter) RecordEvents(expectedEvents, actualEvents []*OCRTestState) { + expectedEventIndex, actualEventIndex := 0, 0 + for expectedEventIndex < len(expectedEvents) || actualEventIndex < len(actualEvents) { + if expectedEventIndex >= len(expectedEvents) { + o.timeLine = append(o.timeLine, actualEvents[actualEventIndex]) + actualEventIndex++ + } else if actualEventIndex >= len(actualEvents) { + o.timeLine = append(o.timeLine, expectedEvents[expectedEventIndex]) + expectedEventIndex++ + } else if expectedEvents[expectedEventIndex].Time.Before(actualEvents[actualEventIndex].Time) { + o.timeLine = append(o.timeLine, expectedEvents[expectedEventIndex]) + expectedEventIndex++ + } else { + o.timeLine = append(o.timeLine, actualEvents[actualEventIndex]) + actualEventIndex++ + } + } +} + // WriteReport writes OCR Soak test report to logs func (o *OCRSoakTestReporter) WriteReport(folderLocation string) error { log.Debug().Msg("Writing OCR Soak Test Report") - var reportGroup sync.WaitGroup - for _, report := range o.ContractReports { - reportGroup.Add(1) - go func(report *OCRSoakTestReport) { - defer reportGroup.Done() - if report.ProcessOCRReport() { - o.AnomaliesDetected = true - } - }(report) - } - reportGroup.Wait() - log.Debug().Int("Count", len(o.ContractReports)).Msg("Processed OCR Soak Test Reports") return o.writeCSV(folderLocation) } @@ -96,365 +105,25 @@ func (o *OCRSoakTestReporter) writeCSV(folderLocation string) error { ocrReportWriter := csv.NewWriter(ocrReportFile) err = ocrReportWriter.Write([]string{ - "Contract Address", - "Total Rounds Processed", - "Average Round Time", - "Longest Round Time", - "Shortest Round Time", - "Average Round Blocks", - "Longest Round Blocks", - "Shortest Round Blocks", + "Time", + "Message", }) if err != nil { return err } - for contractAddress, report := range o.ContractReports { + + for _, event := range o.timeLine { err = ocrReportWriter.Write([]string{ - contractAddress, - fmt.Sprint(report.totalRounds), - report.averageRoundTime.Truncate(time.Second).String(), - report.longestRoundTime.Truncate(time.Second).String(), - report.shortestRoundTime.Truncate(time.Second).String(), - fmt.Sprint(report.averageRoundBlocks), - fmt.Sprint(report.longestRoundBlocks), - fmt.Sprint(report.shortestRoundBlocks), + event.Time.String(), + event.Message, }) if err != nil { return err } } - err = ocrReportWriter.Write([]string{}) - if err != nil { - return err - } - - // Anomalous reports - err = ocrReportWriter.Write([]string{"Updates With Anomalies"}) - if err != nil { - return err - } - - for _, report := range o.ContractReports { - if len(report.AnomalousAnswerIndexes) > 0 { - err = ocrReportWriter.Write(report.csvHeaders()) - if err != nil { - return err - } - } - for _, values := range report.anomalousCSVValues() { - err = ocrReportWriter.Write(values) - if err != nil { - return err - } - } - } - - err = ocrReportWriter.Write([]string{}) - if err != nil { - return err - } - - // All reports - err = ocrReportWriter.Write([]string{"All Updated Answers"}) - if err != nil { - return err - } - - for _, report := range o.ContractReports { - if len(report.UpdatedAnswers) > 0 { - err = ocrReportWriter.Write(report.csvHeaders()) - if err != nil { - return err - } - } - for _, values := range report.allCSVValues() { - err = ocrReportWriter.Write(values) - if err != nil { - return err - } - } - } - ocrReportWriter.Flush() log.Info().Str("Location", reportLocation).Msg("Wrote CSV file") return nil } - -// OCRSoakTestReport holds all answered rounds and summary data for an OCR contract -type OCRSoakTestReport struct { - ContractAddress string - UpdatedAnswers []*OCRAnswerUpdated - AnomalousAnswerIndexes []int - ExpectedRoundDuration time.Duration - - newRoundExpected bool - newRoundExpectedId uint64 - newRoundExpectedAnswer int - newRoundStartTime time.Time - newRoundStartBlock uint64 - - totalRounds uint64 - longestRoundTime time.Duration - shortestRoundTime time.Duration - averageRoundTime time.Duration - longestRoundBlocks uint64 - shortestRoundBlocks uint64 - averageRoundBlocks uint64 -} - -// NewOCRSoakTestReport initializes a new soak test report for a new tests -func NewOCRSoakTestReport(contractAddress string, startingAnswer int, expectedRoundDuration time.Duration) *OCRSoakTestReport { - return &OCRSoakTestReport{ - ContractAddress: contractAddress, - UpdatedAnswers: make([]*OCRAnswerUpdated, 0), - AnomalousAnswerIndexes: make([]int, 0), - ExpectedRoundDuration: expectedRoundDuration, - newRoundExpected: true, - newRoundExpectedId: 1, - newRoundExpectedAnswer: startingAnswer, - } -} - -// ProcessOCRReport summarizes all data collected from OCR rounds, and returns if there are any anomalies detected -func (o *OCRSoakTestReport) ProcessOCRReport() bool { - log.Debug().Str("OCR Address", o.ContractAddress).Msg("Processing OCR Soak Report") - o.AnomalousAnswerIndexes = make([]int, 0) - - var ( - totalRoundBlocks uint64 - totalRoundTime time.Duration - ) - - o.longestRoundTime = 0 - o.shortestRoundTime = math.MaxInt64 - o.longestRoundBlocks = 0 - o.shortestRoundBlocks = math.MaxUint64 - for index, updatedAnswer := range o.UpdatedAnswers { - if updatedAnswer.ProcessAnomalies(o.ExpectedRoundDuration) { - o.AnomalousAnswerIndexes = append(o.AnomalousAnswerIndexes, index) - } - if !updatedAnswer.Anomalous { // Anomalous answers can have outlier values that throw averages off - o.totalRounds++ - updatedAnswer.RoundDuration = updatedAnswer.UpdatedTime.Sub(updatedAnswer.StartingTime) - updatedAnswer.BlockDuration = updatedAnswer.UpdatedBlockNum - updatedAnswer.StartingBlockNum - totalRoundTime += updatedAnswer.RoundDuration - totalRoundBlocks += updatedAnswer.BlockDuration - if o.longestRoundTime < updatedAnswer.RoundDuration { - o.longestRoundTime = updatedAnswer.RoundDuration - } - if o.shortestRoundTime > updatedAnswer.RoundDuration { - o.shortestRoundTime = updatedAnswer.RoundDuration - } - if o.longestRoundBlocks < updatedAnswer.BlockDuration { - o.longestRoundBlocks = updatedAnswer.BlockDuration - } - if o.shortestRoundBlocks > updatedAnswer.BlockDuration { - o.shortestRoundBlocks = updatedAnswer.BlockDuration - } - } - } - if o.totalRounds > 0 { - o.averageRoundBlocks = totalRoundBlocks / o.totalRounds - o.averageRoundTime = totalRoundTime / time.Duration(o.totalRounds) - } - - return len(o.AnomalousAnswerIndexes) > 0 -} - -// NewAnswerUpdated records a new round, updating expectations for the next one if necessary. Returns true if the answer -// comes in as fully expected. -func (o *OCRSoakTestReport) NewAnswerUpdated(newAnswer *OCRAnswerUpdated) bool { - fullyExpected := newAnswer.UpdatedRoundId == o.newRoundExpectedId && - o.newRoundExpected && - o.newRoundExpectedAnswer == newAnswer.UpdatedAnswer - newAnswer.ContractAddress = o.ContractAddress - newAnswer.ExpectedAnswer = o.newRoundExpectedAnswer - newAnswer.ExpectedUpdate = o.newRoundExpected - newAnswer.ExpectedRoundId = o.newRoundExpectedId - if newAnswer.UpdatedRoundId >= o.newRoundExpectedId { - o.newRoundExpectedId = newAnswer.UpdatedRoundId + 1 - } - - if fullyExpected { // Expected round came in correctly - newAnswer.StartingBlockNum = o.newRoundStartBlock - newAnswer.StartingTime = o.newRoundStartTime - o.newRoundExpected = false - } - - o.UpdatedAnswers = append(o.UpdatedAnswers, newAnswer) - log.Info(). - Uint64("Updated Round ID", newAnswer.UpdatedRoundId). - Uint64("Expected Round ID", newAnswer.ExpectedRoundId). - Int("Updated Answer", newAnswer.UpdatedAnswer). - Int("Expected Answer", newAnswer.ExpectedAnswer). - Str("Address", o.ContractAddress). - Uint64("Block Number", newAnswer.UpdatedBlockNum). - Str("Block Hash", newAnswer.UpdatedBlockHash). - Str("Event Tx Hash", newAnswer.RoundTxHash). - Msg("Answer Updated") - return fullyExpected -} - -// NewAnswerExpected indicates that we're expecting a new answer on an OCR contract -func (o *OCRSoakTestReport) NewAnswerExpected(answer int, startingBlock uint64) { - o.newRoundExpected = true - o.newRoundExpectedAnswer = answer - o.newRoundStartTime = time.Now() - o.newRoundStartBlock = startingBlock - log.Debug(). - Str("Address", o.ContractAddress). - Uint64("Expected Round ID", o.newRoundExpectedId). - Int("Expected Answer", o.newRoundExpectedAnswer). - Msg("Expecting a New OCR Round") -} - -func (o *OCRSoakTestReport) csvHeaders() []string { - return []string{ - "Contract Address", - "Update Expected?", - "Expected Round ID", - "Event Round ID", - "On-Chain Round ID", - "Round Start Time", - "Round End Time", - "Round Duration", - "Round Triggered Block Number", - "Round Updated Block Number", - "Round Block Duration", - "Round Updated Block Hash", - "Round Tx Hash", - "Expected Answer", - "Event Answer", - "On-Chain Answer", - "Anomalous?", - "Anomalies", - } -} - -// returns CSV formatted values for all updated answers -func (o *OCRSoakTestReport) allCSVValues() [][]string { - csvValues := [][]string{} - for _, updatedAnswer := range o.UpdatedAnswers { - csvValues = append(csvValues, updatedAnswer.toCSV()) - } - return csvValues -} - -// returns all CSV formatted values of anomalous answers -func (o *OCRSoakTestReport) anomalousCSVValues() [][]string { - csvValues := [][]string{} - for _, anomalousIndex := range o.AnomalousAnswerIndexes { - updatedAnswer := o.UpdatedAnswers[anomalousIndex] - csvValues = append(csvValues, updatedAnswer.toCSV()) - } - return csvValues -} - -// OCRAnswerUpdated records details of an OCRAnswerUpdated event and compares them against expectations -type OCRAnswerUpdated struct { - // metadata - ContractAddress string - ExpectedUpdate bool - StartingBlockNum uint64 - UpdatedBlockHash string - RoundTxHash string - BlockDuration uint64 - StartingTime time.Time - RoundDuration time.Duration - - // round data - ExpectedRoundId uint64 - ExpectedAnswer int - - UpdatedRoundId uint64 - UpdatedBlockNum uint64 - UpdatedTime time.Time - UpdatedAnswer int - - OnChainRoundId uint64 - OnChainAnswer int - - Anomalous bool - Anomalies []string -} - -// ProcessAnomalies checks received data against expected data of the updated answer, returning if anything mismatches -func (o *OCRAnswerUpdated) ProcessAnomalies(expectedRoundDuration time.Duration) bool { - if o.UpdatedRoundId == 0 && o.OnChainRoundId == 0 { - o.Anomalous = true - o.Anomalies = []string{fmt.Sprintf("Test likely ended before the round could be confirmed. Check for round %d on chain", o.ExpectedRoundId)} - return o.Anomalous - } - - var isAnomaly bool - anomalies := []string{} - if !o.ExpectedUpdate { - isAnomaly = true - anomalies = append(anomalies, "Unexpected new round, possible double transmission") - } - - if o.ExpectedRoundId != o.UpdatedRoundId || o.ExpectedRoundId != o.OnChainRoundId { - isAnomaly = true - anomalies = append(anomalies, "RoundID mismatch, possible double transmission") - } - if o.ExpectedAnswer != o.UpdatedAnswer || o.ExpectedAnswer != o.OnChainAnswer { - isAnomaly = true - anomalies = append(anomalies, "! ANSWER MISMATCH !") - } - if o.RoundDuration > expectedRoundDuration { - isAnomaly = true - anomalies = append(anomalies, fmt.Sprintf( - "Round took %s to complete, longer than expected time of %s", o.RoundDuration, expectedRoundDuration.String()), - ) - } - o.Anomalous, o.Anomalies = isAnomaly, anomalies - return isAnomaly -} - -func (o *OCRAnswerUpdated) toCSV() []string { - var ( // Values that could be affected by anomalies - startTime string - roundTimeDuration string - startingBlock string - roundBlockDuration string - ) - - if o.StartingTime.IsZero() { - startTime = "Unknown" - roundTimeDuration = "Unknown" - } else { - startTime = o.StartingTime.Truncate(time.Second).String() - roundTimeDuration = o.UpdatedTime.Sub(o.StartingTime).Truncate(time.Second).String() - } - - if o.StartingBlockNum == 0 { - startingBlock = "Unknown" - roundBlockDuration = "Unknown" - } else { - startingBlock = fmt.Sprint(o.StartingBlockNum) - roundBlockDuration = fmt.Sprint(o.UpdatedBlockNum - o.StartingBlockNum) - } - - return []string{ - o.ContractAddress, - fmt.Sprint(o.ExpectedUpdate), - fmt.Sprint(o.ExpectedRoundId), - fmt.Sprint(o.UpdatedRoundId), - fmt.Sprint(o.OnChainRoundId), - startTime, - o.UpdatedTime.Truncate(time.Second).String(), - roundTimeDuration, - startingBlock, - fmt.Sprint(o.UpdatedBlockNum), - roundBlockDuration, - o.UpdatedBlockHash, - o.RoundTxHash, - fmt.Sprint(o.ExpectedAnswer), - fmt.Sprint(o.UpdatedAnswer), - fmt.Sprint(o.OnChainAnswer), - fmt.Sprint(o.Anomalous), - strings.Join(o.Anomalies, " | "), - } -} diff --git a/integration-tests/testsetups/ocr.go b/integration-tests/testsetups/ocr.go index 3c2b1edea77..474a5ff1faf 100644 --- a/integration-tests/testsetups/ocr.go +++ b/integration-tests/testsetups/ocr.go @@ -3,15 +3,16 @@ package testsetups import ( "context" + "fmt" "math/big" - "math/rand" "testing" "time" geth "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-env/environment" @@ -29,8 +30,9 @@ import ( // OCRSoakTest defines a typical OCR soak test type OCRSoakTest struct { - Inputs *OCRSoakTestInputs - TestReporter testreporters.OCRSoakTestReporter + Inputs *OCRSoakTestInputs + TestReporter testreporters.OCRSoakTestReporter + OperatorForwarderFlow bool testEnvironment *environment.Environment bootstrapNode *client.Chainlink @@ -38,10 +40,14 @@ type OCRSoakTest struct { chainClient blockchain.EVMClient mockServer *ctfClient.MockserverClient mockPath string + filterQuery geth.FilterQuery - ocrInstances []contracts.OffchainAggregator - ocrInstanceMap map[string]contracts.OffchainAggregator // address : instance - OperatorForwarderFlow bool + expectedEvents []*testreporters.OCRTestState + actualEvents []*testreporters.OCRTestState + combinedEvents [][]string + + ocrInstances []contracts.OffchainAggregator + ocrInstanceMap map[string]contracts.OffchainAggregator // address : instance } // OCRSoakTestInputs define required inputs to run an OCR soak test @@ -64,9 +70,10 @@ func NewOCRSoakTest(inputs *OCRSoakTestInputs) *OCRSoakTest { return &OCRSoakTest{ Inputs: inputs, TestReporter: testreporters.OCRSoakTestReporter{ - ContractReports: make(map[string]*testreporters.OCRSoakTestReport), ExpectedRoundDuration: inputs.ExpectedRoundTime, }, + expectedEvents: make([]*testreporters.OCRTestState, 0), + actualEvents: make([]*testreporters.OCRTestState, 0), mockPath: "ocr", ocrInstanceMap: make(map[string]contracts.OffchainAggregator), } @@ -139,11 +146,6 @@ func (o *OCRSoakTest) Setup(t *testing.T, env *environment.Environment) { require.NoError(t, err, "Error waiting for OCR contracts to be deployed") for _, ocrInstance := range o.ocrInstances { o.ocrInstanceMap[ocrInstance.Address()] = ocrInstance - o.TestReporter.ContractReports[ocrInstance.Address()] = testreporters.NewOCRSoakTestReport( - ocrInstance.Address(), - o.Inputs.StartingAdapterValue, - o.Inputs.ExpectedRoundTime, - ) } l.Info().Msg("OCR Soak Test Setup Complete") } @@ -151,6 +153,24 @@ func (o *OCRSoakTest) Setup(t *testing.T, env *environment.Environment) { // Run starts the OCR soak test func (o *OCRSoakTest) Run(t *testing.T) { l := utils.GetTestLogger(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + latestBlockNum, err := o.chainClient.LatestBlockNumber(ctx) + cancel() + require.NoError(t, err, "Error getting current block number") + + ocrAddresses := make([]common.Address, len(o.ocrInstances)) + for i, ocrInstance := range o.ocrInstances { + ocrAddresses[i] = common.HexToAddress(ocrInstance.Address()) + } + contractABI, err := offchainaggregator.OffchainAggregatorMetaData.GetAbi() + require.NoError(t, err, "Error retrieving OCR contract ABI") + o.filterQuery = geth.FilterQuery{ + Addresses: ocrAddresses, + Topics: [][]common.Hash{{contractABI.Events["AnswerUpdated"].ID}}, + FromBlock: big.NewInt(0).SetUint64(latestBlockNum), + } + if o.OperatorForwarderFlow { actions.CreateOCRJobsWithForwarder(t, o.ocrInstances, o.bootstrapNode, o.workerNodes, 5, o.mockServer) } else { @@ -164,47 +184,47 @@ func (o *OCRSoakTest) Run(t *testing.T) { Int("Number of OCR Contracts", len(o.ocrInstances)). Msg("Starting OCR Soak Test") - testDuration := time.NewTimer(o.Inputs.TestDuration) + testDuration := time.After(o.Inputs.TestDuration) // ********************* // ***** Test Loop ***** // ********************* lastAdapterValue, currentAdapterValue := o.Inputs.StartingAdapterValue, o.Inputs.StartingAdapterValue*25 - newRoundTrigger, expiredRoundTrigger := time.NewTimer(0), time.NewTimer(o.Inputs.RoundTimeout) - answerUpdated := make(chan *offchainaggregator.OffchainAggregatorAnswerUpdated) - o.subscribeOCREvents(t, answerUpdated) - remainingExpectedAnswers := len(o.ocrInstances) - testOver := false + newRoundTrigger := time.NewTimer(0) + defer newRoundTrigger.Stop() + err = o.subscribeOCREvents(l) + require.NoError(t, err, "Error subscribing to OCR events") + +testLoop: for { select { - case <-testDuration.C: - testOver = true - l.Warn().Msg("Soak Test Duration Reached. Completing Final Round") - case answer := <-answerUpdated: - if o.processNewAnswer(t, answer) { - remainingExpectedAnswers-- - } - if remainingExpectedAnswers <= 0 { - if testOver { - l.Info().Msg("Soak Test Complete") - return - } - l.Info(). - Str("Wait time", o.Inputs.TimeBetweenRounds.String()). - Msg("All Expected Answers Reported. Waiting to Start a New Round") - remainingExpectedAnswers = len(o.ocrInstances) - newRoundTrigger, expiredRoundTrigger = time.NewTimer(o.Inputs.TimeBetweenRounds), time.NewTimer(o.Inputs.RoundTimeout) - } + case <-testDuration: + break testLoop case <-newRoundTrigger.C: lastAdapterValue, currentAdapterValue = currentAdapterValue, lastAdapterValue o.triggerNewRound(t, currentAdapterValue) - case <-expiredRoundTrigger.C: - l.Warn().Msg("OCR round timed out") - expiredRoundTrigger = time.NewTimer(o.Inputs.RoundTimeout) - remainingExpectedAnswers = len(o.ocrInstances) - o.triggerNewRound(t, rand.Intn(o.Inputs.StartingAdapterValue*25-1-o.Inputs.StartingAdapterValue)+o.Inputs.StartingAdapterValue) // #nosec G404 | Just triggering a random number + newRoundTrigger.Reset(o.Inputs.TimeBetweenRounds) + case t := <-o.chainClient.ConnectionIssue(): + o.expectedEvents = append(o.expectedEvents, &testreporters.OCRTestState{ + Time: t, + Message: "Lost Connection", + }) + case t := <-o.chainClient.ConnectionRestored(): + o.expectedEvents = append(o.expectedEvents, &testreporters.OCRTestState{ + Time: t, + Message: "Reconnected", + }) } } + + l.Info().Msg("Test Complete, collecting on-chain events to be collected") + // Keep trying to collect events until we get them, no + timeout := time.Second * 5 + err = o.collectEvents(l, timeout) + for err != nil { + timeout *= 2 + err = o.collectEvents(l, timeout) + } } // Networks returns the networks that the test is running on @@ -222,96 +242,97 @@ func (o *OCRSoakTest) TearDownVals(t *testing.T) ( // ****** Helpers ****** // ********************* -func (o *OCRSoakTest) processNewEvent( - t *testing.T, - eventSub geth.Subscription, - answerUpdated chan *offchainaggregator.OffchainAggregatorAnswerUpdated, - event *types.Log, - eventDetails *abi.Event, - ocrInstance contracts.OffchainAggregator, - contractABI *abi.ABI, -) { - l := utils.GetTestLogger(t) - errorChan := make(chan error) - eventConfirmed := make(chan bool) - err := o.chainClient.ProcessEvent(eventDetails.Name, event, eventConfirmed, errorChan) +// subscribeOCREvents subscribes to OCR events and logs them to the test logger +func (o *OCRSoakTest) subscribeOCREvents(logger zerolog.Logger) error { + eventLogs := make(chan types.Log) + errorChan := make(chan error, 1) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + eventSub, err := o.chainClient.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs) if err != nil { - l.Error().Err(err).Str("Hash", event.TxHash.Hex()).Str("Event", eventDetails.Name).Msg("Error trying to process event") - return + errorChan <- err } - l.Debug(). - Str("Event", eventDetails.Name). - Str("Address", event.Address.Hex()). - Str("Hash", event.TxHash.Hex()). - Msg("Attempting to Confirm Event") - for { - select { - case err := <-errorChan: - l.Error().Err(err).Msg("Error while confirming event") - return - case confirmed := <-eventConfirmed: - if confirmed { - if eventDetails.Name == "AnswerUpdated" { // Send AnswerUpdated events to answerUpdated channel to handle in main loop - answer, err := ocrInstance.ParseEventAnswerUpdated(*event) - require.NoError(t, err, "Parsing AnswerUpdated event log in OCR instance shouldn't fail") - answerUpdated <- answer + + go func() { + defer cancel() + for { + select { + case event := <-eventLogs: + logger.Info(). + Str("Address", event.Address.Hex()). + Str("Event", "AnswerUpdated"). + Uint64("Block Number", event.BlockNumber). + Msg("Found Event") + case err = <-eventSub.Err(): + errorChan <- err + case err = <-errorChan: + logger.Warn(). + Err(err). + Interface("Query", o.filterQuery). + Msg("Error while subscribed to OCR Logs. Resubscribing") + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + eventSub, err = o.chainClient.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs) + if err != nil { // We failed subscription, come on back and try again + errorChan <- err } - l.Info(). - Str("Contract", event.Address.Hex()). - Str("Event Name", eventDetails.Name). - Uint64("Header Number", event.BlockNumber). - Msg("Contract Event Published") } - return } - } -} + }() -// marshalls new answer events into manageable Go struct for further processing and reporting -func (o *OCRSoakTest) processNewAnswer(t *testing.T, newAnswer *offchainaggregator.OffchainAggregatorAnswerUpdated) bool { - l := utils.GetTestLogger(t) - // Updated Info - answerAddress := newAnswer.Raw.Address.Hex() - _, tracked := o.TestReporter.ContractReports[answerAddress] - if !tracked { - l.Error().Str("Untracked Address", answerAddress).Msg("Received AnswerUpdated event on an untracked OCR instance") - return false - } - processedAnswer := &testreporters.OCRAnswerUpdated{} - processedAnswer.ContractAddress = newAnswer.Raw.Address.Hex() - processedAnswer.UpdatedTime = time.Unix(newAnswer.UpdatedAt.Int64(), 0) - processedAnswer.UpdatedRoundId = newAnswer.RoundId.Uint64() - processedAnswer.UpdatedBlockNum = newAnswer.Raw.BlockNumber - processedAnswer.UpdatedAnswer = int(newAnswer.Current.Int64()) - processedAnswer.UpdatedBlockHash = newAnswer.Raw.BlockHash.Hex() - processedAnswer.RoundTxHash = newAnswer.Raw.TxHash.Hex() - - // On-Chain Info - updatedOCRInstance := o.ocrInstanceMap[answerAddress] - onChainData, err := updatedOCRInstance.GetRound(context.Background(), newAnswer.RoundId) - require.NoError(t, err, "Error retrieving on-chain data for '%s' at round '%d'", answerAddress, processedAnswer.UpdatedRoundId) - processedAnswer.OnChainAnswer = int(onChainData.Answer.Int64()) - processedAnswer.OnChainRoundId = onChainData.RoundId.Uint64() - - return o.TestReporter.ContractReports[answerAddress].NewAnswerUpdated(processedAnswer) + return nil } // triggers a new OCR round by setting a new mock adapter value func (o *OCRSoakTest) triggerNewRound(t *testing.T, currentAdapterValue int) { l := utils.GetTestLogger(t) - startingBlockNum, err := o.chainClient.LatestBlockNumber(context.Background()) - require.NoError(t, err, "Error retrieving latest block number") - for _, report := range o.TestReporter.ContractReports { - report.NewAnswerExpected(currentAdapterValue, startingBlockNum) - } - err = actions.SetAllAdapterResponsesToTheSameValue(currentAdapterValue, o.ocrInstances, o.workerNodes, o.mockServer) + err := actions.SetAllAdapterResponsesToTheSameValue(currentAdapterValue, o.ocrInstances, o.workerNodes, o.mockServer) require.NoError(t, err, "Error setting adapter responses") + o.expectedEvents = append(o.expectedEvents, &testreporters.OCRTestState{ + Time: time.Now(), + Message: fmt.Sprintf("New Round Started, Adapter Value: %d", currentAdapterValue), + }) l.Info(). Int("Value", currentAdapterValue). Msg("Starting a New OCR Round") } +func (o *OCRSoakTest) collectEvents(logger zerolog.Logger, timeout time.Duration) error { + start := time.Now() + logger.Info().Msg("Collecting on-chain events") + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + contractEvents, err := o.chainClient.FilterLogs(ctx, o.filterQuery) + if err != nil { + log.Error(). + Err(err). + Str("Time", time.Since(start).String()). + Msg("Error collecting on-chain events") + return err + } + + for _, event := range contractEvents { + if event.Removed { + continue + } + answerUpdated, err := o.ocrInstances[0].ParseEventAnswerUpdated(event) + if err != nil { + log.Error(). + Err(err). + Str("Time", time.Since(start).String()). + Msg("Error collecting on-chain events") + return err + } + o.actualEvents = append(o.actualEvents, &testreporters.OCRTestState{ + Time: time.Unix(answerUpdated.UpdatedAt.Int64(), 0), + Message: fmt.Sprintf("%s Round: %d Answer: %d", event.Address.Hex(), answerUpdated.RoundId.Uint64(), answerUpdated.Current.Int64()), + }) + } + logger.Info(). + Str("Time", time.Since(start).String()). + Msg("Collected on-chain events") + return nil +} + // ensureValues ensures that all values needed to run the test are present func (o *OCRSoakTest) ensureInputValues(t *testing.T) { inputs := o.Inputs @@ -326,46 +347,3 @@ func (o *OCRSoakTest) ensureInputValues(t *testing.T) { require.NotNil(t, inputs.TimeBetweenRounds, "Expected TimeBetweenRounds to be set") require.Less(t, inputs.TimeBetweenRounds, time.Hour, "TimeBetweenRounds must be less than 1 hour") } - -// subscribeToAnswerUpdatedEvent subscribes to the event log for AnswerUpdated event and -// verifies if the answer is matching with the expected value -func (o *OCRSoakTest) subscribeOCREvents( - t *testing.T, - answerUpdated chan *offchainaggregator.OffchainAggregatorAnswerUpdated, -) { - l := utils.GetTestLogger(t) - contractABI, err := offchainaggregator.OffchainAggregatorMetaData.GetAbi() - require.NoError(t, err, "Getting contract abi for OCR shouldn't fail") - latestBlockNum, err := o.chainClient.LatestBlockNumber(context.Background()) - require.NoError(t, err, "Subscribing to contract event log for OCR instance shouldn't fail") - query := geth.FilterQuery{ - FromBlock: big.NewInt(0).SetUint64(latestBlockNum), - Addresses: []common.Address{}, - } - for i := 0; i < len(o.ocrInstances); i++ { - query.Addresses = append(query.Addresses, common.HexToAddress(o.ocrInstances[i].Address())) - } - eventLogs := make(chan types.Log) - sub, err := o.chainClient.SubscribeFilterLogs(context.Background(), query, eventLogs) - require.NoError(t, err, "Subscribing to contract event log for OCR instance shouldn't fail") - - go func() { - defer sub.Unsubscribe() - - for { - select { - case err := <-sub.Err(): - l.Error().Err(err).Msg("Error while watching for new contract events. Retrying Subscription") - sub.Unsubscribe() - - sub, err = o.chainClient.SubscribeFilterLogs(context.Background(), query, eventLogs) - require.NoError(t, err, "Subscribing to contract event log for OCR instance shouldn't fail") - case vLog := <-eventLogs: - eventDetails, err := contractABI.EventByID(vLog.Topics[0]) - require.NoError(t, err, "Getting event details for OCR instances shouldn't fail") - - go o.processNewEvent(t, sub, answerUpdated, &vLog, eventDetails, o.ocrInstances[0], contractABI) - } - } - }() -}