Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mercury feeds starts at the first block number not at zero #9664

7 changes: 7 additions & 0 deletions core/null/int64.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,10 @@ func (i *Int64) Scan(value interface{}) error {
}
return nil
}

func (i Int64) Ptr() *int64 {
if i.Valid {
return &i.Int64
}
return nil
}
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230530082629-53a5a4bdb25e // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230525203711-20bed74ac906 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230615205306-31dec2180682 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.0-20230601080524-3d8186742482 // indirect
github.com/smartcontractkit/wsrpc v0.7.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1388,8 +1388,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230530082629-53a5a4bdb25e h1:XY8DncHI
github.com/smartcontractkit/caigo v0.0.0-20230530082629-53a5a4bdb25e/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230525203711-20bed74ac906 h1:u7Lw7oqLEjADlJPJQnzlCLNSbj038QttaKY0lCa3V78=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230525203711-20bed74ac906/go.mod h1:MH+MRJaG4SZAbRq5g7//AFY9H9sg5+lLDQnm85aHP6A=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230615205306-31dec2180682 h1:64rRfZdrojeqd30s4nJbE2bfWavQlJxlLRc+4L4IEsc=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230615205306-31dec2180682/go.mod h1:MfZBUifutkv3aK7abyw5YmTJbqt8iFwcQDFikrxC/uI=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3 h1:rlNWHk15A2im/e9U95q4AkHZk5Wbc77lpx6ys4kUyCE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3/go.mod h1:MfZBUifutkv3aK7abyw5YmTJbqt8iFwcQDFikrxC/uI=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592 h1:3Ul/LkULxrolCVguHUFnWJamgUDsSGISlm/DzclstmE=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592/go.mod h1:km46XAo6xebV4Q+WyRFfo3E2t80YqTkegJM4FEfo5/Y=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.0-20230601080524-3d8186742482 h1:ZblU/X27pIHAZ7c/37TZf7ykZ4jkfVUmvIcchyO2rnw=
Expand Down
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/mercury/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/null"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand All @@ -21,7 +22,7 @@ type PluginConfig struct {
// the first ever report in the case of a brand new feed, where the mercury
// server does not have any previous reports. For a brand new feed, this
// effectively sets the "first" validFromBlockNumber.
InitialBlockNumber int64 `json:"initialBlockNumber" toml:"initialBlockNumber"`
InitialBlockNumber null.Int64 `json:"initialBlockNumber" toml:"initialBlockNumber"`
}

func ValidatePluginConfig(config PluginConfig) (merr error) {
Expand Down
13 changes: 2 additions & 11 deletions core/services/ocr2/plugins/mercury/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func TestIntegration_Mercury(t *testing.T) {
const n = 4 // number of nodes
const fromBlock = 1 // cannot use zero, start from block 1
const multiplier = 100000000
initialBlockNumber := int64(rand.Int31n(10))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this because by far the most common case will to have this not be set

testStartTimeStamp := uint32(time.Now().Unix())

// test vars
Expand Down Expand Up @@ -120,11 +119,7 @@ func TestIntegration_Mercury(t *testing.T) {
steve := testutils.MustNewSimTransactor(t) // config contract deployer and owner
genesisData := core.GenesisAlloc{steve.From: {Balance: assets.Ether(1000).ToInt()}}
backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil))
backend.Commit() // ensure starting block number at least 1
// Ensure initialBlockNumber is at or below current block number
for i := 1; i < int(initialBlockNumber); i++ {
backend.Commit()
}
backend.Commit() // ensure starting block number at least 1
stopMining := cltest.Mine(backend, 1*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain
t.Cleanup(stopMining)

Expand Down Expand Up @@ -227,7 +222,6 @@ func TestIntegration_Mercury(t *testing.T) {
feed.id,
chainID,
fromBlock,
initialBlockNumber,
)
}
}
Expand Down Expand Up @@ -342,7 +336,7 @@ func TestIntegration_Mercury(t *testing.T) {
assert.GreaterOrEqual(t, currentBlock.Time(), reportElems["currentBlockTimestamp"].(uint64))
assert.NotEqual(t, common.Hash{}, common.Hash(reportElems["currentBlockHash"].([32]uint8)))
assert.LessOrEqual(t, int(reportElems["validFromBlockNum"].(uint64)), int(reportElems["currentBlockNum"].(uint64)))
assert.LessOrEqual(t, initialBlockNumber, int64(reportElems["validFromBlockNum"].(uint64)))
assert.Less(t, int64(0), int64(reportElems["validFromBlockNum"].(uint64)))

t.Logf("oracle %x reported for feed %s (0x%x)", req.pk, feed.name, feed.id)

Expand Down Expand Up @@ -606,7 +600,6 @@ func addMercuryJob(
feedID [32]byte,
chainID *big.Int,
fromBlock int,
initialBlockNumber int64,
) {
node.AddJob(t, fmt.Sprintf(`
type = "offchainreporting2"
Expand Down Expand Up @@ -650,7 +643,6 @@ observationSource = """
[pluginConfig]
serverURL = "%[8]s"
serverPubKey = "%[9]x"
initialBlockNumber = %[15]d

[relayConfig]
chainID = %[12]d
Expand All @@ -670,6 +662,5 @@ fromBlock = %[13]d
chainID,
fromBlock,
feedName,
initialBlockNumber,
))
}
1 change: 1 addition & 0 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewServices(
chEnhancedTelem,
chainHeadTracker,
ocr2Provider.ContractTransmitter(),
pluginConfig.InitialBlockNumber.Ptr(),
)
argsNoPlugin.MercuryPluginFactory = relaymercury.NewFactory(
ds,
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *Relayer) NewMercuryProvider(rargs relaytypes.RelayArgs, pargs relaytype
if err != nil {
return nil, err
}
transmitter := mercury.NewTransmitter(r.lggr, configWatcher.ContractConfigTracker(), client, privKey.PublicKey, *relayConfig.FeedID, mercuryConfig.InitialBlockNumber)
transmitter := mercury.NewTransmitter(r.lggr, configWatcher.ContractConfigTracker(), client, privKey.PublicKey, *relayConfig.FeedID)

return NewMercuryProvider(configWatcher, transmitter, reportCodec, r.lggr), nil
}
Expand Down
47 changes: 41 additions & 6 deletions core/services/relay/evm/mercury/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type Runner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run pipeline.Run, trrs pipeline.TaskRunResults, err error)
}

type Fetcher interface {
// FetchInitialMaxFinalizedBlockNumber should fetch the initial max
// finalized block number from the mercury server.
FetchInitialMaxFinalizedBlockNumber(context.Context) (*int64, error)
}

type datasource struct {
pipelineRunner Runner
jb job.Job
Expand All @@ -42,15 +48,16 @@ type datasource struct {

mu sync.RWMutex

chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker ChainHeadTracker
fetcher relaymercury.Fetcher
chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker ChainHeadTracker
fetcher Fetcher
initialBlockNumber *int64
}

var _ relaymercury.DataSource = &datasource{}

func NewDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker ChainHeadTracker, fetcher relaymercury.Fetcher) *datasource {
return &datasource{pr, jb, spec, lggr, rr, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher}
func NewDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64) *datasource {
return &datasource{pr, jb, spec, lggr, rr, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber}
}

func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercury.Observation, err error) {
Expand All @@ -63,7 +70,35 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
wg.Add(1)
go func() {
defer wg.Done()
obs.MaxFinalizedBlockNumber.Val, obs.MaxFinalizedBlockNumber.Err = ds.fetcher.FetchInitialMaxFinalizedBlockNumber(ctx)
val, fetchErr := ds.fetcher.FetchInitialMaxFinalizedBlockNumber(ctx)
if fetchErr != nil {
obs.MaxFinalizedBlockNumber.Err = fetchErr
return
}
if val != nil {
obs.MaxFinalizedBlockNumber.Val = *val
return
}
if ds.initialBlockNumber == nil {
if obs.CurrentBlockNum.Err != nil {
obs.MaxFinalizedBlockNumber.Err = fmt.Errorf("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: %w", obs.CurrentBlockNum.Err)
} else {
// Subract 1 here because we will later add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be the same as current block num.
obs.MaxFinalizedBlockNumber.Val = obs.CurrentBlockNum.Val - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: How about using negative value (aka invalid) to denote MaxFinalizedBlockNumber for empty report and handle this case when calculating validFromBlockNum?

Copy link
Collaborator Author

@samsondav samsondav Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 is already used as a legitimate value to indicate that we want the feed to start from 0, so this is not available to use as a sentinel value

ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber unset, using currentBlockNum=%d-1)", obs.MaxFinalizedBlockNumber.Val, obs.CurrentBlockNum.Val)
}
} else {
// NOTE: It's important to subtract 1 if the server is missing any past
// report (brand new feed) since we will add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be zero.
//
// If "initialBlockNumber" is set to zero, this will give a starting block of zero.
obs.MaxFinalizedBlockNumber.Val = *ds.initialBlockNumber - 1
ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber=%d)", obs.MaxFinalizedBlockNumber.Val, *ds.initialBlockNumber)
}
}()
} else {
obs.MaxFinalizedBlockNumber.Err = errors.New("fetchMaxFinalizedBlockNum=false")
Expand Down
49 changes: 46 additions & 3 deletions core/services/relay/evm/mercury/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
var _ relaymercury.Fetcher = &mockFetcher{}

type mockFetcher struct {
num int64
num *int64
err error
}

func (m *mockFetcher) FetchInitialMaxFinalizedBlockNumber(context.Context) (int64, error) {
func (m *mockFetcher) FetchInitialMaxFinalizedBlockNumber(context.Context) (*int64, error) {
return m.num, m.err
}

Expand Down Expand Up @@ -143,15 +143,58 @@ func TestMercury_Observe(t *testing.T) {
})
t.Run("if FetchInitialMaxFinalizedBlockNumber succeeds", func(t *testing.T) {
fetcher.err = nil
fetcher.num = 32
var num int64 = 32
fetcher.num = &num

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.NoError(t, obs.MaxFinalizedBlockNumber.Err)
assert.Equal(t, int64(32), obs.MaxFinalizedBlockNumber.Val)
})
t.Run("if FetchInitialMaxFinalizedBlockNumber returns nil (new feed) and initialBlockNumber is set", func(t *testing.T) {
var initialBlockNumber int64 = 50
ds.initialBlockNumber = &initialBlockNumber
fetcher.err = nil
fetcher.num = nil

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.NoError(t, obs.MaxFinalizedBlockNumber.Err)
assert.Equal(t, int64(49), obs.MaxFinalizedBlockNumber.Val)
})
t.Run("if FetchInitialMaxFinalizedBlockNumber returns nil (new feed) and initialBlockNumber is not set", func(t *testing.T) {
ds.initialBlockNumber = nil
t.Run("if current block num is valid", func(t *testing.T) {
fetcher.err = nil
fetcher.num = nil

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.NoError(t, obs.MaxFinalizedBlockNumber.Err)
assert.Equal(t, head.Number-1, obs.MaxFinalizedBlockNumber.Val)
})
t.Run("if current block num errored", func(t *testing.T) {
h2 := htmocks.NewHeadTracker(t)
h2.On("LatestChain").Return(nil)
ht.h = h2
c2 := evmtest.NewEthClientMock(t)
c2.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(nil, errors.New("head retrieval failed"))
ht.c = c2

obs, err := ds.Observe(ctx, repts, true)
assert.NoError(t, err)

assert.EqualError(t, obs.MaxFinalizedBlockNumber.Err, "FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: head retrieval failed")
})
})
})

ht.h = h
ht.c = c

t.Run("when fetchMaxFinalizedBlockNum=false", func(t *testing.T) {
t.Run("when run execution fails, returns error", func(t *testing.T) {
t.Cleanup(func() {
Expand Down
34 changes: 12 additions & 22 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ var _ Transmitter = &mercuryTransmitter{}

type mercuryTransmitter struct {
utils.StartStopOnce
lggr logger.Logger
rpcClient wsrpc.Client
cfgTracker ConfigTracker
initialBlockNumber int64
lggr logger.Logger
rpcClient wsrpc.Client
cfgTracker ConfigTracker

feedID [32]byte
feedIDHex string
Expand Down Expand Up @@ -116,14 +115,13 @@ func getPayloadTypes() abi.Arguments {
})
}

func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, feedID [32]byte, initialBlockNumber int64) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrpc.Client, fromAccount ed25519.PublicKey, feedID [32]byte) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
return &mercuryTransmitter{
utils.StartStopOnce{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
rpcClient,
cfgTracker,
initialBlockNumber,
feedID,
feedIDHex,
fmt.Sprintf("%x", fromAccount),
Expand Down Expand Up @@ -291,39 +289,31 @@ func (mt *mercuryTransmitter) LatestConfigDigestAndEpoch(ctx context.Context) (c
panic("not needed for OCR3")
}

func (mt *mercuryTransmitter) FetchInitialMaxFinalizedBlockNumber(ctx context.Context) (int64, error) {
mt.lggr.Debug("FetchInitialMaxFinalizedBlockNumber")
func (mt *mercuryTransmitter) FetchInitialMaxFinalizedBlockNumber(ctx context.Context) (*int64, error) {
mt.lggr.Trace("FetchInitialMaxFinalizedBlockNumber")
req := &pb.LatestReportRequest{
FeedId: mt.feedID[:],
}
resp, err := mt.rpcClient.LatestReport(ctx, req)
if err != nil {
mt.lggr.Errorw("FetchInitialMaxFinalizedBlockNumber failed", "err", err)
return 0, pkgerrors.Wrap(err, "FetchInitialMaxFinalizedBlockNumber failed to fetch LatestReport")
return nil, pkgerrors.Wrap(err, "FetchInitialMaxFinalizedBlockNumber failed to fetch LatestReport")
}
if resp == nil {
return 0, errors.New("FetchInitialMaxFinalizedBlockNumber expected LatestReport to return non-nil response")
return nil, errors.New("FetchInitialMaxFinalizedBlockNumber expected LatestReport to return non-nil response")
}
if resp.Error != "" {
err = errors.New(resp.Error)
mt.lggr.Errorw("FetchInitialMaxFinalizedBlockNumber failed; mercury server returned error", "err", err)
return 0, err
return nil, err
}
if resp.Report == nil {
maxFinalizedBlockNumber := mt.initialBlockNumber - 1
mt.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber=%d)", maxFinalizedBlockNumber, mt.initialBlockNumber)
// NOTE: It's important to return -1 if the server is missing any past
// report (brand new feed) since we will add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be zero.
//
// If "initialBlockNumber" is unset, this will give a starting block of zero.
return maxFinalizedBlockNumber, nil
return nil, nil
} else if !bytes.Equal(resp.Report.FeedId, mt.feedID[:]) {
return 0, fmt.Errorf("FetchInitialMaxFinalizedBlockNumber failed; mismatched feed IDs, expected: 0x%x, got: 0x%x", mt.feedID, resp.Report.FeedId)
return nil, fmt.Errorf("FetchInitialMaxFinalizedBlockNumber failed; mismatched feed IDs, expected: 0x%x, got: 0x%x", mt.feedID, resp.Report.FeedId)
}

mt.lggr.Debugw("FetchInitialMaxFinalizedBlockNumber success", "currentBlockNum", resp.Report.CurrentBlockNumber)

return resp.Report.CurrentBlockNumber, nil
return &resp.Report.CurrentBlockNumber, nil
}
Loading