Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Attempt to continue LRU cache for decoded chunks #1214

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
d62febb
first attempt at chunk_cache layer
shikharsg Aug 29, 2018
f796ea7
ChunkCache test with MockChunkCacheArray
shikharsg Aug 29, 2018
32141a9
np.copy not needed when accessing a subset of a chunk
shikharsg Oct 9, 2018
b35139b
fixed 'Mm' dtype error for buffersize function
shikharsg Oct 13, 2018
3c45176
renamed ChunkCache to LRUChunkCache
Oct 13, 2018
46dcf94
LRUChunkCache in zarr root namespace
Oct 13, 2018
c69c751
LRUChunkCache example
Oct 13, 2018
2cb143e
write caching of chunk should be done after encoding
Oct 15, 2018
2fb169e
ensure cached chunk has been round tripped through encode-decode if d…
Oct 15, 2018
31e4dfb
flake8 fixes
Oct 15, 2018
5559c4f
read write cache for 0-d arrays
Oct 15, 2018
2a0124a
added tutorial and api docs
Oct 15, 2018
6fac2ad
separated store tests from mutable mapping tests in test_storage.py
shikharsg Oct 20, 2018
4e79d0b
fixed pickle, __delitem__ and ordered dict iteration bugs
shikharsg Oct 20, 2018
5fd6fc8
documenting slowdown when using write cache with object arrays
shikharsg Oct 20, 2018
422f9eb
factoring out mapping code from LRUStoreCache and LRUChunkCache
shikharsg Oct 23, 2018
44cea83
consistent variable naming in _chunk_getitem
shikharsg Nov 11, 2018
1b67e90
removed unnecesary code from _set_basic_selection_zd and added encode…
shikharsg Nov 11, 2018
9b0cc29
flake 8 fixes
shikharsg Nov 11, 2018
715f86d
Merge remote-tracking branch 'upstream/master' into chunk_cache
shikharsg Nov 13, 2018
0013f95
fixed coverage
shikharsg Nov 14, 2018
b70c348
Merge branch 'chunk_cache' into master
shikharsg Nov 14, 2018
c4f2487
Merge pull request #4 from shikharsg/master
shikharsg Nov 14, 2018
245f661
Merge branch 'master' into chunk_cache
shikharsg Nov 15, 2018
a2a05fb
Merge branch 'master' into chunk_cache
shikharsg Jan 8, 2019
b8b9056
Merge branch 'chunk_cache' into chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
04f0367
Merge pull request #3 from shikharsg/chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
f19d43e
bug fix
shikharsg Jan 27, 2019
52a43bf
Merge branch 'master' into chunk_cache
shikharsg Jan 27, 2019
697d46e
python 2 and 3 compatibility
shikharsg Jan 27, 2019
1e727c7
Merge branch 'master' into chunk_cache
shikharsg Feb 10, 2019
377ece7
coverage fix and __init__.py LRUChunkCache import
shikharsg Feb 10, 2019
3473adb
merged chunk_cache with master
shikharsg Mar 4, 2019
df84c89
flake8 fix
shikharsg Mar 4, 2019
88fe66d
Merge branch 'master' into chunk_cache
Mar 30, 2019
a816014
Implemented https://github.com/zarr-developers/zarr/pull/306/files#r2…
Apr 11, 2019
8cc083b
cache tests refactor
May 3, 2019
23fcdea
fixed minor tests mistak
May 3, 2019
309cc48
Merge branch 'master' into chunk_cache
May 3, 2019
635ec87
flake8 fix
May 3, 2019
a85d156
Merge remote-tracking branch 'upstream/master' into chunk_cache
Aug 20, 2019
ef86184
merged with master
Oct 30, 2019
875c24f
added chunk cache to Group
Nov 20, 2019
dcd4ee7
merge with master
Nov 20, 2019
4a1baa9
added chunk_cache to all relevant function
Nov 20, 2019
e6540e1
Merge branch 'master' into chunk_cache
Dec 12, 2019
9f9d176
merge with master
Sep 30, 2020
6571382
fixed failing doctest
Sep 30, 2020
8c8a69f
Merge remote-tracking branch 'origin/master' into pr-306
joshmoore Feb 19, 2021
e0e5254
fixed setitem caching order
Feb 20, 2021
992b48a
Merge branch 'master' into chunk_cache
jakirkham Feb 21, 2021
38ee622
refactor
Feb 21, 2021
8b6a699
Merge branch 'master' into chunk_cache
Apr 5, 2021
ba5c0ed
Merge 'origin/master' into pr-306
joshmoore Aug 27, 2021
7cdce5f
Remove use of unittest
joshmoore Aug 27, 2021
06c899b
Merge branch 'master' into chunk_cache
joshmoore Sep 23, 2021
c9be163
Merge remote-tracking branch 'origin/main' into chunk_cache
croth1 Oct 21, 2022
16793f5
code style cleanup
croth1 Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/api/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Storage (``zarr.storage``)

.. autoclass:: ConsolidatedMetadataStore

.. autoclass:: LRUChunkCache

.. automethod:: invalidate

.. autofunction:: init_array
.. autofunction:: init_group
.. autofunction:: contains_array
Expand Down
32 changes: 28 additions & 4 deletions docs/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -863,10 +863,34 @@ store. E.g.::
b'Hello from the cloud!'
0.0009490990014455747

If you are still experiencing poor performance with distributed/cloud storage,
please raise an issue on the GitHub issue tracker with any profiling data you
can provide, as there may be opportunities to optimise further either within
Zarr or within the mapping interface to the storage.
The above :class:`zarr.storage.LRUStoreCache` wraps any Zarr storage class, and stores
encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases
where decoding is computationally expensive, Zarr also provides a
:class:`zarr.storage.LRUChunkCache` which can store decoded chunks, e.g.::

>>> import zarr
>>> from numcodecs import LZMA
>>> import numpy as np
>>> store = zarr.DictStore()
>>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100),
... store=store, compressor=LZMA())
>>> from timeit import timeit
>>> # data access without cache
... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP
0.6703157789888792
>>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None))
>>> # first data access about the same as without cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.681269913999131
>>> # second time accesses the decoded chunks in the cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.007617925992235541


If you are still experiencing poor performance with distributed/cloud storage, please
raise an issue on the GitHub issue tracker with any profiling data you can provide, as
there may be opportunities to optimise further either within Zarr or within the mapping
interface to the storage.

IO with ``fsspec``
~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion zarr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from zarr.storage import (ABSStore, DBMStore, DictStore, DirectoryStore,
KVStore, LMDBStore, LRUStoreCache, MemoryStore, MongoDBStore,
NestedDirectoryStore, RedisStore, SQLiteStore,
TempStore, ZipStore)
TempStore, ZipStore, LRUChunkCache)
from zarr.sync import ProcessSynchronizer, ThreadSynchronizer
from zarr.version import version as __version__

Expand Down
144 changes: 110 additions & 34 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ class Array:
read and decompressed when possible.

.. versionadded:: 2.7
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will need to get updated.

chunk_cache: MutableMapping, optional
Mapping to store decoded chunks for caching. Can be used in repeated
chunk access scenarios when decoding of data is computationally
expensive.
NOTE: When using the write cache feature with object arrays(i.e.
when dtype of array is 'object' and when writing to the array with
chunk_cache provided) could result in a slight slowdown as some
dtypes, like VLenArray, have to go through the encode-decode phase
before having the correct dtype.

write_empty_chunks : bool, optional
If True, all chunks will be stored regardless of their contents. If
Expand Down Expand Up @@ -169,6 +178,7 @@ def __init__(
cache_metadata=True,
cache_attrs=True,
partial_decompress=False,
chunk_cache=None,
write_empty_chunks=True,
zarr_version=None,
meta_array=None,
Expand Down Expand Up @@ -199,6 +209,7 @@ def __init__(
self._cache_metadata = cache_metadata
self._is_view = False
self._partial_decompress = partial_decompress
self._chunk_cache = chunk_cache
self._write_empty_chunks = write_empty_chunks
if meta_array is not None:
self._meta_array = np.empty_like(meta_array, shape=())
Expand Down Expand Up @@ -941,19 +952,33 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None):
if selection not in ((), (Ellipsis,)):
err_too_many_indices(selection, ())

try:
# obtain encoded data for chunk
ckey = self._chunk_key((0,))
cdata = self.chunk_store[ckey]
# obtain key for chunk
ckey = self._chunk_key((0,))

except KeyError:
# chunk not initialized
chunk = np.zeros_like(self._meta_array, shape=(), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)
# setup variable to hold decoded chunk
chunk = None

else:
chunk = self._decode_chunk(cdata)
# check for cached chunk
if self._chunk_cache is not None:
chunk = self._chunk_cache.get(ckey)

if chunk is None:
try:
# obtain encoded data for chunk
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
chunk = np.zeros_like(self._meta_array, shape=(), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)

else:
chunk = self._decode_chunk(cdata)

# cache decoded chunk
if self._chunk_cache is not None:
self._chunk_cache[ckey] = chunk

# handle fields
if fields:
Expand Down Expand Up @@ -1752,6 +1777,10 @@ def _set_basic_selection_zd(self, selection, value, fields=None):

# remove chunk if write_empty_chunks is false and it only contains the fill value
if (not self.write_empty_chunks) and all_equal(self.fill_value, chunk):
# invalidate value in cache
if self._chunk_cache is not None:
if ckey in self._chunk_cache:
del self._chunk_cache[ckey]
try:
del self.chunk_store[ckey]
return
Expand All @@ -1763,6 +1792,12 @@ def _set_basic_selection_zd(self, selection, value, fields=None):
cdata = self._encode_chunk(chunk)
self.chunk_store[ckey] = cdata

if self._chunk_cache is not None:
# ensure cached chunk has been round tripped through encode-decode if dtype=object
if self.dtype == object:
chunk = self._decode_chunk(cdata)
self._chunk_cache[ckey] = chunk

def _set_basic_selection_nd(self, selection, value, fields=None):
# implementation of __setitem__ for array with at least one dimension

Expand Down Expand Up @@ -1822,6 +1857,7 @@ def _set_selection(self, indexer, value, fields=None):

# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)

else:
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
chunk_values = []
Expand All @@ -1844,6 +1880,18 @@ def _set_selection(self, indexer, value, fields=None):
self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
fields=fields)

def _select_and_set_out(self, fields, chunk, chunk_selection, drop_axes,
out, out_selection):
# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp

def _process_chunk(
self,
out,
Expand All @@ -1853,6 +1901,7 @@ def _process_chunk(
out_is_ndarray,
fields,
out_selection,
ckey,
partial_read_decode=False,
):
"""Take binary data from storage and fill output array"""
Expand Down Expand Up @@ -1919,16 +1968,12 @@ def _process_chunk(
except ArrayIndexError:
cdata = cdata.read_full()
chunk = self._decode_chunk(cdata)
if self._chunk_cache is not None:
# cache the decoded chunk
self._chunk_cache[ckey] = chunk

# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
out[out_selection] = tmp
self._select_and_set_out(fields, chunk, chunk_selection, drop_axes,
out, out_selection)

def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
drop_axes=None, fields=None):
Expand Down Expand Up @@ -1961,22 +2006,38 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
# obtain key for chunk
ckey = self._chunk_key(chunk_coords)

try:
# obtain compressed data for chunk
cdata = self.chunk_store[ckey]
# setup variable to hold decoded chunk
chunk = None

except KeyError:
# chunk not initialized
if self._fill_value is not None:
if fields:
fill_value = self._fill_value[fields]
else:
fill_value = self._fill_value
out[out_selection] = fill_value
# check for cached chunk
if self._chunk_cache is not None:
try:
chunk = self._chunk_cache[ckey]
self._select_and_set_out(fields, chunk, chunk_selection,
drop_axes, out, out_selection)
except KeyError:
pass

else:
self._process_chunk(out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection)
if chunk is None:

try:
# obtain compressed data for chunk
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
if self._fill_value is not None:
if fields:
fill_value = self._fill_value[fields]
else:
fill_value = self._fill_value
out[out_selection] = fill_value
return

else:
self._process_chunk(out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection,
ckey)

def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
drop_axes=None, fields=None):
Expand Down Expand Up @@ -2020,6 +2081,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
out_is_ndarray,
fields,
out_select,
ckey,
partial_read_decode=partial_read_decode,
)
else:
Expand Down Expand Up @@ -2159,6 +2221,13 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
else:
chunk[chunk_selection] = value

# cache the chunk
if self._chunk_cache is not None:
# ensure cached chunk has been round tripped through encode-decode if dtype=object
if self.dtype == object:
chunk = self._decode_chunk(self._encode_chunk(chunk))
self._chunk_cache[ckey] = np.copy(chunk)

return chunk

def _chunk_key(self, chunk_coords):
Expand Down Expand Up @@ -2502,6 +2571,13 @@ def _resize_nosync(self, *args):
except KeyError:
# chunk not initialized
pass
if self._chunk_cache is not None:
try:
del self._chunk_cache[key]
except KeyError:
# chunk not cached
pass

old_cdata_shape_working_list[idx_cdata] = min(val_old_cdata, val_new_cdata)

def append(self, data, axis=0):
Expand Down
28 changes: 25 additions & 3 deletions zarr/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def create(shape, chunks=True, dtype=None, compressor='default',
fill_value=0, order='C', store=None, synchronizer=None,
overwrite=False, path=None, chunk_store=None, filters=None,
cache_metadata=True, cache_attrs=True, read_only=False,
object_codec=None, dimension_separator=None, write_empty_chunks=True,
object_codec=None, dimension_separator=None, chunk_cache=None, write_empty_chunks=True,
*, zarr_version=None, meta_array=None, **kwargs):
"""Create an array.

Expand Down Expand Up @@ -53,6 +53,15 @@ def create(shape, chunks=True, dtype=None, compressor='default',
chunk_store : MutableMapping, optional
Separate storage for chunks. If not provided, `store` will be used
for storage of both chunks and metadata.
chunk_cache: MutableMapping, optional
Mapping to store decoded chunks for caching. Can be used in repeated
chunk access scenarios when decoding of data is computationally
expensive.
NOTE: When using the write cache feature with object arrays(i.e.
when dtype of array is 'object' and when writing to the array with
chunk_cache provided) could result in a slight slowdown as some
dtypes, like VLenArray, have to go through the encode-decode phase
before having the correct dtype.
filters : sequence of Codecs, optional
Sequence of filters to use to encode chunk data prior to compression.
cache_metadata : bool, optional
Expand Down Expand Up @@ -174,7 +183,8 @@ def create(shape, chunks=True, dtype=None, compressor='default',
# instantiate array
z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer,
cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only,
write_empty_chunks=write_empty_chunks, meta_array=meta_array)
chunk_cache=chunk_cache, write_empty_chunks=write_empty_chunks,
meta_array=meta_array)

return z

Expand Down Expand Up @@ -411,6 +421,7 @@ def open_array(
chunk_store=None,
storage_options=None,
partial_decompress=False,
chunk_cache=None,
write_empty_chunks=True,
*,
zarr_version=None,
Expand Down Expand Up @@ -478,6 +489,16 @@ def open_array(
non-fill-value data are stored, at the expense of overhead associated
with checking the data of each chunk.

.. versionadded:: 2.7
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to get updated

chunk_cache: MutableMapping, optional
Mapping to store decoded chunks for caching. Can be used in repeated
chunk access scenarios when decoding of data is computationally
expensive.
NOTE: When using the write cache feature with object arrays(i.e.
when dtype of array is 'object' and when writing to the array with
chunk_cache provided) could result in a slight slowdown as some
dtypes, like VLenArray, have to go through the encode-decode phase
before having the correct dtype.
.. versionadded:: 2.11

zarr_version : {None, 2, 3}, optional
Expand Down Expand Up @@ -598,7 +619,8 @@ def open_array(
# instantiate array
z = Array(store, read_only=read_only, synchronizer=synchronizer,
cache_metadata=cache_metadata, cache_attrs=cache_attrs, path=path,
chunk_store=chunk_store, write_empty_chunks=write_empty_chunks)
chunk_store=chunk_store, chunk_cache=chunk_cache,
write_empty_chunks=write_empty_chunks)

return z

Expand Down
Loading