From 87e96c1f60c0b256a133e2c88049df53d6bacb15 Mon Sep 17 00:00:00 2001 From: Krish Suchak <42231639+suchak1@users.noreply.github.com> Date: Sun, 21 Feb 2021 18:07:10 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=86=93=20feat:=20handle=20polygon=20api?= =?UTF-8?q?=20changes=20=F0=9F=86=93=20(#92)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * changes * upgrade deps * fix * add print statements if tests are skipped during update --- .github/workflows/build.yml | 2 +- .github/workflows/dividends.yml | 2 +- .github/workflows/intraday.yml | 5 +-- .github/workflows/ohlc.yml | 3 +- .github/workflows/sandbox.yml | 17 ++++---- .github/workflows/splits.yml | 6 +-- .github/workflows/unemployment.yml | 2 +- README.md | 2 +- requirements.txt | 8 ++-- scripts/update_hist_intraday.py | 46 ++++------------------ scripts/update_hist_ohlc.py | 38 ++++-------------- scripts/update_intraday.py | 44 +++++---------------- scripts/update_ohlc.py | 41 ++++---------------- scripts/upload_data.py | 13 ++++--- src/Broker.py | 4 +- src/Constants.py | 11 +++--- src/DataSource.py | 35 ++++++++++++----- src/Workflow.py | 51 ++++++++++++++++++++++++ test/test_DataSource.py | 62 ++++++++++++++++++++++-------- test/test_TimeMachine.py | 13 ++++++- test/test_Workflow.py | 19 +++++++++ 21 files changed, 224 insertions(+), 200 deletions(-) create mode 100644 src/Workflow.py create mode 100644 test/test_Workflow.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4c44e37d..40ff589d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -15,7 +15,7 @@ env: IEXCLOUD: ${{ secrets.IEXCLOUD_SANDBOX }} STOCKTWITS: ${{ secrets.STOCKTWITS }} BLS: ${{ secrets.BLS }} - APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }} + POLYGON: ${{ secrets.POLYGON }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} diff --git a/.github/workflows/dividends.yml b/.github/workflows/dividends.yml index da412906..3550a1c2 100644 --- a/.github/workflows/dividends.yml +++ b/.github/workflows/dividends.yml @@ -43,5 +43,5 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} S3_BUCKET: ${{ secrets.S3_BUCKET }} - APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }} + POLYGON: ${{ secrets.POLYGON }} run: python scripts/update_dividends.py diff --git a/.github/workflows/intraday.yml b/.github/workflows/intraday.yml index 57c7c20a..92162f7b 100644 --- a/.github/workflows/intraday.yml +++ b/.github/workflows/intraday.yml @@ -5,8 +5,8 @@ name: Intraday on: schedule: - - cron: "0 16 * * *" - # 12pm EST every day (for crypto) + - cron: "30 19 * * *" + # 2:30pm EST every day (for crypto) jobs: build: @@ -43,6 +43,5 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} S3_BUCKET: ${{ secrets.S3_BUCKET }} - APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }} POLYGON: ${{ secrets.POLYGON }} run: python scripts/update_intraday.py diff --git a/.github/workflows/ohlc.yml b/.github/workflows/ohlc.yml index 1562f5a3..e44cd2f8 100644 --- a/.github/workflows/ohlc.yml +++ b/.github/workflows/ohlc.yml @@ -5,7 +5,7 @@ name: OHLC on: schedule: - - cron: "0 15 * * *" + - cron: "0 16 * * *" # 11am EST every day (for crypto) jobs: @@ -43,6 +43,5 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} S3_BUCKET: ${{ secrets.S3_BUCKET }} - APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }} POLYGON: ${{ secrets.POLYGON }} run: python scripts/update_ohlc.py diff --git a/.github/workflows/sandbox.yml b/.github/workflows/sandbox.yml index 05ce2ff4..1e5013b4 100644 --- a/.github/workflows/sandbox.yml +++ b/.github/workflows/sandbox.yml @@ -15,7 +15,6 @@ env: IEXCLOUD: ${{ secrets.IEXCLOUD_SANDBOX }} STOCKTWITS: ${{ secrets.STOCKTWITS }} S3_BUCKET: ${{ secrets.S3_DEV_BUCKET }} - APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }} POLYGON: ${{ secrets.POLYGON }} BLS: ${{ secrets.BLS }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} @@ -66,20 +65,20 @@ jobs: - name: Update symbols run: python scripts/update_symbols.py - - name: Update dividends - run: python scripts/update_dividends.py + # - name: Update dividends + # run: python scripts/update_dividends.py - - name: Update splits - run: python scripts/update_splits.py + # - name: Update splits + # run: python scripts/update_splits.py - - name: Update OHLC - run: python scripts/update_ohlc.py + # - name: Update OHLC + # run: python scripts/update_ohlc.py - name: Update social sentiment run: python scripts/update_sentiment.py - - name: Update intraday - run: python scripts/update_intraday.py + # - name: Update intraday + # run: python scripts/update_intraday.py - name: Update unemployment run: python scripts/update_unrate.py diff --git a/.github/workflows/splits.yml b/.github/workflows/splits.yml index e9aa1e10..351f7b7e 100644 --- a/.github/workflows/splits.yml +++ b/.github/workflows/splits.yml @@ -5,8 +5,8 @@ name: Splits on: schedule: - - cron: "30 12 1 * *" - # 8:30am EST + - cron: "30 13 1 * *" + # 9:30am EST jobs: build: @@ -43,5 +43,5 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} S3_BUCKET: ${{ secrets.S3_BUCKET }} - APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }} + POLYGON: ${{ secrets.POLYGON }} run: python scripts/update_splits.py diff --git a/.github/workflows/unemployment.yml b/.github/workflows/unemployment.yml index 60256154..95d35c35 100644 --- a/.github/workflows/unemployment.yml +++ b/.github/workflows/unemployment.yml @@ -1,7 +1,7 @@ # This workflow will automatically update data files # For more information see: https://help.github.com/en/actions/reference/events-that-trigger-workflows#scheduled-events-schedule -name: Unemployment Rate +name: Unemployment on: schedule: diff --git a/README.md b/README.md index 597525e7..7b00f02f 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ Using Robinhood 2FA, we can simply provide our MFA one-time password in the `.en - [ ] Cash Flow - [ ] CEO Compensation - [ ] Government / Macro - - [x] [![Unemployment Rate]()](https://github.com/suchak1/hyperdrive/actions?query=workflow%3A%22Unemployment+Rate%22) + - [x] [![Unemployment]()](https://github.com/suchak1/hyperdrive/actions?query=workflow%3AUnemployment) - [ ] Real GDP - [ ] US Recession Probabilities - [ ] Market diff --git a/requirements.txt b/requirements.txt index 6ed1af9f..bc0721c7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ python-dotenv == 0.15.0 -pandas == 1.2.1 -robin-stocks == 1.7.0 -boto3 == 1.16.63 +pandas == 1.2.2 +robin-stocks == 2.0.0 +boto3 == 1.17.12 polygon-api-client == 0.1.9 -pytz == 2020.5 +pytz == 2021.1 diff --git a/scripts/update_hist_intraday.py b/scripts/update_hist_intraday.py index b39f0814..7044b7f6 100644 --- a/scripts/update_hist_intraday.py +++ b/scripts/update_hist_intraday.py @@ -2,23 +2,19 @@ import sys from time import sleep from datetime import datetime -from multiprocessing import Process sys.path.append('src') from DataSource import IEXCloud, Polygon # noqa autopep8 -from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS, POLY_CRYPTO_DELAY # noqa autopep8 +from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS # noqa autopep8 -poly_stocks = Polygon() -poly_crypto = Polygon(os.environ['POLYGON']) -stock_symbols = poly_stocks.get_symbols() -# last_completed_stock = 'FOXA' -# stock_symbols = stock_symbols[stock_symbols.index(last_completed_stock)+1:] -# [250:] +poly = Polygon(os.environ['POLYGON']) +stock_symbols = poly.get_symbols() crypto_symbols = POLY_CRYPTO_SYMBOLS +all_symbols = stock_symbols + crypto_symbols -def update_poly_stocks_intraday(): - for symbol in stock_symbols: +def update_poly_intraday(): + for symbol in all_symbols: now = datetime.now() hour = now.hour while hour in set(range(8, 12)): @@ -28,7 +24,7 @@ def update_poly_stocks_intraday(): hour = datetime.now().hour filenames = [] try: - filenames = poly_stocks.save_intraday( + filenames = poly.save_intraday( symbol=symbol, timeframe='30d', retries=1) except Exception as e: print(f'Polygon.io intraday update failed for {symbol}.') @@ -38,32 +34,6 @@ def update_poly_stocks_intraday(): for filename in filenames: if os.path.exists(filename): os.remove(filename) -# Crypto pass -def update_poly_crypto_intraday(): - - for idx, symbol in enumerate(crypto_symbols): - filenames = [] - try: - filenames = poly_crypto.save_intraday( - symbol=symbol, timeframe='9m', - delay=POLY_CRYPTO_DELAY, retries=2) - except Exception as e: - print(f'Polygon.io intraday update failed for {symbol}.') - print(e) - finally: - if CI: - for filename in filenames: - if os.path.exists(filename): - os.remove(filename) - - if idx != len(crypto_symbols) - 1: - sleep(POLY_CRYPTO_DELAY) - - -p2 = Process(target=update_poly_stocks_intraday) -# p3 = Process(target=update_poly_crypto_intraday) -if __name__ == '__main__': - p2.start() -# p3.start() +update_poly_intraday() diff --git a/scripts/update_hist_ohlc.py b/scripts/update_hist_ohlc.py index 1156a384..c219ee3b 100644 --- a/scripts/update_hist_ohlc.py +++ b/scripts/update_hist_ohlc.py @@ -1,18 +1,16 @@ import os import sys -from time import sleep from multiprocessing import Process sys.path.append('src') from DataSource import IEXCloud, Polygon # noqa autopep8 -from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS, POLY_CRYPTO_DELAY # noqa autopep8 +from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS # noqa autopep8 iex = IEXCloud() -poly_stocks = Polygon() -poly_crypto = Polygon(os.environ['POLYGON']) +poly = Polygon(os.environ['POLYGON']) stock_symbols = iex.get_symbols() -# [250:] crypto_symbols = POLY_CRYPTO_SYMBOLS +all_symbols = stock_symbols + crypto_symbols timeframe = '2m' # Double redundancy @@ -34,27 +32,12 @@ def update_iex_ohlc(): # 2nd pass -def update_poly_stocks_ohlc(): - for symbol in stock_symbols: - filename = PathFinder().get_ohlc_path( - symbol=symbol, provider=poly_stocks.provider) - try: - poly_stocks.save_ohlc(symbol=symbol, timeframe=timeframe) - except Exception as e: - print(f'Polygon.io OHLC update failed for {symbol}.') - print(e) - finally: - if CI and os.path.exists(filename): - os.remove(filename) -# Crypto pass - - -def update_poly_crypto_ohlc(): - for idx, symbol in enumerate(crypto_symbols): +def update_poly_ohlc(): + for symbol in all_symbols: filename = PathFinder().get_ohlc_path( - symbol=symbol, provider=poly_crypto.provider) + symbol=symbol, provider=poly.provider) try: - poly_crypto.save_ohlc(symbol=symbol, timeframe=timeframe) + poly.save_ohlc(symbol=symbol, timeframe=timeframe) except Exception as e: print(f'Polygon.io OHLC update failed for {symbol}.') print(e) @@ -62,13 +45,8 @@ def update_poly_crypto_ohlc(): if CI and os.path.exists(filename): os.remove(filename) - if idx != len(crypto_symbols) - 1: - sleep(POLY_CRYPTO_DELAY) - p1 = Process(target=update_iex_ohlc) -p2 = Process(target=update_poly_stocks_ohlc) -p3 = Process(target=update_poly_crypto_ohlc) +p2 = Process(target=update_poly_ohlc) p1.start() p2.start() -p3.start() diff --git a/scripts/update_intraday.py b/scripts/update_intraday.py index ef2246aa..e3866a98 100644 --- a/scripts/update_intraday.py +++ b/scripts/update_intraday.py @@ -1,18 +1,16 @@ import os import sys -from time import sleep from multiprocessing import Process sys.path.append('src') from DataSource import IEXCloud, Polygon # noqa autopep8 -from Constants import PathFinder, POLY_CRYPTO_DELAY # noqa autopep8 +from Constants import PathFinder, POLY_CRYPTO_SYMBOLS, FEW_DAYS # noqa autopep8 import Constants as C # noqa autopep8 iex = IEXCloud() -poly_stocks = Polygon() -poly_crypto = Polygon(os.environ['POLYGON']) +poly = Polygon(os.environ['POLYGON']) stock_symbols = iex.get_symbols() -crypto_symbols = C.POLY_CRYPTO_SYMBOLS -few_days = '3d' +crypto_symbols = POLY_CRYPTO_SYMBOLS +all_symbols = stock_symbols + crypto_symbols # Double redundancy @@ -37,31 +35,12 @@ def update_iex_intraday(): # 2nd pass -def update_poly_stocks_intraday(): - for symbol in stock_symbols: - filenames = [] - try: - filenames = poly_stocks.save_intraday( - symbol=symbol, timeframe=few_days, - retries=1 if C.TEST else C.DEFAULT_RETRIES) - except Exception as e: - print(f'Polygon.io intraday update failed for {symbol}.') - print(e) - finally: - if C.CI: - for filename in filenames: - if os.path.exists(filename): - os.remove(filename) -# Crypto pass - - -def update_poly_crypto_intraday(): - for idx, symbol in enumerate(crypto_symbols): +def update_poly_intraday(): + for symbol in all_symbols: filenames = [] try: - filenames = poly_crypto.save_intraday( - symbol=symbol, timeframe=few_days, - retries=1 if C.TEST else C.DEFAULT_RETRIES) + filenames = poly.save_intraday( + symbol=symbol, timeframe=FEW_DAYS, retries=1) except Exception as e: print(f'Polygon.io intraday update failed for {symbol}.') print(e) @@ -71,13 +50,8 @@ def update_poly_crypto_intraday(): if os.path.exists(filename): os.remove(filename) - if idx != len(crypto_symbols) - 1: - sleep(POLY_CRYPTO_DELAY) - p1 = Process(target=update_iex_intraday) -p2 = Process(target=update_poly_stocks_intraday) -p3 = Process(target=update_poly_crypto_intraday) +p2 = Process(target=update_poly_intraday) p1.start() p2.start() -p3.start() diff --git a/scripts/update_ohlc.py b/scripts/update_ohlc.py index aaa9b5a0..0a3e30ce 100644 --- a/scripts/update_ohlc.py +++ b/scripts/update_ohlc.py @@ -1,18 +1,16 @@ import os import sys -from time import sleep from multiprocessing import Process sys.path.append('src') from DataSource import IEXCloud, Polygon # noqa autopep8 -from Constants import PathFinder, POLY_CRYPTO_DELAY # noqa autopep8 +from Constants import PathFinder, POLY_CRYPTO_SYMBOLS, FEW_DAYS # noqa autopep8 import Constants as C # noqa autopep8 iex = IEXCloud() -poly_stocks = Polygon() -poly_crypto = Polygon(os.environ['POLYGON']) +poly = Polygon(os.environ['POLYGON']) stock_symbols = iex.get_symbols() -crypto_symbols = C.POLY_CRYPTO_SYMBOLS -few_days = '3d' +crypto_symbols = POLY_CRYPTO_SYMBOLS +all_symbols = stock_symbols + crypto_symbols # Double redundancy @@ -35,44 +33,21 @@ def update_iex_ohlc(): # 2nd pass -def update_poly_stocks_ohlc(): +def update_poly_ohlc(): for symbol in stock_symbols: try: - poly_stocks.save_ohlc(symbol=symbol, timeframe=few_days, - retries=1 if C.TEST else C.DEFAULT_RETRIES) + poly.save_ohlc(symbol=symbol, timeframe=FEW_DAYS, retries=1) except Exception as e: print(f'Polygon.io OHLC update failed for {symbol}.') print(e) finally: filename = PathFinder().get_ohlc_path( - symbol=symbol, provider=poly_stocks.provider) + symbol=symbol, provider=poly.provider) if C.CI and os.path.exists(filename): os.remove(filename) -# Crypto pass - - -def update_poly_crypto_ohlc(): - - for idx, symbol in enumerate(crypto_symbols): - try: - poly_crypto.save_ohlc(symbol=symbol, timeframe=few_days, - retries=1 if C.TEST else C.DEFAULT_RETRIES) - except Exception as e: - print(f'Polygon.io OHLC update failed for {symbol}.') - print(e) - finally: - filename = PathFinder().get_ohlc_path( - symbol=symbol, provider=poly_crypto.provider) - if C.CI and os.path.exists(filename): - os.remove(filename) - - if idx != len(crypto_symbols) - 1: - sleep(POLY_CRYPTO_DELAY) p1 = Process(target=update_iex_ohlc) -p2 = Process(target=update_poly_stocks_ohlc) -p3 = Process(target=update_poly_crypto_ohlc) +p2 = Process(target=update_poly_ohlc) p1.start() p2.start() -p3.start() diff --git a/scripts/upload_data.py b/scripts/upload_data.py index f3ce129c..22508993 100644 --- a/scripts/upload_data.py +++ b/scripts/upload_data.py @@ -7,13 +7,14 @@ store = Store() poly = Polygon() - -to_download = ['DUK', 'FOX', 'GD', 'GE', 'GILD', 'GLD', 'GM', 'HD', 'INSG'] +# stocks to download all 15+ years of data +to_download = [] # s3://hyperdrive.pro/data/intraday/polygon/ -symbols = ['IBM', 'ICLN', 'INO', 'INTC', - 'IPO', 'IQ', 'ISRG'] -# F, G, H, I + +# stocks to update (last month of data) +symbols = [] + if __name__ == '__main__': for symbol in symbols: store.upload_dir(path=f'data/intraday/polygon/{symbol}') @@ -25,6 +26,6 @@ for symbol in to_download: poly.save_intraday( symbol=symbol, - timeframe='6250d', + timeframe='6300d', retries=1 ) diff --git a/src/Broker.py b/src/Broker.py index 234f6423..30788edb 100644 --- a/src/Broker.py +++ b/src/Broker.py @@ -1,6 +1,6 @@ import os import pyotp -import robin_stocks as rh +import robin_stocks.robinhood as rh from dotenv import load_dotenv, find_dotenv import pandas as pd from Constants import PathFinder @@ -65,7 +65,7 @@ def save_symbols(self): C.SYMBOL: symbols, C.NAME: names }) - self.writer.update_csv(self.finder.get_symbols_path(), df) + self.writer.save_csv(self.finder.get_symbols_path(), df) def get_holdings(self): if not hasattr(self, 'holdings'): diff --git a/src/Constants.py b/src/Constants.py index 190adfd2..badcfe38 100644 --- a/src/Constants.py +++ b/src/Constants.py @@ -85,13 +85,14 @@ def get_env_bool(var_name): SENTIMENT_SYMBOLS_IGNORE = { 'SPYD', 'VWDRY', 'BPMP', 'FOX', 'YYY', 'SDIV', - 'DIV', 'SHECY', 'PALL', - 'DWDP', 'TFCF', 'SPAR', - 'TMUSR', 'OXY+', 'BNTX^'} + 'DIV', 'SHECY', 'PALL' +} -DEFAULT_RETRIES = 3 +DEFAULT_RETRIES = 2 DEFAULT_DELAY = 2 -POLY_CRYPTO_DELAY = 15 +POLY_FREE_DELAY = 13 +FEW = 3 +FEW_DAYS = str(FEW) + 'd' class PathFinder: diff --git a/src/DataSource.py b/src/DataSource.py index f56099a4..10a031c3 100644 --- a/src/DataSource.py +++ b/src/DataSource.py @@ -1,6 +1,6 @@ import os import requests -from time import sleep +from time import sleep, time import pandas as pd from polygon import RESTClient from dotenv import load_dotenv, find_dotenv @@ -323,7 +323,7 @@ class IEXCloud(MarketData): def __init__(self): super().__init__() self.base = 'https://cloud.iexapis.com' - self.version = 'stable' + self.version = 'v1' self.token = os.environ['IEXCLOUD'] self.provider = 'iexcloud' @@ -509,14 +509,26 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True): class Polygon(MarketData): - def __init__(self, token=os.environ.get('APCA_API_KEY_ID')): + def __init__(self, token=os.environ.get('POLYGON'), free=True): super().__init__() self.client = RESTClient(token) self.provider = 'polygon' + self.free = free + + def obey_free_limit(self): + if self.free and hasattr(self, 'last_api_call_time'): + time_since_last_call = time() - self.last_api_call_time + sleep(C.POLY_FREE_DELAY - time_since_last_call) + + def log_api_call_time(self): + self.last_api_call_time = time() + print(time()) def get_dividends(self, **kwargs): def _get_dividends(symbol, timeframe='max'): + self.obey_free_limit() response = self.client.reference_stock_dividends(symbol) + self.log_api_call_time() raw = pd.DataFrame(response.results) df = self.standardize_dividends(symbol, raw) return self.reader.data_in_timeframe(df, C.EX, timeframe) @@ -524,7 +536,9 @@ def _get_dividends(symbol, timeframe='max'): def get_splits(self, **kwargs): def _get_splits(symbol, timeframe='max'): + self.obey_free_limit() response = self.client.reference_stock_splits(symbol) + self.log_api_call_time() raw = pd.DataFrame(response.results) df = self.standardize_splits(symbol, raw) return self.reader.data_in_timeframe(df, C.EX, timeframe) @@ -535,15 +549,18 @@ def _get_ohlc(symbol, timeframe='max'): is_crypto = symbol.find('X%3A') == 0 formatted_start, formatted_end = self.traveller.convert_dates( timeframe) + self.obey_free_limit() response = self.client.stocks_equities_aggregates( symbol, 1, 'day', from_=formatted_start, to=formatted_end, unadjusted=False - ).results + ) + self.log_api_call_time() + raw = response.results columns = {'t': 'date', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 'vw': 'average', 'n': 'trades'} - df = pd.DataFrame(response).rename(columns=columns) + df = pd.DataFrame(raw).rename(columns=columns) if is_crypto: df['date'] = pd.to_datetime( df['date'], unit='ms') @@ -566,27 +583,25 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True): raise Exception(f'No dates in timeframe: {timeframe}.') for idx, date in enumerate(dates): + self.obey_free_limit() response = self.client.stocks_equities_aggregates( symbol, min, 'minute', from_=date, to=date, unadjusted=False ) + self.log_api_call_time() if hasattr(response, 'results'): response = response.results else: - if is_crypto and idx != len(dates) - 1: - sleep(C.POLY_CRYPTO_DELAY) continue columns = {'t': 'date', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume', 'vw': 'average', 'n': 'trades'} df = pd.DataFrame(response).rename(columns=columns) - if symbol.find('X%3A') == 0: + if is_crypto: df['date'] = pd.to_datetime( df['date'], unit='ms') - if idx != len(dates) - 1: - sleep(C.POLY_CRYPTO_DELAY) else: df['date'] = pd.to_datetime( df['date'], unit='ms').dt.tz_localize( diff --git a/src/Workflow.py b/src/Workflow.py new file mode 100644 index 00000000..4129e9ba --- /dev/null +++ b/src/Workflow.py @@ -0,0 +1,51 @@ +import re +import sys +from datetime import datetime, timedelta +sys.path.append('src') +from DataSource import MarketData # noqa +from Constants import POLY_FREE_DELAY, FEW, POLY_CRYPTO_SYMBOLS # noqa + + +class Flow: + def get_workflow_start_time(self, workflow_name): + with open(f'.github/workflows/{workflow_name}.yml') as file: + workflow_content = file.read() + line_pattern = '- cron: "(.*)"' + try: + cron_line = re.search(line_pattern, workflow_content).group(1) + except AttributeError: + raise Exception( + f"{workflow_name}.yml doesn't have a scheduled cron job") + + now = datetime.utcnow() + default_times = [now.minute, now.hour, now.day, now.month] + times = [default_times[idx] if time == + '*' else int(time) for idx, time in enumerate( + cron_line.split(' ')[:-1])] + + minute, hour, day, month = times + return datetime(now.year, month, day, hour, minute) + + def is_workflow_running(self, workflow_name, buffer_min=30): + md = MarketData() + start_time = self.get_workflow_start_time(workflow_name) + num_stock = len(md.get_symbols()) + num_crypto = len(POLY_CRYPTO_SYMBOLS) + duration = timedelta(seconds=POLY_FREE_DELAY) + now = datetime.utcnow() + + if workflow_name in {'ohlc', 'intraday'}: + duration *= (num_stock + num_crypto) * FEW + elif workflow_name in {'dividends', 'splits'}: + duration *= num_stock + else: + return False + + buffer = timedelta(minutes=buffer_min) + return (now < start_time + duration + buffer and + now > start_time - buffer) + + def is_any_workflow_running(self): + workflows = ['ohlc', 'intraday', 'dividends', 'splits'] + return any( + [self.is_workflow_running(workflow) for workflow in workflows]) diff --git a/test/test_DataSource.py b/test/test_DataSource.py index 8b2b74eb..2bd947e8 100644 --- a/test/test_DataSource.py +++ b/test/test_DataSource.py @@ -1,17 +1,20 @@ import os import sys -from time import sleep +from time import sleep, time from random import choice import pandas as pd sys.path.append('src') from DataSource import MarketData, IEXCloud, Polygon, StockTwits, LaborStats # noqa autopep8 import Constants as C # noqa autopep8 +from Workflow import Flow # noqa autopep8 + md = MarketData() iex = IEXCloud() poly = Polygon() twit = StockTwits() bls = LaborStats() +flow = Flow() if not C.CI: iex.token = os.environ['IEXCLOUD_SANDBOX'] @@ -359,26 +362,55 @@ def test_init(self): assert hasattr(poly, 'provider') def test_get_dividends(self): - df = poly.get_dividends(symbol='AAPL', timeframe='5y') - assert {C.EX, C.PAY, C.DEC, C.DIV}.issubset(df.columns) - assert len(df) > 0 + if not flow.is_any_workflow_running(): + df = poly.get_dividends(symbol='AAPL', timeframe='5y') + assert {C.EX, C.PAY, C.DEC, C.DIV}.issubset(df.columns) + assert len(df) > 0 + else: + print('Skipping Polygon.io dividends test because update in progress') def test_get_splits(self): - df = poly.get_splits(symbol='AAPL') - assert {C.EX, C.DEC, C.RATIO}.issubset(df.columns) - assert len(df) > 0 + if not flow.is_any_workflow_running(): + df = poly.get_splits(symbol='AAPL') + assert {C.EX, C.DEC, C.RATIO}.issubset(df.columns) + assert len(df) > 0 + else: + print('Skipping Polygon.io splits test because update in progress') def test_get_ohlc(self): - df = poly.get_ohlc(symbol='AAPL', timeframe='1m') - assert {C.TIME, C.OPEN, C.HIGH, C.LOW, - C.CLOSE, C.VOL, C.AVG}.issubset(df.columns) - assert len(df) > 10 + if not flow.is_any_workflow_running(): + df = poly.get_ohlc(symbol='AAPL', timeframe='1m') + assert {C.TIME, C.OPEN, C.HIGH, C.LOW, + C.CLOSE, C.VOL, C.AVG}.issubset(df.columns) + assert len(df) > 10 + else: + print('Skipping Polygon.io OHLC test because update in progress') def test_get_intraday(self): - df = pd.concat(poly.get_intraday(symbol='AAPL', timeframe='1w')) - assert {C.TIME, C.OPEN, C.HIGH, C.LOW, - C.CLOSE, C.VOL}.issubset(df.columns) - assert len(df) > 1000 + if not flow.is_any_workflow_running(): + df = pd.concat(poly.get_intraday(symbol='AAPL', timeframe='1w')) + assert {C.TIME, C.OPEN, C.HIGH, C.LOW, + C.CLOSE, C.VOL}.issubset(df.columns) + assert len(df) > 1000 + else: + print( + 'Skipping Polygon.io intraday test because update in progress') + + def test_log_api_call_time(self): + if hasattr(poly, 'last_api_call_time'): + delattr(poly, 'last_api_call_time') + poly.log_api_call_time() + assert hasattr(poly, 'last_api_call_time') + + def test_obey_free_limit(self): + if hasattr(poly, 'last_api_call_time'): + delattr(poly, 'last_api_call_time') + + then = time() + poly.log_api_call_time() + poly.obey_free_limit() + now = time() + assert now - then > C.POLY_FREE_DELAY class TestStockTwits: diff --git a/test/test_TimeMachine.py b/test/test_TimeMachine.py index f309f02e..d85ce6f5 100644 --- a/test/test_TimeMachine.py +++ b/test/test_TimeMachine.py @@ -1,6 +1,7 @@ +import re import sys import pytest -from datetime import timedelta +from datetime import datetime, timedelta sys.path.append('src') from TimeMachine import TimeTraveller # noqa autopep8 @@ -24,5 +25,15 @@ def test_convert_delta(self): with pytest.raises(ValueError): traveller.convert_delta('0') + def test_convert_dates(self): + pattern = '[0-9]{4}-[0-9]{2}-[0-9]{2}' + start, end = traveller.convert_dates('7d') + assert re.match(pattern, start) + assert re.match(pattern, end) + def test_dates_in_range(self): assert len(traveller.dates_in_range('1m')) > 20 + + def test_combine_date_time(self): + dt = traveller.combine_date_time('2020-01-02', '09:30') + assert dt == datetime(2020, 1, 2, 9, 30) diff --git a/test/test_Workflow.py b/test/test_Workflow.py new file mode 100644 index 00000000..f3752e5d --- /dev/null +++ b/test/test_Workflow.py @@ -0,0 +1,19 @@ +import sys +from datetime import datetime +sys.path.append('src') +from Workflow import Flow # noqa autopep8 + +flow = Flow() +now = datetime.utcnow() + + +class TestWorkFlow: + def test_get_workflow_start_time(self): + assert flow.get_workflow_start_time('dividends') == datetime( + now.year, now.month, 1, 12) + + def test_is_workflow_running(self): + pass + + def test_is_any_workflow_running(self): + pass