From d62febb37b0de9639ae85cb2d3fc2643ef8ca4b1 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Wed, 29 Aug 2018 13:17:48 +0530 Subject: [PATCH 01/34] first attempt at chunk_cache layer --- zarr/core.py | 36 +++++++++-- zarr/storage.py | 118 +++++++++++++++++++++++++++++++++++++ zarr/tests/test_core.py | 47 ++++++++++++++- zarr/tests/test_storage.py | 57 +++++++++++++++++- 4 files changed, 252 insertions(+), 6 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 03d9bdc667..dc4f5f4a47 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -103,7 +103,8 @@ class Array(object): """ def __init__(self, store, path=None, read_only=False, chunk_store=None, - synchronizer=None, cache_metadata=True, cache_attrs=True): + synchronizer=None, cache_metadata=True, cache_attrs=True, + chunk_cache=None): # N.B., expect at this point store is fully initialized with all # configuration metadata fully specified and normalized @@ -118,6 +119,7 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None, self._synchronizer = synchronizer self._cache_metadata = cache_metadata self._is_view = False + self._chunk_cache = chunk_cache # initialize metadata self._load_metadata() @@ -1562,8 +1564,21 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, ckey = self._chunk_key(chunk_coords) try: + + cdata = None + chunk_was_cached = False + + # first try getting from cache (if one has been provided) + if self._chunk_cache is not None: + try: + cdata = self._chunk_cache[ckey] + chunk_was_cached = True + except KeyError: + pass + # obtain compressed data for chunk - cdata = self.chunk_store[ckey] + if not chunk_was_cached: + cdata = self.chunk_store[ckey] except KeyError: # chunk not initialized @@ -1593,8 +1608,12 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, # contiguous, so we can decompress directly from the chunk # into the destination array - if self._compressor: + if chunk_was_cached: + np.copyto(dest, cdata) + elif self._compressor: self._compressor.decode(cdata, dest) + if self._chunk_cache is not None: + self._chunk_cache[ckey] = np.copy(dest) else: if isinstance(cdata, np.ndarray): chunk = cdata.view(self._dtype) @@ -1602,10 +1621,17 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, chunk = np.frombuffer(cdata, dtype=self._dtype) chunk = chunk.reshape(self._chunks, order=self._order) np.copyto(dest, chunk) + if self._chunk_cache is not None: + self._chunk_cache[ckey] = np.copy(chunk) return # decode chunk - chunk = self._decode_chunk(cdata) + if not chunk_was_cached: + chunk = self._decode_chunk(cdata) + if self._chunk_cache is not None: + self._chunk_cache[ckey] = np.copy(chunk) + else: + chunk = np.copy(cdata) # select data from chunk if fields: @@ -1714,6 +1740,8 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non else: chunk[chunk_selection] = value + if self._chunk_cache is not None: + self._chunk_cache[ckey] = np.copy(chunk) # encode chunk cdata = self._encode_chunk(chunk) diff --git a/zarr/storage.py b/zarr/storage.py index 39a497d08b..07e511944e 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1883,3 +1883,121 @@ def __delitem__(self, key): with self._mutex: self._invalidate_keys() self._invalidate_value(key) + + +class ChunkCache(MutableMapping): + """Storage class that implements a least-recently-used (LRU) cache for array chunks. + Intended primarily for use with stores that can be slow to access, e.g., remote stores that + require network communication to store and retrieve data. + + Parameters + ---------- + max_size : int + The maximum size that the cache may grow to, in number of bytes. Provide `None` + if you would like the cache to have unlimited size. + + """ + + def __init__(self, max_size): + self._max_size = max_size + self._current_size = 0 + self._values_cache = OrderedDict() + self._mutex = Lock() + self.hits = self.misses = 0 + + def __getstate__(self): + return (self._max_size, self._current_size, + self._values_cache, self.hits, + self.misses) + + def __setstate__(self, state): + (self._store, self._max_size, self._current_size, + self._values_cache, self.hits, + self.misses) = state + self._mutex = Lock() + + def __len__(self): + return len(self._keys()) + + def __iter__(self): + return self.keys() + + def __contains__(self, key): + with self._mutex: + return key in self._keys() + + def clear(self): + self.invalidate() + + def keys(self): + with self._mutex: + return iter(self._keys()) + + def _keys(self): + return self._values_cache.keys() + + def _pop_value(self): + # remove the first value from the cache, as this will be the least recently + # used value + _, v = self._values_cache.popitem(last=False) + return v + + def _accommodate_value(self, value_size): + if self._max_size is None: + return + # ensure there is enough space in the cache for a new value + while self._current_size + value_size > self._max_size: + v = self._pop_value() + self._current_size -= buffer_size(v) + + def _cache_value(self, key, value): + # cache a value + value_size = buffer_size(value) + # check size of the value against max size, as if the value itself exceeds max + # size then we are never going to cache it + if self._max_size is None or value_size <= self._max_size: + self._accommodate_value(value_size) + self._values_cache[key] = value + self._current_size += value_size + + def invalidate(self): + """Completely clear the cache.""" + with self._mutex: + self._values_cache.clear() + + def invalidate_values(self): + """Clear the values cache.""" + with self._mutex: + self._values_cache.clear() + + def _invalidate_value(self, key): + if key in self._values_cache: + value = self._values_cache.pop(key) + self._current_size -= buffer_size(value) + + def __getitem__(self, key): + try: + # try to obtain the value from the cache + with self._mutex: + value = self._values_cache[key] + # cache hit if no KeyError is raised + self.hits += 1 + # treat the end as most recently used + OrderedDict_move_to_end(self._values_cache, key) + + except KeyError: + # cache miss + with self._mutex: + self.misses += 1 + raise KeyError + + return value + + def __setitem__(self, key, value): + with self._mutex: + self._invalidate_value(key) + self._cache_value(key, value) + + def __delitem__(self, key): + with self._mutex: + self._invalidate_value(key) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index 390f888287..bb2271088e 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -16,7 +16,7 @@ from zarr.storage import (DirectoryStore, init_array, init_group, NestedDirectoryStore, DBMStore, LMDBStore, atexit_rmtree, atexit_rmglob, - LRUStoreCache) + LRUStoreCache, ChunkCache) from zarr.core import Array from zarr.errors import PermissionError from zarr.compat import PY2, text_type, binary_type @@ -1698,3 +1698,48 @@ def create_array(read_only=False, **kwargs): init_array(store, **kwargs) return Array(store, read_only=read_only, cache_metadata=cache_metadata, cache_attrs=cache_attrs) + + +class TestArrayWithChunkCache(TestArray): + + @staticmethod + def create_array(read_only=False, **kwargs): + store = dict() + kwargs.setdefault('compressor', Zlib(level=1)) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + init_array(store, **kwargs) + return Array(store, read_only=read_only, cache_metadata=cache_metadata, + cache_attrs=cache_attrs, chunk_cache=ChunkCache(max_size=None)) + + @staticmethod + def create_array_with_cache(read_only=False, **kwargs): + store = dict() + kwargs.setdefault('compressor', Zlib(level=1)) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + init_array(store, **kwargs) + cache = ChunkCache(max_size=None) + return Array(store, read_only=read_only, cache_metadata=cache_metadata, + cache_attrs=cache_attrs, chunk_cache=cache), cache + + def test_hit_miss(self): + a = np.arange(100).reshape((10, 10)) + z, cache = self.create_array_with_cache(shape=a.shape, chunks=(10,1), dtype=a.dtype) + + # test write cache + z[:] = a + assert cache.misses == 0 and cache.hits == 0 + _ = z[:] + assert cache.misses == 0 and cache.hits == 10 + + cache.clear() + cache.misses = 0 + cache.hits = 0 + + # test read cache + assert cache.misses == 0 and cache.hits == 0 + _ = z[:] + assert cache.misses == 10 and cache.hits == 0 + _ = z[:] + assert cache.misses == 10 and cache.hits == 10 diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index f68f8a6ed6..ee9770ce08 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -19,7 +19,7 @@ DirectoryStore, ZipStore, init_group, group_meta_key, getsize, migrate_1to2, TempStore, atexit_rmtree, NestedDirectoryStore, default_compressor, DBMStore, - LMDBStore, atexit_rmglob, LRUStoreCache) + LMDBStore, atexit_rmglob, LRUStoreCache, ChunkCache) from zarr.meta import (decode_array_metadata, encode_array_metadata, ZARR_FORMAT, decode_group_metadata, encode_group_metadata) from zarr.compat import PY2 @@ -1071,6 +1071,61 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] +class TestChunkCache(object): + + def test_cache_values_no_max_size(self): + + # setup cache + cache = ChunkCache(max_size=None) + assert 0 == cache.hits + assert 0 == cache.misses + cache['foo'] = b'xxx' + cache['bar'] = b'yyy' + assert 0 == cache.hits + assert 0 == cache.misses + + # test __getitem__ hits cache with existent key + assert b'xxx' == cache['foo'] + assert 1 == cache.hits + assert 0 == cache.misses + + # test __getitem__ misses and raises KeyError with non existent key + with pytest.raises(KeyError): + _ = cache['baz'] + assert 1 == cache.hits + assert 1 == cache.misses + + # test second __getitem__, cache hit + assert b'xxx' == cache['foo'] + assert 2 == cache.hits + assert 1 == cache.misses + + # test __setitem__, __getitem__ + cache['foo'] = b'zzz' + # should be a cache hit + assert b'zzz' == cache['foo'] + assert 3 == cache.hits + assert 1 == cache.misses + + # manually invalidate all cached values + cache.invalidate_values() + with pytest.raises(KeyError): + assert b'zzz' == cache['foo'] + cache['foo'] = b'zzz' + assert b'zzz' == cache['foo'] + cache.invalidate() + with pytest.raises(KeyError): + assert b'zzz' == cache['foo'] + + # test __delitem__ + cache['foo'] = b'zzz' + assert b'zzz' == cache['foo'] + del cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + cache['foo'] + + def test_getsize(): store = dict() store['foo'] = b'aaa' From f796ea78aeb00336a675d52a01a319df10b65543 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Wed, 29 Aug 2018 14:32:23 +0530 Subject: [PATCH 02/34] ChunkCache test with MockChunkCacheArray --- zarr/tests/test_storage.py | 184 ++++++++++++++++++++++++++++++++----- 1 file changed, 159 insertions(+), 25 deletions(-) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index ee9770ce08..2a4e5702e1 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -1073,58 +1073,192 @@ def test_cache_keys(self): class TestChunkCache(object): + class MockChunkCacheArray(object): + + def __init__(self, chunk_cache, store): + self._chunk_cache = chunk_cache + self._store = store + + def __setitem__(self, key, value): + self._store[key] = value + self._chunk_cache[key] = value + + def __getitem__(self, item): + try: + return self._chunk_cache[item] + except KeyError: + value = self._store[item] + self._chunk_cache[item] = value + return value + def test_cache_values_no_max_size(self): + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + assert 0 == store.counter['__getitem__', 'foo'] + assert 1 == store.counter['__setitem__', 'foo'] + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + # setup cache cache = ChunkCache(max_size=None) assert 0 == cache.hits assert 0 == cache.misses - cache['foo'] = b'xxx' - cache['bar'] = b'yyy' - assert 0 == cache.hits - assert 0 == cache.misses - # test __getitem__ hits cache with existent key - assert b'xxx' == cache['foo'] - assert 1 == cache.hits - assert 0 == cache.misses + # setup array with cache and store + z = self.MockChunkCacheArray(cache, store) - # test __getitem__ misses and raises KeyError with non existent key - with pytest.raises(KeyError): - _ = cache['baz'] - assert 1 == cache.hits + # test first __getitem__, cache miss + assert b'xxx' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == store.counter['__setitem__', 'foo'] + assert 0 == cache.hits assert 1 == cache.misses # test second __getitem__, cache hit - assert b'xxx' == cache['foo'] - assert 2 == cache.hits + assert b'xxx' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == store.counter['__setitem__', 'foo'] + assert 1 == cache.hits assert 1 == cache.misses # test __setitem__, __getitem__ - cache['foo'] = b'zzz' + z['foo'] = b'zzz' + assert 1 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] # should be a cache hit - assert b'zzz' == cache['foo'] - assert 3 == cache.hits + assert b'zzz' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] + assert 2 == cache.hits assert 1 == cache.misses # manually invalidate all cached values cache.invalidate_values() - with pytest.raises(KeyError): - assert b'zzz' == cache['foo'] - cache['foo'] = b'zzz' - assert b'zzz' == cache['foo'] + assert b'zzz' == z['foo'] + assert 2 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] cache.invalidate() - with pytest.raises(KeyError): - assert b'zzz' == cache['foo'] + assert b'zzz' == z['foo'] + assert 3 == store.counter['__getitem__', 'foo'] + assert 2 == store.counter['__setitem__', 'foo'] # test __delitem__ - cache['foo'] = b'zzz' - assert b'zzz' == cache['foo'] del cache['foo'] with pytest.raises(KeyError): # noinspection PyStatementEffect cache['foo'] + # verify other keys untouched + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + + def test_cache_values_with_max_size(self): + + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + assert 0 == store.counter['__getitem__', 'foo'] + assert 0 == store.counter['__getitem__', 'bar'] + + # setup cache can only hold one item + cache = ChunkCache(max_size=5) + assert 0 == cache.hits + assert 0 == cache.misses + + # setup array with cache and store + z = self.MockChunkCacheArray(cache, store) + + # test first 'foo' __getitem__, cache miss + assert b'xxx' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 0 == cache.hits + assert 1 == cache.misses + + # test second 'foo' __getitem__, cache hit + assert b'xxx' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == cache.hits + assert 1 == cache.misses + + # test first 'bar' __getitem__, cache miss + assert b'yyy' == z['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 1 == cache.hits + assert 2 == cache.misses + + # test second 'bar' __getitem__, cache hit + assert b'yyy' == z['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 2 == cache.hits + assert 2 == cache.misses + + # test 'foo' __getitem__, should have been evicted, cache miss + assert b'xxx' == z['foo'] + assert 2 == store.counter['__getitem__', 'foo'] + assert 2 == cache.hits + assert 3 == cache.misses + + # test 'bar' __getitem__, should have been evicted, cache miss + assert b'yyy' == z['bar'] + assert 2 == store.counter['__getitem__', 'bar'] + assert 2 == cache.hits + assert 4 == cache.misses + + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + assert 0 == store.counter['__getitem__', 'foo'] + assert 0 == store.counter['__getitem__', 'bar'] + + # setup cache can hold 2 items + cache = ChunkCache(max_size=6) + assert 0 == cache.hits + assert 0 == cache.misses + + # setup array with cache and store + z = self.MockChunkCacheArray(cache, store) + + # test first 'foo' __getitem__, cache miss + assert b'xxx' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 0 == cache.hits + assert 1 == cache.misses + + # test second 'foo' __getitem__, cache hit + assert b'xxx' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 1 == cache.hits + assert 1 == cache.misses + + # test first 'bar' __getitem__, cache miss + assert b'yyy' == z['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 1 == cache.hits + assert 2 == cache.misses + + # test second 'bar' __getitem__, cache hit + assert b'yyy' == z['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 2 == cache.hits + assert 2 == cache.misses + + # test 'foo' __getitem__, should still be cached + assert b'xxx' == z['foo'] + assert 1 == store.counter['__getitem__', 'foo'] + assert 3 == cache.hits + assert 2 == cache.misses + + # test 'bar' __getitem__, should still be cached + assert b'yyy' == z['bar'] + assert 1 == store.counter['__getitem__', 'bar'] + assert 4 == cache.hits + assert 2 == cache.misses + def test_getsize(): store = dict() From 32141a98aebcd38a3b40a67d39b104e3490469ed Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Tue, 9 Oct 2018 12:00:20 +0100 Subject: [PATCH 03/34] np.copy not needed when accessing a subset of a chunk --- zarr/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/core.py b/zarr/core.py index dc4f5f4a47..26dc91fe16 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1631,7 +1631,7 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, if self._chunk_cache is not None: self._chunk_cache[ckey] = np.copy(chunk) else: - chunk = np.copy(cdata) + chunk = cdata # select data from chunk if fields: From b35139bbf2d6b1230c1d969654773e697365871c Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Sat, 13 Oct 2018 09:59:46 +0100 Subject: [PATCH 04/34] fixed 'Mm' dtype error for buffersize function --- zarr/util.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zarr/util.py b/zarr/util.py index b79865bfe8..5edb4498cf 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -315,6 +315,9 @@ def normalize_storage_path(path): def buffer_size(v): from array import array as _stdlib_array + # special case for 'Mm' dtypes + if hasattr(v, 'dtype') and v.dtype.kind in 'Mm': + return v.nbytes if PY2 and isinstance(v, _stdlib_array): # pragma: py3 no cover # special case array.array because does not support buffer # interface in PY2 From 3c45176acaaf2e878edac8902b5579fe9ae07ac2 Mon Sep 17 00:00:00 2001 From: shikhar Date: Sat, 13 Oct 2018 11:45:01 +0100 Subject: [PATCH 05/34] renamed ChunkCache to LRUChunkCache --- zarr/storage.py | 2 +- zarr/tests/test_core.py | 8 ++++---- zarr/tests/test_storage.py | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/zarr/storage.py b/zarr/storage.py index 07e511944e..62409f2326 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1885,7 +1885,7 @@ def __delitem__(self, key): self._invalidate_value(key) -class ChunkCache(MutableMapping): +class LRUChunkCache(MutableMapping): """Storage class that implements a least-recently-used (LRU) cache for array chunks. Intended primarily for use with stores that can be slow to access, e.g., remote stores that require network communication to store and retrieve data. diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index bb2271088e..7eff24cb06 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -16,7 +16,7 @@ from zarr.storage import (DirectoryStore, init_array, init_group, NestedDirectoryStore, DBMStore, LMDBStore, atexit_rmtree, atexit_rmglob, - LRUStoreCache, ChunkCache) + LRUStoreCache, LRUChunkCache) from zarr.core import Array from zarr.errors import PermissionError from zarr.compat import PY2, text_type, binary_type @@ -1700,7 +1700,7 @@ def create_array(read_only=False, **kwargs): cache_attrs=cache_attrs) -class TestArrayWithChunkCache(TestArray): +class TestArrayWithLRUChunkCache(TestArray): @staticmethod def create_array(read_only=False, **kwargs): @@ -1710,7 +1710,7 @@ def create_array(read_only=False, **kwargs): cache_attrs = kwargs.pop('cache_attrs', True) init_array(store, **kwargs) return Array(store, read_only=read_only, cache_metadata=cache_metadata, - cache_attrs=cache_attrs, chunk_cache=ChunkCache(max_size=None)) + cache_attrs=cache_attrs, chunk_cache=LRUChunkCache(max_size=None)) @staticmethod def create_array_with_cache(read_only=False, **kwargs): @@ -1719,7 +1719,7 @@ def create_array_with_cache(read_only=False, **kwargs): cache_metadata = kwargs.pop('cache_metadata', True) cache_attrs = kwargs.pop('cache_attrs', True) init_array(store, **kwargs) - cache = ChunkCache(max_size=None) + cache = LRUChunkCache(max_size=None) return Array(store, read_only=read_only, cache_metadata=cache_metadata, cache_attrs=cache_attrs, chunk_cache=cache), cache diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 2a4e5702e1..211c3e5c65 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -19,7 +19,7 @@ DirectoryStore, ZipStore, init_group, group_meta_key, getsize, migrate_1to2, TempStore, atexit_rmtree, NestedDirectoryStore, default_compressor, DBMStore, - LMDBStore, atexit_rmglob, LRUStoreCache, ChunkCache) + LMDBStore, atexit_rmglob, LRUStoreCache, LRUChunkCache) from zarr.meta import (decode_array_metadata, encode_array_metadata, ZARR_FORMAT, decode_group_metadata, encode_group_metadata) from zarr.compat import PY2 @@ -1071,7 +1071,7 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] -class TestChunkCache(object): +class TestLRUChunkCache(object): class MockChunkCacheArray(object): @@ -1103,7 +1103,7 @@ def test_cache_values_no_max_size(self): assert 1 == store.counter['__setitem__', 'bar'] # setup cache - cache = ChunkCache(max_size=None) + cache = LRUChunkCache(max_size=None) assert 0 == cache.hits assert 0 == cache.misses @@ -1165,7 +1165,7 @@ def test_cache_values_with_max_size(self): assert 0 == store.counter['__getitem__', 'bar'] # setup cache can only hold one item - cache = ChunkCache(max_size=5) + cache = LRUChunkCache(max_size=5) assert 0 == cache.hits assert 0 == cache.misses @@ -1216,7 +1216,7 @@ def test_cache_values_with_max_size(self): assert 0 == store.counter['__getitem__', 'bar'] # setup cache can hold 2 items - cache = ChunkCache(max_size=6) + cache = LRUChunkCache(max_size=6) assert 0 == cache.hits assert 0 == cache.misses From 46dcf94108ae402e26025d2c4a18b099a58c191a Mon Sep 17 00:00:00 2001 From: shikhar Date: Sat, 13 Oct 2018 11:45:57 +0100 Subject: [PATCH 06/34] LRUChunkCache in zarr root namespace --- zarr/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zarr/__init__.py b/zarr/__init__.py index 56d060fdac..5759c127e5 100644 --- a/zarr/__init__.py +++ b/zarr/__init__.py @@ -7,7 +7,8 @@ from zarr.creation import (empty, zeros, ones, full, array, empty_like, zeros_like, ones_like, full_like, open_array, open_like, create) from zarr.storage import (DictStore, DirectoryStore, ZipStore, TempStore, - NestedDirectoryStore, DBMStore, LMDBStore, LRUStoreCache) + NestedDirectoryStore, DBMStore, LMDBStore, LRUStoreCache, + LRUChunkCache) from zarr.hierarchy import group, open_group, Group from zarr.sync import ThreadSynchronizer, ProcessSynchronizer from zarr.codecs import * From c69c751586c4cc38840a632fc8ba0998d69f9b96 Mon Sep 17 00:00:00 2001 From: shikhar Date: Sat, 13 Oct 2018 12:31:09 +0100 Subject: [PATCH 07/34] LRUChunkCache example --- zarr/core.py | 4 ++++ zarr/storage.py | 27 +++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 26dc91fe16..3629312668 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -51,6 +51,10 @@ class Array(object): If True (default), user attributes will be cached for attribute read operations. If False, user attributes are reloaded from the store prior to all attribute read operations. + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. Attributes ---------- diff --git a/zarr/storage.py b/zarr/storage.py index 62409f2326..71ecb68ad9 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1886,9 +1886,10 @@ def __delitem__(self, key): class LRUChunkCache(MutableMapping): - """Storage class that implements a least-recently-used (LRU) cache for array chunks. + """Class that implements a least-recently-used (LRU) cache for array chunks. Intended primarily for use with stores that can be slow to access, e.g., remote stores that - require network communication to store and retrieve data. + require network communication to store and retrieve data, and/or arrays where decompression + of data is computationally expensive. Parameters ---------- @@ -1896,6 +1897,28 @@ class LRUChunkCache(MutableMapping): The maximum size that the cache may grow to, in number of bytes. Provide `None` if you would like the cache to have unlimited size. + Examples + -------- + The example below uses a dict store to store the encoded array and uses LRUChunkCache to + store decoded chunks:: + + >>> import zarr + >>> from numcodecs import LZMA + >>> import numpy as np + >>> store = zarr.DictStore() + >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), store=store, compressor=LZMA()) + >>> from timeit import timeit + >>> # data access without cache + ... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP + 0.6703157789888792 + >>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None)) + >>> # first data access about the same as without cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.681269913999131 + >>> # second time accesses the decoded chunks in the cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.007617925992235541 + """ def __init__(self, max_size): From 2cb143ecf740cedc82d5f7eca215c123045e156b Mon Sep 17 00:00:00 2001 From: shikhar Date: Mon, 15 Oct 2018 08:18:57 +0100 Subject: [PATCH 08/34] write caching of chunk should be done after encoding --- zarr/core.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 3629312668..094830f65a 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1744,14 +1744,16 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non else: chunk[chunk_selection] = value - if self._chunk_cache is not None: - self._chunk_cache[ckey] = np.copy(chunk) # encode chunk cdata = self._encode_chunk(chunk) # store self.chunk_store[ckey] = cdata + # cache the chunk + if self._chunk_cache is not None: + self._chunk_cache[ckey] = np.copy(chunk) + def _chunk_key(self, chunk_coords): return self._key_prefix + '.'.join(map(str, chunk_coords)) From 2fb169e36806e295221873adce76a29e1c2d7e63 Mon Sep 17 00:00:00 2001 From: shikhar Date: Mon, 15 Oct 2018 08:26:34 +0100 Subject: [PATCH 09/34] ensure cached chunk has been round tripped through encode-decode if dtype=object. See this: https://github.com/zarr-developers/zarr/pull/306#issuecomment-429528823 --- zarr/core.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zarr/core.py b/zarr/core.py index 094830f65a..4e17e37ce7 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1752,6 +1752,9 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non # cache the chunk if self._chunk_cache is not None: + # ensure cached chunk has been round tripped through encode-decode if dtype=object + if self.dtype == object: + chunk = self._decode_chunk(cdata) self._chunk_cache[ckey] = np.copy(chunk) def _chunk_key(self, chunk_coords): From 31e4dfb425a51ada888cfb3edb8652ba47dfbf09 Mon Sep 17 00:00:00 2001 From: shikhar Date: Mon, 15 Oct 2018 08:41:18 +0100 Subject: [PATCH 10/34] flake8 fixes --- zarr/storage.py | 3 ++- zarr/tests/test_core.py | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/zarr/storage.py b/zarr/storage.py index 71ecb68ad9..11af690892 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1906,7 +1906,8 @@ class LRUChunkCache(MutableMapping): >>> from numcodecs import LZMA >>> import numpy as np >>> store = zarr.DictStore() - >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), store=store, compressor=LZMA()) + >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), + ... store=store, compressor=LZMA()) >>> from timeit import timeit >>> # data access without cache ... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index 7eff24cb06..a0d5a306f3 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -1725,12 +1725,12 @@ def create_array_with_cache(read_only=False, **kwargs): def test_hit_miss(self): a = np.arange(100).reshape((10, 10)) - z, cache = self.create_array_with_cache(shape=a.shape, chunks=(10,1), dtype=a.dtype) + z, cache = self.create_array_with_cache(shape=a.shape, chunks=(10, 1), dtype=a.dtype) # test write cache z[:] = a assert cache.misses == 0 and cache.hits == 0 - _ = z[:] + z[:] assert cache.misses == 0 and cache.hits == 10 cache.clear() @@ -1739,7 +1739,7 @@ def test_hit_miss(self): # test read cache assert cache.misses == 0 and cache.hits == 0 - _ = z[:] + z[:] assert cache.misses == 10 and cache.hits == 0 - _ = z[:] + z[:] assert cache.misses == 10 and cache.hits == 10 From 5559c4fde5d7b6fbe69714be1860a2686b91c4c2 Mon Sep 17 00:00:00 2001 From: shikhar Date: Mon, 15 Oct 2018 09:26:54 +0100 Subject: [PATCH 11/34] read write cache for 0-d arrays --- zarr/core.py | 74 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 4e17e37ce7..c6d88b53da 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -698,19 +698,36 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): if selection not in ((), (Ellipsis,)): err_too_many_indices(selection, ()) - try: - # obtain encoded data for chunk - ckey = self._chunk_key((0,)) - cdata = self.chunk_store[ckey] + # obtain key for chunk + ckey = self._chunk_key((0,)) - except KeyError: - # chunk not initialized - chunk = np.zeros((), dtype=self._dtype) - if self._fill_value is not None: - chunk.fill(self._fill_value) + # setup variable to hold decoded chunk + chunk = None - else: - chunk = self._decode_chunk(cdata) + # check for cached chunk + if self._chunk_cache is not None: + try: + chunk = self._chunk_cache[ckey] + except KeyError: + pass + + if chunk is None: + try: + # obtain encoded data for chunk + cdata = self.chunk_store[ckey] + + except KeyError: + # chunk not initialized + chunk = np.zeros((), dtype=self._dtype) + if self._fill_value is not None: + chunk.fill(self._fill_value) + + else: + chunk = self._decode_chunk(cdata) + + # cache decoded chunk + if self._chunk_cache is not None: + self._chunk_cache[ckey] = chunk # handle fields if fields: @@ -1460,20 +1477,29 @@ def _set_basic_selection_zd(self, selection, value, fields=None): # obtain key for chunk ckey = self._chunk_key((0,)) - # setup chunk - try: - # obtain compressed data for chunk - cdata = self.chunk_store[ckey] + chunk = None - except KeyError: - # chunk not initialized - chunk = np.zeros((), dtype=self._dtype) - if self._fill_value is not None: - chunk.fill(self._fill_value) + if self._chunk_cache is not None: + try: + chunk = self._chunk_cache[ckey] + except KeyError: + pass - else: - # decode chunk - chunk = self._decode_chunk(cdata).copy() + if chunk is None: + # setup chunk + try: + # obtain compressed data for chunk + cdata = self.chunk_store[ckey] + + except KeyError: + # chunk not initialized + chunk = np.zeros((), dtype=self._dtype) + if self._fill_value is not None: + chunk.fill(self._fill_value) + + else: + # decode chunk + chunk = self._decode_chunk(cdata).copy() # set value if fields: @@ -1484,6 +1510,8 @@ def _set_basic_selection_zd(self, selection, value, fields=None): # encode and store cdata = self._encode_chunk(chunk) self.chunk_store[ckey] = cdata + if self._chunk_cache is not None: + self._chunk_cache[ckey] = chunk def _set_basic_selection_nd(self, selection, value, fields=None): # implementation of __setitem__ for array with at least one dimension From 2a0124a16e413fb37bc0f9bcc3c5d6bb4404b522 Mon Sep 17 00:00:00 2001 From: shikhar Date: Mon, 15 Oct 2018 10:33:44 +0100 Subject: [PATCH 12/34] added tutorial and api docs --- docs/api/storage.rst | 4 ++++ docs/tutorial.rst | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/docs/api/storage.rst b/docs/api/storage.rst index 2365359fa9..8e23730f8e 100644 --- a/docs/api/storage.rst +++ b/docs/api/storage.rst @@ -27,6 +27,10 @@ Storage (``zarr.storage``) .. automethod:: invalidate_values .. automethod:: invalidate_keys +.. autoclass:: LRUChunkCache + + .. automethod:: invalidate + .. autofunction:: init_array .. autofunction:: init_group .. autofunction:: contains_array diff --git a/docs/tutorial.rst b/docs/tutorial.rst index c174a57ae5..53c81dd133 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -797,6 +797,30 @@ layer over a remote store. E.g.:: b'Hello from the cloud!' 0.0009490990014455747 +The above :class:`zarr.storage.LRUStoreCache` wraps any Zarr storage class, and stores +encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases +where decoding is computationally expensive, Zarr also provides a +:class:`zarr.storage.LRUChunkCache` which can store decoded chunks, e.g.:: + + >>> import zarr + >>> from numcodecs import LZMA + >>> import numpy as np + >>> store = zarr.DictStore() + >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), + ... store=store, compressor=LZMA()) + >>> from timeit import timeit + >>> # data access without cache + ... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP + 0.6703157789888792 + >>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None)) + >>> # first data access about the same as without cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.681269913999131 + >>> # second time accesses the decoded chunks in the cache + ... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP + 0.007617925992235541 + + If you are still experiencing poor performance with distributed/cloud storage, please raise an issue on the GitHub issue tracker with any profiling data you can provide, as there may be opportunities to optimise further either within Zarr or within the mapping From 6fac2adc4ab20289e84c254a3cb259a8717b4a14 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Sat, 20 Oct 2018 14:33:03 +0530 Subject: [PATCH 13/34] separated store tests from mutable mapping tests in test_storage.py --- zarr/tests/test_storage.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 211c3e5c65..bfadb0d9fa 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -29,8 +29,8 @@ from zarr.tests.util import CountingDict -class StoreTests(object): - """Abstract store tests.""" +class MutableMappingStoreTests(object): + """Abstract Mutable Mapping Tests""" def create_store(self, **kwargs): # pragma: no cover # implement in sub-class @@ -157,6 +157,10 @@ def test_getsize(self): assert 15 == getsize(store) assert 5 == getsize(store, 'spong') + +class StoreTests(MutableMappingStoreTests): + """Abstract store tests.""" + # noinspection PyStatementEffect def test_hierarchy(self): # setup @@ -1071,7 +1075,7 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] -class TestLRUChunkCache(object): +class TestLRUChunkCache(MutableMappingStoreTests, unittest.TestCase): class MockChunkCacheArray(object): @@ -1091,6 +1095,9 @@ def __getitem__(self, item): self._chunk_cache[item] = value return value + def create_store(self): + return LRUChunkCache(max_size=None) + def test_cache_values_no_max_size(self): # setup store From 4e79d0bc23752d20f7546a124688e482f5a2cf59 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Sat, 20 Oct 2018 14:34:01 +0530 Subject: [PATCH 14/34] fixed pickle, __delitem__ and ordered dict iteration bugs --- zarr/storage.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/zarr/storage.py b/zarr/storage.py index 11af690892..3e4f7f3baa 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1935,7 +1935,7 @@ def __getstate__(self): self.misses) def __setstate__(self, state): - (self._store, self._max_size, self._current_size, + (self._max_size, self._current_size, self._values_cache, self.hits, self.misses) = state self._mutex = Lock() @@ -1960,6 +1960,12 @@ def keys(self): def _keys(self): return self._values_cache.keys() + def values(self): + return self._values_cache.values() + + def items(self): + return self._values_cache.items() + def _pop_value(self): # remove the first value from the cache, as this will be the least recently # used value @@ -2023,5 +2029,7 @@ def __setitem__(self, key, value): self._cache_value(key, value) def __delitem__(self, key): + if key not in self._values_cache: + raise KeyError with self._mutex: self._invalidate_value(key) From 5fd6fc84ed2cd9a47b644a3f193c8f31d8556fb1 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Sat, 20 Oct 2018 15:50:56 +0530 Subject: [PATCH 15/34] documenting slowdown when using write cache with object arrays --- zarr/core.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/zarr/core.py b/zarr/core.py index c6d88b53da..5f54bcdea1 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -55,6 +55,11 @@ class Array(object): Mapping to store decoded chunks for caching. Can be used in repeated chunk access scenarios when decoding of data is computationally expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. Attributes ---------- From 422f9eb22f26869c24af8018085ba465ef622eeb Mon Sep 17 00:00:00 2001 From: shikharsg Date: Tue, 23 Oct 2018 19:02:37 +0530 Subject: [PATCH 16/34] factoring out mapping code from LRUStoreCache and LRUChunkCache --- zarr/storage.py | 172 ++++++++++++++++++++---------------------------- 1 file changed, 72 insertions(+), 100 deletions(-) diff --git a/zarr/storage.py b/zarr/storage.py index 3e4f7f3baa..e36903a3e9 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1703,7 +1703,75 @@ def __len__(self): return self.db.stat()['entries'] -class LRUStoreCache(MutableMapping): +class LRUMappingCache(MutableMapping): + """Abstract base class for Mapping Cache + """ + + def __init__(self, max_size): + self._max_size = max_size + self._current_size = 0 + self._values_cache = OrderedDict() + self._mutex = Lock() + self.hits = self.misses = 0 + + def __len__(self): + return len(self._keys()) + + def __iter__(self): + return self.keys() + + def keys(self): + with self._mutex: + return iter(self._keys()) + + def _keys(self): + raise NotImplementedError + + def _pop_value(self): + # remove the first value from the cache, as this will be the least recently + # used value + _, v = self._values_cache.popitem(last=False) + return v + + def _accommodate_value(self, value_size): + if self._max_size is None: + return + # ensure there is enough space in the cache for a new value + while self._current_size + value_size > self._max_size: + v = self._pop_value() + self._current_size -= buffer_size(v) + + def _cache_value(self, key, value): + # cache a value + value_size = buffer_size(value) + # check size of the value against max size, as if the value itself exceeds max + # size then we are never going to cache it + if self._max_size is None or value_size <= self._max_size: + self._accommodate_value(value_size) + self._values_cache[key] = value + self._current_size += value_size + + def invalidate_values(self): + """Clear the values cache.""" + with self._mutex: + self._values_cache.clear() + + def _invalidate_value(self, key): + if key in self._values_cache: + value = self._values_cache.pop(key) + self._current_size -= buffer_size(value) + + def __getitem__(self, key): + raise NotImplementedError + + def __setitem__(self, key, value): + raise NotImplementedError + + def __delitem__(self, key): + raise NotImplementedError + + +class LRUStoreCache(LRUMappingCache): """Storage class that implements a least-recently-used (LRU) cache layer over some other store. Intended primarily for use with stores that can be slow to access, e.g., remote stores that require network communication to store and @@ -1741,15 +1809,11 @@ class LRUStoreCache(MutableMapping): """ def __init__(self, store, max_size): + super().__init__(max_size) self._store = store - self._max_size = max_size - self._current_size = 0 self._keys_cache = None self._contains_cache = None self._listdir_cache = dict() - self._values_cache = OrderedDict() - self._mutex = Lock() - self.hits = self.misses = 0 def __getstate__(self): return (self._store, self._max_size, self._current_size, self._keys_cache, @@ -1762,12 +1826,6 @@ def __setstate__(self, state): self.misses) = state self._mutex = Lock() - def __len__(self): - return len(self._keys()) - - def __iter__(self): - return self.keys() - def __contains__(self, key): with self._mutex: if self._contains_cache is None: @@ -1778,10 +1836,6 @@ def clear(self): self._store.clear() self.invalidate() - def keys(self): - with self._mutex: - return iter(self._keys()) - def _keys(self): if self._keys_cache is None: self._keys_cache = list(self._store.keys()) @@ -1799,41 +1853,12 @@ def listdir(self, path=None): def getsize(self, path=None): return getsize(self._store, path=path) - def _pop_value(self): - # remove the first value from the cache, as this will be the least recently - # used value - _, v = self._values_cache.popitem(last=False) - return v - - def _accommodate_value(self, value_size): - if self._max_size is None: - return - # ensure there is enough space in the cache for a new value - while self._current_size + value_size > self._max_size: - v = self._pop_value() - self._current_size -= buffer_size(v) - - def _cache_value(self, key, value): - # cache a value - value_size = buffer_size(value) - # check size of the value against max size, as if the value itself exceeds max - # size then we are never going to cache it - if self._max_size is None or value_size <= self._max_size: - self._accommodate_value(value_size) - self._values_cache[key] = value - self._current_size += value_size - def invalidate(self): """Completely clear the cache.""" with self._mutex: self._values_cache.clear() self._invalidate_keys() - def invalidate_values(self): - """Clear the values cache.""" - with self._mutex: - self._values_cache.clear() - def invalidate_keys(self): """Clear the keys cache.""" with self._mutex: @@ -1844,11 +1869,6 @@ def _invalidate_keys(self): self._contains_cache = None self._listdir_cache.clear() - def _invalidate_value(self, key): - if key in self._values_cache: - value = self._values_cache.pop(key) - self._current_size -= buffer_size(value) - def __getitem__(self, key): try: # first try to obtain the value from the cache @@ -1885,7 +1905,7 @@ def __delitem__(self, key): self._invalidate_value(key) -class LRUChunkCache(MutableMapping): +class LRUChunkCache(LRUMappingCache): """Class that implements a least-recently-used (LRU) cache for array chunks. Intended primarily for use with stores that can be slow to access, e.g., remote stores that require network communication to store and retrieve data, and/or arrays where decompression @@ -1923,11 +1943,7 @@ class LRUChunkCache(MutableMapping): """ def __init__(self, max_size): - self._max_size = max_size - self._current_size = 0 - self._values_cache = OrderedDict() - self._mutex = Lock() - self.hits = self.misses = 0 + super().__init__(max_size) def __getstate__(self): return (self._max_size, self._current_size, @@ -1940,12 +1956,6 @@ def __setstate__(self, state): self.misses) = state self._mutex = Lock() - def __len__(self): - return len(self._keys()) - - def __iter__(self): - return self.keys() - def __contains__(self, key): with self._mutex: return key in self._keys() @@ -1953,10 +1963,6 @@ def __contains__(self, key): def clear(self): self.invalidate() - def keys(self): - with self._mutex: - return iter(self._keys()) - def _keys(self): return self._values_cache.keys() @@ -1966,45 +1972,11 @@ def values(self): def items(self): return self._values_cache.items() - def _pop_value(self): - # remove the first value from the cache, as this will be the least recently - # used value - _, v = self._values_cache.popitem(last=False) - return v - - def _accommodate_value(self, value_size): - if self._max_size is None: - return - # ensure there is enough space in the cache for a new value - while self._current_size + value_size > self._max_size: - v = self._pop_value() - self._current_size -= buffer_size(v) - - def _cache_value(self, key, value): - # cache a value - value_size = buffer_size(value) - # check size of the value against max size, as if the value itself exceeds max - # size then we are never going to cache it - if self._max_size is None or value_size <= self._max_size: - self._accommodate_value(value_size) - self._values_cache[key] = value - self._current_size += value_size - def invalidate(self): """Completely clear the cache.""" with self._mutex: self._values_cache.clear() - def invalidate_values(self): - """Clear the values cache.""" - with self._mutex: - self._values_cache.clear() - - def _invalidate_value(self, key): - if key in self._values_cache: - value = self._values_cache.pop(key) - self._current_size -= buffer_size(value) - def __getitem__(self, key): try: # try to obtain the value from the cache From 44cea834e8802492faa35afb95dba2c25a5023cb Mon Sep 17 00:00:00 2001 From: shikhar Date: Sun, 11 Nov 2018 23:54:21 +0530 Subject: [PATCH 17/34] consistent variable naming in _chunk_getitem --- zarr/core.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 5f54bcdea1..aa07481506 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1602,19 +1602,17 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, try: - cdata = None - chunk_was_cached = False + cdata, chunk = None, None # first try getting from cache (if one has been provided) if self._chunk_cache is not None: try: - cdata = self._chunk_cache[ckey] - chunk_was_cached = True + chunk = self._chunk_cache[ckey] except KeyError: pass # obtain compressed data for chunk - if not chunk_was_cached: + if chunk is None: cdata = self.chunk_store[ckey] except KeyError: @@ -1645,8 +1643,8 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, # contiguous, so we can decompress directly from the chunk # into the destination array - if chunk_was_cached: - np.copyto(dest, cdata) + if chunk is not None: + np.copyto(dest, chunk) elif self._compressor: self._compressor.decode(cdata, dest) if self._chunk_cache is not None: @@ -1663,12 +1661,10 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, return # decode chunk - if not chunk_was_cached: + if chunk is None: chunk = self._decode_chunk(cdata) if self._chunk_cache is not None: self._chunk_cache[ckey] = np.copy(chunk) - else: - chunk = cdata # select data from chunk if fields: From 1b67e902f507baca26e5c78a760629999199429d Mon Sep 17 00:00:00 2001 From: shikhar Date: Mon, 12 Nov 2018 01:04:19 +0530 Subject: [PATCH 18/34] removed unnecesary code from _set_basic_selection_zd and added encode decode round trip for object arrays(with tests) --- zarr/core.py | 37 ++++++++++++++------------------ zarr/tests/test_core.py | 47 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index aa07481506..ef6188ed7e 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1482,29 +1482,20 @@ def _set_basic_selection_zd(self, selection, value, fields=None): # obtain key for chunk ckey = self._chunk_key((0,)) - chunk = None - - if self._chunk_cache is not None: - try: - chunk = self._chunk_cache[ckey] - except KeyError: - pass - - if chunk is None: - # setup chunk - try: - # obtain compressed data for chunk - cdata = self.chunk_store[ckey] + # setup chunk + try: + # obtain compressed data for chunk + cdata = self.chunk_store[ckey] - except KeyError: - # chunk not initialized - chunk = np.zeros((), dtype=self._dtype) - if self._fill_value is not None: - chunk.fill(self._fill_value) + except KeyError: + # chunk not initialized + chunk = np.zeros((), dtype=self._dtype) + if self._fill_value is not None: + chunk.fill(self._fill_value) - else: - # decode chunk - chunk = self._decode_chunk(cdata).copy() + else: + # decode chunk + chunk = self._decode_chunk(cdata).copy() # set value if fields: @@ -1515,7 +1506,11 @@ def _set_basic_selection_zd(self, selection, value, fields=None): # encode and store cdata = self._encode_chunk(chunk) self.chunk_store[ckey] = cdata + if self._chunk_cache is not None: + # ensure cached chunk has been round tripped through encode-decode if dtype=object + if self.dtype == object: + chunk = self._decode_chunk(cdata) self._chunk_cache[ckey] = chunk def _set_basic_selection_nd(self, selection, value, fields=None): diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index a0d5a306f3..33da770b84 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -1743,3 +1743,50 @@ def test_hit_miss(self): assert cache.misses == 10 and cache.hits == 0 z[:] assert cache.misses == 10 and cache.hits == 10 + + # noinspection PyStatementEffect + def test_array_0d_with_object_arrays(self): + # test behaviour for array with 0 dimensions + + # setup + a = np.zeros((), dtype=object) + z = self.create_array(shape=(), dtype=a.dtype, fill_value=0, object_codec=Pickle()) + + # check properties + assert a.ndim == z.ndim + assert a.shape == z.shape + assert a.size == z.size + assert a.dtype == z.dtype + assert a.nbytes == z.nbytes + with pytest.raises(TypeError): + len(z) + assert () == z.chunks + assert 1 == z.nchunks + assert (1,) == z.cdata_shape + # compressor always None - no point in compressing a single value + assert z.compressor is None + + # check __getitem__ + b = z[...] + assert isinstance(b, np.ndarray) + assert a.shape == b.shape + assert a.dtype == b.dtype + assert_array_equal(a, np.array(z)) + assert_array_equal(a, z[...]) + assert a[()] == z[()] + with pytest.raises(IndexError): + z[0] + with pytest.raises(IndexError): + z[:] + + # check __setitem__ + z[...] = 42 + assert 42 == z[()] + z[()] = 43 + assert 43 == z[()] + with pytest.raises(IndexError): + z[0] = 42 + with pytest.raises(IndexError): + z[:] = 42 + with pytest.raises(ValueError): + z[...] = np.array([1, 2, 3]) From 9b0cc2908286621decabdb39625fa2010f3e608c Mon Sep 17 00:00:00 2001 From: shikhar Date: Mon, 12 Nov 2018 01:28:40 +0530 Subject: [PATCH 19/34] flake 8 fixes --- zarr/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/core.py b/zarr/core.py index ef6188ed7e..49176fd2f2 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1482,7 +1482,7 @@ def _set_basic_selection_zd(self, selection, value, fields=None): # obtain key for chunk ckey = self._chunk_key((0,)) - # setup chunk + # setup chunk try: # obtain compressed data for chunk cdata = self.chunk_store[ckey] From 0013f951c8caabd813dfa882dc2bd296e8d08cde Mon Sep 17 00:00:00 2001 From: shikhar Date: Thu, 15 Nov 2018 00:38:39 +0530 Subject: [PATCH 20/34] fixed coverage --- zarr/core.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 29658aa4d8..cd3c34a435 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1646,8 +1646,6 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, np.copyto(dest, chunk) elif self._compressor: self._compressor.decode(cdata, dest) - if self._chunk_cache is not None: - self._chunk_cache[ckey] = np.copy(dest) else: if isinstance(cdata, np.ndarray): chunk = cdata.view(self._dtype) @@ -1655,8 +1653,6 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, chunk = np.frombuffer(cdata, dtype=self._dtype) chunk = chunk.reshape(self._chunks, order=self._order) np.copyto(dest, chunk) - if self._chunk_cache is not None: - self._chunk_cache[ckey] = np.copy(chunk) return # decode chunk From f19d43e61d3ea392ce5e40bb517f946a0bb04cb5 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Sun, 27 Jan 2019 16:15:37 +0000 Subject: [PATCH 21/34] bug fix --- zarr/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/storage.py b/zarr/storage.py index e6e108b318..995fbc4af4 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1976,7 +1976,7 @@ def listdir(self, path): return listdir(self.meta_store, path) -class LRUChunkCache(MutableMapping): +class LRUChunkCache(LRUMappingCache): """Class that implements a least-recently-used (LRU) cache for array chunks. Intended primarily for use with stores that can be slow to access, e.g., remote stores that require network communication to store and retrieve data, and/or arrays where decompression From 697d46e7b6eb21800265e9dddd5dcc8ef8358382 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Sun, 27 Jan 2019 16:38:40 +0000 Subject: [PATCH 22/34] python 2 and 3 compatibility --- zarr/storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zarr/storage.py b/zarr/storage.py index 956c79f211..a67e8762b4 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1803,7 +1803,7 @@ class LRUStoreCache(LRUMappingCache): """ def __init__(self, store, max_size): - super().__init__(max_size) + super(LRUStoreCache, self).__init__(max_size) self._store = store self._keys_cache = None self._contains_cache = None @@ -2221,7 +2221,7 @@ class LRUChunkCache(LRUMappingCache): """ def __init__(self, max_size): - super().__init__(max_size) + super(LRUChunkCache, self).__init__(max_size) def __getstate__(self): return (self._max_size, self._current_size, From 377ece70177a25213fce495d55d15b61451e5070 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Sun, 10 Feb 2019 13:56:17 +0000 Subject: [PATCH 23/34] coverage fix and __init__.py LRUChunkCache import --- zarr/__init__.py | 2 +- zarr/storage.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/zarr/__init__.py b/zarr/__init__.py index e208b8ae82..656049c82f 100644 --- a/zarr/__init__.py +++ b/zarr/__init__.py @@ -8,7 +8,7 @@ ones_like, full_like, open_array, open_like, create) from zarr.storage import (DictStore, DirectoryStore, ZipStore, TempStore, NestedDirectoryStore, DBMStore, LMDBStore, SQLiteStore, - LRUStoreCache, RedisStore, MongoDBStore) + LRUStoreCache, RedisStore, MongoDBStore, LRUChunkCache) from zarr.hierarchy import group, open_group, Group from zarr.sync import ThreadSynchronizer, ProcessSynchronizer from zarr.codecs import * diff --git a/zarr/storage.py b/zarr/storage.py index 6265a42b48..3e48f823d8 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1719,7 +1719,7 @@ def keys(self): return iter(self._keys()) def _keys(self): - raise NotImplementedError + raise NotImplementedError # pragma: no cover def _pop_value(self): # remove the first value from the cache, as this will be the least recently @@ -1756,13 +1756,13 @@ def _invalidate_value(self, key): self._current_size -= buffer_size(value) def __getitem__(self, key): - raise NotImplementedError + raise NotImplementedError # pragma: no cover def __setitem__(self, key, value): - raise NotImplementedError + raise NotImplementedError # pragma: no cover def __delitem__(self, key): - raise NotImplementedError + raise NotImplementedError # pragma: no cover class LRUStoreCache(LRUMappingCache): From df84c89c39b8d9ffb3bc93d1c87efca284d04e83 Mon Sep 17 00:00:00 2001 From: shikharsg Date: Mon, 4 Mar 2019 13:39:31 +0000 Subject: [PATCH 24/34] flake8 fix --- zarr/tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index abcc3bb4d4..a88c5633e5 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -2331,4 +2331,4 @@ def test_array_0d_with_object_arrays(self): with pytest.raises(IndexError): z[:] = 42 with pytest.raises(ValueError): - z[...] = np.array([1, 2, 3]) \ No newline at end of file + z[...] = np.array([1, 2, 3]) From a816014275c1e05604f05f635e9621b535710ccb Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Thu, 11 Apr 2019 19:21:40 +0100 Subject: [PATCH 25/34] Implemented https://github.com/zarr-developers/zarr/pull/306/files#r224507203 and https://github.com/zarr-developers/zarr/pull/306/files#r225615705 --- zarr/core.py | 123 +++++++++++++++++++++++++++------------------------ 1 file changed, 64 insertions(+), 59 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 4ecdf63b7a..8861e80e7b 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -743,9 +743,9 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): else: chunk = self._decode_chunk(cdata) - # cache decoded chunk - if self._chunk_cache is not None: - self._chunk_cache[ckey] = chunk + # cache decoded chunk + if self._chunk_cache is not None: + self._chunk_cache[ckey] = chunk # handle fields if fields: @@ -1609,78 +1609,83 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, # obtain key for chunk ckey = self._chunk_key(chunk_coords) - try: + # setup variable to hold decoded chunk + chunk = None - cdata, chunk = None, None + # check for cached chunk + if self._chunk_cache is not None: + try: + chunk = self._chunk_cache[ckey] + except KeyError: + pass - # first try getting from cache (if one has been provided) - if self._chunk_cache is not None: - try: - chunk = self._chunk_cache[ckey] - except KeyError: - pass + if chunk is None: - # obtain compressed data for chunk - if chunk is None: + try: + # obtain compressed data for chunk cdata = self.chunk_store[ckey] - except KeyError: - # chunk not initialized - if self._fill_value is not None: - if fields: - fill_value = self._fill_value[fields] - else: - fill_value = self._fill_value - out[out_selection] = fill_value + except KeyError: + # chunk not initialized + if self._fill_value is not None: + if fields: + fill_value = self._fill_value[fields] + else: + fill_value = self._fill_value + out[out_selection] = fill_value + return - else: + else: - if (isinstance(out, np.ndarray) and - not fields and - is_contiguous_selection(out_selection) and - is_total_slice(chunk_selection, self._chunks) and - not self._filters and - self._dtype != object): - - dest = out[out_selection] - write_direct = ( - dest.flags.writeable and ( - (self._order == 'C' and dest.flags.c_contiguous) or - (self._order == 'F' and dest.flags.f_contiguous) + # look for a possible optimisation where data can be decompressed directly + # into destination, which avoids a memory copy + if (isinstance(out, np.ndarray) and + not fields and + is_contiguous_selection(out_selection) and + is_total_slice(chunk_selection, self._chunks) and + not self._filters and + self._dtype != object): + + dest = out[out_selection] + write_direct = ( + dest.flags.writeable and ( + (self._order == 'C' and dest.flags.c_contiguous) or + (self._order == 'F' and dest.flags.f_contiguous) + ) ) - ) - if write_direct: + if write_direct: - # optimization: we want the whole chunk, and the destination is - # contiguous, so we can decompress directly from the chunk - # into the destination array + # optimization: we want the whole chunk, and the destination is + # contiguous, so we can decompress directly from the chunk + # into the destination array - if chunk is not None: - np.copyto(dest, chunk) - elif self._compressor: - self._compressor.decode(cdata, dest) - else: - chunk = ensure_ndarray(cdata).view(self._dtype) - chunk = chunk.reshape(self._chunks, order=self._order) - np.copyto(dest, chunk) - return + if self._compressor: + self._compressor.decode(cdata, dest) + else: + if isinstance(cdata, np.ndarray): + chunk = cdata.view(self._dtype) + else: + chunk = np.frombuffer(cdata, dtype=self._dtype) + chunk = chunk.reshape(self._chunks, order=self._order) + np.copyto(dest, chunk) + return - # decode chunk - if chunk is None: + # decode chunk chunk = self._decode_chunk(cdata) if self._chunk_cache is not None: - self._chunk_cache[ckey] = np.copy(chunk) + # cache the decoded chunk + self._chunk_cache[ckey] = chunk - # select data from chunk - if fields: - chunk = chunk[fields] - tmp = chunk[chunk_selection] - if drop_axes: - tmp = np.squeeze(tmp, axis=drop_axes) + # select data from chunk + if fields: + chunk = chunk[fields] + tmp = chunk[chunk_selection] + if drop_axes: + tmp = np.squeeze(tmp, axis=drop_axes) - # store selected data in output - out[out_selection] = tmp + # store selected data in output + out[out_selection] = tmp def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None): """Replace part or whole of a chunk. From 8cc083b331fd74f1b305fb494fd5e25932f527c6 Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Sat, 4 May 2019 00:05:21 +0100 Subject: [PATCH 26/34] cache tests refactor --- zarr/tests/test_storage.py | 247 +++++++++++-------------------------- 1 file changed, 69 insertions(+), 178 deletions(-) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index a0be1868da..f1aa49dbd6 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -1128,10 +1128,13 @@ def create_store(self): return store -class TestLRUStoreCache(StoreTests, unittest.TestCase): +class CacheTests(unittest.TestCase): - def create_store(self): - return LRUStoreCache(dict(), max_size=2**27) + def create_store(self): # pragma: no cover + raise NotImplementedError + + def create_cache(self, store, max_size=None): # pragma: no cover + raise NotImplementedError def test_cache_values_no_max_size(self): @@ -1145,7 +1148,7 @@ def test_cache_values_no_max_size(self): assert 1 == store.counter['__setitem__', 'bar'] # setup cache - cache = LRUStoreCache(store, max_size=None) + cache = self.create_cache(store) assert 0 == cache.hits assert 0 == cache.misses @@ -1184,19 +1187,6 @@ def test_cache_values_no_max_size(self): assert 3 == store.counter['__getitem__', 'foo'] assert 2 == store.counter['__setitem__', 'foo'] - # test __delitem__ - del cache['foo'] - with pytest.raises(KeyError): - # noinspection PyStatementEffect - cache['foo'] - with pytest.raises(KeyError): - # noinspection PyStatementEffect - store['foo'] - - # verify other keys untouched - assert 0 == store.counter['__getitem__', 'bar'] - assert 1 == store.counter['__setitem__', 'bar'] - def test_cache_values_with_max_size(self): # setup store @@ -1206,7 +1196,7 @@ def test_cache_values_with_max_size(self): assert 0 == store.counter['__getitem__', 'foo'] assert 0 == store.counter['__getitem__', 'bar'] # setup cache - can only hold one item - cache = LRUStoreCache(store, max_size=5) + cache = self.create_cache(store, max_size=5) assert 0 == cache.hits assert 0 == cache.misses @@ -1293,6 +1283,35 @@ def test_cache_values_with_max_size(self): assert 4 == cache.hits assert 2 == cache.misses +class TestLRUStoreCache(StoreTests, CacheTests): + + def create_store(self): + return LRUStoreCache(dict(), max_size=2**27) + + def create_cache(self, store, max_size=None): + return LRUStoreCache(store=store, max_size=max_size) + + def test_delitem(self): + # setup store + store = CountingDict() + store['foo'] = b'xxx' + store['bar'] = b'yyy' + + # setup cache + cache = self.create_cache(store) + + del cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + cache['foo'] + with pytest.raises(KeyError): + # noinspection PyStatementEffect + store['foo'] + + # verify other keys untouched + assert 0 == store.counter['__getitem__', 'bar'] + assert 1 == store.counter['__setitem__', 'bar'] + def test_cache_keys(self): # setup @@ -1302,7 +1321,7 @@ def test_cache_keys(self): assert 0 == store.counter['__contains__', 'foo'] assert 0 == store.counter['__iter__'] assert 0 == store.counter['keys'] - cache = LRUStoreCache(store, max_size=None) + cache = self.create_cache(store, max_size=None) # keys should be cached on first call keys = sorted(cache.keys()) @@ -1352,197 +1371,69 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] -class TestLRUChunkCache(MutableMappingStoreTests, unittest.TestCase): +class TestLRUChunkCache(MutableMappingStoreTests, CacheTests): + # mock test object that will act as both the cache and the array class MockChunkCacheArray(object): def __init__(self, chunk_cache, store): - self._chunk_cache = chunk_cache + self.chunk_cache = chunk_cache self._store = store + self.hits = 0 + self.misses = 0 def __setitem__(self, key, value): self._store[key] = value - self._chunk_cache[key] = value + self.chunk_cache[key] = value + self._reset_hits_misses() def __getitem__(self, item): try: - return self._chunk_cache[item] + value = self.chunk_cache[item] except KeyError: value = self._store[item] - self._chunk_cache[item] = value - return value + self.chunk_cache[item] = value + self._reset_hits_misses() + return value + + def __delitem__(self, key): + self.chunk_cache.__delitem__(key) + + def _reset_hits_misses(self): + self.hits = self.chunk_cache.hits + self.misses = self.chunk_cache.misses + + def invalidate(self): + self.chunk_cache.invalidate() + + def invalidate_values(self): + self.chunk_cache.invalidate_values() def create_store(self): - return LRUChunkCache(max_size=None) + return LRUChunkCache(max_size=2**27) - def test_cache_values_no_max_size(self): + def create_cache(self, store, max_size=None): + return self.MockChunkCacheArray(LRUChunkCache(max_size=max_size), store=store) + def test_delitem(self): # setup store store = CountingDict() store['foo'] = b'xxx' store['bar'] = b'yyy' - assert 0 == store.counter['__getitem__', 'foo'] - assert 1 == store.counter['__setitem__', 'foo'] - assert 0 == store.counter['__getitem__', 'bar'] - assert 1 == store.counter['__setitem__', 'bar'] # setup cache - cache = LRUChunkCache(max_size=None) - assert 0 == cache.hits - assert 0 == cache.misses - - # setup array with cache and store - z = self.MockChunkCacheArray(cache, store) - - # test first __getitem__, cache miss - assert b'xxx' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 1 == store.counter['__setitem__', 'foo'] - assert 0 == cache.hits - assert 1 == cache.misses - - # test second __getitem__, cache hit - assert b'xxx' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 1 == store.counter['__setitem__', 'foo'] - assert 1 == cache.hits - assert 1 == cache.misses - - # test __setitem__, __getitem__ - z['foo'] = b'zzz' - assert 1 == store.counter['__getitem__', 'foo'] - assert 2 == store.counter['__setitem__', 'foo'] - # should be a cache hit - assert b'zzz' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 2 == store.counter['__setitem__', 'foo'] - assert 2 == cache.hits - assert 1 == cache.misses + cache = self.create_cache(store) - # manually invalidate all cached values - cache.invalidate_values() - assert b'zzz' == z['foo'] - assert 2 == store.counter['__getitem__', 'foo'] - assert 2 == store.counter['__setitem__', 'foo'] - cache.invalidate() - assert b'zzz' == z['foo'] - assert 3 == store.counter['__getitem__', 'foo'] - assert 2 == store.counter['__setitem__', 'foo'] - - # test __delitem__ + cache['foo'] del cache['foo'] with pytest.raises(KeyError): # noinspection PyStatementEffect - cache['foo'] + cache.chunk_cache['foo'] # verify other keys untouched assert 0 == store.counter['__getitem__', 'bar'] assert 1 == store.counter['__setitem__', 'bar'] - def test_cache_values_with_max_size(self): - - # setup store - store = CountingDict() - store['foo'] = b'xxx' - store['bar'] = b'yyy' - assert 0 == store.counter['__getitem__', 'foo'] - assert 0 == store.counter['__getitem__', 'bar'] - - # setup cache can only hold one item - cache = LRUChunkCache(max_size=5) - assert 0 == cache.hits - assert 0 == cache.misses - - # setup array with cache and store - z = self.MockChunkCacheArray(cache, store) - - # test first 'foo' __getitem__, cache miss - assert b'xxx' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 0 == cache.hits - assert 1 == cache.misses - - # test second 'foo' __getitem__, cache hit - assert b'xxx' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 1 == cache.hits - assert 1 == cache.misses - - # test first 'bar' __getitem__, cache miss - assert b'yyy' == z['bar'] - assert 1 == store.counter['__getitem__', 'bar'] - assert 1 == cache.hits - assert 2 == cache.misses - - # test second 'bar' __getitem__, cache hit - assert b'yyy' == z['bar'] - assert 1 == store.counter['__getitem__', 'bar'] - assert 2 == cache.hits - assert 2 == cache.misses - - # test 'foo' __getitem__, should have been evicted, cache miss - assert b'xxx' == z['foo'] - assert 2 == store.counter['__getitem__', 'foo'] - assert 2 == cache.hits - assert 3 == cache.misses - - # test 'bar' __getitem__, should have been evicted, cache miss - assert b'yyy' == z['bar'] - assert 2 == store.counter['__getitem__', 'bar'] - assert 2 == cache.hits - assert 4 == cache.misses - - # setup store - store = CountingDict() - store['foo'] = b'xxx' - store['bar'] = b'yyy' - assert 0 == store.counter['__getitem__', 'foo'] - assert 0 == store.counter['__getitem__', 'bar'] - - # setup cache can hold 2 items - cache = LRUChunkCache(max_size=6) - assert 0 == cache.hits - assert 0 == cache.misses - - # setup array with cache and store - z = self.MockChunkCacheArray(cache, store) - - # test first 'foo' __getitem__, cache miss - assert b'xxx' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 0 == cache.hits - assert 1 == cache.misses - - # test second 'foo' __getitem__, cache hit - assert b'xxx' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 1 == cache.hits - assert 1 == cache.misses - - # test first 'bar' __getitem__, cache miss - assert b'yyy' == z['bar'] - assert 1 == store.counter['__getitem__', 'bar'] - assert 1 == cache.hits - assert 2 == cache.misses - - # test second 'bar' __getitem__, cache hit - assert b'yyy' == z['bar'] - assert 1 == store.counter['__getitem__', 'bar'] - assert 2 == cache.hits - assert 2 == cache.misses - - # test 'foo' __getitem__, should still be cached - assert b'xxx' == z['foo'] - assert 1 == store.counter['__getitem__', 'foo'] - assert 3 == cache.hits - assert 2 == cache.misses - - # test 'bar' __getitem__, should still be cached - assert b'yyy' == z['bar'] - assert 1 == store.counter['__getitem__', 'bar'] - assert 4 == cache.hits - assert 2 == cache.misses - def test_getsize(): store = dict() From 23fcdead24163a0591da142e3d5b49b6881ca869 Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Sat, 4 May 2019 00:10:06 +0100 Subject: [PATCH 27/34] fixed minor tests mistak --- zarr/tests/test_storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index f1aa49dbd6..d1094ccaf6 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -1128,7 +1128,7 @@ def create_store(self): return store -class CacheTests(unittest.TestCase): +class CacheTests(object): def create_store(self): # pragma: no cover raise NotImplementedError @@ -1283,7 +1283,7 @@ def test_cache_values_with_max_size(self): assert 4 == cache.hits assert 2 == cache.misses -class TestLRUStoreCache(StoreTests, CacheTests): +class TestLRUStoreCache(StoreTests, CacheTests, unittest.TestCase): def create_store(self): return LRUStoreCache(dict(), max_size=2**27) @@ -1371,7 +1371,7 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] -class TestLRUChunkCache(MutableMappingStoreTests, CacheTests): +class TestLRUChunkCache(MutableMappingStoreTests, CacheTests, unittest.TestCase): # mock test object that will act as both the cache and the array class MockChunkCacheArray(object): From 635ec87a09ae5e192feaaf4876f1c5c5d04190aa Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Sat, 4 May 2019 00:52:02 +0100 Subject: [PATCH 28/34] flake8 fix --- zarr/tests/test_storage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index 827af7a721..52e6dfb081 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -1283,6 +1283,7 @@ def test_cache_values_with_max_size(self): assert 4 == cache.hits assert 2 == cache.misses + class TestLRUStoreCache(StoreTests, CacheTests, unittest.TestCase): def create_store(self): From 875c24ffa9e0450e0e8d0e9297fd7a7531546548 Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Wed, 20 Nov 2019 15:31:20 +0000 Subject: [PATCH 29/34] added chunk cache to Group --- zarr/hierarchy.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zarr/hierarchy.py b/zarr/hierarchy.py index a5bf4bede6..ba776542be 100644 --- a/zarr/hierarchy.py +++ b/zarr/hierarchy.py @@ -88,9 +88,10 @@ class Group(MutableMapping): """ def __init__(self, store, path=None, read_only=False, chunk_store=None, - cache_attrs=True, synchronizer=None): + cache_attrs=True, synchronizer=None, chunk_cache=None): self._store = store self._chunk_store = chunk_store + self._chunk_cache = chunk_cache self._path = normalize_storage_path(path) if self._path: self._key_prefix = self._path + '/' @@ -321,7 +322,7 @@ def __getitem__(self, item): path = self._item_path(item) if contains_array(self._store, path): return Array(self._store, read_only=self._read_only, path=path, - chunk_store=self._chunk_store, + chunk_store=self._chunk_store, chunk_cache=self._chunk_cache, synchronizer=self._synchronizer, cache_attrs=self.attrs.cache) elif contains_group(self._store, path): return Group(self._store, read_only=self._read_only, path=path, From 4a1baa9975adb79304293c1fb266d8f221470e1f Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Wed, 20 Nov 2019 16:37:26 +0000 Subject: [PATCH 30/34] added chunk_cache to all relevant function --- zarr/creation.py | 27 +++++++++++++++++++++++---- zarr/hierarchy.py | 21 +++++++++++++-------- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/zarr/creation.py b/zarr/creation.py index 632dd24915..a932d4c9c9 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -17,7 +17,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', store=None, synchronizer=None, overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, - object_codec=None, **kwargs): + object_codec=None, chunk_cache=None, **kwargs): """Create an array. Parameters @@ -49,6 +49,15 @@ def create(shape, chunks=True, dtype=None, compressor='default', chunk_store : MutableMapping, optional Separate storage for chunks. If not provided, `store` will be used for storage of both chunks and metadata. + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. filters : sequence of Codecs, optional Sequence of filters to use to encode chunk data prior to compression. cache_metadata : bool, optional @@ -122,7 +131,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, - cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only) + cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only, + chunk_cache=chunk_cache) return z @@ -353,7 +363,7 @@ def array(data, **kwargs): def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', synchronizer=None, filters=None, cache_metadata=True, cache_attrs=True, path=None, - object_codec=None, chunk_store=None, **kwargs): + object_codec=None, chunk_store=None, chunk_cache=None, **kwargs): """Open an array using file-mode-like semantics. Parameters @@ -399,6 +409,15 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, A codec to encode object arrays, only needed if dtype=object. chunk_store : MutableMapping or string, optional Store or path to directory in file system or name of zip file. + chunk_cache: MutableMapping, optional + Mapping to store decoded chunks for caching. Can be used in repeated + chunk access scenarios when decoding of data is computationally + expensive. + NOTE: When using the write cache feature with object arrays(i.e. + when dtype of array is 'object' and when writing to the array with + chunk_cache provided) could result in a slight slowdown as some + dtypes, like VLenArray, have to go through the encode-decode phase + before having the correct dtype. Returns ------- @@ -487,7 +506,7 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, # instantiate array z = Array(store, read_only=read_only, synchronizer=synchronizer, cache_metadata=cache_metadata, cache_attrs=cache_attrs, path=path, - chunk_store=chunk_store) + chunk_store=chunk_store, chunk_cache=chunk_cache) return z diff --git a/zarr/hierarchy.py b/zarr/hierarchy.py index ba776542be..54342c63f5 100644 --- a/zarr/hierarchy.py +++ b/zarr/hierarchy.py @@ -327,7 +327,7 @@ def __getitem__(self, item): elif contains_group(self._store, path): return Group(self._store, read_only=self._read_only, path=path, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache) else: raise KeyError(item) @@ -404,6 +404,7 @@ def groups(self): if contains_group(self._store, path): yield key, Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, + chunk_cache=self._chunk_cache, cache_attrs=self.attrs.cache, synchronizer=self._synchronizer) @@ -678,7 +679,7 @@ def _create_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache) def create_groups(self, *names, **kwargs): """Convenience method to create multiple groups in a single call.""" @@ -722,7 +723,7 @@ def _require_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, chunk_cache=self._chunk_cache) def require_groups(self, *names): """Convenience method to require multiple groups in a single call.""" @@ -839,7 +840,8 @@ def _require_dataset_nosync(self, name, shape, dtype=None, exact=False, cache_attrs = kwargs.get('cache_attrs', self.attrs.cache) a = Array(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, synchronizer=synchronizer, - cache_metadata=cache_metadata, cache_attrs=cache_attrs) + cache_metadata=cache_metadata, cache_attrs=cache_attrs, + chunk_cache=self._chunk_cache) shape = normalize_shape(shape) if shape != a.shape: raise TypeError('shape do not match existing array; expected {}, got {}' @@ -1016,7 +1018,8 @@ def _normalize_store_arg(store, clobber=False): def group(store=None, overwrite=False, chunk_store=None, - cache_attrs=True, synchronizer=None, path=None): + cache_attrs=True, synchronizer=None, path=None, + chunk_cache=None): """Create a group. Parameters @@ -1070,11 +1073,12 @@ def group(store=None, overwrite=False, chunk_store=None, path=path) return Group(store, read_only=False, chunk_store=chunk_store, - cache_attrs=cache_attrs, synchronizer=synchronizer, path=path) + cache_attrs=cache_attrs, synchronizer=synchronizer, path=path, + chunk_cache=chunk_cache) def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None, - chunk_store=None): + chunk_store=None, chunk_cache=None): """Open a group using file-mode-like semantics. Parameters @@ -1152,4 +1156,5 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N read_only = mode == 'r' return Group(store, read_only=read_only, cache_attrs=cache_attrs, - synchronizer=synchronizer, path=path, chunk_store=chunk_store) + synchronizer=synchronizer, path=path, chunk_store=chunk_store, + chunk_cache=chunk_cache) From 6571382df83c8d65003ab453047b4b8f4d9e5be9 Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Wed, 30 Sep 2020 22:19:24 +0100 Subject: [PATCH 31/34] fixed failing doctest --- zarr/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/storage.py b/zarr/storage.py index a5876c8130..5d3ff44816 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -2747,7 +2747,7 @@ class LRUChunkCache(LRUMappingCache): >>> import zarr >>> from numcodecs import LZMA >>> import numpy as np - >>> store = zarr.DictStore() + >>> store = zarr.MemoryStore() >>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100), ... store=store, compressor=LZMA()) >>> from timeit import timeit From e0e52544b27bb4ca9336033063f053db59a4f572 Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Sat, 20 Feb 2021 20:42:45 +0000 Subject: [PATCH 32/34] fixed setitem caching order --- zarr/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index c7980371dc..88f25bf65b 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1650,7 +1650,6 @@ def _set_selection(self, indexer, value, fields=None): self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values, fields=fields) - def _select_and_set_out(self, fields, chunk, chunk_selection, drop_axes, out, out_selection): # select data from chunk @@ -1955,7 +1954,7 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None): chunk[chunk_selection] = value # encode chunk - return self._encode_chunk(chunk) + cdata = self._encode_chunk(chunk) # cache the chunk if self._chunk_cache is not None: @@ -1964,6 +1963,8 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None): chunk = self._decode_chunk(cdata) self._chunk_cache[ckey] = np.copy(chunk) + return cdata + def _chunk_key(self, chunk_coords): return self._key_prefix + '.'.join(map(str, chunk_coords)) From 38ee62234d89480c89c066c725c66f28722b2204 Mon Sep 17 00:00:00 2001 From: Shikhar Goenka Date: Sun, 21 Feb 2021 13:59:41 +0000 Subject: [PATCH 33/34] refactor --- zarr/core.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/zarr/core.py b/zarr/core.py index 88f25bf65b..00da520bc5 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -758,10 +758,7 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): # check for cached chunk if self._chunk_cache is not None: - try: - chunk = self._chunk_cache[ckey] - except KeyError: - pass + chunk = self._chunk_cache.get(ckey) if chunk is None: try: From 7cdce5f200066fb30a6990bffc86667952fa1e46 Mon Sep 17 00:00:00 2001 From: jmoore Date: Fri, 27 Aug 2021 14:51:41 +0200 Subject: [PATCH 34/34] Remove use of unittest --- zarr/tests/test_storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zarr/tests/test_storage.py b/zarr/tests/test_storage.py index cc1dc50f2b..ff278a2eac 100644 --- a/zarr/tests/test_storage.py +++ b/zarr/tests/test_storage.py @@ -1697,7 +1697,7 @@ def test_cache_values_with_max_size(self): assert 2 == cache.misses -class TestLRUStoreCache(StoreTests, CacheTests, unittest.TestCase): +class TestLRUStoreCache(StoreTests, CacheTests): def create_store(self, **kwargs): # wrapper therefore no dimension_separator argument @@ -1787,7 +1787,7 @@ def test_cache_keys(self): assert 1 == store.counter['__iter__'] -class TestLRUChunkCache(MutableMappingStoreTests, CacheTests, unittest.TestCase): +class TestLRUChunkCache(MutableMappingStoreTests, CacheTests): # mock test object that will act as both the cache and the array class MockChunkCacheArray(object):