Skip to content

Commit

Permalink
feat: Refactor partial failures (#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Warehime committed Sep 12, 2024
1 parent 00703fa commit ef03f40
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 92 deletions.
31 changes: 29 additions & 2 deletions oracle/market_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"os"
"strings"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -42,8 +43,31 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) {
continue
}

if o.lastUpdated != 0 && o.lastUpdated == result.Value.LastUpdated {
o.logger.Debug("skipping market map update on no lastUpdated change", zap.Uint64("lastUpdated", o.lastUpdated))
continue
}

validSubset, err := result.Value.MarketMap.GetValidSubset()
if err != nil {
o.logger.Error("failed to validate market map", zap.Error(err))
continue
}

// Detect removed markets and surface info about the removals
var removedMarkets []string
for t := range result.Value.MarketMap.Markets {
if _, in := validSubset.Markets[t]; !in {
removedMarkets = append(removedMarkets, t)
}
}
if len(validSubset.Markets) == 0 || len(validSubset.Markets) != len(result.Value.MarketMap.Markets) {
o.logger.Warn("invalid market map update has caused some markets to be removed")
o.logger.Info("markets removed from invalid market map", zap.String("markets", strings.Join(removedMarkets, " ")))
}

// Update the oracle with the latest market map iff the market map has changed.
updated := result.Value.MarketMap
updated := validSubset
if o.marketMap.Equal(updated) {
o.logger.Debug("market map has not changed")
continue
Expand All @@ -55,12 +79,15 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) {
continue
}

o.lastUpdated = result.Value.GetLastUpdated()

// Write the market map to the configured path.
if err := o.WriteMarketMap(); err != nil {
o.logger.Error("failed to write market map", zap.Error(err))
}

o.logger.Info("updated oracle with new market map", zap.Any("market_map", updated))
o.logger.Info("updated oracle with new market map")
o.logger.Debug("updated oracle with new market map", zap.Any("market_map", updated))
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions oracle/market_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,46 @@ func TestListenForMarketMapUpdates(t *testing.T) {
// Clean up the file.
require.NoError(t, os.Remove(path))
})
t.Run("can update providers with a new market map and handle partially invalid state", func(t *testing.T) {
chains := []mmclienttypes.Chain{{ChainID: "dYdX"}}
handler, factory := marketMapperFactory(t, chains)
handler.On("CreateURL", mock.Anything).Return("", nil).Maybe()

resolved := make(mmclienttypes.ResolvedMarketMap)
resp := mmtypes.MarketMapResponse{
MarketMap: partialInvalidMarketMap,
}
resolved[chains[0]] = mmclienttypes.NewMarketMapResult(&resp, time.Now())
handler.On("ParseResponse", mock.Anything, mock.Anything).Return(mmclienttypes.NewMarketMapResponse(resolved, nil)).Maybe()

o, err := oracle.New(
oracleCfgWithMockMapper,
noOpPriceAggregator{},
oracle.WithLogger(logger),
oracle.WithMarketMapperFactory(factory),
oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory),
oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory),
)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
err := o.Start(ctx)
if !errors.Is(err, context.Canceled) {
t.Errorf("Start() should have returned context.Canceled error")
}
}()

// Wait for the oracle to start.
time.Sleep(5000 * time.Millisecond)

// The oracle should have been updated.
require.Equal(t, validMarketMapSubset, o.GetMarketMap())

// Stop the oracle.
cancel()
o.Stop()
})
}
2 changes: 2 additions & 0 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type OracleImpl struct { //nolint:revive
cfg config.OracleConfig
// marketMap is the market map that the oracle is using.
marketMap mmtypes.MarketMap
// lastUpdated is the field in the marketmap module tracking the last block at which an update was posted
lastUpdated uint64
// writeTo is a path to write the market map to.
writeTo string

Expand Down
11 changes: 3 additions & 8 deletions oracle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@ func (o *OracleImpl) UpdateMarketMap(marketMap mmtypes.MarketMap) error {
o.mut.Lock()
defer o.mut.Unlock()

validSubset, err := marketMap.GetValidSubset()
if err != nil {
if err := marketMap.ValidateBasic(); err != nil {
o.logger.Error("failed to validate market map", zap.Error(err))
return err
}

if len(validSubset.Markets) == 0 {
o.logger.Warn("market map update produced no valid markets to fetch")
}

// Iterate over all existing price providers and update their market maps.
for name, state := range o.priceProviders {
providerTickers, err := types.ProviderTickersFromMarketMap(name, validSubset)
providerTickers, err := types.ProviderTickersFromMarketMap(name, marketMap)
if err != nil {
o.logger.Error("failed to create provider market map", zap.String("provider", name), zap.Error(err))
return err
Expand All @@ -47,7 +42,7 @@ func (o *OracleImpl) UpdateMarketMap(marketMap mmtypes.MarketMap) error {
o.priceProviders[name] = updatedState
}

o.marketMap = validSubset
o.marketMap = marketMap
if o.aggregator != nil {
o.aggregator.UpdateMarketMap(o.marketMap)
}
Expand Down
60 changes: 2 additions & 58 deletions oracle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestUpdateWithMarketMap(t *testing.T) {
t.Run("bad market map is not rejected", func(t *testing.T) {
t.Run("bad market map is rejected", func(t *testing.T) {
orc, err := oracle.New(
oracleCfg,
noOpPriceAggregator{},
Expand All @@ -35,7 +35,7 @@ func TestUpdateWithMarketMap(t *testing.T) {
"bad": {},
},
})
require.NoError(t, err)
require.Error(t, err)

o.Stop()
})
Expand Down Expand Up @@ -626,60 +626,4 @@ func TestUpdateProviderState(t *testing.T) {
500*time.Millisecond,
)
})

t.Run("can update the market map with partial failure on NormalizeBy", func(t *testing.T) {
orc, err := oracle.New(
oracleCfg,
noOpPriceAggregator{},
oracle.WithLogger(logger),
oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory),
oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory),
)
require.NoError(t, err)
o := orc.(*oracle.OracleImpl)
require.NoError(t, o.Init(context.TODO()))

providers := o.GetProviderState()
require.Len(t, providers, 3)

// Update the oracle's market map.
require.NoError(t, o.UpdateMarketMap(partialInvalidMarketMap))

providers = o.GetProviderState()

cbTickers, err := types.ProviderTickersFromMarketMap(coinbase.Name, validMarketMapSubset)
require.NoError(t, err)

// Check the state after the update.
coinbaseState, ok := providers[coinbase.Name]
require.True(t, ok)
checkProviderState(
t,
cbTickers,
coinbase.Name,
providertypes.API,
false,
coinbaseState,
)

okxTickers, err := types.ProviderTickersFromMarketMap(okx.Name, validMarketMapSubset)
require.NoError(t, err)

okxState, ok := providers[okx.Name]
require.True(t, ok)
checkProviderState(
t,
okxTickers,
okx.Name,
providertypes.WebSockets,
false,
okxState,
)

binanceState, ok := providers[binance.Name]
require.True(t, ok)
checkProviderState(t, nil, binance.Name, providertypes.API, false, binanceState)

o.Stop()
})
}
19 changes: 0 additions & 19 deletions providers/apis/marketmap/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,6 @@ func (f *MarketMapFetcher) Fetch(
)
}

// Validate the market map response.
//
// TODO: Add checks on the chain ID.
if err := resp.MarketMap.ValidateBasic(); err != nil {
f.logger.Info(
"invalid market map response from module",
zap.Any("market_map", resp.MarketMap),
zap.Error(err),
)

return types.NewMarketMapResponseWithErr(
chains,
providertypes.NewErrorWithCode(
fmt.Errorf("invalid market map response: %w", err),
providertypes.ErrorInvalidResponse,
),
)
}

resolved := make(types.ResolvedMarketMap)
resolved[chains[0]] = types.NewMarketMapResult(resp, time.Now())

Expand Down
16 changes: 11 additions & 5 deletions providers/apis/marketmap/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,28 @@ func TestFetch(t *testing.T) {
},
},
{
name: "errors when the market map response is invalid",
name: "does not error when the market map response is invalid",
chains: chains[:1],
client: func() mmtypes.QueryClient {
c := mocks.NewQueryClient(t)
c.On("MarketMap", mock.Anything, mock.Anything).Return(
&mmtypes.MarketMapResponse{
MarketMap: badMarketMap,
MarketMap: badMarketMap,
ChainId: chains[0].ChainID,
LastUpdated: 11,
},
nil,
)
return c
},
expected: types.MarketMapResponse{
UnResolved: types.UnResolvedMarketMap{
chains[0]: providertypes.UnresolvedResult{
ErrorWithCode: providertypes.NewErrorWithCode(fmt.Errorf("invalid market map response"), providertypes.ErrorAPIGeneral),
Resolved: types.ResolvedMarketMap{
chains[0]: types.MarketMapResult{
Value: &mmtypes.MarketMapResponse{
MarketMap: badMarketMap,
ChainId: chains[0].ChainID,
LastUpdated: 11,
},
},
},
},
Expand Down

0 comments on commit ef03f40

Please sign in to comment.