Skip to content

Commit

Permalink
Add Guard Clause
Browse files Browse the repository at this point in the history
  • Loading branch information
Neal Chambers committed Dec 5, 2024
1 parent 4c2488a commit 6aaa9f8
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions src/dali/abstract_ccxt_pair_converter_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@
DAYS_IN_WEEK: int = 7
MANY_YEARS_IN_THE_FUTURE: relativedelta = relativedelta(years=100)


class AssetPairAndHistoricalPrice(NamedTuple):
from_asset: str
to_asset: str
Expand Down Expand Up @@ -513,6 +514,18 @@ def find_historical_bar(self, from_asset: str, to_asset: str, timestamp: datetim
def find_historical_bars(
self, from_asset: str, to_asset: str, timestamp: datetime, exchange: str, all_bars: bool = False, timespan: str = _MINUTE
) -> Optional[List[HistoricalBar]]:

# Guard clause to pull from cache
if all_bars:
cached_bundle = self._get_bundle_from_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange))
if cached_bundle:
# If the last bar in the bundle is within the last week, return the bundle
if (datetime.now(timezone.utc) - cached_bundle[-1].timestamp).total_seconds() <= _TIME_GRANULARITY_STRING_TO_SECONDS[_ONE_WEEK]:
return cached_bundle
# If the last bar in the bundle is older than a week, we need to start from the next millisecond
# We will pull the rest later and add to this bundle
timestamp = cached_bundle[-1].timestamp + timedelta(milliseconds=1)

# Stage 1 Initialization
result: List[HistoricalBar] = []
self.__transaction_count += 1
Expand Down Expand Up @@ -561,18 +574,6 @@ def find_historical_bars(

within_last_week: bool = False

# Get bundles of bars if they exist, saving us from making a call to the API
if all_bars:
cached_bundle: Optional[List[HistoricalBar]] = self._get_bundle_from_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange))
if cached_bundle:
result.extend(cached_bundle)
timestamp = cached_bundle[-1].timestamp + timedelta(milliseconds=1)
ms_timestamp = int(timestamp.timestamp() * _MS_IN_SECOND)

# If the bundle of bars is within the last week, we don't need to pull new optimization data.
if result and (datetime.now(timezone.utc) - result[-1].timestamp).total_seconds() > _TIME_GRANULARITY_STRING_TO_SECONDS[_ONE_WEEK]:
within_last_week = True

while (retry_count < len(_TIME_GRANULARITY_DICT.get(exchange, _TIME_GRANULARITY))) and not within_last_week:
timeframe: str = _TIME_GRANULARITY_DICT.get(exchange, _TIME_GRANULARITY)[retry_count]
request_count: int = 0
Expand Down Expand Up @@ -713,6 +714,8 @@ def find_historical_bars(
)
)
elif all_bars:
if cached_bundle:
result = cached_bundle + result
self._add_bundle_to_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange), result)
break # If historical_data is empty we have hit the end of records and need to return
else:
Expand All @@ -726,20 +729,14 @@ def find_historical_bars(
def _initialize_retry_count(self, exchange: str, timespan: str) -> int:
if timespan not in _TIME_GRANULARITY_SET:
raise RP2ValueError(f"Internal error: Invalid time span '{timespan}' passed to find_historical_bars.")

granularity = (
_TIME_GRANULARITY_DICT[exchange]
if exchange in _NONSTANDARD_GRANULARITY_EXCHANGE_SET
else _TIME_GRANULARITY
)

granularity = _TIME_GRANULARITY_DICT[exchange] if exchange in _NONSTANDARD_GRANULARITY_EXCHANGE_SET else _TIME_GRANULARITY

if timespan not in granularity:
raise RP2ValueError(
f"Internal error: Time span '{timespan}' is not valid for exchange '{exchange}'."
)
raise RP2ValueError(f"Internal error: Time span '{timespan}' is not valid for exchange '{exchange}'.")

return granularity.index(timespan)

def _add_alternative_markets(self, graph: MappedGraph[str], current_markets: Dict[str, List[str]]) -> None:
for base_asset, quote_asset in _ALT_MARKET_BY_BASE_DICT.items():
alt_market = base_asset + quote_asset
Expand Down

0 comments on commit 6aaa9f8

Please sign in to comment.