Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speedup for reading and writing cached data with sqlalchemy #48

Merged
merged 3 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Cachew gives the best of two worlds and makes it both **easy and efficient**. Th
- first your objects get [converted](src/cachew/marshall/cachew.py#L34) into a simpler JSON-like representation
- after that, they are mapped into byte blobs via [`orjson`](https://github.com/ijl/orjson).

When the function is called, cachew [computes the hash of your function's arguments ](src/cachew/__init__.py:#L503)
When the function is called, cachew [computes the hash of your function's arguments ](src/cachew/__init__.py:#L504)
and compares it against the previously stored hash value.

- If they match, it would deserialize and yield whatever is stored in the cache database
Expand All @@ -140,18 +140,18 @@ and compares it against the previously stored hash value.



* automatic schema inference: [1](src/cachew/tests/test_cachew.py#L349), [2](src/cachew/tests/test_cachew.py#L363)
* automatic schema inference: [1](src/cachew/tests/test_cachew.py#L350), [2](src/cachew/tests/test_cachew.py#L364)
* supported types:

* primitive: `str`, `int`, `float`, `bool`, `datetime`, `date`, `Exception`

See [tests.test_types](src/cachew/tests/test_cachew.py#L675), [tests.test_primitive](src/cachew/tests/test_cachew.py#L709), [tests.test_dates](src/cachew/tests/test_cachew.py#L629), [tests.test_exceptions](src/cachew/tests/test_cachew.py#L1036)
* [@dataclass and NamedTuple](src/cachew/tests/test_cachew.py#L591)
* [Optional](src/cachew/tests/test_cachew.py#L493) types
* [Union](src/cachew/tests/test_cachew.py#L787) types
* [nested datatypes](src/cachew/tests/test_cachew.py#L409)
See [tests.test_types](src/cachew/tests/test_cachew.py#L676), [tests.test_primitive](src/cachew/tests/test_cachew.py#L710), [tests.test_dates](src/cachew/tests/test_cachew.py#L630), [tests.test_exceptions](src/cachew/tests/test_cachew.py#L1037)
* [@dataclass and NamedTuple](src/cachew/tests/test_cachew.py#L592)
* [Optional](src/cachew/tests/test_cachew.py#L494) types
* [Union](src/cachew/tests/test_cachew.py#L788) types
* [nested datatypes](src/cachew/tests/test_cachew.py#L410)

* detects [datatype schema changes](src/cachew/tests/test_cachew.py#L439) and discards old data automatically
* detects [datatype schema changes](src/cachew/tests/test_cachew.py#L440) and discards old data automatically


# Performance
Expand All @@ -165,12 +165,12 @@ You can find some of my performance tests in [benchmarks/](benchmarks) dir, and


# Using
See [docstring](src/cachew/__init__.py#L328) for up-to-date documentation on parameters and return types.
See [docstring](src/cachew/__init__.py#L329) for up-to-date documentation on parameters and return types.
You can also use [extensive unit tests](src/cachew/tests/test_cachew.py) as a reference.

Some useful (but optional) arguments of `@cachew` decorator:

* `cache_path` can be a directory, or a callable that [returns a path](src/cachew/tests/test_cachew.py#L386) and depends on function's arguments.
* `cache_path` can be a directory, or a callable that [returns a path](src/cachew/tests/test_cachew.py#L387) and depends on function's arguments.

By default, `settings.DEFAULT_CACHEW_DIR` is used.

Expand Down Expand Up @@ -264,7 +264,7 @@ Now you can use `@mcachew` in place of `@cachew`, and be certain things don't br
## Settings


[cachew.settings](src/cachew/__init__.py#L67) exposes some parameters that allow you to control `cachew` behaviour:
[cachew.settings](src/cachew/__init__.py#L68) exposes some parameters that allow you to control `cachew` behaviour:
- `ENABLE`: set to `False` if you want to disable caching for without removing the decorators (useful for testing and debugging).
You can also use [cachew.extra.disabled_cachew](src/cachew/extra.py#L18) context manager to do it temporarily.
- `DEFAULT_CACHEW_DIR`: override to set a different base directory. The default is the "user cache directory" (see [appdirs docs](https://github.com/ActiveState/appdirs#some-example-output)).
Expand Down
47 changes: 33 additions & 14 deletions src/cachew/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def orjson_dumps(*args, **kwargs): # type: ignore[misc]
import appdirs
import sqlalchemy
from sqlalchemy import Column, Table, event, text
from sqlalchemy.dialects import sqlite

from .logging_helper import makeLogger
from .marshall.cachew import CachewMarshall, build_schema
Expand Down Expand Up @@ -617,8 +618,8 @@ def cachew_wrapper(
else:
old_hashes = cursor.fetchall()


assert len(old_hashes) <= 1, old_hashes # shouldn't happen

old_hash: Optional[SourceHash]
if len(old_hashes) == 0:
old_hash = None
Expand All @@ -627,11 +628,30 @@ def cachew_wrapper(

logger.debug('old hash: %s', old_hash)


def cached_items():
rows = conn.execute(table_cache.select())
for row in rows:
j = orjson_loads(row[0])

# by default, sqlalchemy wraps all results into Row object
# this can cause quite a lot of overhead if you're reading many rows
# it seems that in principle, sqlalchemy supports just returning bare underlying tuple from the dbapi
# but from browsing the code it doesn't seem like this functionality exposed
# if you're looking for cues, see
# - ._source_supports_scalars
# - ._generate_rows
# - ._row_getter
# by using this raw iterator we speed up reading the cache quite a bit
# asked here https://github.com/sqlalchemy/sqlalchemy/discussions/10350
raw_row_iterator = getattr(rows, '_raw_row_iterator', None)
if raw_row_iterator is None:
warnings.warn(
"CursorResult._raw_row_iterator method isn't found. This could lead to degraded cache reading performance."
)
row_iterator = rows
else:
row_iterator = raw_row_iterator()

for (blob,) in row_iterator:
j = orjson_loads(blob)
obj = marshall.load(j)
yield obj

Expand Down Expand Up @@ -730,19 +750,18 @@ def missing_keys(cached: List[str], wanted: List[str]) -> Optional[List[str]]:
# at this point we're guaranteed to have an exclusive write transaction

datas = func(*args, **kwargs)
column_names = [c.name for c in table_cache_tmp.columns]
insert_into_table_cache_tmp = table_cache_tmp.insert()
# uhh. this gives a huge speedup for inserting
# since we don't have to create intermediate dictionaries
insert_into_table_cache_tmp_raw = str(table_cache_tmp.insert().compile(dialect=sqlite.dialect(paramstyle='qmark')))
# I also tried setting paramstyle='qmark' in create_engine, but it seems to be ignored :(
# idk what benefit sqlalchemy gives at this point, seems to just complicate things

chunk: List[Any] = []

def flush() -> None:
nonlocal chunk
if len(chunk) > 0:
# TODO optimize this, we really don't need to make extra dicts here just to insert
chunk_dict = [
dict(zip(column_names, row))
for row in chunk
]
conn.execute(insert_into_table_cache_tmp, chunk_dict)
conn.exec_driver_sql(insert_into_table_cache_tmp_raw, [(c,) for c in chunk])
chunk = []

total_objects = 0
Expand All @@ -755,8 +774,8 @@ def flush() -> None:
return

dct = marshall.dump(obj)
j = orjson_dumps(dct)
chunk.append((j,))
blob = orjson_dumps(dct)
chunk.append(blob)
if len(chunk) >= chunk_by:
flush()
flush()
Expand Down
9 changes: 5 additions & 4 deletions src/cachew/tests/test_cachew.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from .. import cachew, get_logger, NTBinder, CachewException, settings

from .utils import running_on_ci
from .utils import running_on_ci, gc_control


logger = get_logger()
Expand Down Expand Up @@ -284,9 +284,10 @@ class TE2(NamedTuple):

# you can run one specific test (e.g. to profile) by passing it as -k to pytest
# e.g. -k 'test_many[500000-False]'
@pytest.mark.parametrize('count', [100_000, 500_000, 1_000_000])
def test_many(count: int, tmp_path: Path) -> None:
if count > 100_000 and running_on_ci:
@pytest.mark.parametrize('count', [99, 500_000, 1_000_000])
@pytest.mark.parametrize('gc_on', [True, False], ids=['gc_on', 'gc_off'])
def test_many(count: int, tmp_path: Path, gc_control) -> None:
if count > 99 and running_on_ci:
pytest.skip("test would be too slow on CI, only meant to run manually")
# should be a parametrized test perhaps
src = tmp_path / 'source'
Expand Down