Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRU cache for decoded chunks #306

Closed
wants to merge 56 commits into from
Closed
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
d62febb
first attempt at chunk_cache layer
shikharsg Aug 29, 2018
f796ea7
ChunkCache test with MockChunkCacheArray
shikharsg Aug 29, 2018
32141a9
np.copy not needed when accessing a subset of a chunk
shikharsg Oct 9, 2018
b35139b
fixed 'Mm' dtype error for buffersize function
shikharsg Oct 13, 2018
3c45176
renamed ChunkCache to LRUChunkCache
Oct 13, 2018
46dcf94
LRUChunkCache in zarr root namespace
Oct 13, 2018
c69c751
LRUChunkCache example
Oct 13, 2018
2cb143e
write caching of chunk should be done after encoding
Oct 15, 2018
2fb169e
ensure cached chunk has been round tripped through encode-decode if d…
Oct 15, 2018
31e4dfb
flake8 fixes
Oct 15, 2018
5559c4f
read write cache for 0-d arrays
Oct 15, 2018
2a0124a
added tutorial and api docs
Oct 15, 2018
6fac2ad
separated store tests from mutable mapping tests in test_storage.py
shikharsg Oct 20, 2018
4e79d0b
fixed pickle, __delitem__ and ordered dict iteration bugs
shikharsg Oct 20, 2018
5fd6fc8
documenting slowdown when using write cache with object arrays
shikharsg Oct 20, 2018
422f9eb
factoring out mapping code from LRUStoreCache and LRUChunkCache
shikharsg Oct 23, 2018
44cea83
consistent variable naming in _chunk_getitem
shikharsg Nov 11, 2018
1b67e90
removed unnecesary code from _set_basic_selection_zd and added encode…
shikharsg Nov 11, 2018
9b0cc29
flake 8 fixes
shikharsg Nov 11, 2018
715f86d
Merge remote-tracking branch 'upstream/master' into chunk_cache
shikharsg Nov 13, 2018
0013f95
fixed coverage
shikharsg Nov 14, 2018
b70c348
Merge branch 'chunk_cache' into master
shikharsg Nov 14, 2018
c4f2487
Merge pull request #4 from shikharsg/master
shikharsg Nov 14, 2018
245f661
Merge branch 'master' into chunk_cache
shikharsg Nov 15, 2018
a2a05fb
Merge branch 'master' into chunk_cache
shikharsg Jan 8, 2019
b8b9056
Merge branch 'chunk_cache' into chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
04f0367
Merge pull request #3 from shikharsg/chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
f19d43e
bug fix
shikharsg Jan 27, 2019
52a43bf
Merge branch 'master' into chunk_cache
shikharsg Jan 27, 2019
697d46e
python 2 and 3 compatibility
shikharsg Jan 27, 2019
1e727c7
Merge branch 'master' into chunk_cache
shikharsg Feb 10, 2019
377ece7
coverage fix and __init__.py LRUChunkCache import
shikharsg Feb 10, 2019
3473adb
merged chunk_cache with master
shikharsg Mar 4, 2019
df84c89
flake8 fix
shikharsg Mar 4, 2019
88fe66d
Merge branch 'master' into chunk_cache
Mar 30, 2019
a816014
Implemented https://github.com/zarr-developers/zarr/pull/306/files#r2…
Apr 11, 2019
8cc083b
cache tests refactor
May 3, 2019
23fcdea
fixed minor tests mistak
May 3, 2019
309cc48
Merge branch 'master' into chunk_cache
May 3, 2019
635ec87
flake8 fix
May 3, 2019
a85d156
Merge remote-tracking branch 'upstream/master' into chunk_cache
Aug 20, 2019
ef86184
merged with master
Oct 30, 2019
875c24f
added chunk cache to Group
Nov 20, 2019
dcd4ee7
merge with master
Nov 20, 2019
4a1baa9
added chunk_cache to all relevant function
Nov 20, 2019
e6540e1
Merge branch 'master' into chunk_cache
Dec 12, 2019
9f9d176
merge with master
Sep 30, 2020
6571382
fixed failing doctest
Sep 30, 2020
8c8a69f
Merge remote-tracking branch 'origin/master' into pr-306
joshmoore Feb 19, 2021
e0e5254
fixed setitem caching order
Feb 20, 2021
992b48a
Merge branch 'master' into chunk_cache
jakirkham Feb 21, 2021
38ee622
refactor
Feb 21, 2021
8b6a699
Merge branch 'master' into chunk_cache
Apr 5, 2021
ba5c0ed
Merge 'origin/master' into pr-306
joshmoore Aug 27, 2021
7cdce5f
Remove use of unittest
joshmoore Aug 27, 2021
06c899b
Merge branch 'master' into chunk_cache
joshmoore Sep 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/api/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Storage (``zarr.storage``)

.. autoclass:: ConsolidatedMetadataStore

.. autoclass:: LRUChunkCache

.. automethod:: invalidate

.. autofunction:: init_array
.. autofunction:: init_group
.. autofunction:: contains_array
Expand Down
32 changes: 28 additions & 4 deletions docs/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -849,10 +849,34 @@ store. E.g.::
b'Hello from the cloud!'
0.0009490990014455747

If you are still experiencing poor performance with distributed/cloud storage,
please raise an issue on the GitHub issue tracker with any profiling data you
can provide, as there may be opportunities to optimise further either within
Zarr or within the mapping interface to the storage.
The above :class:`zarr.storage.LRUStoreCache` wraps any Zarr storage class, and stores
encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases
encoded chunks. Every time the cache is accessed, the chunk must be decoded. For cases

where decoding is computationally expensive, Zarr also provides a
:class:`zarr.storage.LRUChunkCache` which can store decoded chunks, e.g.::

>>> import zarr
>>> from numcodecs import LZMA
>>> import numpy as np
>>> store = zarr.DictStore()
>>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100),
... store=store, compressor=LZMA())
>>> from timeit import timeit
>>> # data access without cache
... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP
0.6703157789888792
>>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None))
>>> # first data access about the same as without cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.681269913999131
>>> # second time accesses the decoded chunks in the cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.007617925992235541


shikharsg marked this conversation as resolved.
Show resolved Hide resolved
If you are still experiencing poor performance with distributed/cloud storage, please
raise an issue on the GitHub issue tracker with any profiling data you can provide, as
there may be opportunities to optimise further either within Zarr or within the mapping
interface to the storage.

IO with ``fsspec``
~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion zarr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from zarr.storage import (ABSStore, DBMStore, DictStore, DirectoryStore,
LMDBStore, LRUStoreCache, MemoryStore, MongoDBStore,
NestedDirectoryStore, RedisStore, SQLiteStore,
TempStore, ZipStore)
TempStore, ZipStore, LRUChunkCache)
from zarr.sync import ProcessSynchronizer, ThreadSynchronizer
from zarr.version import version as __version__

Expand Down
137 changes: 102 additions & 35 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class Array:
read and decompressed when possible.

.. versionadded:: 2.7
chunk_cache: MutableMapping, optional
Mapping to store decoded chunks for caching. Can be used in repeated
chunk access scenarios when decoding of data is computationally
expensive.
NOTE: When using the write cache feature with object arrays(i.e.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NOTE: When using the write cache feature with object arrays(i.e.
NOTE: When using the write cache feature with object arrays (i.e.

when dtype of array is 'object' and when writing to the array with
chunk_cache provided) could result in a slight slowdown as some
dtypes, like VLenArray, have to go through the encode-decode phase
before having the correct dtype.

Attributes
----------
Expand Down Expand Up @@ -137,6 +146,7 @@ def __init__(
cache_metadata=True,
cache_attrs=True,
partial_decompress=False,
chunk_cache=None,
):
# N.B., expect at this point store is fully initialized with all
# configuration metadata fully specified and normalized
Expand All @@ -153,6 +163,7 @@ def __init__(
self._cache_metadata = cache_metadata
self._is_view = False
self._partial_decompress = partial_decompress
self._chunk_cache = chunk_cache

# initialize metadata
self._load_metadata()
Expand Down Expand Up @@ -793,19 +804,33 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None):
if selection not in ((), (Ellipsis,)):
err_too_many_indices(selection, ())

try:
# obtain encoded data for chunk
ckey = self._chunk_key((0,))
cdata = self.chunk_store[ckey]
# obtain key for chunk
ckey = self._chunk_key((0,))

except KeyError:
# chunk not initialized
chunk = np.zeros((), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)
# setup variable to hold decoded chunk
chunk = None

else:
chunk = self._decode_chunk(cdata)
# check for cached chunk
if self._chunk_cache is not None:
chunk = self._chunk_cache.get(ckey)

if chunk is None:
try:
# obtain encoded data for chunk
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
chunk = np.zeros((), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)

else:
chunk = self._decode_chunk(cdata)

# cache decoded chunk
if self._chunk_cache is not None:
self._chunk_cache[ckey] = chunk

# handle fields
if fields:
Expand Down Expand Up @@ -1588,6 +1613,12 @@ def _set_basic_selection_zd(self, selection, value, fields=None):
cdata = self._encode_chunk(chunk)
self.chunk_store[ckey] = cdata

if self._chunk_cache is not None:
# ensure cached chunk has been round tripped through encode-decode if dtype=object
if self.dtype == object:
chunk = self._decode_chunk(cdata)
self._chunk_cache[ckey] = chunk

def _set_basic_selection_nd(self, selection, value, fields=None):
# implementation of __setitem__ for array with at least one dimension

Expand Down Expand Up @@ -1647,6 +1678,7 @@ def _set_selection(self, indexer, value, fields=None):

# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)

else:
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
chunk_values = []
Expand All @@ -1669,6 +1701,18 @@ def _set_selection(self, indexer, value, fields=None):
self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
fields=fields)

def _select_and_set_out(self, fields, chunk, chunk_selection, drop_axes,
out, out_selection):
# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp

def _process_chunk(
self,
out,
Expand All @@ -1678,6 +1722,7 @@ def _process_chunk(
out_is_ndarray,
fields,
out_selection,
ckey,
partial_read_decode=False,
):
"""Take binary data from storage and fill output array"""
Expand Down Expand Up @@ -1741,16 +1786,12 @@ def _process_chunk(
except ArrayIndexError:
cdata = cdata.read_full()
chunk = self._decode_chunk(cdata)
if self._chunk_cache is not None:
# cache the decoded chunk
self._chunk_cache[ckey] = chunk

# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp
self._select_and_set_out(fields, chunk, chunk_selection, drop_axes,
out, out_selection)

def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
drop_axes=None, fields=None):
Expand Down Expand Up @@ -1783,22 +1824,38 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
# obtain key for chunk
ckey = self._chunk_key(chunk_coords)

try:
# obtain compressed data for chunk
cdata = self.chunk_store[ckey]
# setup variable to hold decoded chunk
chunk = None

shikharsg marked this conversation as resolved.
Show resolved Hide resolved
except KeyError:
# chunk not initialized
if self._fill_value is not None:
if fields:
fill_value = self._fill_value[fields]
else:
fill_value = self._fill_value
out[out_selection] = fill_value
# check for cached chunk
if self._chunk_cache is not None:
try:
chunk = self._chunk_cache[ckey]
self._select_and_set_out(fields, chunk, chunk_selection,
drop_axes, out, out_selection)
except KeyError:
pass

else:
self._process_chunk(out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection)
if chunk is None:

try:
# obtain compressed data for chunk
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
if self._fill_value is not None:
if fields:
fill_value = self._fill_value[fields]
else:
fill_value = self._fill_value
out[out_selection] = fill_value
return

else:
self._process_chunk(out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection,
ckey)

def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
drop_axes=None, fields=None):
Expand Down Expand Up @@ -1842,6 +1899,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
out_is_ndarray,
fields,
out_select,
ckey,
partial_read_decode=partial_read_decode,
)
else:
Expand Down Expand Up @@ -1947,7 +2005,16 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
chunk[chunk_selection] = value

# encode chunk
return self._encode_chunk(chunk)
cdata = self._encode_chunk(chunk)

# cache the chunk
if self._chunk_cache is not None:
# ensure cached chunk has been round tripped through encode-decode if dtype=object
if self.dtype == object:
chunk = self._decode_chunk(cdata)
self._chunk_cache[ckey] = np.copy(chunk)

return cdata

def _chunk_key(self, chunk_coords):
return self._key_prefix + '.'.join(map(str, chunk_coords))
Expand Down
26 changes: 23 additions & 3 deletions zarr/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def create(shape, chunks=True, dtype=None, compressor='default',
fill_value=0, order='C', store=None, synchronizer=None,
overwrite=False, path=None, chunk_store=None, filters=None,
cache_metadata=True, cache_attrs=True, read_only=False,
object_codec=None, **kwargs):
object_codec=None, chunk_cache=None, **kwargs):
"""Create an array.

Parameters
Expand Down Expand Up @@ -51,6 +51,15 @@ def create(shape, chunks=True, dtype=None, compressor='default',
chunk_store : MutableMapping, optional
Separate storage for chunks. If not provided, `store` will be used
for storage of both chunks and metadata.
chunk_cache: MutableMapping, optional
Mapping to store decoded chunks for caching. Can be used in repeated
chunk access scenarios when decoding of data is computationally
expensive.
NOTE: When using the write cache feature with object arrays(i.e.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NOTE: When using the write cache feature with object arrays(i.e.
NOTE: When using the write cache feature with object arrays (i.e.

when dtype of array is 'object' and when writing to the array with
chunk_cache provided) could result in a slight slowdown as some
dtypes, like VLenArray, have to go through the encode-decode phase
before having the correct dtype.
filters : sequence of Codecs, optional
Sequence of filters to use to encode chunk data prior to compression.
cache_metadata : bool, optional
Expand Down Expand Up @@ -124,7 +133,8 @@ def create(shape, chunks=True, dtype=None, compressor='default',

# instantiate array
z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer,
cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only)
cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only,
chunk_cache=chunk_cache)

return z

Expand Down Expand Up @@ -380,6 +390,7 @@ def open_array(
chunk_store=None,
storage_options=None,
partial_decompress=False,
chunk_cache=None,
**kwargs
):
"""Open an array using file-mode-like semantics.
Expand Down Expand Up @@ -436,6 +447,15 @@ def open_array(
read and decompressed when possible.

.. versionadded:: 2.7
chunk_cache: MutableMapping, optional
Mapping to store decoded chunks for caching. Can be used in repeated
chunk access scenarios when decoding of data is computationally
expensive.
NOTE: When using the write cache feature with object arrays(i.e.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NOTE: When using the write cache feature with object arrays(i.e.
NOTE: When using the write cache feature with object arrays (i.e.

when dtype of array is 'object' and when writing to the array with
chunk_cache provided) could result in a slight slowdown as some
dtypes, like VLenArray, have to go through the encode-decode phase
before having the correct dtype.

Returns
-------
Expand Down Expand Up @@ -525,7 +545,7 @@ def open_array(
# instantiate array
z = Array(store, read_only=read_only, synchronizer=synchronizer,
cache_metadata=cache_metadata, cache_attrs=cache_attrs, path=path,
chunk_store=chunk_store)
chunk_store=chunk_store, chunk_cache=chunk_cache)

return z

Expand Down
Loading