From 6aaa9f88c38266241d7b740079c3d4f4b85b9d16 Mon Sep 17 00:00:00 2001 From: Neal Chambers Date: Thu, 5 Dec 2024 08:54:28 +0900 Subject: [PATCH] Add Guard Clause --- .../abstract_ccxt_pair_converter_plugin.py | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/dali/abstract_ccxt_pair_converter_plugin.py b/src/dali/abstract_ccxt_pair_converter_plugin.py index 809f0a53..ba7690e1 100644 --- a/src/dali/abstract_ccxt_pair_converter_plugin.py +++ b/src/dali/abstract_ccxt_pair_converter_plugin.py @@ -247,6 +247,7 @@ DAYS_IN_WEEK: int = 7 MANY_YEARS_IN_THE_FUTURE: relativedelta = relativedelta(years=100) + class AssetPairAndHistoricalPrice(NamedTuple): from_asset: str to_asset: str @@ -513,6 +514,18 @@ def find_historical_bar(self, from_asset: str, to_asset: str, timestamp: datetim def find_historical_bars( self, from_asset: str, to_asset: str, timestamp: datetime, exchange: str, all_bars: bool = False, timespan: str = _MINUTE ) -> Optional[List[HistoricalBar]]: + + # Guard clause to pull from cache + if all_bars: + cached_bundle = self._get_bundle_from_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange)) + if cached_bundle: + # If the last bar in the bundle is within the last week, return the bundle + if (datetime.now(timezone.utc) - cached_bundle[-1].timestamp).total_seconds() <= _TIME_GRANULARITY_STRING_TO_SECONDS[_ONE_WEEK]: + return cached_bundle + # If the last bar in the bundle is older than a week, we need to start from the next millisecond + # We will pull the rest later and add to this bundle + timestamp = cached_bundle[-1].timestamp + timedelta(milliseconds=1) + # Stage 1 Initialization result: List[HistoricalBar] = [] self.__transaction_count += 1 @@ -561,18 +574,6 @@ def find_historical_bars( within_last_week: bool = False - # Get bundles of bars if they exist, saving us from making a call to the API - if all_bars: - cached_bundle: Optional[List[HistoricalBar]] = self._get_bundle_from_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange)) - if cached_bundle: - result.extend(cached_bundle) - timestamp = cached_bundle[-1].timestamp + timedelta(milliseconds=1) - ms_timestamp = int(timestamp.timestamp() * _MS_IN_SECOND) - - # If the bundle of bars is within the last week, we don't need to pull new optimization data. - if result and (datetime.now(timezone.utc) - result[-1].timestamp).total_seconds() > _TIME_GRANULARITY_STRING_TO_SECONDS[_ONE_WEEK]: - within_last_week = True - while (retry_count < len(_TIME_GRANULARITY_DICT.get(exchange, _TIME_GRANULARITY))) and not within_last_week: timeframe: str = _TIME_GRANULARITY_DICT.get(exchange, _TIME_GRANULARITY)[retry_count] request_count: int = 0 @@ -713,6 +714,8 @@ def find_historical_bars( ) ) elif all_bars: + if cached_bundle: + result = cached_bundle + result self._add_bundle_to_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange), result) break # If historical_data is empty we have hit the end of records and need to return else: @@ -726,20 +729,14 @@ def find_historical_bars( def _initialize_retry_count(self, exchange: str, timespan: str) -> int: if timespan not in _TIME_GRANULARITY_SET: raise RP2ValueError(f"Internal error: Invalid time span '{timespan}' passed to find_historical_bars.") - - granularity = ( - _TIME_GRANULARITY_DICT[exchange] - if exchange in _NONSTANDARD_GRANULARITY_EXCHANGE_SET - else _TIME_GRANULARITY - ) + + granularity = _TIME_GRANULARITY_DICT[exchange] if exchange in _NONSTANDARD_GRANULARITY_EXCHANGE_SET else _TIME_GRANULARITY if timespan not in granularity: - raise RP2ValueError( - f"Internal error: Time span '{timespan}' is not valid for exchange '{exchange}'." - ) + raise RP2ValueError(f"Internal error: Time span '{timespan}' is not valid for exchange '{exchange}'.") return granularity.index(timespan) - + def _add_alternative_markets(self, graph: MappedGraph[str], current_markets: Dict[str, List[str]]) -> None: for base_asset, quote_asset in _ALT_MARKET_BY_BASE_DICT.items(): alt_market = base_asset + quote_asset