Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace sqlite3 with peewee for 100% thread-safety #1675

Merged
merged 1 commit into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ To install `yfinance` using `conda`, see
- [frozendict](https://pypi.org/project/frozendict) \>= 2.3.4
- [beautifulsoup4](https://pypi.org/project/beautifulsoup4) \>= 4.11.1
- [html5lib](https://pypi.org/project/html5lib) \>= 1.1
- [peewee](https://pypi.org/project/peewee) \>= 3.16.2

#### Optional (if you want to use `pandas_datareader`)

Expand Down
2 changes: 2 additions & 0 deletions meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ requirements:
- frozendict >=2.3.4
- beautifulsoup4 >=4.11.1
- html5lib >=1.1
- peewee >=3.16.2
# - pycryptodome >=3.6.6
- pip
- python
Expand All @@ -41,6 +42,7 @@ requirements:
- frozendict >=2.3.4
- beautifulsoup4 >=4.11.1
- html5lib >=1.1
- peewee >=3.16.2
# - pycryptodome >=3.6.6
- python

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pytz>=2022.5
frozendict>=2.3.4
beautifulsoup4>=4.11.1
html5lib>=1.1
peewee>=3.16.2
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
install_requires=['pandas>=1.3.0', 'numpy>=1.16.5',
'requests>=2.31', 'multitasking>=0.0.7',
'lxml>=4.9.1', 'appdirs>=1.4.4', 'pytz>=2022.5',
'frozendict>=2.3.4',
'frozendict>=2.3.4', 'peewee>=3.16.2',
'beautifulsoup4>=4.11.1', 'html5lib>=1.1'],
# Note: Pandas.read_html() needs html5lib & beautifulsoup4
entry_points={
Expand Down
160 changes: 41 additions & 119 deletions yfinance/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import logging
import os as _os
import re as _re
import sqlite3 as _sqlite3
import peewee as _peewee
import sys as _sys
import threading
from functools import lru_cache
Expand Down Expand Up @@ -891,136 +891,60 @@ def __str__(self):
# TimeZone cache related code
# ---------------------------------

class _KVStore:
"""Simple Sqlite backed key/value store, key and value are strings. Should be thread safe."""

def __init__(self, filename):
self._cache_mutex = Lock()
with self._cache_mutex:
self.conn = _sqlite3.connect(filename, timeout=10, check_same_thread=False)
self.conn.execute('pragma journal_mode=wal')
try:
self.conn.execute('create table if not exists "kv" (key TEXT primary key, value TEXT) without rowid')
except Exception as e:
if 'near "without": syntax error' in str(e):
# "without rowid" requires sqlite 3.8.2. Older versions will raise exception
self.conn.execute('create table if not exists "kv" (key TEXT primary key, value TEXT)')
else:
raise
self.conn.commit()
_atexit.register(self.close)

def close(self):
if self.conn is not None:
with self._cache_mutex:
self.conn.close()
self.conn = None

def get(self, key: str) -> Union[str, None]:
"""Get value for key if it exists else returns None"""
try:
item = self.conn.execute('select value from "kv" where key=?', (key,))
except _sqlite3.IntegrityError as e:
self.delete(key)
return None
if item:
return next(item, (None,))[0]

def set(self, key: str, value: str) -> None:
if value is None:
self.delete(key)
else:
with self._cache_mutex:
self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
self.conn.commit()

def bulk_set(self, kvdata: Dict[str, str]):
records = tuple(i for i in kvdata.items())
with self._cache_mutex:
self.conn.executemany('replace into "kv" (key, value) values (?,?)', records)
self.conn.commit()

def delete(self, key: str):
with self._cache_mutex:
self.conn.execute('delete from "kv" where key=?', (key,))
self.conn.commit()
_cache_dir = _os.path.join(_ad.user_cache_dir(), "py-yfinance")
DB_PATH = _os.path.join(_cache_dir, 'tkr-tz.db')
db = _peewee.SqliteDatabase(DB_PATH, pragmas={'journal_mode': 'wal', 'cache_size': -64})
_tz_cache = None


class _TzCacheException(Exception):
pass


class _TzCache:
"""Simple sqlite file cache of ticker->timezone"""

def __init__(self):
self._setup_cache_folder()
# Must init db here, where is thread-safe
try:
self._tz_db = _KVStore(_os.path.join(self._db_dir, "tkr-tz.db"))
except _sqlite3.DatabaseError as err:
raise _TzCacheException(f"Error creating TzCache folder: '{self._db_dir}' reason: {err}")
self._migrate_cache_tkr_tz()

def _setup_cache_folder(self):
if not _os.path.isdir(self._db_dir):
try:
_os.makedirs(self._db_dir)
except OSError as err:
raise _TzCacheException(f"Error creating TzCache folder: '{self._db_dir}' reason: {err}")

elif not (_os.access(self._db_dir, _os.R_OK) and _os.access(self._db_dir, _os.W_OK)):
raise _TzCacheException(f"Cannot read and write in TzCache folder: '{self._db_dir}'")

def lookup(self, tkr):
return self.tz_db.get(tkr)

def store(self, tkr, tz):
if tz is None:
self.tz_db.delete(tkr)
else:
tz_db = self.tz_db.get(tkr)
if tz_db is not None:
if tz != tz_db:
get_yf_logger().debug(f'{tkr}: Overwriting cached TZ "{tz_db}" with different TZ "{tz}"')
self.tz_db.set(tkr, tz)
else:
self.tz_db.set(tkr, tz)
class KV(_peewee.Model):
key = _peewee.CharField(primary_key=True)
value = _peewee.CharField(null=True)

class Meta:
database = db
without_rowid = True

@property
def _db_dir(self):
global _cache_dir
return _os.path.join(_cache_dir, "py-yfinance")

@property
def tz_db(self):
return self._tz_db
class _TzCache:
def __init__(self):
db.connect()
db.create_tables([KV])

def _migrate_cache_tkr_tz(self):
"""Migrate contents from old ticker CSV-cache to SQLite db"""
old_cache_file_path = _os.path.join(self._db_dir, "tkr-tz.csv")
old_cache_file_path = _os.path.join(_cache_dir, "tkr-tz.csv")
if _os.path.isfile(old_cache_file_path):
_os.remove(old_cache_file_path)

if not _os.path.isfile(old_cache_file_path):
def lookup(self, key):
try:
return KV.get(KV.key == key).value
except KV.DoesNotExist:
return None

def store(self, key, value):
try:
df = _pd.read_csv(old_cache_file_path, index_col="Ticker", on_bad_lines="skip")
except _pd.errors.EmptyDataError:
_os.remove(old_cache_file_path)
except TypeError:
_os.remove(old_cache_file_path)
else:
# Discard corrupt data:
df = df[~df["Tz"].isna().to_numpy()]
df = df[~(df["Tz"] == '').to_numpy()]
df = df[~df.index.isna()]
if not df.empty:
try:
self.tz_db.bulk_set(df.to_dict()['Tz'])
except Exception as e:
# Ignore
pass
if value is None:
q = KV.delete().where(KV.key == key)
q.execute()
return
with db.atomic():
KV.insert(key=key, value=value).execute()
except IntegrityError:
# Integrity error means the key already exists. Try updating the key.
old_value = self.lookup(key)
if old_value != value:
get_yf_logger().debug(f"Value for key {key} changed from {old_value} to {value}.")
with db.atomic():
q = KV.update(value=value).where(KV.key == key)
q.execute()

_os.remove(old_cache_file_path)
def close(self):
db.close()


class _TzCacheDummy:
Expand Down Expand Up @@ -1058,9 +982,7 @@ def get_tz_cache():
return _tz_cache


_cache_dir = _ad.user_cache_dir()
_cache_init_lock = Lock()
_tz_cache = None


def set_tz_cache_location(cache_dir: str):
Expand Down