Skip to content

Commit

Permalink
📊 fix(data): get last three days of ohlc data for accurate num_trades…
Browse files Browse the repository at this point in the history
… 📊 (#77)

* ohlc change

* 3d

* update hist_intraday script
  • Loading branch information
suchak1 authored Jan 13, 2021
1 parent 3463c23 commit bcb9af7
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 33 deletions.
59 changes: 59 additions & 0 deletions scripts/update_hist_intraday.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
import sys
from time import sleep
from multiprocessing import Process
sys.path.append('src')
from DataSource import IEXCloud, Polygon # noqa autopep8
from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS, POLY_CRYPTO_DELAY # noqa autopep8


poly_stocks = Polygon()
poly_crypto = Polygon(os.environ['POLYGON'])
stock_symbols = poly_stocks.get_symbols()
stock_symbols = stock_symbols[stock_symbols.index('DIS')+1:]
# [250:]
crypto_symbols = POLY_CRYPTO_SYMBOLS


def update_poly_stocks_intraday():
for symbol in stock_symbols:
filenames = []
try:
filenames = poly_stocks.save_intraday(
symbol=symbol, timeframe='21y')
except Exception as e:
print(f'Polygon.io intraday update failed for {symbol}.')
print(e)
finally:
if CI:
for filename in filenames:
if os.path.exists(filename):
os.remove(filename)
# Crypto pass


def update_poly_crypto_intraday():

for idx, symbol in enumerate(crypto_symbols):
filenames = []
try:
filenames = poly_crypto.save_intraday(
symbol=symbol, timeframe='2y',
delay=POLY_CRYPTO_DELAY, retries=1)
except Exception as e:
print(f'Polygon.io intraday update failed for {symbol}.')
print(e)
finally:
if CI:
for filename in filenames:
if os.path.exists(filename):
os.remove(filename)

if idx != len(crypto_symbols) - 1:
sleep(POLY_CRYPTO_DELAY)


# p2 = Process(target=update_poly_stocks_intraday)
p3 = Process(target=update_poly_crypto_intraday)
# p2.start()
p3.start()
16 changes: 9 additions & 7 deletions scripts/update_hist_ohlc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
from multiprocessing import Process
sys.path.append('src')
from DataSource import IEXCloud, Polygon # noqa autopep8
from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS # noqa autopep8
from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS, POLY_CRYPTO_DELAY # noqa autopep8


iex = IEXCloud()
poly_stocks = Polygon()
poly_crypto = Polygon(os.environ['POLYGON'])
stock_symbols = iex.get_symbols()[250:]
stock_symbols = iex.get_symbols()
# [250:]
crypto_symbols = POLY_CRYPTO_SYMBOLS
timeframe = '2m'
# Double redundancy

# 1st pass
Expand All @@ -22,7 +24,7 @@ def update_iex_ohlc():
filename = PathFinder().get_ohlc_path(
symbol=symbol, provider=iex.provider)
try:
iex.save_ohlc(symbol=symbol, timeframe='max')
iex.save_ohlc(symbol=symbol, timeframe=timeframe)
except Exception as e:
print(f'IEX Cloud OHLC update failed for {symbol}.')
print(e)
Expand All @@ -37,7 +39,7 @@ def update_poly_stocks_ohlc():
filename = PathFinder().get_ohlc_path(
symbol=symbol, provider=poly_stocks.provider)
try:
poly_stocks.save_ohlc(symbol=symbol, timeframe='max')
poly_stocks.save_ohlc(symbol=symbol, timeframe=timeframe)
except Exception as e:
print(f'Polygon.io OHLC update failed for {symbol}.')
print(e)
Expand All @@ -48,12 +50,12 @@ def update_poly_stocks_ohlc():


def update_poly_crypto_ohlc():
calls_per_min = 5

for idx, symbol in enumerate(crypto_symbols):
filename = PathFinder().get_ohlc_path(
symbol=symbol, provider=poly_crypto.provider)
try:
poly_crypto.save_ohlc(symbol=symbol, timeframe='max')
poly_crypto.save_ohlc(symbol=symbol, timeframe=timeframe)
except Exception as e:
print(f'Polygon.io OHLC update failed for {symbol}.')
print(e)
Expand All @@ -62,7 +64,7 @@ def update_poly_crypto_ohlc():
os.remove(filename)

if idx != len(crypto_symbols) - 1:
sleep(60 // calls_per_min + 5)
sleep(POLY_CRYPTO_DELAY)


p1 = Process(target=update_iex_ohlc)
Expand Down
42 changes: 23 additions & 19 deletions scripts/update_intraday.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
from multiprocessing import Process
sys.path.append('src')
from DataSource import IEXCloud, Polygon # noqa autopep8
from Constants import PathFinder # noqa autopep8
from Constants import PathFinder, POLY_CRYPTO_DELAY # noqa autopep8
import Constants as C # noqa autopep8

iex = IEXCloud()
poly_stocks = Polygon()
poly_crypto = Polygon(os.environ['POLYGON'])
stock_symbols = iex.get_symbols()
crypto_symbols = C.POLY_CRYPTO_SYMBOLS
yesterday = iex.traveller.dates_in_range('1d')[0]
few_days = '3d'
last_few_days = iex.traveller.dates_in_range(few_days)

# Double redundancy

# 1st pass
Expand All @@ -29,7 +31,7 @@ def update_iex_intraday():
finally:
filename = PathFinder().get_intraday_path(
symbol=symbol,
date=yesterday,
date=last_few_days[-1],
provider=iex.provider)
if C.CI and os.path.exists(filename):
os.remove(filename)
Expand All @@ -40,41 +42,43 @@ def update_poly_stocks_intraday():
for symbol in stock_symbols:
try:
poly_stocks.save_intraday(
symbol=symbol, timeframe='1d',
symbol=symbol, timeframe=few_days,
retries=1 if C.TEST else C.DEFAULT_RETRIES)
except Exception as e:
print(f'Polygon.io intraday update failed for {symbol}.')
print(e)
finally:
filename = PathFinder().get_intraday_path(
symbol=symbol,
date=yesterday,
provider=poly_stocks.provider)
if C.CI and os.path.exists(filename):
os.remove(filename)
for date in few_days:
filename = PathFinder().get_intraday_path(
symbol=symbol,
date=date,
provider=poly_stocks.provider)
if C.CI and os.path.exists(filename):
os.remove(filename)
# Crypto pass


def update_poly_crypto_intraday():
calls_per_min = 5

for idx, symbol in enumerate(crypto_symbols):
try:
poly_crypto.save_intraday(
symbol=symbol, timeframe='3d',
symbol=symbol, timeframe=few_days,
retries=1 if C.TEST else C.DEFAULT_RETRIES)
except Exception as e:
print(f'Polygon.io intraday update failed for {symbol}.')
print(e)
finally:
filename = PathFinder().get_intraday_path(
symbol=symbol,
date=yesterday,
provider=poly_crypto.provider)
if C.CI and os.path.exists(filename):
os.remove(filename)
for date in few_days:
filename = PathFinder().get_intraday_path(
symbol=symbol,
date=date,
provider=poly_crypto.provider)
if C.CI and os.path.exists(filename):
os.remove(filename)

if idx != len(crypto_symbols) - 1:
sleep(60 // calls_per_min + 5)
sleep(POLY_CRYPTO_DELAY)


p1 = Process(target=update_iex_intraday)
Expand Down
12 changes: 7 additions & 5 deletions scripts/update_ohlc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
from multiprocessing import Process
sys.path.append('src')
from DataSource import IEXCloud, Polygon # noqa autopep8
from Constants import PathFinder # noqa autopep8
from Constants import PathFinder, POLY_CRYPTO_DELAY # noqa autopep8
import Constants as C # noqa autopep8

iex = IEXCloud()
poly_stocks = Polygon()
poly_crypto = Polygon(os.environ['POLYGON'])
stock_symbols = iex.get_symbols()
crypto_symbols = C.POLY_CRYPTO_SYMBOLS
few_days = '3d'

# Double redundancy

# 1st pass
Expand All @@ -36,7 +38,7 @@ def update_iex_ohlc():
def update_poly_stocks_ohlc():
for symbol in stock_symbols:
try:
poly_stocks.save_ohlc(symbol=symbol, timeframe='1d',
poly_stocks.save_ohlc(symbol=symbol, timeframe=few_days,
retries=1 if C.TEST else C.DEFAULT_RETRIES)
except Exception as e:
print(f'Polygon.io OHLC update failed for {symbol}.')
Expand All @@ -50,10 +52,10 @@ def update_poly_stocks_ohlc():


def update_poly_crypto_ohlc():
calls_per_min = 5

for idx, symbol in enumerate(crypto_symbols):
try:
poly_crypto.save_ohlc(symbol=symbol, timeframe='1d',
poly_crypto.save_ohlc(symbol=symbol, timeframe=few_days,
retries=1 if C.TEST else C.DEFAULT_RETRIES)
except Exception as e:
print(f'Polygon.io OHLC update failed for {symbol}.')
Expand All @@ -65,7 +67,7 @@ def update_poly_crypto_ohlc():
os.remove(filename)

if idx != len(crypto_symbols) - 1:
sleep(60 // calls_per_min + 5)
sleep(POLY_CRYPTO_DELAY)


p1 = Process(target=update_iex_ohlc)
Expand Down
1 change: 1 addition & 0 deletions src/Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def get_env_bool(var_name):

DEFAULT_RETRIES = 3
DEFAULT_DELAY = 2
POLY_CRYPTO_DELAY = 15


class PathFinder:
Expand Down
20 changes: 18 additions & 2 deletions src/DataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def save_dividends(self, **kwargs):
df = self.reader.update_df(
filename, self.get_dividends(**kwargs), C.EX, C.DATE_FMT)
self.writer.update_csv(filename, df)
if os.path.exists(filename):
return filename

def get_splits(self, symbol, timeframe='max'):
# given a symbol, return a cached dataframe
Expand Down Expand Up @@ -124,6 +126,8 @@ def save_splits(self, **kwargs):
df = self.reader.update_df(
filename, self.get_splits(**kwargs), C.EX, C.DATE_FMT)
self.writer.update_csv(filename, df)
if os.path.exists(filename):
return filename

def standardize_ohlc(self, symbol, df):
full_mapping = dict(
Expand Down Expand Up @@ -164,6 +168,8 @@ def save_ohlc(self, **kwargs):
df = self.reader.update_df(
filename, self.get_ohlc(**kwargs), C.TIME, C.DATE_FMT)
self.writer.update_csv(filename, df)
if os.path.exists(filename):
return filename

def get_social_sentiment(self, symbol, timeframe='max'):
# given a symbol, return a cached dataframe
Expand Down Expand Up @@ -206,6 +212,8 @@ def save_social_sentiment(self, **kwargs):
else:
return
self.writer.update_csv(filename, df)
if os.path.exists(filename):
return filename

def standardize_sentiment(self, symbol, df):
full_mapping = dict(
Expand Down Expand Up @@ -255,6 +263,7 @@ def get_intraday(self, symbol, min=1, timeframe='max', extra_hrs=False):
def save_intraday(self, **kwargs):
symbol = kwargs['symbol']
dfs = self.get_intraday(**kwargs)
filenames = []

for df in dfs:
date = df[C.TIME].iloc[0].strftime(C.DATE_FMT)
Expand All @@ -266,6 +275,9 @@ def save_intraday(self, **kwargs):
df = self.reader.update_df(
filename, df, C.TIME, save_fmt)
self.writer.update_csv(filename, df)
if os.path.exists(filename):
filenames.append(filename)
return filenames
# def handle_request(self, url, err_msg):


Expand Down Expand Up @@ -490,6 +502,7 @@ def _get_splits(symbol, timeframe='max'):

def get_ohlc(self, **kwargs):
def _get_ohlc(symbol, timeframe='max'):
is_crypto = symbol.find('X%3A') == 0
formatted_start, formatted_end = self.traveller.convert_dates(
timeframe)
response = self.client.stocks_equities_aggregates(
Expand All @@ -501,7 +514,7 @@ def _get_ohlc(symbol, timeframe='max'):
'vw': 'average', 'n': 'trades'}

df = pd.DataFrame(response).rename(columns=columns)
if symbol.find('X%3A') == 0:
if is_crypto:
df['date'] = pd.to_datetime(
df['date'], unit='ms')
else:
Expand All @@ -517,6 +530,7 @@ def _get_ohlc(symbol, timeframe='max'):
def get_intraday(self, **kwargs):
def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True):
# pass min directly into stock_aggs function as multiplier
is_crypto = symbol.find('X%3A') == 0
dates = self.traveller.dates_in_range(timeframe)
if dates == []:
raise Exception(f'No dates in timeframe: {timeframe}.')
Expand All @@ -530,6 +544,8 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True):
if hasattr(response, 'results'):
response = response.results
else:
if is_crypto and idx != len(dates) - 1:
sleep(C.POLY_CRYPTO_DELAY)
continue

columns = {'t': 'date', 'o': 'open', 'h': 'high',
Expand All @@ -540,7 +556,7 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True):
df['date'] = pd.to_datetime(
df['date'], unit='ms')
if idx != len(dates) - 1:
sleep(15)
sleep(C.POLY_CRYPTO_DELAY)
else:
df['date'] = pd.to_datetime(
df['date'], unit='ms').dt.tz_localize(
Expand Down

0 comments on commit bcb9af7

Please sign in to comment.