Skip to content

Commit

Permalink
chore_: refactor market manager priceCache to use cache abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
seanstrom committed Sep 16, 2024
1 parent 1b2b89b commit 460f631
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 49 deletions.
92 changes: 45 additions & 47 deletions services/wallet/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type MarketValuesSnapshot struct {
type DataPerTokenAndCurrency = map[string]map[string]DataPoint
type MarketValuesPerCurrencyAndToken = map[string]map[string]MarketValuesSnapshot
type TokenMarketCache MarketValuesPerCurrencyAndToken
type TokenPriceCache DataPerTokenAndCurrency

type Manager struct {
feed *event.Feed
priceCache DataPerTokenAndCurrency
priceCacheLock sync.RWMutex
priceCache MarketCache[TokenPriceCache]
marketCache MarketCache[TokenMarketCache]
IsConnected bool
LastCheckedAt int64
Expand All @@ -54,7 +54,7 @@ func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Ma

return &Manager{
feed: feed,
priceCache: make(DataPerTokenAndCurrency),
priceCache: *NewCache(make(TokenPriceCache)),
marketCache: *NewCache(make(TokenMarketCache)),
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
Expand Down Expand Up @@ -263,69 +263,67 @@ func (pm *Manager) FetchPrices(symbols []string, currencies []string) (map[strin
}

func (pm *Manager) getCachedPricesFor(symbols []string, currencies []string) DataPerTokenAndCurrency {
prices := make(DataPerTokenAndCurrency)

for _, symbol := range symbols {
prices[symbol] = make(map[string]DataPoint)
for _, currency := range currencies {
prices[symbol][currency] = pm.priceCache[symbol][currency]
return Read(&pm.priceCache, func(tokenPriceCache TokenPriceCache) DataPerTokenAndCurrency {
prices := make(DataPerTokenAndCurrency)
for _, symbol := range symbols {
prices[symbol] = make(map[string]DataPoint)
for _, currency := range currencies {
prices[symbol][currency] = tokenPriceCache[symbol][currency]
}
}
}

return prices
return prices
})
}

func (pm *Manager) updatePriceCache(prices map[string]map[string]float64) {
pm.priceCacheLock.Lock()
defer pm.priceCacheLock.Unlock()

for token, pricesPerCurrency := range prices {
_, present := pm.priceCache[token]
if !present {
pm.priceCache[token] = make(map[string]DataPoint)
}
for currency, price := range pricesPerCurrency {
pm.priceCache[token][currency] = DataPoint{
Price: price,
UpdatedAt: time.Now().Unix(),
Write(&pm.priceCache, func(tokenPriceCache TokenPriceCache) TokenPriceCache {
for token, pricesPerCurrency := range prices {
_, present := tokenPriceCache[token]
if !present {
tokenPriceCache[token] = make(map[string]DataPoint)
}
for currency, price := range pricesPerCurrency {
tokenPriceCache[token][currency] = DataPoint{
Price: price,
UpdatedAt: time.Now().Unix(),
}
}
}
}
}

func (pm *Manager) GetCachedPrices() DataPerTokenAndCurrency {
pm.priceCacheLock.RLock()
defer pm.priceCacheLock.RUnlock()

return pm.priceCache
return tokenPriceCache
})
}

// Return cached price if present in cache and age is less than maxAgeInSeconds. Fetch otherwise.
func (pm *Manager) GetOrFetchPrices(symbols []string, currencies []string, maxAgeInSeconds int64) (DataPerTokenAndCurrency, error) {
symbolsToFetchMap := make(map[string]bool)
symbolsToFetch := make([]string, 0, len(symbols))
symbolsToFetch := Read(&pm.priceCache, func(tokenPriceCache TokenPriceCache) []string {
symbolsToFetchMap := make(map[string]bool)
symbolsToFetch := make([]string, 0, len(symbols))

now := time.Now().Unix()
now := time.Now().Unix()

for _, symbol := range symbols {
tokenPriceCache, ok := pm.GetCachedPrices()[symbol]
if !ok {
if !symbolsToFetchMap[symbol] {
symbolsToFetchMap[symbol] = true
symbolsToFetch = append(symbolsToFetch, symbol)
}
continue
}
for _, currency := range currencies {
if now-tokenPriceCache[currency].UpdatedAt > maxAgeInSeconds {
for _, symbol := range symbols {
tokenPriceCache, ok := tokenPriceCache[symbol]
if !ok {
if !symbolsToFetchMap[symbol] {
symbolsToFetchMap[symbol] = true
symbolsToFetch = append(symbolsToFetch, symbol)
}
break
continue
}
for _, currency := range currencies {
if now-tokenPriceCache[currency].UpdatedAt > maxAgeInSeconds {
if !symbolsToFetchMap[symbol] {
symbolsToFetchMap[symbol] = true
symbolsToFetch = append(symbolsToFetch, symbol)
}
break
}
}
}
}

return symbolsToFetch
})

if len(symbolsToFetch) > 0 {
_, err := pm.FetchPrices(symbolsToFetch, currencies)
Expand Down
6 changes: 6 additions & 0 deletions services/wallet/market/market_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,9 @@ func Write[T any](cache *MarketCache[T], writer func(store T) T) *MarketCache[T]
cache.store = writer(cache.store)
return cache
}

func (cache *MarketCache[T]) Get() T {
cache.lock.RLock()
defer cache.lock.RUnlock()
return cache.store
}
4 changes: 2 additions & 2 deletions services/wallet/market/market_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestPrice(t *testing.T) {
manager := setupMarketManager(t, []thirdparty.MarketDataProvider{priceProvider, priceProvider})

{
rst := manager.GetCachedPrices()
rst := manager.priceCache.Get()
require.Empty(t, rst)
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func TestPrice(t *testing.T) {
}
}

cache := manager.GetCachedPrices()
cache := manager.priceCache.Get()
for symbol, pricePerCurrency := range mockPrices {
for currency, price := range pricePerCurrency {
require.Equal(t, price, cache[symbol][currency].Price)
Expand Down

0 comments on commit 460f631

Please sign in to comment.