Skip to content

Commit

Permalink
Merge pull request #1675 from ranaroussi/hotfix/database-error
Browse files Browse the repository at this point in the history
Replace sqlite3 with peewee for 100% thread-safety
  • Loading branch information
ValueRaider committed Sep 17, 2023
2 parents de59f0b + 7d6d856 commit 32e569f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 120 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ To install `yfinance` using `conda`, see
- [frozendict](https://pypi.org/project/frozendict) \>= 2.3.4
- [beautifulsoup4](https://pypi.org/project/beautifulsoup4) \>= 4.11.1
- [html5lib](https://pypi.org/project/html5lib) \>= 1.1
- [peewee](https://pypi.org/project/peewee) \>= 3.16.2

#### Optional (if you want to use `pandas_datareader`)

Expand Down
2 changes: 2 additions & 0 deletions meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ requirements:
- frozendict >=2.3.4
- beautifulsoup4 >=4.11.1
- html5lib >=1.1
- peewee >=3.16.2
# - pycryptodome >=3.6.6
- pip
- python
Expand All @@ -41,6 +42,7 @@ requirements:
- frozendict >=2.3.4
- beautifulsoup4 >=4.11.1
- html5lib >=1.1
- peewee >=3.16.2
# - pycryptodome >=3.6.6
- python

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pytz>=2022.5
frozendict>=2.3.4
beautifulsoup4>=4.11.1
html5lib>=1.1
peewee>=3.16.2
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
install_requires=['pandas>=1.3.0', 'numpy>=1.16.5',
'requests>=2.31', 'multitasking>=0.0.7',
'lxml>=4.9.1', 'appdirs>=1.4.4', 'pytz>=2022.5',
'frozendict>=2.3.4',
'frozendict>=2.3.4', 'peewee>=3.16.2',
'beautifulsoup4>=4.11.1', 'html5lib>=1.1'],
# Note: Pandas.read_html() needs html5lib & beautifulsoup4
entry_points={
Expand Down
160 changes: 41 additions & 119 deletions yfinance/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import logging
import os as _os
import re as _re
import sqlite3 as _sqlite3
import peewee as _peewee
import sys as _sys
import threading
from functools import lru_cache
Expand Down Expand Up @@ -891,136 +891,60 @@ def __str__(self):
# TimeZone cache related code
# ---------------------------------

class _KVStore:
"""Simple Sqlite backed key/value store, key and value are strings. Should be thread safe."""

def __init__(self, filename):
self._cache_mutex = Lock()
with self._cache_mutex:
self.conn = _sqlite3.connect(filename, timeout=10, check_same_thread=False)
self.conn.execute('pragma journal_mode=wal')
try:
self.conn.execute('create table if not exists "kv" (key TEXT primary key, value TEXT) without rowid')
except Exception as e:
if 'near "without": syntax error' in str(e):
# "without rowid" requires sqlite 3.8.2. Older versions will raise exception
self.conn.execute('create table if not exists "kv" (key TEXT primary key, value TEXT)')
else:
raise
self.conn.commit()
_atexit.register(self.close)

def close(self):
if self.conn is not None:
with self._cache_mutex:
self.conn.close()
self.conn = None

def get(self, key: str) -> Union[str, None]:
"""Get value for key if it exists else returns None"""
try:
item = self.conn.execute('select value from "kv" where key=?', (key,))
except _sqlite3.IntegrityError as e:
self.delete(key)
return None
if item:
return next(item, (None,))[0]

def set(self, key: str, value: str) -> None:
if value is None:
self.delete(key)
else:
with self._cache_mutex:
self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
self.conn.commit()

def bulk_set(self, kvdata: Dict[str, str]):
records = tuple(i for i in kvdata.items())
with self._cache_mutex:
self.conn.executemany('replace into "kv" (key, value) values (?,?)', records)
self.conn.commit()

def delete(self, key: str):
with self._cache_mutex:
self.conn.execute('delete from "kv" where key=?', (key,))
self.conn.commit()
_cache_dir = _os.path.join(_ad.user_cache_dir(), "py-yfinance")
DB_PATH = _os.path.join(_cache_dir, 'tkr-tz.db')
db = _peewee.SqliteDatabase(DB_PATH, pragmas={'journal_mode': 'wal', 'cache_size': -64})
_tz_cache = None


class _TzCacheException(Exception):
pass


class _TzCache:
"""Simple sqlite file cache of ticker->timezone"""

def __init__(self):
self._setup_cache_folder()
# Must init db here, where is thread-safe
try:
self._tz_db = _KVStore(_os.path.join(self._db_dir, "tkr-tz.db"))
except _sqlite3.DatabaseError as err:
raise _TzCacheException(f"Error creating TzCache folder: '{self._db_dir}' reason: {err}")
self._migrate_cache_tkr_tz()

def _setup_cache_folder(self):
if not _os.path.isdir(self._db_dir):
try:
_os.makedirs(self._db_dir)
except OSError as err:
raise _TzCacheException(f"Error creating TzCache folder: '{self._db_dir}' reason: {err}")

elif not (_os.access(self._db_dir, _os.R_OK) and _os.access(self._db_dir, _os.W_OK)):
raise _TzCacheException(f"Cannot read and write in TzCache folder: '{self._db_dir}'")

def lookup(self, tkr):
return self.tz_db.get(tkr)

def store(self, tkr, tz):
if tz is None:
self.tz_db.delete(tkr)
else:
tz_db = self.tz_db.get(tkr)
if tz_db is not None:
if tz != tz_db:
get_yf_logger().debug(f'{tkr}: Overwriting cached TZ "{tz_db}" with different TZ "{tz}"')
self.tz_db.set(tkr, tz)
else:
self.tz_db.set(tkr, tz)
class KV(_peewee.Model):
key = _peewee.CharField(primary_key=True)
value = _peewee.CharField(null=True)

class Meta:
database = db
without_rowid = True

@property
def _db_dir(self):
global _cache_dir
return _os.path.join(_cache_dir, "py-yfinance")

@property
def tz_db(self):
return self._tz_db
class _TzCache:
def __init__(self):
db.connect()
db.create_tables([KV])

def _migrate_cache_tkr_tz(self):
"""Migrate contents from old ticker CSV-cache to SQLite db"""
old_cache_file_path = _os.path.join(self._db_dir, "tkr-tz.csv")
old_cache_file_path = _os.path.join(_cache_dir, "tkr-tz.csv")
if _os.path.isfile(old_cache_file_path):
_os.remove(old_cache_file_path)

if not _os.path.isfile(old_cache_file_path):
def lookup(self, key):
try:
return KV.get(KV.key == key).value
except KV.DoesNotExist:
return None

def store(self, key, value):
try:
df = _pd.read_csv(old_cache_file_path, index_col="Ticker", on_bad_lines="skip")
except _pd.errors.EmptyDataError:
_os.remove(old_cache_file_path)
except TypeError:
_os.remove(old_cache_file_path)
else:
# Discard corrupt data:
df = df[~df["Tz"].isna().to_numpy()]
df = df[~(df["Tz"] == '').to_numpy()]
df = df[~df.index.isna()]
if not df.empty:
try:
self.tz_db.bulk_set(df.to_dict()['Tz'])
except Exception as e:
# Ignore
pass
if value is None:
q = KV.delete().where(KV.key == key)
q.execute()
return
with db.atomic():
KV.insert(key=key, value=value).execute()
except IntegrityError:
# Integrity error means the key already exists. Try updating the key.
old_value = self.lookup(key)
if old_value != value:
get_yf_logger().debug(f"Value for key {key} changed from {old_value} to {value}.")
with db.atomic():
q = KV.update(value=value).where(KV.key == key)
q.execute()

_os.remove(old_cache_file_path)
def close(self):
db.close()


class _TzCacheDummy:
Expand Down Expand Up @@ -1058,9 +982,7 @@ def get_tz_cache():
return _tz_cache


_cache_dir = _ad.user_cache_dir()
_cache_init_lock = Lock()
_tz_cache = None


def set_tz_cache_location(cache_dir: str):
Expand Down

0 comments on commit 32e569f

Please sign in to comment.