diff --git a/docs/api/storage.rst b/docs/api/storage.rst index 4321837449..092b6cf54b 100644 --- a/docs/api/storage.rst +++ b/docs/api/storage.rst @@ -39,6 +39,10 @@ Storage (``zarr.storage``) .. autoclass:: ConsolidatedMetadataStore +.. autoclass:: LRUChunkCache + + .. automethod:: invalidate + .. autofunction:: init_array .. autofunction:: init_group .. autofunction:: contains_array diff --git a/docs/tutorial.rst b/docs/tutorial.rst index a3421608cc..ff44de1b4d 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -851,10 +851,34 @@ store. E.g.:: b'Hello from the cloud!' 0.0009490990014455747 -If you are still experiencing poor performance with distributed/cloud storage, -please raise an issue on the GitHub issue tracker with any profiling data you -can provide, as there may be opportunities to optimise further either within -Zarr or within the mapping interface to the storage. +The above :class:`zarr.storage.LRUStoreCache` wraps any Zarr storage class, and stores +encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases +where decoding is computationally expensive, Zarr also provides a +:class:`zarr.storage.LRUChunkCache` which can store decoded chunks, e.g.:: + + >>> import zarr + >>> from numcodecs import LZMA + >>> import numpy as np + >>> store = zarr.DictStore() + >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), + ... store=store, compressor=LZMA()) + >>> from timeit import timeit + >>> # data access without cache + ... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP + 0.6703157789888792 + >>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None)) + >>> # first data access about the same as without cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.681269913999131 + >>> # second time accesses the decoded chunks in the cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.007617925992235541 + + +If you are still experiencing poor performance with distributed/cloud storage, please +raise an issue on the GitHub issue tracker with any profiling data you can provide, as +there may be opportunities to optimise further either within Zarr or within the mapping +interface to the storage. IO with ``fsspec`` ~~~~~~~~~~~~~~~~~~ diff --git a/zarr/__init__.py b/zarr/__init__.py index 7558ce77de..1c82613f75 100644 --- a/zarr/__init__.py +++ b/zarr/__init__.py @@ -13,7 +13,7 @@ from zarr.storage import (ABSStore, DBMStore, DictStore, DirectoryStore, LMDBStore, LRUStoreCache, MemoryStore, MongoDBStore, NestedDirectoryStore, RedisStore, SQLiteStore, - TempStore, ZipStore) + TempStore, ZipStore, LRUChunkCache) from zarr.sync import ProcessSynchronizer, ThreadSynchronizer from zarr.version import version as __version__ diff --git a/zarr/core.py b/zarr/core.py index ba3f2c1e2d..ab9b11b65f 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -76,6 +76,15 @@ class Array: read and decompressed when possible. .. versionadded:: 2.7 + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. Attributes ---------- @@ -138,6 +147,7 @@ def __init__( cache_metadata=True, cache_attrs=True, partial_decompress=False, + chunk_cache=None, ): # N.B., expect at this point store is fully initialized with all # configuration metadata fully specified and normalized @@ -154,6 +164,7 @@ def __init__( self._cache_metadata = cache_metadata self._is_view = False self._partial_decompress = partial_decompress + self._chunk_cache = chunk_cache # initialize metadata self._load_metadata() @@ -795,19 +806,33 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): if selection not in ((), (Ellipsis,)): err_too_many_indices(selection, ()) - try: - # obtain encoded data for chunk - ckey = self._chunk_key((0,)) - cdata = self.chunk_store[ckey] + # obtain key for chunk + ckey = self._chunk_key((0,)) - except KeyError: - # chunk not initialized - chunk = np.zeros((), dtype=self._dtype) - if self._fill_value is not None: - chunk.fill(self._fill_value) + # setup variable to hold decoded chunk + chunk = None - else: - chunk = self._decode_chunk(cdata) + # check for cached chunk + if self._chunk_cache is not None: + chunk = self._chunk_cache.get(ckey) + + if chunk is None: + try: + # obtain encoded data for chunk + cdata = self.chunk_store[ckey] + + except KeyError: + # chunk not initialized + chunk = np.zeros((), dtype=self._dtype) + if self._fill_value is not None: + chunk.fill(self._fill_value) + + else: + chunk = self._decode_chunk(cdata) + + # cache decoded chunk + if self._chunk_cache is not None: + self._chunk_cache[ckey] = chunk # handle fields if fields: @@ -1590,6 +1615,12 @@ def _set_basic_selection_zd(self, selection, value, fields=None): cdata = self._encode_chunk(chunk) self.chunk_store[ckey] = cdata + if self._chunk_cache is not None: + # ensure cached chunk has been round tripped through encode-decode if dtype=object + if self.dtype == object: + chunk = self._decode_chunk(cdata) + self._chunk_cache[ckey] = chunk + def _set_basic_selection_nd(self, selection, value, fields=None): # implementation of __setitem__ for array with at least one dimension @@ -1649,6 +1680,7 @@ def _set_selection(self, indexer, value, fields=None): # put data self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields) + else: lchunk_coords, lchunk_selection, lout_selection = zip(*indexer) chunk_values = [] @@ -1671,6 +1703,18 @@ def _set_selection(self, indexer, value, fields=None): self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values, fields=fields) + def _select_and_set_out(self, fields, chunk, chunk_selection, drop_axes, + out, out_selection): + # select data from chunk + if fields: + chunk = chunk[fields] + tmp = chunk[chunk_selection] + if drop_axes: + tmp = np.squeeze(tmp, axis=drop_axes) + + # store selected data in output + out[out_selection] = tmp + def _process_chunk( self, out, @@ -1680,6 +1724,7 @@ def _process_chunk( out_is_ndarray, fields, out_selection, + ckey, partial_read_decode=False, ): """Take binary data from storage and fill output array""" @@ -1743,16 +1788,12 @@ def _process_chunk( except ArrayIndexError: cdata = cdata.read_full() chunk = self._decode_chunk(cdata) + if self._chunk_cache is not None: + # cache the decoded chunk + self._chunk_cache[ckey] = chunk - # select data from chunk - if fields: - chunk = chunk[fields] - tmp = chunk[chunk_selection] - if drop_axes: - tmp = np.squeeze(tmp, axis=drop_axes) - - # store selected data in output - out[out_selection] = tmp + self._select_and_set_out(fields, chunk, chunk_selection, drop_axes, + out, out_selection) def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes=None, fields=None): @@ -1785,22 +1826,38 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, # obtain key for chunk ckey = self._chunk_key(chunk_coords) - try: - # obtain compressed data for chunk - cdata = self.chunk_store[ckey] + # setup variable to hold decoded chunk + chunk = None - except KeyError: - # chunk not initialized - if self._fill_value is not None: - if fields: - fill_value = self._fill_value[fields] - else: - fill_value = self._fill_value - out[out_selection] = fill_value + # check for cached chunk + if self._chunk_cache is not None: + try: + chunk = self._chunk_cache[ckey] + self._select_and_set_out(fields, chunk, chunk_selection, + drop_axes, out, out_selection) + except KeyError: + pass - else: - self._process_chunk(out, cdata, chunk_selection, drop_axes, - out_is_ndarray, fields, out_selection) + if chunk is None: + + try: + # obtain compressed data for chunk + cdata = self.chunk_store[ckey] + + except KeyError: + # chunk not initialized + if self._fill_value is not None: + if fields: + fill_value = self._fill_value[fields] + else: + fill_value = self._fill_value + out[out_selection] = fill_value + return + + else: + self._process_chunk(out, cdata, chunk_selection, drop_axes, + out_is_ndarray, fields, out_selection, + ckey) def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, drop_axes=None, fields=None): @@ -1844,6 +1901,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, out_is_ndarray, fields, out_select, + ckey, partial_read_decode=partial_read_decode, ) else: @@ -1949,7 +2007,16 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None): chunk[chunk_selection] = value # encode chunk - return self._encode_chunk(chunk) + cdata = self._encode_chunk(chunk) + + # cache the chunk + if self._chunk_cache is not None: + # ensure cached chunk has been round tripped through encode-decode if dtype=object + if self.dtype == object: + chunk = self._decode_chunk(cdata) + self._chunk_cache[ckey] = np.copy(chunk) + + return cdata def _chunk_key(self, chunk_coords): return self._key_prefix + self._dimension_separator.join(map(str, chunk_coords)) diff --git a/zarr/creation.py b/zarr/creation.py index 0e2d2041ba..30eade5794 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -21,7 +21,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', store=None, synchronizer=None, overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, - object_codec=None, dimension_separator=None, **kwargs): + object_codec=None, dimension_separator=None, chunk_cache=None, + **kwargs): """Create an array. Parameters @@ -53,6 +54,15 @@ def create(shape, chunks=True, dtype=None, compressor='default', chunk_store : MutableMapping, optional Separate storage for chunks. If not provided, `store` will be used for storage of both chunks and metadata. + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. filters : sequence of Codecs, optional Sequence of filters to use to encode chunk data prior to compression. cache_metadata : bool, optional @@ -142,7 +152,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, - cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only) + cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only, + chunk_cache=chunk_cache) return z @@ -400,6 +411,7 @@ def open_array( chunk_store=None, storage_options=None, partial_decompress=False, + chunk_cache=None, **kwargs ): """Open an array using file-mode-like semantics. @@ -456,6 +468,15 @@ def open_array( read and decompressed when possible. .. versionadded:: 2.7 + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. Returns ------- @@ -545,7 +566,7 @@ def open_array( # instantiate array z = Array(store, read_only=read_only, synchronizer=synchronizer, cache_metadata=cache_metadata, cache_attrs=cache_attrs, path=path, - chunk_store=chunk_store) + chunk_store=chunk_store, chunk_cache=chunk_cache) return z diff --git a/zarr/hierarchy.py b/zarr/hierarchy.py index 87c2178e61..3b783b1e35 100644 --- a/zarr/hierarchy.py +++ b/zarr/hierarchy.py @@ -95,9 +95,10 @@ class Group(MutableMapping): """ def __init__(self, store, path=None, read_only=False, chunk_store=None, - cache_attrs=True, synchronizer=None): + cache_attrs=True, synchronizer=None, chunk_cache=None): self._store = store self._chunk_store = chunk_store + self._chunk_cache = chunk_cache self._path = normalize_storage_path(path) if self._path: self._key_prefix = self._path + '/' @@ -339,12 +340,12 @@ def __getitem__(self, item): path = self._item_path(item) if contains_array(self._store, path): return Array(self._store, read_only=self._read_only, path=path, - chunk_store=self._chunk_store, + chunk_store=self._chunk_store, chunk_cache=self._chunk_cache, synchronizer=self._synchronizer, cache_attrs=self.attrs.cache) elif contains_group(self._store, path): return Group(self._store, read_only=self._read_only, path=path, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache) else: raise KeyError(item) @@ -421,6 +422,7 @@ def groups(self): if contains_group(self._store, path): yield key, Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, + chunk_cache=self._chunk_cache, cache_attrs=self.attrs.cache, synchronizer=self._synchronizer) @@ -695,7 +697,7 @@ def _create_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache) def create_groups(self, *names, **kwargs): """Convenience method to create multiple groups in a single call.""" @@ -739,7 +741,7 @@ def _require_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache) def require_groups(self, *names): """Convenience method to require multiple groups in a single call.""" @@ -865,7 +867,8 @@ def _require_dataset_nosync(self, name, shape, dtype=None, exact=False, cache_attrs = kwargs.get('cache_attrs', self.attrs.cache) a = Array(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, synchronizer=synchronizer, - cache_metadata=cache_metadata, cache_attrs=cache_attrs) + cache_metadata=cache_metadata, cache_attrs=cache_attrs, + chunk_cache=self._chunk_cache) shape = normalize_shape(shape) if shape != a.shape: raise TypeError('shape do not match existing array; expected {}, got {}' @@ -1045,7 +1048,8 @@ def _normalize_store_arg(store, *, clobber=False, storage_options=None, mode=Non def group(store=None, overwrite=False, chunk_store=None, - cache_attrs=True, synchronizer=None, path=None): + cache_attrs=True, synchronizer=None, path=None, + chunk_cache=None): """Create a group. Parameters @@ -1099,11 +1103,12 @@ def group(store=None, overwrite=False, chunk_store=None, path=path) return Group(store, read_only=False, chunk_store=chunk_store, - cache_attrs=cache_attrs, synchronizer=synchronizer, path=path) + cache_attrs=cache_attrs, synchronizer=synchronizer, path=path, + chunk_cache=chunk_cache) def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None, - chunk_store=None, storage_options=None): + chunk_store=None, storage_options=None, chunk_cache=None): """Open a group using file-mode-like semantics. Parameters @@ -1188,4 +1193,5 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N read_only = mode == 'r' return Group(store, read_only=read_only, cache_attrs=cache_attrs, - synchronizer=synchronizer, path=path, chunk_store=chunk_store) + synchronizer=synchronizer, path=path, chunk_store=chunk_store, + chunk_cache=chunk_cache) diff --git a/zarr/storage.py b/zarr/storage.py index 395551687f..c0ef85e673 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -2010,7 +2010,75 @@ def __len__(self): return self.db.stat()['entries'] -class LRUStoreCache(MutableMapping): +class LRUMappingCache(MutableMapping): + """Abstract base class for Mapping Cache + """ + + def __init__(self, max_size): + self._max_size = max_size + self._current_size = 0 + self._values_cache = OrderedDict() + self._mutex = Lock() + self.hits = self.misses = 0 + + def __len__(self): + return len(self._keys()) + + def __iter__(self): + return self.keys() + + def keys(self): + with self._mutex: + return iter(self._keys()) + + def _keys(self): + raise NotImplementedError # pragma: no cover + + def _pop_value(self): + # remove the first value from the cache, as this will be the least recently + # used value + _, v = self._values_cache.popitem(last=False) + return v + + def _accommodate_value(self, value_size): + if self._max_size is None: + return + # ensure there is enough space in the cache for a new value + while self._current_size + value_size > self._max_size: + v = self._pop_value() + self._current_size -= buffer_size(v) + + def _cache_value(self, key, value): + # cache a value + value_size = buffer_size(value) + # check size of the value against max size, as if the value itself exceeds max + # size then we are never going to cache it + if self._max_size is None or value_size <= self._max_size: + self._accommodate_value(value_size) + self._values_cache[key] = value + self._current_size += value_size + + def invalidate_values(self): + """Clear the values cache.""" + with self._mutex: + self._values_cache.clear() + + def _invalidate_value(self, key): + if key in self._values_cache: + value = self._values_cache.pop(key) + self._current_size -= buffer_size(value) + + def __getitem__(self, key): + raise NotImplementedError # pragma: no cover + + def __setitem__(self, key, value): + raise NotImplementedError # pragma: no cover + + def __delitem__(self, key): + raise NotImplementedError # pragma: no cover + + +class LRUStoreCache(LRUMappingCache): """Storage class that implements a least-recently-used (LRU) cache layer over some other store. Intended primarily for use with stores that can be slow to access, e.g., remote stores that require network communication to store and @@ -2048,15 +2116,11 @@ class LRUStoreCache(MutableMapping): """ def __init__(self, store, max_size): + super(LRUStoreCache, self).__init__(max_size) self._store = store - self._max_size = max_size - self._current_size = 0 self._keys_cache = None self._contains_cache = None self._listdir_cache = dict() - self._values_cache = OrderedDict() - self._mutex = Lock() - self.hits = self.misses = 0 def __getstate__(self): return (self._store, self._max_size, self._current_size, self._keys_cache, @@ -2069,12 +2133,6 @@ def __setstate__(self, state): self.misses) = state self._mutex = Lock() - def __len__(self): - return len(self._keys()) - - def __iter__(self): - return self.keys() - def __contains__(self, key): with self._mutex: if self._contains_cache is None: @@ -2085,10 +2143,6 @@ def clear(self): self._store.clear() self.invalidate() - def keys(self): - with self._mutex: - return iter(self._keys()) - def _keys(self): if self._keys_cache is None: self._keys_cache = list(self._store.keys()) @@ -2106,41 +2160,12 @@ def listdir(self, path=None): def getsize(self, path=None): return getsize(self._store, path=path) - def _pop_value(self): - # remove the first value from the cache, as this will be the least recently - # used value - _, v = self._values_cache.popitem(last=False) - return v - - def _accommodate_value(self, value_size): - if self._max_size is None: - return - # ensure there is enough space in the cache for a new value - while self._current_size + value_size > self._max_size: - v = self._pop_value() - self._current_size -= buffer_size(v) - - def _cache_value(self, key, value): - # cache a value - value_size = buffer_size(value) - # check size of the value against max size, as if the value itself exceeds max - # size then we are never going to cache it - if self._max_size is None or value_size <= self._max_size: - self._accommodate_value(value_size) - self._values_cache[key] = value - self._current_size += value_size - def invalidate(self): """Completely clear the cache.""" with self._mutex: self._values_cache.clear() self._invalidate_keys() - def invalidate_values(self): - """Clear the values cache.""" - with self._mutex: - self._values_cache.clear() - def invalidate_keys(self): """Clear the keys cache.""" with self._mutex: @@ -2151,11 +2176,6 @@ def _invalidate_keys(self): self._contains_cache = None self._listdir_cache.clear() - def _invalidate_value(self, key): - if key in self._values_cache: - value = self._values_cache.pop(key) - self._current_size -= buffer_size(value) - def __getitem__(self, key): try: # first try to obtain the value from the cache @@ -2621,3 +2641,105 @@ def getsize(self, path): def listdir(self, path): return listdir(self.meta_store, path) + + +class LRUChunkCache(LRUMappingCache): + """Class that implements a least-recently-used (LRU) cache for array chunks. + Intended primarily for use with stores that can be slow to access, e.g., remote stores that + require network communication to store and retrieve data, and/or arrays where decompression + of data is computationally expensive. + + Parameters + ---------- + max_size : int + The maximum size that the cache may grow to, in number of bytes. Provide `None` + if you would like the cache to have unlimited size. + + Examples + -------- + The example below uses a dict store to store the encoded array and uses LRUChunkCache to + store decoded chunks:: + + >>> import zarr + >>> from numcodecs import LZMA + >>> import numpy as np + >>> store = zarr.MemoryStore() + >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), + ... store=store, compressor=LZMA()) + >>> from timeit import timeit + >>> # data access without cache + ... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP + 0.6703157789888792 + >>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None)) + >>> # first data access about the same as without cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.681269913999131 + >>> # second time accesses the decoded chunks in the cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.007617925992235541 + + """ + + def __init__(self, max_size): + super(LRUChunkCache, self).__init__(max_size) + + def __getstate__(self): + return (self._max_size, self._current_size, + self._values_cache, self.hits, + self.misses) + + def __setstate__(self, state): + (self._max_size, self._current_size, + self._values_cache, self.hits, + self.misses) = state + self._mutex = Lock() + + def __contains__(self, key): + with self._mutex: + return key in self._keys() + + def clear(self): + self.invalidate() + + def _keys(self): + return self._values_cache.keys() + + def values(self): + return self._values_cache.values() + + def items(self): + return self._values_cache.items() + + def invalidate(self): + """Completely clear the cache.""" + with self._mutex: + self._values_cache.clear() + + def __getitem__(self, key): + try: + # try to obtain the value from the cache + with self._mutex: + value = self._values_cache[key] + # cache hit if no KeyError is raised + self.hits += 1 + # treat the end as most recently used + self._values_cache.move_to_end(key) + + except KeyError: + # cache miss + with self._mutex: + self.misses += 1 + raise KeyError + + return value + + def __setitem__(self, key, value): + with self._mutex: + self._invalidate_value(key) + self._cache_value(key, value) + + def __delitem__(self, key): + if key not in self._values_cache: + raise KeyError + with self._mutex: + self._invalidate_value(key) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index be2feffe8a..6a62270203 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -25,6 +25,7 @@ DBMStore, DirectoryStore, LMDBStore, + LRUChunkCache, LRUStoreCache, NestedDirectoryStore, SQLiteStore, @@ -2541,6 +2542,98 @@ def test_store_has_bytes_values(self): pass +class TestArrayWithLRUChunkCache(TestArray): + + @staticmethod + def create_array(read_only=False, **kwargs): + store = dict() + kwargs.setdefault('compressor', Zlib(level=1)) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + init_array(store, **kwargs) + return Array(store, read_only=read_only, cache_metadata=cache_metadata, + cache_attrs=cache_attrs, chunk_cache=LRUChunkCache(max_size=None)) + + @staticmethod + def create_array_with_cache(read_only=False, **kwargs): + store = dict() + kwargs.setdefault('compressor', Zlib(level=1)) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + init_array(store, **kwargs) + cache = LRUChunkCache(max_size=None) + return Array(store, read_only=read_only, cache_metadata=cache_metadata, + cache_attrs=cache_attrs, chunk_cache=cache), cache + + def test_hit_miss(self): + a = np.arange(100).reshape((10, 10)) + z, cache = self.create_array_with_cache(shape=a.shape, chunks=(10, 1), dtype=a.dtype) + + # test write cache + z[:] = a + assert cache.misses == 0 and cache.hits == 0 + z[:] + assert cache.misses == 0 and cache.hits == 10 + + cache.clear() + cache.misses = 0 + cache.hits = 0 + + # test read cache + assert cache.misses == 0 and cache.hits == 0 + z[:] + assert cache.misses == 10 and cache.hits == 0 + z[:] + assert cache.misses == 10 and cache.hits == 10 + + # noinspection PyStatementEffect + def test_array_0d_with_object_arrays(self): + # test behaviour for array with 0 dimensions + + # setup + a = np.zeros((), dtype=object) + z = self.create_array(shape=(), dtype=a.dtype, fill_value=0, object_codec=Pickle()) + + # check properties + assert a.ndim == z.ndim + assert a.shape == z.shape + assert a.size == z.size + assert a.dtype == z.dtype + assert a.nbytes == z.nbytes + with pytest.raises(TypeError): + len(z) + assert () == z.chunks + assert 1 == z.nchunks + assert (1,) == z.cdata_shape + # compressor always None - no point in compressing a single value + assert z.compressor is None + + # check __getitem__ + b = z[...] + assert isinstance(b, np.ndarray) + assert a.shape == b.shape + assert a.dtype == b.dtype + assert_array_equal(a, np.array(z)) + assert_array_equal(a, z[...]) + assert a[()] == z[()] + with pytest.raises(IndexError): + z[0] + with pytest.raises(IndexError): + z[:] + + # check __setitem__ + z[...] = 42 + assert 42 == z[()] + z[()] = 43 + assert 43 == z[()] + with pytest.raises(IndexError): + z[0] = 42 + with pytest.raises(IndexError): + z[:] = 42 + with pytest.raises(ValueError): + z[...] = np.array([1, 2, 3]) + + @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") class TestArrayWithFSStore(TestArray): @staticmethod diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 1412ec2099..c00166dde5 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -26,10 +26,10 @@ from zarr.n5 import N5Store, N5FSStore from zarr.storage import (ABSStore, ConsolidatedMetadataStore, DBMStore, DictStore, DirectoryStore, LMDBStore, LRUStoreCache, - MemoryStore, MongoDBStore, NestedDirectoryStore, - RedisStore, SQLiteStore, TempStore, ZipStore, - array_meta_key, atexit_rmglob, atexit_rmtree, - attrs_key, default_compressor, getsize, + LRUChunkCache, MemoryStore, MongoDBStore, + NestedDirectoryStore, RedisStore, SQLiteStore, + TempStore, ZipStore, array_meta_key, atexit_rmglob, + atexit_rmtree, attrs_key, default_compressor, getsize, group_meta_key, init_array, init_group, migrate_1to2) from zarr.storage import FSStore from zarr.tests.util import CountingDict, have_fsspec, skip_test_env_var, abs_container @@ -54,8 +54,8 @@ def skip_if_nested_chunks(**kwargs): pytest.skip("nested chunks are unsupported") -class StoreTests(object): - """Abstract store tests.""" +class MutableMappingStoreTests(object): + """Abstract Mutable Mapping Tests""" def create_store(self, **kwargs): # pragma: no cover # implement in sub-class @@ -249,6 +249,10 @@ def test_getsize(self): if hasattr(store, 'close'): store.close() + +class StoreTests(MutableMappingStoreTests): + """Abstract store tests.""" + # noinspection PyStatementEffect def test_hierarchy(self): # setup @@ -1663,12 +1667,13 @@ def create_store(self, **kwargs): return store -class TestLRUStoreCache(StoreTests): +class CacheTests(object): - def create_store(self, **kwargs): - # wrapper therefore no dimension_separator argument - skip_if_nested_chunks(**kwargs) - return LRUStoreCache(dict(), max_size=2**27) + def create_store(self): # pragma: no cover + raise NotImplementedError + + def create_cache(self, store, max_size=None): # pragma: no cover + raise NotImplementedError def test_cache_values_no_max_size(self): @@ -1682,7 +1687,7 @@ def test_cache_values_no_max_size(self): assert 1 == store.counter['__setitem__', 'bar'] # setup cache - cache = LRUStoreCache(store, max_size=None) + cache = self.create_cache(store) assert 0 == cache.hits assert 0 == cache.misses @@ -1721,19 +1726,6 @@ def test_cache_values_no_max_size(self): assert 3 == store.counter['__getitem__', 'foo'] assert 2 == store.counter['__setitem__', 'foo'] - # test __delitem__ - del cache['foo'] - with pytest.raises(KeyError): - # noinspection PyStatementEffect - cache['foo'] - with pytest.raises(KeyError): - # noinspection PyStatementEffect - store['foo'] - - # verify other keys untouched - assert 0 == store.counter['__getitem__', 'bar'] - assert 1 == store.counter['__setitem__', 'bar'] - def test_cache_values_with_max_size(self): # setup store @@ -1743,7 +1735,7 @@ def test_cache_values_with_max_size(self): assert 0 == store.counter['__getitem__', 'foo'] assert 0 == store.counter['__getitem__', 'bar'] # setup cache - can only hold one item - cache = LRUStoreCache(store, max_size=5) + cache = self.create_cache(store, max_size=5) assert 0 == cache.hits assert 0 == cache.misses @@ -1830,6 +1822,38 @@ def test_cache_values_with_max_size(self): assert 4 == cache.hits assert 2 == cache.misses + +class TestLRUStoreCache(StoreTests, CacheTests): + + def create_store(self, **kwargs): + # wrapper therefore no dimension_separator argument + skip_if_nested_chunks(**kwargs) + return LRUStoreCache(dict(), max_size=2**27) + + def create_cache(self, store, max_size=None): + return LRUStoreCache(store=store, max_size=max_size) + + def test_delitem(self): + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + + # setup cache + cache = self.create_cache(store) + + del cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + store['foo'] + + # verify other keys untouched + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + def test_cache_keys(self): # setup @@ -1839,7 +1863,7 @@ def test_cache_keys(self): assert 0 == store.counter['__contains__', 'foo'] assert 0 == store.counter['__iter__'] assert 0 == store.counter['keys'] - cache = LRUStoreCache(store, max_size=None) + cache = self.create_cache(store, max_size=None) # keys should be cached on first call keys = sorted(cache.keys()) @@ -1889,6 +1913,70 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] +class TestLRUChunkCache(MutableMappingStoreTests, CacheTests): + + # mock test object that will act as both the cache and the array + class MockChunkCacheArray(object): + + def __init__(self, chunk_cache, store): + self.chunk_cache = chunk_cache + self._store = store + self.hits = 0 + self.misses = 0 + + def __setitem__(self, key, value): + self._store[key] = value + self.chunk_cache[key] = value + self._reset_hits_misses() + + def __getitem__(self, item): + try: + value = self.chunk_cache[item] + except KeyError: + value = self._store[item] + self.chunk_cache[item] = value + self._reset_hits_misses() + return value + + def __delitem__(self, key): + self.chunk_cache.__delitem__(key) + + def _reset_hits_misses(self): + self.hits = self.chunk_cache.hits + self.misses = self.chunk_cache.misses + + def invalidate(self): + self.chunk_cache.invalidate() + + def invalidate_values(self): + self.chunk_cache.invalidate_values() + + def create_store(self): + return LRUChunkCache(max_size=2**27) + + def create_cache(self, store, max_size=None): + return self.MockChunkCacheArray(LRUChunkCache(max_size=max_size), store=store) + + def test_delitem(self): + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + + # setup cache + cache = self.create_cache(store) + + cache['foo'] + del cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + cache.chunk_cache['foo'] + + # verify other keys untouched + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + + def test_getsize(): store = dict() store['foo'] = b'aaa'