Skip to content

Commit

Permalink
πŸ’Ž Feature: Add Polygon.io (Double Redundancy) πŸ’Ž (#23)
Browse files Browse the repository at this point in the history
* still need to write get_splits fx

* made iexcloud subdirs

* start polygon class

* fix test

* init function

* switch to polygon api

* auto dep check

* polygon dividend redundancy added

* tests

* update env var in workflows

* secrets

* fixes?

* fix test

* done
  • Loading branch information
suchak1 committed Aug 20, 2020
1 parent 6b8d47e commit 99418a3
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 46 deletions.
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates

version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
RH_2FA: ${{ secrets.RH_2FA }}
IEXCLOUD: ${{ secrets.IEXCLOUD_SANDBOX }}
S3_BUCKET: ${{ secrets.S3_DEV_BUCKET }}
APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }}
run: coverage run -m pytest -vv

- name: Generate test coverage report
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/dividends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }}
S3_BUCKET: ${{ secrets.S3_BUCKET }}
APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }}
run: python scripts/update_dividends.py
2 changes: 2 additions & 0 deletions .github/workflows/sandbox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
RH_2FA: ${{ secrets.RH_2FA }}
IEXCLOUD: ${{ secrets.IEXCLOUD_SANDBOX }}
S3_BUCKET: ${{ secrets.S3_DEV_BUCKET }}
APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }}
run: coverage run -m pytest -vv

- name: Generate test coverage report
Expand All @@ -72,6 +73,7 @@ jobs:
env:
IEXCLOUD: ${{ secrets.IEXCLOUD_SANDBOX }}
S3_BUCKET: ${{ secrets.S3_DEV_BUCKET }}
APCA_API_KEY_ID: ${{ secrets.APCA_API_KEY_ID }}
run: python scripts/update_dividends.py

- name: Upload repo to S3
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
python-dotenv == 0.13.0
pandas == 1.0.5
pandas == 1.1.0
robin-stocks == 1.3.0
boto3 == 1.14.35
boto3 == 1.14.35
polygon-api-client == 0.1.6
26 changes: 16 additions & 10 deletions scripts/update_dividends.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import sys
from multiprocessing import Pool
sys.path.append('src')
from DataSource import IEXCloud # noqa autopep8

from DataSource import IEXCloud, Polygon # noqa autopep8

iex = IEXCloud()
poly = Polygon()
symbols = iex.get_symbols()

# Double redundancy

def multi_div(symbol):
# define fx to be pickled in multiprocessing
iex.save_dividends(symbol)

for symbol in symbols:
# 1st pass
try:
iex.save_dividends(symbol=symbol, timeframe='3m')
except Exception as e:
print(f'IEX Cloud dividend update failed for {symbol}.')
print(e)

# save files as CSVs and uploads to S3
with Pool() as p:
p.map(multi_div, symbols)
# 2nd pass
try:
poly.save_dividends(symbol=symbol, timeframe='max')
except Exception as e:
print(f'Polygon.io dividend update failed for {symbol}.')
print(e)
12 changes: 10 additions & 2 deletions src/Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@
DEV_DIR = 'dev'
DIV_DIR = 'dividends'
SPLT_DIR = 'splits'
FULL_DIV_DIR = os.path.join(DATA_DIR, DIV_DIR)
IEX_DIR = 'iexcloud'
POLY_DIR = 'polygon'

folders = {
'iexcloud': IEX_DIR,
'polygon': POLY_DIR
}
# PLYGN

# Column Names
# Symbols / Generic
Expand All @@ -37,12 +44,13 @@ def get_symbols_path(self):
'symbols.csv'
)

def get_dividends_path(self, symbol):
def get_dividends_path(self, symbol, provider='iexcloud'):
# given a symbol
# return the path to its csv
return os.path.join(
DATA_DIR,
DIV_DIR,
folders[provider],
f'{symbol.upper()}.csv'
)

Expand Down
82 changes: 61 additions & 21 deletions src/DataSource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import requests
import pandas as pd
from polygon import RESTClient
from dotenv import load_dotenv
from FileOps import FileReader, FileWriter
from Constants import PathFinder
Expand All @@ -15,29 +16,60 @@ def __init__(self, broker=None):
self.writer = FileWriter()
self.reader = FileReader()
self.finder = PathFinder()
self.provider = 'iexcloud'

def get_symbols(self):
# get cached list of symbols
symbols_path = self.finder.get_symbols_path()
return list(self.reader.load_csv(symbols_path)[C.SYMBOL])

def get_dividends(self, symbol, timeframe=None):
def get_dividends(self, symbol, timeframe='max'):
# given a symbol, return a cached dataframe
return self.reader.load_csv(self.finder.get_dividends_path(symbol))
df = self.reader.load_csv(
self.finder.get_dividends_path(symbol, self.provider))
filtered = self.reader.data_in_timeframe(df, C.EX, timeframe)
return filtered

def standardize_dividends(self, symbol, df):
full_mapping = dict(
zip(
['exDate', 'paymentDate', 'declaredDate', 'amount'],
[C.EX, C.PAY, C.DEC, C.DIV]
)
)
mapping = {k: v for k, v in full_mapping.items() if k in df}
columns = list(mapping)

df = df[columns].rename(columns=mapping)
filename = self.finder.get_dividends_path(symbol, self.provider)

if C.EX in df and C.DIV in df:
df = self.reader.update_df(
filename, df, C.EX).sort_values(by=[C.EX])
df[C.DIV] = df[C.DIV].apply(lambda amt: float(amt) if amt else 0)

def save_dividends(self, symbol, timeframe=None):
# given a symbol, save its dividend history
if timeframe:
df = self.get_dividends(symbol, timeframe)
else:
df = self.get_dividends(symbol)
self.writer.update_csv(self.finder.get_dividends_path(symbol), df)
return df

# def save_splits(self, symbol):
# # given a symbol, save its stock split history
# df = self.get_splits(symbol)
def save_dividends(self, **kwargs):
# given a symbol, save its dividend history
symbol = kwargs['symbol']
df = self.get_dividends(**kwargs)
self.writer.update_csv(
self.finder.get_dividends_path(symbol, self.provider), df)

# def get_splits(self, symbol, timeframe='max'):
# # given a symbol, return a cached dataframe
# df = self.reader.load_csv(self.finder.get_splits_path(symbol))
# filtered = self.reader.data_in_timeframe(df, C.EX, timeframe)
# return filtered

# def save_splits(self, **kwargs):
# # given a symbol, save its splits history
# symbol = kwargs['symbol']
# df = self.get_splits(**kwargs)
# self.writer.update_csv(self.finder.get_splits_path(symbol), df)


# make tiingo OR IEX CLOUD!! version of get dividends which
# fetches existing dividend csv and adds a row if dividend
# today or fetches last 5 years, joins with existing and updates if new
Expand All @@ -50,6 +82,7 @@ def __init__(self, broker=None):
self.base = 'https://cloud.iexapis.com'
self.version = 'stable'
self.token = os.environ['IEXCLOUD']
self.provider = 'iexcloud'

def get_endpoint(self, parts):
# given a url
Expand Down Expand Up @@ -84,16 +117,9 @@ def get_dividends(self, symbol, timeframe='3m'):
if not response.ok or data == []:
return empty

columns = ['exDate', 'paymentDate', 'declaredDate', 'amount']
mapping = dict(zip(columns, [C.EX, C.PAY, C.DEC, C.DIV]))
df = pd.DataFrame(data)[columns].rename(columns=mapping)

filename = self.finder.get_dividends_path(symbol)
df = pd.DataFrame(data)

df = self.reader.update_df(filename, df, C.EX).sort_values(by=[C.EX])
df[C.DIV] = df[C.DIV].apply(lambda amt: float(amt) if amt else 0)

return df
return self.standardize_dividends(symbol, df)

# def get_splits(self, symbol):
# # given a symbol, return the stock splits
Expand All @@ -104,3 +130,17 @@ def get_dividends(self, symbol, timeframe='3m'):
# )
# df = df[df['Stock Splits'] != 0]
# return df


class Polygon(MarketData):
def __init__(self, broker=None):
super().__init__(broker=broker)
load_dotenv()
self.client = RESTClient(os.environ['APCA_API_KEY_ID'])
self.provider = 'polygon'

def get_dividends(self, symbol, timeframe='max'):
response = self.client.reference_stock_dividends(symbol)
raw = pd.DataFrame(response.results)
df = self.standardize_dividends(symbol, raw)
return self.reader.data_in_timeframe(df, C.EX, timeframe)
37 changes: 36 additions & 1 deletion src/FileOps.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import json
import time
from datetime import datetime
from datetime import date, datetime, timedelta
import pandas as pd
from Storage import Store
# consider combining fileoperations into one class
Expand Down Expand Up @@ -49,13 +49,48 @@ def check_update(self, filename, df):
def update_df(self, filename, new, column):
old = self.load_csv(filename)
if not old.empty:
old[column] = pd.to_datetime(old[column])
new[column] = pd.to_datetime(new[column])
old = old[~old[column].isin(new[column])]
new = old.append(new, ignore_index=True)
return new

def check_file_exists(self, filename):
return os.path.exists(filename) and self.store.key_exists(filename)

def convert_delta(self, timeframe):
if timeframe == 'max':
return timedelta(days=36500)

periods = {'y': 365, 'm': 30, 'w': 7, 'd': 1}
period = 'y'
idx = -1

for curr_period in periods:
idx = timeframe.find(curr_period)
if idx != -1:
period = curr_period
break

if idx == -1:
supported = ', '.join(list(periods))
error_msg = f'Only certain suffixes ({supported}) are supported.'
raise ValueError(error_msg)

num = int(timeframe[:idx])
days = periods[period] * num
delta = timedelta(days=days)

return delta

def data_in_timeframe(self, df, col, timeframe='max'):
if col not in df:
return df
delta = self.convert_delta(timeframe)
df[col] = pd.to_datetime(df[col])
filtered = df[df[col] > pd.to_datetime(date.today() - delta)]
return filtered


class FileWriter:
# file write operations
Expand Down
9 changes: 6 additions & 3 deletions src/Storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ def get_all_keys(self):

def key_exists(self, key, download=False):
try:
s3 = boto3.resource('s3')
bucket = s3.Bucket(self.bucket_name)
bucket.Object(key).load()
if download:
self.download_file(key)
else:
s3 = boto3.resource('s3')
bucket = s3.Bucket(self.bucket_name)
bucket.Object(key).load()
except ClientError:
return False
else:
Expand Down
6 changes: 4 additions & 2 deletions test/test_Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ def test_get_symbols_path(self):
assert finder.get_symbols_path() == 'data/symbols.csv'

def test_get_dividends_path(self):
assert finder.get_dividends_path('aapl') == 'data/dividends/AAPL.csv'
assert finder.get_dividends_path('AMD') == 'data/dividends/AMD.csv'
assert finder.get_dividends_path(
'aapl') == 'data/dividends/iexcloud/AAPL.csv'
assert finder.get_dividends_path(
'AMD') == 'data/dividends/iexcloud/AMD.csv'

def test_get_splits_path(self):
assert finder.get_splits_path('aapl') == 'data/splits/AAPL.csv'
Expand Down
Loading

0 comments on commit 99418a3

Please sign in to comment.