Skip to content

Commit

Permalink
Mercury feeds starts at the first block number not at zero (#9664)
Browse files Browse the repository at this point in the history
* Mercury feeds starts at the first block number not at zero

* Update integration test

* chainlink-relay => bbcb3a9

* Update core/services/relay/evm/mercury/data_source.go

Co-authored-by: Sergei Drugalev <sergei.drugalev@smartcontract.com>

* Fix lint

* Fix test

---------

Co-authored-by: Sergei Drugalev <sergei.drugalev@smartcontract.com>
  • Loading branch information
2 people authored and FelixFan1992 committed Jul 6, 2023
1 parent 3b89b85 commit a328b09
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 86 deletions.
7 changes: 7 additions & 0 deletions core/null/int64.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,10 @@ func (i *Int64) Scan(value interface{}) error {
}
return nil
}

func (i Int64) Ptr() *int64 {
if i.Valid {
return &i.Int64
}
return nil
}
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230530082629-53a5a4bdb25e // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230525203711-20bed74ac906 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230615205306-31dec2180682 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.0-20230601080524-3d8186742482 // indirect
github.com/smartcontractkit/wsrpc v0.7.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1388,8 +1388,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230530082629-53a5a4bdb25e h1:XY8DncHI
github.com/smartcontractkit/caigo v0.0.0-20230530082629-53a5a4bdb25e/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230525203711-20bed74ac906 h1:u7Lw7oqLEjADlJPJQnzlCLNSbj038QttaKY0lCa3V78=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230525203711-20bed74ac906/go.mod h1:MH+MRJaG4SZAbRq5g7//AFY9H9sg5+lLDQnm85aHP6A=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230615205306-31dec2180682 h1:64rRfZdrojeqd30s4nJbE2bfWavQlJxlLRc+4L4IEsc=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230615205306-31dec2180682/go.mod h1:MfZBUifutkv3aK7abyw5YmTJbqt8iFwcQDFikrxC/uI=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3 h1:rlNWHk15A2im/e9U95q4AkHZk5Wbc77lpx6ys4kUyCE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3/go.mod h1:MfZBUifutkv3aK7abyw5YmTJbqt8iFwcQDFikrxC/uI=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592 h1:3Ul/LkULxrolCVguHUFnWJamgUDsSGISlm/DzclstmE=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592/go.mod h1:km46XAo6xebV4Q+WyRFfo3E2t80YqTkegJM4FEfo5/Y=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.0-20230601080524-3d8186742482 h1:ZblU/X27pIHAZ7c/37TZf7ykZ4jkfVUmvIcchyO2rnw=
Expand Down
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/mercury/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/null"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand All @@ -21,7 +22,7 @@ type PluginConfig struct {
// the first ever report in the case of a brand new feed, where the mercury
// server does not have any previous reports. For a brand new feed, this
// effectively sets the "first" validFromBlockNumber.
InitialBlockNumber int64 `json:"initialBlockNumber" toml:"initialBlockNumber"`
InitialBlockNumber null.Int64 `json:"initialBlockNumber" toml:"initialBlockNumber"`
}

func ValidatePluginConfig(config PluginConfig) (merr error) {
Expand Down
13 changes: 2 additions & 11 deletions core/services/ocr2/plugins/mercury/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func TestIntegration_Mercury(t *testing.T) {
const n = 4 // number of nodes
const fromBlock = 1 // cannot use zero, start from block 1
const multiplier = 100000000
initialBlockNumber := int64(rand.Int31n(10))
testStartTimeStamp := uint32(time.Now().Unix())

// test vars
Expand Down Expand Up @@ -120,11 +119,7 @@ func TestIntegration_Mercury(t *testing.T) {
steve := testutils.MustNewSimTransactor(t) // config contract deployer and owner
genesisData := core.GenesisAlloc{steve.From: {Balance: assets.Ether(1000).ToInt()}}
backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil))
backend.Commit() // ensure starting block number at least 1
// Ensure initialBlockNumber is at or below current block number
for i := 1; i < int(initialBlockNumber); i++ {
backend.Commit()
}
backend.Commit() // ensure starting block number at least 1
stopMining := cltest.Mine(backend, 1*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain
t.Cleanup(stopMining)

Expand Down Expand Up @@ -227,7 +222,6 @@ func TestIntegration_Mercury(t *testing.T) {
feed.id,
chainID,
fromBlock,
initialBlockNumber,
)
}
}
Expand Down Expand Up @@ -342,7 +336,7 @@ func TestIntegration_Mercury(t *testing.T) {
assert.GreaterOrEqual(t, currentBlock.Time(), reportElems["currentBlockTimestamp"].(uint64))
assert.NotEqual(t, common.Hash{}, common.Hash(reportElems["currentBlockHash"].([32]uint8)))
assert.LessOrEqual(t, int(reportElems["validFromBlockNum"].(uint64)), int(reportElems["currentBlockNum"].(uint64)))
assert.LessOrEqual(t, initialBlockNumber, int64(reportElems["validFromBlockNum"].(uint64)))
assert.Less(t, int64(0), int64(reportElems["validFromBlockNum"].(uint64)))

t.Logf("oracle %x reported for feed %s (0x%x)", req.pk, feed.name, feed.id)

Expand Down Expand Up @@ -606,7 +600,6 @@ func addMercuryJob(
feedID [32]byte,
chainID *big.Int,
fromBlock int,
initialBlockNumber int64,
) {
node.AddJob(t, fmt.Sprintf(`
type = "offchainreporting2"
Expand Down Expand Up @@ -650,7 +643,6 @@ observationSource = """
[pluginConfig]
serverURL = "%[8]s"
serverPubKey = "%[9]x"
initialBlockNumber = %[15]d
[relayConfig]
chainID = %[12]d
Expand All @@ -670,6 +662,5 @@ fromBlock = %[13]d
chainID,
fromBlock,
feedName,
initialBlockNumber,
))
}
1 change: 1 addition & 0 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewServices(
chEnhancedTelem,
chainHeadTracker,
ocr2Provider.ContractTransmitter(),
pluginConfig.InitialBlockNumber.Ptr(),
)
argsNoPlugin.MercuryPluginFactory = relaymercury.NewFactory(
ds,
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *Relayer) NewMercuryProvider(rargs relaytypes.RelayArgs, pargs relaytype
if err != nil {
return nil, err
}
transmitter := mercury.NewTransmitter(r.lggr, configWatcher.ContractConfigTracker(), client, privKey.PublicKey, *relayConfig.FeedID, mercuryConfig.InitialBlockNumber)
transmitter := mercury.NewTransmitter(r.lggr, configWatcher.ContractConfigTracker(), client, privKey.PublicKey, *relayConfig.FeedID)

return NewMercuryProvider(configWatcher, transmitter, reportCodec, r.lggr), nil
}
Expand Down
47 changes: 41 additions & 6 deletions core/services/relay/evm/mercury/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type Runner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run pipeline.Run, trrs pipeline.TaskRunResults, err error)
}

type Fetcher interface {
// FetchInitialMaxFinalizedBlockNumber should fetch the initial max
// finalized block number from the mercury server.
FetchInitialMaxFinalizedBlockNumber(context.Context) (*int64, error)
}

type datasource struct {
pipelineRunner Runner
jb job.Job
Expand All @@ -42,15 +48,16 @@ type datasource struct {

mu sync.RWMutex

chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker ChainHeadTracker
fetcher relaymercury.Fetcher
chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker ChainHeadTracker
fetcher Fetcher
initialBlockNumber *int64
}

var _ relaymercury.DataSource = &datasource{}

func NewDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker ChainHeadTracker, fetcher relaymercury.Fetcher) *datasource {
return &datasource{pr, jb, spec, lggr, rr, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher}
func NewDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64) *datasource {
return &datasource{pr, jb, spec, lggr, rr, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber}
}

func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercury.Observation, err error) {
Expand All @@ -63,7 +70,35 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
wg.Add(1)
go func() {
defer wg.Done()
obs.MaxFinalizedBlockNumber.Val, obs.MaxFinalizedBlockNumber.Err = ds.fetcher.FetchInitialMaxFinalizedBlockNumber(ctx)
val, fetchErr := ds.fetcher.FetchInitialMaxFinalizedBlockNumber(ctx)
if fetchErr != nil {
obs.MaxFinalizedBlockNumber.Err = fetchErr
return
}
if val != nil {
obs.MaxFinalizedBlockNumber.Val = *val
return
}
if ds.initialBlockNumber == nil {
if obs.CurrentBlockNum.Err != nil {
obs.MaxFinalizedBlockNumber.Err = fmt.Errorf("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: %w", obs.CurrentBlockNum.Err)
} else {
// Subract 1 here because we will later add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be the same as current block num.
obs.MaxFinalizedBlockNumber.Val = obs.CurrentBlockNum.Val - 1
ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber unset, using currentBlockNum=%d-1)", obs.MaxFinalizedBlockNumber.Val, obs.CurrentBlockNum.Val)
}
} else {
// NOTE: It's important to subtract 1 if the server is missing any past
// report (brand new feed) since we will add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be zero.
//
// If "initialBlockNumber" is set to zero, this will give a starting block of zero.
obs.MaxFinalizedBlockNumber.Val = *ds.initialBlockNumber - 1
ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber=%d)", obs.MaxFinalizedBlockNumber.Val, *ds.initialBlockNumber)
}
}()
} else {
obs.MaxFinalizedBlockNumber.Err = errors.New("fetchMaxFinalizedBlockNum=false")
Expand Down
49 changes: 46 additions & 3 deletions core/services/relay/evm/mercury/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
var _ relaymercury.Fetcher = &mockFetcher{}

type mockFetcher struct {
num int64
num *int64
err error
}

func (m *mockFetcher) FetchInitialMaxFinalizedBlockNumber(context.Context) (int64, error) {
func (m *mockFetcher) FetchInitialMaxFinalizedBlockNumber(context.Context) (*int64, error) {
return m.num, m.err
}

Expand Down Expand Up @@ -143,15 +143,58 @@ func TestMercury_Observe(t *testing.T) {
})
t.Run("if FetchInitialMaxFinalizedBlockNumber succeeds", func(t *testing.T) {
fetcher.err = nil
fetcher.num = 32
var num int64 = 32
fetcher.num = &num

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.NoError(t, obs.MaxFinalizedBlockNumber.Err)
assert.Equal(t, int64(32), obs.MaxFinalizedBlockNumber.Val)
})
t.Run("if FetchInitialMaxFinalizedBlockNumber returns nil (new feed) and initialBlockNumber is set", func(t *testing.T) {
var initialBlockNumber int64 = 50
ds.initialBlockNumber = &initialBlockNumber
fetcher.err = nil
fetcher.num = nil

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.NoError(t, obs.MaxFinalizedBlockNumber.Err)
assert.Equal(t, int64(49), obs.MaxFinalizedBlockNumber.Val)
})
t.Run("if FetchInitialMaxFinalizedBlockNumber returns nil (new feed) and initialBlockNumber is not set", func(t *testing.T) {
ds.initialBlockNumber = nil
t.Run("if current block num is valid", func(t *testing.T) {
fetcher.err = nil
fetcher.num = nil

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.NoError(t, obs.MaxFinalizedBlockNumber.Err)
assert.Equal(t, head.Number-1, obs.MaxFinalizedBlockNumber.Val)
})
t.Run("if current block num errored", func(t *testing.T) {
h2 := htmocks.NewHeadTracker(t)
h2.On("LatestChain").Return(nil)
ht.h = h2
c2 := evmtest.NewEthClientMock(t)
c2.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(nil, errors.New("head retrieval failed"))
ht.c = c2

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.EqualError(t, obs.MaxFinalizedBlockNumber.Err, "FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: head retrieval failed")
})
})
})

ht.h = h
ht.c = c

t.Run("when fetchMaxFinalizedBlockNum=false", func(t *testing.T) {
t.Run("when run execution fails, returns error", func(t *testing.T) {
t.Cleanup(func() {
Expand Down
34 changes: 12 additions & 22 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ var _ Transmitter = &mercuryTransmitter{}

type mercuryTransmitter struct {
utils.StartStopOnce
lggr logger.Logger
rpcClient wsrpc.Client
cfgTracker ConfigTracker
initialBlockNumber int64
lggr logger.Logger
rpcClient wsrpc.Client
cfgTracker ConfigTracker

feedID [32]byte
feedIDHex string
Expand Down Expand Up @@ -116,14 +115,13 @@ func getPayloadTypes() abi.Arguments {
})
}

func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, feedID [32]byte, initialBlockNumber int64) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, feedID [32]byte) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
return &mercuryTransmitter{
utils.StartStopOnce{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
rpcClient,
cfgTracker,
initialBlockNumber,
feedID,
feedIDHex,
fmt.Sprintf("%x", fromAccount),
Expand Down Expand Up @@ -291,39 +289,31 @@ func (mt *mercuryTransmitter) LatestConfigDigestAndEpoch(ctx context.Context) (c
panic("not needed for OCR3")
}

func (mt *mercuryTransmitter) FetchInitialMaxFinalizedBlockNumber(ctx context.Context) (int64, error) {
mt.lggr.Debug("FetchInitialMaxFinalizedBlockNumber")
func (mt *mercuryTransmitter) FetchInitialMaxFinalizedBlockNumber(ctx context.Context) (*int64, error) {
mt.lggr.Trace("FetchInitialMaxFinalizedBlockNumber")
req := &pb.LatestReportRequest{
FeedId: mt.feedID[:],
}
resp, err := mt.rpcClient.LatestReport(ctx, req)
if err != nil {
mt.lggr.Errorw("FetchInitialMaxFinalizedBlockNumber failed", "err", err)
return 0, pkgerrors.Wrap(err, "FetchInitialMaxFinalizedBlockNumber failed to fetch LatestReport")
return nil, pkgerrors.Wrap(err, "FetchInitialMaxFinalizedBlockNumber failed to fetch LatestReport")
}
if resp == nil {
return 0, errors.New("FetchInitialMaxFinalizedBlockNumber expected LatestReport to return non-nil response")
return nil, errors.New("FetchInitialMaxFinalizedBlockNumber expected LatestReport to return non-nil response")
}
if resp.Error != "" {
err = errors.New(resp.Error)
mt.lggr.Errorw("FetchInitialMaxFinalizedBlockNumber failed; mercury server returned error", "err", err)
return 0, err
return nil, err
}
if resp.Report == nil {
maxFinalizedBlockNumber := mt.initialBlockNumber - 1
mt.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber=%d)", maxFinalizedBlockNumber, mt.initialBlockNumber)
// NOTE: It's important to return -1 if the server is missing any past
// report (brand new feed) since we will add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be zero.
//
// If "initialBlockNumber" is unset, this will give a starting block of zero.
return maxFinalizedBlockNumber, nil
return nil, nil
} else if !bytes.Equal(resp.Report.FeedId, mt.feedID[:]) {
return 0, fmt.Errorf("FetchInitialMaxFinalizedBlockNumber failed; mismatched feed IDs, expected: 0x%x, got: 0x%x", mt.feedID, resp.Report.FeedId)
return nil, fmt.Errorf("FetchInitialMaxFinalizedBlockNumber failed; mismatched feed IDs, expected: 0x%x, got: 0x%x", mt.feedID, resp.Report.FeedId)
}

mt.lggr.Debugw("FetchInitialMaxFinalizedBlockNumber success", "currentBlockNum", resp.Report.CurrentBlockNumber)

return resp.Report.CurrentBlockNumber, nil
return &resp.Report.CurrentBlockNumber, nil
}
Loading

0 comments on commit a328b09

Please sign in to comment.