diff --git a/oracle/config/api.go b/oracle/config/api.go index 0eb839b15..5efdec5cc 100644 --- a/oracle/config/api.go +++ b/oracle/config/api.go @@ -40,6 +40,11 @@ type APIConfig struct { // Name is the name of the provider that corresponds to this config. Name string `json:"name"` + + // MaxBlockHeightAge is the oldest an update from an on-chain data source can be without having its + // block height incremented. In the case where a data source has exceeded this limit and the block + // height is not increasing, price reporting will be skipped until the block height increases. + MaxBlockHeightAge time.Duration `json:"maxBlockHeightAge"` } // Endpoint holds all data necessary for an API provider to connect to a given endpoint @@ -123,5 +128,9 @@ func (c *APIConfig) ValidateBasic() error { } } + if c.MaxBlockHeightAge < 0 { + return fmt.Errorf("max_block_height_age cannot be negative") + } + return nil } diff --git a/oracle/config/api_test.go b/oracle/config/api_test.go index 28d902ba6..a17507344 100644 --- a/oracle/config/api_test.go +++ b/oracle/config/api_test.go @@ -135,6 +135,36 @@ func TestAPIConfig(t *testing.T) { }, expectedErr: false, }, + { + name: "good config with max_block_height_age", + config: config.APIConfig{ + Enabled: true, + Timeout: time.Second, + Interval: time.Second, + ReconnectTimeout: time.Second, + MaxQueries: 1, + Name: "test", + Endpoints: []config.Endpoint{{URL: "http://test.com"}}, + BatchSize: 1, + MaxBlockHeightAge: 10 * time.Second, + }, + expectedErr: false, + }, + { + name: "bad config with negative max_block_height_age", + config: config.APIConfig{ + Enabled: true, + Timeout: time.Second, + Interval: time.Second, + ReconnectTimeout: time.Second, + MaxQueries: 1, + Name: "test", + Endpoints: []config.Endpoint{{URL: "http://test.com"}}, + BatchSize: 1, + MaxBlockHeightAge: -10 * time.Second, + }, + expectedErr: true, + }, { name: "bad config with invalid endpoint (no url)", config: config.APIConfig{ diff --git a/providers/apis/defi/ethmulticlient/multi_client.go b/providers/apis/defi/ethmulticlient/multi_client.go index d154ec124..afc89aa50 100644 --- a/providers/apis/defi/ethmulticlient/multi_client.go +++ b/providers/apis/defi/ethmulticlient/multi_client.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" + "github.com/skip-mev/slinky/providers/apis/defi/types" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" "go.uber.org/zap" @@ -23,6 +25,8 @@ type MultiRPCClient struct { // underlying clients clients []EVMClient + + blockAgeChecker types.BlockAgeChecker } // NewMultiRPCClient returns a new MultiRPCClient. @@ -32,9 +36,10 @@ func NewMultiRPCClient( clients []EVMClient, ) EVMClient { return &MultiRPCClient{ - logger: logger, - clients: clients, - api: api, + logger: logger, + clients: clients, + api: api, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), } } @@ -81,12 +86,20 @@ func NewMultiRPCClientFromEndpoints( } return &MultiRPCClient{ - logger: logger.With(zap.String("multi_client", api.Name)), - api: api, - clients: clients, + logger: logger.With(zap.String("multi_client", api.Name)), + api: api, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } +// define a result struct that go routines will populate and append to a slice when they complete their request. +type result struct { + height uint64 + results []rpc.BatchElem + err error +} + // BatchCallContext injects a call to eth_blockNumber, and makes batch calls to the underlying EVMClients. // It returns the response that has the greatest height from the eth_blockNumber call. An error is returned // only when no client was able to successfully provide a height or errored when sending the BatchCall. @@ -95,15 +108,9 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. m.logger.Debug("BatchCallContext called with 0 elems") return nil } - // define a result struct that go routines will populate and append to a slice when they complete their request. - type result struct { - height uint64 - results []rpc.BatchElem - } + results := make([]result, len(m.clients)) - // error slice to capture errors go routines encounter. - errs := make([]error, len(m.clients)) wg := new(sync.WaitGroup) // this is the index of where we will have an eth_blockNumber call. blockNumReqIndex := len(batchElems) @@ -124,7 +131,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. // if there was an error, or if the block_num request didn't have result / errored // we log the error and append to error slice. if err != nil || req[blockNumReqIndex].Result == "" || req[blockNumReqIndex].Error != nil { - errs[i] = fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error) + resultErr := fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error) + results[i] = result{0, nil, resultErr} m.logger.Debug( "endpoint request failed", zap.Error(err), @@ -138,7 +146,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. // try to get the block number. r, ok := req[blockNumReqIndex].Result.(*string) if !ok { - errs[i] = fmt.Errorf("result from eth_blockNumber was not a string") + resultErr := fmt.Errorf("result from eth_blockNumber was not a string") + results[i] = result{0, nil, resultErr} m.logger.Debug( "result from eth_blockNumber was not a string", zap.String("url", url), @@ -149,7 +158,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. // decode the new height height, err := hexutil.DecodeUint64(*r) if err != nil { // if we can't decode the height, log an error. - errs[i] = fmt.Errorf("could not decode hex eth height: %w", err) + resultErr := fmt.Errorf("could not decode hex eth height: %w", err) + results[i] = result{0, nil, resultErr} m.logger.Debug( "could not decode hex eth height", zap.String("url", url), @@ -163,17 +173,31 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. zap.String("url", url), ) // append the results, minus the appended eth_blockNumber request. - results[i] = result{height, req[:blockNumReqIndex]} + results[i] = result{height, req[:blockNumReqIndex], nil} }(clientIdx) } wg.Wait() + filtered, err := m.filterResponses(results) + if err != nil { + return fmt.Errorf("error filtering responses: %w", err) + } + + // copy the results from the results that had the largest height. + copy(batchElems, filtered) + return nil +} + +// filterAccountsResponses chooses the rpc response with the highest block number. +func (m *MultiRPCClient) filterResponses(responses []result) ([]rpc.BatchElem, error) { // see which of the results had the largest height, and store the index of that result. var ( maxHeight uint64 maxHeightIndex int + errs = make([]error, len(responses)) ) - for i, res := range results { + for i, res := range responses { + errs[i] = res.err if res.height > maxHeight { maxHeight = res.height maxHeightIndex = i @@ -183,12 +207,17 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. if maxHeight == 0 { err := errors.Join(errs...) if err != nil { - return err + return nil, err } // this should never happen... but who knows. maybe something terrible happened. - return errors.New("no errors were encountered, however no go routine was able to report a height") + return nil, errors.New("no errors were encountered, however no go routine was able to report a height") + } - // copy the results from the results that had the largest height. - copy(batchElems, results[maxHeightIndex].results) - return nil + + // check the block height + if valid := m.blockAgeChecker.IsHeightValid(maxHeight); !valid { + return nil, fmt.Errorf("height %d is stale and older than %d", maxHeight, m.api.MaxBlockHeightAge) + } + + return responses[maxHeightIndex].results, nil } diff --git a/providers/apis/defi/osmosis/client.go b/providers/apis/defi/osmosis/client.go index f8869d573..beb92966c 100644 --- a/providers/apis/defi/osmosis/client.go +++ b/providers/apis/defi/osmosis/client.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "math/rand" + "strconv" "sync" "time" @@ -12,9 +12,14 @@ import ( "github.com/skip-mev/slinky/oracle/config" "github.com/skip-mev/slinky/pkg/http" + "github.com/skip-mev/slinky/providers/apis/defi/types" "github.com/skip-mev/slinky/providers/base/api/metrics" ) +const ( + headerBlockHeight = "grpc-metadata-x-cosmos-block-height" +) + var ( _ Client = &ClientImpl{} _ Client = &MultiClientImpl{} @@ -28,7 +33,7 @@ type Client interface { poolID uint64, baseAsset, quoteAsset string, - ) (SpotPriceResponse, error) + ) (WrappedSpotPriceResponse, error) } // ClientImpl is an implementation of a client to Osmosis using a @@ -74,7 +79,7 @@ func NewClient( } // SpotPrice uses the underlying x/poolmanager client to access spot prices. -func (c *ClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (SpotPriceResponse, error) { +func (c *ClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (WrappedSpotPriceResponse, error) { start := time.Now() defer func() { c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start)) @@ -82,23 +87,35 @@ func (c *ClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, qu url, err := CreateURL(c.endpoint.URL, poolID, baseAsset, quoteAsset) if err != nil { - return SpotPriceResponse{}, err + return WrappedSpotPriceResponse{}, err } resp, err := c.httpClient.GetWithContext(ctx, url) if err != nil { - return SpotPriceResponse{}, err + return WrappedSpotPriceResponse{}, err } c.apiMetrics.AddHTTPStatusCode(c.api.Name, resp) + var blockHeight uint64 + heightStr := resp.Header.Get(headerBlockHeight) + if heightStr != "" { + blockHeight, err = strconv.ParseUint(heightStr, 10, 64) + if err != nil { + return WrappedSpotPriceResponse{}, fmt.Errorf("failed to parse block height: %w", err) + } + } + var spotPriceResponse SpotPriceResponse if err := json.NewDecoder(resp.Body).Decode(&spotPriceResponse); err != nil { - return SpotPriceResponse{}, err + return WrappedSpotPriceResponse{}, err } c.apiMetrics.AddHTTPStatusCode(c.api.Name, resp) - return spotPriceResponse, nil + return WrappedSpotPriceResponse{ + SpotPriceResponse: spotPriceResponse, + BlockHeight: blockHeight, + }, nil } // MultiClientImpl is an Osmosis client that wraps a set of multiple Clients. @@ -108,6 +125,8 @@ type MultiClientImpl struct { apiMetrics metrics.APIMetrics clients []Client + + blockAgeChecker types.BlockAgeChecker } // NewMultiClient creates a new Client. @@ -134,10 +153,11 @@ func NewMultiClient( } return &MultiClientImpl{ - logger: logger, - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger, + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } @@ -174,17 +194,18 @@ func NewMultiClientFromEndpoints( } return &MultiClientImpl{ - logger: logger, - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger, + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } // SpotPrice delegates the request to all underlying clients and applies a filter to the // set of responses. -func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (SpotPriceResponse, error) { - resps := make([]SpotPriceResponse, len(mc.clients)) +func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (WrappedSpotPriceResponse, error) { + resps := make([]WrappedSpotPriceResponse, len(mc.clients)) var wg sync.WaitGroup wg.Add(len(mc.clients)) @@ -209,22 +230,29 @@ func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAss wg.Wait() - return filterSpotPriceResponses(resps) + return mc.filterSpotPriceResponses(resps) } -// filterSpotPriceResponses currently just chooses a random response as there is no way to differentiate. -func filterSpotPriceResponses(responses []SpotPriceResponse) (SpotPriceResponse, error) { +// filterSpotPriceResponses chooses the response with the highest block height. +func (mc *MultiClientImpl) filterSpotPriceResponses(responses []WrappedSpotPriceResponse) (WrappedSpotPriceResponse, error) { if len(responses) == 0 { - return SpotPriceResponse{}, fmt.Errorf("no responses found") + return WrappedSpotPriceResponse{}, fmt.Errorf("no responses found") } - perm := rand.Perm(len(responses)) - for _, i := range perm { - resp := responses[perm[i]] - if resp.SpotPrice != "" { - return resp, nil + highestHeight := uint64(0) + highestHeightIndex := 0 + + for i, resp := range responses { + if resp.BlockHeight > highestHeight { + highestHeight = resp.BlockHeight + highestHeightIndex = i } } - return SpotPriceResponse{}, fmt.Errorf("no responses found") + // check the block height + if valid := mc.blockAgeChecker.IsHeightValid(highestHeight); !valid { + return WrappedSpotPriceResponse{}, fmt.Errorf("height %d is stale and older than %d", highestHeight, mc.api.MaxBlockHeightAge) + } + + return responses[highestHeightIndex], nil } diff --git a/providers/apis/defi/osmosis/client_test.go b/providers/apis/defi/osmosis/client_test.go index 7f1d32a6d..dd8640dd4 100644 --- a/providers/apis/defi/osmosis/client_test.go +++ b/providers/apis/defi/osmosis/client_test.go @@ -82,15 +82,15 @@ func TestMultiClient(t *testing.T) { defer cancel() // mocks - client1.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedPrice, + client1.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedPrice}, }, nil).Once() - client2.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedPrice, + client2.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedPrice}, }, nil).Once() - client3.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.SpotPriceResponse{}, + client3.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.WrappedSpotPriceResponse{}, fmt.Errorf("error")).Once() resp, err := client.SpotPrice(ctx, poolID, baseAsset, quoteAsset) @@ -112,14 +112,14 @@ func TestMultiClient(t *testing.T) { defer cancel() // mocks - client1.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedPrice, + client1.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedPrice}, }, nil).Once() - client2.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedPrice, + client2.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedPrice}, }, nil).Once() - client3.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedPrice, + client3.On("SpotPrice", mock.Anything, poolID, baseAsset, quoteAsset).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedPrice}, }, nil).Once() resp, err := client.SpotPrice(ctx, poolID, baseAsset, quoteAsset) diff --git a/providers/apis/defi/osmosis/mocks/client.go b/providers/apis/defi/osmosis/mocks/client.go index ddfd37c8d..bf20496a1 100644 --- a/providers/apis/defi/osmosis/mocks/client.go +++ b/providers/apis/defi/osmosis/mocks/client.go @@ -16,22 +16,22 @@ type Client struct { } // SpotPrice provides a mock function with given fields: ctx, poolID, baseAsset, quoteAsset -func (_m *Client) SpotPrice(ctx context.Context, poolID uint64, baseAsset string, quoteAsset string) (osmosis.SpotPriceResponse, error) { +func (_m *Client) SpotPrice(ctx context.Context, poolID uint64, baseAsset string, quoteAsset string) (osmosis.WrappedSpotPriceResponse, error) { ret := _m.Called(ctx, poolID, baseAsset, quoteAsset) if len(ret) == 0 { panic("no return value specified for SpotPrice") } - var r0 osmosis.SpotPriceResponse + var r0 osmosis.WrappedSpotPriceResponse var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, string, string) (osmosis.SpotPriceResponse, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint64, string, string) (osmosis.WrappedSpotPriceResponse, error)); ok { return rf(ctx, poolID, baseAsset, quoteAsset) } - if rf, ok := ret.Get(0).(func(context.Context, uint64, string, string) osmosis.SpotPriceResponse); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint64, string, string) osmosis.WrappedSpotPriceResponse); ok { r0 = rf(ctx, poolID, baseAsset, quoteAsset) } else { - r0 = ret.Get(0).(osmosis.SpotPriceResponse) + r0 = ret.Get(0).(osmosis.WrappedSpotPriceResponse) } if rf, ok := ret.Get(1).(func(context.Context, uint64, string, string) error); ok { diff --git a/providers/apis/defi/osmosis/price_fetcher.go b/providers/apis/defi/osmosis/price_fetcher.go index 3cb49c464..b354ab157 100644 --- a/providers/apis/defi/osmosis/price_fetcher.go +++ b/providers/apis/defi/osmosis/price_fetcher.go @@ -7,9 +7,8 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" - "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/skip-mev/slinky/oracle/config" oracletypes "github.com/skip-mev/slinky/oracle/types" @@ -203,6 +202,6 @@ func (pf *APIPriceFetcher) Fetch( return oracletypes.NewPriceResponse(resolved, unresolved) } -func calculatePrice(resp SpotPriceResponse) (*big.Float, error) { +func calculatePrice(resp WrappedSpotPriceResponse) (*big.Float, error) { return math.Float64StringToBigFloat(resp.SpotPrice) } diff --git a/providers/apis/defi/osmosis/price_fetcher_test.go b/providers/apis/defi/osmosis/price_fetcher_test.go index cc4254e32..9ab70792b 100644 --- a/providers/apis/defi/osmosis/price_fetcher_test.go +++ b/providers/apis/defi/osmosis/price_fetcher_test.go @@ -203,8 +203,10 @@ func TestProviderFetch(t *testing.T) { client.On("SpotPrice", mock.Anything, btcUSDTMetadata.PoolID, btcUSDTMetadata.BaseTokenDenom, btcUSDTMetadata.QuoteTokenDenom, - ).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedBTCUSDTPrice, + ).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{ + SpotPrice: expectedBTCUSDTPrice, + }, }, nil).Once() ts := defaultTickersToProviderTickers([]types.DefaultProviderTicker{tickers[0]}) @@ -223,8 +225,10 @@ func TestProviderFetch(t *testing.T) { err = fmt.Errorf("error") - client.On("SpotPrice", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(osmosis.SpotPriceResponse{ - SpotPrice: "", + client.On("SpotPrice", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{ + SpotPrice: "", + }, }, err).Times(3) ts := defaultTickersToProviderTickers(tickers) @@ -273,20 +277,20 @@ func TestProviderFetch(t *testing.T) { client.On("SpotPrice", mock.Anything, btcUSDTMetadata.PoolID, btcUSDTMetadata.BaseTokenDenom, btcUSDTMetadata.QuoteTokenDenom, - ).Return(osmosis.SpotPriceResponse{ - SpotPrice: "", + ).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: ""}, }, err).Once() client.On("SpotPrice", mock.Anything, ethUSDTMetadata.PoolID, ethUSDTMetadata.BaseTokenDenom, ethUSDTMetadata.QuoteTokenDenom, - ).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedETHUSDTPrice, + ).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedETHUSDTPrice}, }, nil).Once() client.On("SpotPrice", mock.Anything, mogSOLMetadata.PoolID, mogSOLMetadata.BaseTokenDenom, mogSOLMetadata.QuoteTokenDenom, - ).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedMOGSOLPRICE, + ).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedMOGSOLPRICE}, }, nil).Once() ts := defaultTickersToProviderTickers(tickers) @@ -310,20 +314,20 @@ func TestProviderFetch(t *testing.T) { client.On("SpotPrice", mock.Anything, btcUSDTMetadata.PoolID, btcUSDTMetadata.BaseTokenDenom, btcUSDTMetadata.QuoteTokenDenom, - ).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedBTCUSDTPrice, + ).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedBTCUSDTPrice}, }, nil).Once() client.On("SpotPrice", mock.Anything, ethUSDTMetadata.PoolID, ethUSDTMetadata.BaseTokenDenom, ethUSDTMetadata.QuoteTokenDenom, - ).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedETHUSDTPrice, + ).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedETHUSDTPrice}, }, nil).Once() client.On("SpotPrice", mock.Anything, mogSOLMetadata.PoolID, mogSOLMetadata.BaseTokenDenom, mogSOLMetadata.QuoteTokenDenom, - ).Return(osmosis.SpotPriceResponse{ - SpotPrice: expectedMOGSOLPRICE, + ).Return(osmosis.WrappedSpotPriceResponse{ + SpotPriceResponse: osmosis.SpotPriceResponse{SpotPrice: expectedMOGSOLPRICE}, }, nil).Once() ts := defaultTickersToProviderTickers(tickers) diff --git a/providers/apis/defi/osmosis/types.go b/providers/apis/defi/osmosis/types.go index 3d0308291..7e97ca816 100644 --- a/providers/apis/defi/osmosis/types.go +++ b/providers/apis/defi/osmosis/types.go @@ -127,8 +127,14 @@ var DefaultAPIConfig = config.APIConfig{ URL: "https://osmosis-api.polkachu.com", }, }, + MaxBlockHeightAge: 30 * time.Second, } type SpotPriceResponse struct { SpotPrice string `json:"spot_price"` } + +type WrappedSpotPriceResponse struct { + SpotPriceResponse + BlockHeight uint64 `json:"block_height"` +} diff --git a/providers/apis/defi/raydium/multi_client.go b/providers/apis/defi/raydium/multi_client.go index c8e8bf232..b16988132 100644 --- a/providers/apis/defi/raydium/multi_client.go +++ b/providers/apis/defi/raydium/multi_client.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "github.com/skip-mev/slinky/oracle/config" + "github.com/skip-mev/slinky/providers/apis/defi/types" "github.com/skip-mev/slinky/providers/base/api/metrics" ) @@ -23,6 +24,8 @@ type MultiJSONRPCClient struct { // underlying clients clients []SolanaJSONRPCClient + + blockAgeChecker types.BlockAgeChecker } // NewMultiJSONRPCClient returns a new MultiJSONRPCClient. @@ -33,10 +36,11 @@ func NewMultiJSONRPCClient( clients []SolanaJSONRPCClient, ) SolanaJSONRPCClient { return &MultiJSONRPCClient{ - logger: logger, - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger, + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), } } @@ -80,10 +84,11 @@ func NewMultiJSONRPCClientFromEndpoints( } return &MultiJSONRPCClient{ - logger: logger.With(zap.String("multi_client", Name)), - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger.With(zap.String("multi_client", Name)), + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } @@ -138,11 +143,11 @@ func (c *MultiJSONRPCClient) GetMultipleAccountsWithOpts( } // filter the responses - return filterAccountsResponses(responses) + return c.filterAccountsResponses(responses) } // filterAccountsResponses chooses the rpc response with the highest slot number. -func filterAccountsResponses(responses []*rpc.GetMultipleAccountsResult) (*rpc.GetMultipleAccountsResult, error) { +func (c *MultiJSONRPCClient) filterAccountsResponses(responses []*rpc.GetMultipleAccountsResult) (*rpc.GetMultipleAccountsResult, error) { var ( maxSlot uint64 maxResp *rpc.GetMultipleAccountsResult @@ -159,5 +164,10 @@ func filterAccountsResponses(responses []*rpc.GetMultipleAccountsResult) (*rpc.G } } + // check the block height (slot) + if valid := c.blockAgeChecker.IsHeightValid(maxSlot); !valid { + return nil, fmt.Errorf("height %d is stale and older than %d", maxSlot, c.api.MaxBlockHeightAge) + } + return maxResp, nil } diff --git a/providers/apis/defi/raydium/types.go b/providers/apis/defi/raydium/types.go index 3c02b175a..b23420d66 100644 --- a/providers/apis/defi/raydium/types.go +++ b/providers/apis/defi/raydium/types.go @@ -150,4 +150,5 @@ var DefaultAPIConfig = config.APIConfig{ URL: "https://api.mainnet-beta.solana.com", }, }, + MaxBlockHeightAge: 30 * time.Second, } diff --git a/providers/apis/defi/types/block_age.go b/providers/apis/defi/types/block_age.go new file mode 100644 index 000000000..f998fdc94 --- /dev/null +++ b/providers/apis/defi/types/block_age.go @@ -0,0 +1,42 @@ +package types + +import "time" + +// BlockAgeChecker is a utility type to check if incoming block heights are validly updating. +// If the block heights are not increasing and the time since the last update has exceeded +// a configurable duration, this type will report that the updates are invalid. +type BlockAgeChecker struct { + lastHeight uint64 + lastTimeStamp time.Time + maxAge time.Duration +} + +// NewBlockAgeChecker returns a zeroed BlockAgeChecker using the provided maxAge. +func NewBlockAgeChecker(maxAge time.Duration) BlockAgeChecker { + return BlockAgeChecker{ + lastHeight: 0, + lastTimeStamp: time.Now(), + maxAge: maxAge, + } +} + +// IsHeightValid returns true if: +// - the new height is greater than the last height OR +// - the time past the last block height update is less than the configured max age +// returns false if: +// - the time is past the configured max age. +func (bc *BlockAgeChecker) IsHeightValid(newHeight uint64) bool { + now := time.Now() + + if newHeight > bc.lastHeight { + bc.lastHeight = newHeight + bc.lastTimeStamp = now + return true + } + + if now.Sub(bc.lastTimeStamp) > bc.maxAge { + return false + } + + return true +} diff --git a/providers/apis/defi/types/block_age_test.go b/providers/apis/defi/types/block_age_test.go new file mode 100644 index 000000000..fdaf89b62 --- /dev/null +++ b/providers/apis/defi/types/block_age_test.go @@ -0,0 +1,66 @@ +package types_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/skip-mev/slinky/providers/apis/defi/types" +) + +func TestBlockAgeChecker_IsHeightValid(t *testing.T) { + tests := []struct { + name string + lastHeight uint64 + waitTime time.Duration + maxAge time.Duration + newHeight uint64 + isValid bool + }{ + { + name: "valid 0s no timeout", + lastHeight: 0, + waitTime: 0, + maxAge: 10 * time.Minute, + newHeight: 0, + isValid: true, + }, + { + name: "valid new height no timeout", + lastHeight: 0, + waitTime: 0, + maxAge: 10 * time.Minute, + newHeight: 0, + isValid: true, + }, + { + name: "invalid 0s due to timeout", + lastHeight: 0, + waitTime: 10 * time.Millisecond, + maxAge: 1 * time.Millisecond, + newHeight: 0, + isValid: false, + }, + { + name: "valid timeout but block height increase", + lastHeight: 0, + waitTime: 10 * time.Millisecond, + maxAge: 1 * time.Millisecond, + newHeight: 1, + isValid: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bc := types.NewBlockAgeChecker(tt.maxAge) + + got := bc.IsHeightValid(tt.lastHeight) + require.True(t, got) + time.Sleep(tt.waitTime) + + got = bc.IsHeightValid(tt.newHeight) + require.Equal(t, tt.isValid, got) + }) + } +} diff --git a/providers/apis/defi/uniswapv3/utils.go b/providers/apis/defi/uniswapv3/utils.go index 866d7632b..afd547999 100644 --- a/providers/apis/defi/uniswapv3/utils.go +++ b/providers/apis/defi/uniswapv3/utils.go @@ -92,26 +92,28 @@ var ( // DefaultETHAPIConfig is the default configuration for the Uniswap API. Specifically this is for // Ethereum mainnet. DefaultETHAPIConfig = config.APIConfig{ - Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.ETHEREUM), - Atomic: true, - Enabled: true, - Timeout: 1000 * time.Millisecond, - Interval: 2000 * time.Millisecond, - ReconnectTimeout: 2000 * time.Millisecond, - MaxQueries: 1, - Endpoints: []config.Endpoint{{URL: ETH_URL}}, + Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.ETHEREUM), + Atomic: true, + Enabled: true, + Timeout: 1000 * time.Millisecond, + Interval: 2000 * time.Millisecond, + ReconnectTimeout: 2000 * time.Millisecond, + MaxQueries: 1, + Endpoints: []config.Endpoint{{URL: ETH_URL}}, + MaxBlockHeightAge: 30 * time.Second, } // DefaultBaseAPIConfig is the default configuration for the Uniswap API. Specifically this is for // Base mainnet. DefaultBaseAPIConfig = config.APIConfig{ - Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.BASE), - Atomic: true, - Enabled: true, - Timeout: 1000 * time.Millisecond, - Interval: 2000 * time.Millisecond, - ReconnectTimeout: 2000 * time.Millisecond, - MaxQueries: 1, - Endpoints: []config.Endpoint{{URL: BASE_URL}}, + Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.BASE), + Atomic: true, + Enabled: true, + Timeout: 1000 * time.Millisecond, + Interval: 2000 * time.Millisecond, + ReconnectTimeout: 2000 * time.Millisecond, + MaxQueries: 1, + Endpoints: []config.Endpoint{{URL: BASE_URL}}, + MaxBlockHeightAge: 30 * time.Second, } )