Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Attempt to continue LRU cache for decoded chunks #1214

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
d62febb
first attempt at chunk_cache layer
shikharsg Aug 29, 2018
f796ea7
ChunkCache test with MockChunkCacheArray
shikharsg Aug 29, 2018
32141a9
np.copy not needed when accessing a subset of a chunk
shikharsg Oct 9, 2018
b35139b
fixed 'Mm' dtype error for buffersize function
shikharsg Oct 13, 2018
3c45176
renamed ChunkCache to LRUChunkCache
Oct 13, 2018
46dcf94
LRUChunkCache in zarr root namespace
Oct 13, 2018
c69c751
LRUChunkCache example
Oct 13, 2018
2cb143e
write caching of chunk should be done after encoding
Oct 15, 2018
2fb169e
ensure cached chunk has been round tripped through encode-decode if d…
Oct 15, 2018
31e4dfb
flake8 fixes
Oct 15, 2018
5559c4f
read write cache for 0-d arrays
Oct 15, 2018
2a0124a
added tutorial and api docs
Oct 15, 2018
6fac2ad
separated store tests from mutable mapping tests in test_storage.py
shikharsg Oct 20, 2018
4e79d0b
fixed pickle, __delitem__ and ordered dict iteration bugs
shikharsg Oct 20, 2018
5fd6fc8
documenting slowdown when using write cache with object arrays
shikharsg Oct 20, 2018
422f9eb
factoring out mapping code from LRUStoreCache and LRUChunkCache
shikharsg Oct 23, 2018
44cea83
consistent variable naming in _chunk_getitem
shikharsg Nov 11, 2018
1b67e90
removed unnecesary code from _set_basic_selection_zd and added encode…
shikharsg Nov 11, 2018
9b0cc29
flake 8 fixes
shikharsg Nov 11, 2018
715f86d
Merge remote-tracking branch 'upstream/master' into chunk_cache
shikharsg Nov 13, 2018
0013f95
fixed coverage
shikharsg Nov 14, 2018
b70c348
Merge branch 'chunk_cache' into master
shikharsg Nov 14, 2018
c4f2487
Merge pull request #4 from shikharsg/master
shikharsg Nov 14, 2018
245f661
Merge branch 'master' into chunk_cache
shikharsg Nov 15, 2018
a2a05fb
Merge branch 'master' into chunk_cache
shikharsg Jan 8, 2019
b8b9056
Merge branch 'chunk_cache' into chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
04f0367
Merge pull request #3 from shikharsg/chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
f19d43e
bug fix
shikharsg Jan 27, 2019
52a43bf
Merge branch 'master' into chunk_cache
shikharsg Jan 27, 2019
697d46e
python 2 and 3 compatibility
shikharsg Jan 27, 2019
1e727c7
Merge branch 'master' into chunk_cache
shikharsg Feb 10, 2019
377ece7
coverage fix and __init__.py LRUChunkCache import
shikharsg Feb 10, 2019
3473adb
merged chunk_cache with master
shikharsg Mar 4, 2019
df84c89
flake8 fix
shikharsg Mar 4, 2019
88fe66d
Merge branch 'master' into chunk_cache
Mar 30, 2019
a816014
Implemented https://github.com/zarr-developers/zarr/pull/306/files#r2…
Apr 11, 2019
8cc083b
cache tests refactor
May 3, 2019
23fcdea
fixed minor tests mistak
May 3, 2019
309cc48
Merge branch 'master' into chunk_cache
May 3, 2019
635ec87
flake8 fix
May 3, 2019
a85d156
Merge remote-tracking branch 'upstream/master' into chunk_cache
Aug 20, 2019
ef86184
merged with master
Oct 30, 2019
875c24f
added chunk cache to Group
Nov 20, 2019
dcd4ee7
merge with master
Nov 20, 2019
4a1baa9
added chunk_cache to all relevant function
Nov 20, 2019
e6540e1
Merge branch 'master' into chunk_cache
Dec 12, 2019
9f9d176
merge with master
Sep 30, 2020
6571382
fixed failing doctest
Sep 30, 2020
8c8a69f
Merge remote-tracking branch 'origin/master' into pr-306
joshmoore Feb 19, 2021
e0e5254
fixed setitem caching order
Feb 20, 2021
992b48a
Merge branch 'master' into chunk_cache
jakirkham Feb 21, 2021
38ee622
refactor
Feb 21, 2021
8b6a699
Merge branch 'master' into chunk_cache
Apr 5, 2021
ba5c0ed
Merge 'origin/master' into pr-306
joshmoore Aug 27, 2021
7cdce5f
Remove use of unittest
joshmoore Aug 27, 2021
06c899b
Merge branch 'master' into chunk_cache
joshmoore Sep 23, 2021
c9be163
Merge remote-tracking branch 'origin/main' into chunk_cache
croth1 Oct 21, 2022
16793f5
code style cleanup
croth1 Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ def __init__(
cache_attrs=True,
partial_decompress=False,
chunk_cache=None,
write_empty_chunks=True,
zarr_version=None,
meta_array=None,
):
# N.B., expect at this point store is fully initialized with all
# configuration metadata fully specified and normalized
Expand Down Expand Up @@ -207,6 +210,17 @@ def __init__(
self._is_view = False
self._partial_decompress = partial_decompress
self._chunk_cache = chunk_cache
self._write_empty_chunks = write_empty_chunks
if meta_array is not None:
self._meta_array = np.empty_like(meta_array, shape=())
else:
self._meta_array = np.empty(())
self._version = zarr_version
if self._version == 3:
self._data_key_prefix = 'data/root/' + self._key_prefix
self._data_path = 'data/root/' + self._path
self._hierarchy_metadata = _get_hierarchy_metadata(store=self._store)
self._metadata_key_suffix = self._hierarchy_metadata['metadata_key_suffix']

# initialize metadata
self._load_metadata()
Expand Down Expand Up @@ -955,7 +969,7 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None):

except KeyError:
# chunk not initialized
chunk = np.zeros((), dtype=self._dtype)
chunk = np.zeros_like(self._meta_array, shape=(), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)

Expand Down Expand Up @@ -1763,6 +1777,10 @@ def _set_basic_selection_zd(self, selection, value, fields=None):

# remove chunk if write_empty_chunks is false and it only contains the fill value
if (not self.write_empty_chunks) and all_equal(self.fill_value, chunk):
# invalidate value in cache
if self._chunk_cache is not None:
if ckey in self._chunk_cache:
del self._chunk_cache[ckey]
try:
del self.chunk_store[ckey]
return
Expand Down Expand Up @@ -2140,6 +2158,9 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non
# attempt to delete chunk if it only contains the fill value
if (not self.write_empty_chunks) and all_equal(self.fill_value, cdata):
self._chunk_delitem(ckey)
if self._chunk_cache is not None:
#TODO christian
self._chunk_cache
else:
self.chunk_store[ckey] = self._encode_chunk(cdata)

Expand Down Expand Up @@ -2203,17 +2224,14 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
else:
chunk[chunk_selection] = value

# encode chunk
cdata = self._encode_chunk(chunk)

# cache the chunk
if self._chunk_cache is not None:
# ensure cached chunk has been round tripped through encode-decode if dtype=object
if self.dtype == object:
chunk = self._decode_chunk(cdata)
chunk = self._decode_chunk(self._encode_chunk(chunk))
self._chunk_cache[ckey] = np.copy(chunk)

return cdata
return chunk

def _chunk_key(self, chunk_coords):
if self._version == 3:
Expand Down Expand Up @@ -2556,6 +2574,13 @@ def _resize_nosync(self, *args):
except KeyError:
# chunk not initialized
pass
if self._chunk_cache is not None:
try:
del self._chunk_cache[key]
except KeyError:
# chunk not cached
pass

old_cdata_shape_working_list[idx_cdata] = min(val_old_cdata, val_new_cdata)

def append(self, data, axis=0):
Expand Down
25 changes: 21 additions & 4 deletions zarr/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def create(shape, chunks=True, dtype=None, compressor='default',
fill_value=0, order='C', store=None, synchronizer=None,
overwrite=False, path=None, chunk_store=None, filters=None,
cache_metadata=True, cache_attrs=True, read_only=False,
object_codec=None, dimension_separator=None, chunk_cache=None,
**kwargs):
object_codec=None, dimension_separator=None, chunk_cache=None, write_empty_chunks=True,
*, zarr_version=None, meta_array=None, **kwargs):
"""Create an array.

Parameters
Expand Down Expand Up @@ -183,7 +183,8 @@ def create(shape, chunks=True, dtype=None, compressor='default',
# instantiate array
z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer,
cache_metadata=cache_metadata, cache_attrs=cache_attrs, read_only=read_only,
chunk_cache=chunk_cache)
chunk_cache=chunk_cache, write_empty_chunks=write_empty_chunks,
meta_array=meta_array)

return z

Expand Down Expand Up @@ -421,6 +422,10 @@ def open_array(
storage_options=None,
partial_decompress=False,
chunk_cache=None,
write_empty_chunks=True,
*,
zarr_version=None,
dimension_separator=None,
**kwargs
):
"""Open an array using file-mode-like semantics.
Expand Down Expand Up @@ -494,6 +499,17 @@ def open_array(
chunk_cache provided) could result in a slight slowdown as some
dtypes, like VLenArray, have to go through the encode-decode phase
before having the correct dtype.
.. versionadded:: 2.11

zarr_version : {None, 2, 3}, optional
The zarr protocol version of the array to be opened. If None, it will
be inferred from ``store`` or ``chunk_store`` if they are provided,
otherwise defaulting to 2.
dimension_separator : {None, '.', '/'}, optional
Can be used to specify whether the array is in a flat ('.') or nested
('/') format. If None, the appropriate value will be read from `store`
when present. Otherwise, defaults to '.' when ``zarr_version == 2``
and `/` otherwise.

Returns
-------
Expand Down Expand Up @@ -603,7 +619,8 @@ def open_array(
# instantiate array
z = Array(store, read_only=read_only, synchronizer=synchronizer,
cache_metadata=cache_metadata, cache_attrs=cache_attrs, path=path,
chunk_store=chunk_store, chunk_cache=chunk_cache)
chunk_store=chunk_store, chunk_cache=chunk_cache,
write_empty_chunks=write_empty_chunks)

return z

Expand Down
89 changes: 68 additions & 21 deletions zarr/hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,17 @@ class Group(MutableMapping):
"""

def __init__(self, store, path=None, read_only=False, chunk_store=None,
cache_attrs=True, synchronizer=None, chunk_cache=None):
cache_attrs=True, synchronizer=None, chunk_cache=None, zarr_version=None, *,
meta_array=None):
store: BaseStore = _normalize_store_arg(store, zarr_version=zarr_version)
if zarr_version is None:
zarr_version = getattr(store, '_store_version', DEFAULT_ZARR_VERSION)

if zarr_version != 2:
assert_zarr_v3_api_available()

if chunk_store is not None:
chunk_store: BaseStore = _normalize_store_arg(chunk_store, zarr_version=zarr_version)
self._store = store
self._chunk_store = chunk_store
self._chunk_cache = chunk_cache
Expand Down Expand Up @@ -427,11 +437,23 @@ def __getitem__(self, item):
if contains_array(self._store, path):
return Array(self._store, read_only=self._read_only, path=path,
chunk_store=self._chunk_store, chunk_cache=self._chunk_cache,
synchronizer=self._synchronizer, cache_attrs=self.attrs.cache)
elif contains_group(self._store, path):
synchronizer=self._synchronizer, cache_attrs=self.attrs.cache,
zarr_version=self._version, meta_array=self._meta_array)
elif contains_group(self._store, path, explicit_only=True):
return Group(self._store, read_only=self._read_only, path=path,
chunk_store=self._chunk_store, cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer, chunk_cache=self._chunk_cache)
chunk_store=self._chunk_store, chunk_cache=self._chunk_cache, cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer, zarr_version=self._version,
meta_array=self._meta_array)
elif self._version == 3:
implicit_group = meta_root + path + '/'
# non-empty folder in the metadata path implies an implicit group
if self._store.list_prefix(implicit_group):
return Group(self._store, read_only=self._read_only, path=path,
chunk_store=self._chunk_store, cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer, zarr_version=self._version,
meta_array=self._meta_array)
else:
raise KeyError(item)
else:
raise KeyError(item)

Expand Down Expand Up @@ -516,14 +538,39 @@ def groups(self):
foo <class 'zarr.hierarchy.Group'>

"""
for key in sorted(listdir(self._store, self._path)):
path = self._key_prefix + key
if contains_group(self._store, path):
yield key, Group(self._store, path=path, read_only=self._read_only,
chunk_store=self._chunk_store,
chunk_cache=self._chunk_cache,
cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer)
if self._version == 2:
for key in sorted(listdir(self._store, self._path)):
path = self._key_prefix + key
if contains_group(self._store, path, explicit_only=False):
yield key, Group(
self._store,
path=path,
read_only=self._read_only,
chunk_store=self._chunk_store,
chunk_cache=self._chunk_cache,
cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer,
zarr_version=self._version)

else:
dir_name = meta_root + self._path
group_sfx = '.group' + self._metadata_key_suffix
for key in sorted(listdir(self._store, dir_name)):
if key.endswith(group_sfx):
key = key[:-len(group_sfx)]
path = self._key_prefix + key
if path.endswith(".array" + self._metadata_key_suffix):
# skip array keys
continue
if contains_group(self._store, path, explicit_only=False):
yield key, Group(
self._store,
path=path,
read_only=self._read_only,
chunk_store=self._chunk_store,
cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer,
zarr_version=self._version)

def array_keys(self, recurse=False):
"""Return an iterator over member names for arrays only.
Expand Down Expand Up @@ -816,7 +863,7 @@ def _create_group_nosync(self, name, overwrite=False):

return Group(self._store, path=path, read_only=self._read_only,
chunk_store=self._chunk_store, cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer, chunk_cache=self._chunk_cache)
synchronizer=self._synchronizer, chunk_cache=self._chunk_cache, zarr_version=self._version)

def create_groups(self, *names, **kwargs):
"""Convenience method to create multiple groups in a single call."""
Expand Down Expand Up @@ -860,7 +907,8 @@ def _require_group_nosync(self, name, overwrite=False):

return Group(self._store, path=path, read_only=self._read_only,
chunk_store=self._chunk_store, cache_attrs=self.attrs.cache,
synchronizer=self._synchronizer, chunk_cache=self._chunk_cache)
synchronizer=self._synchronizer, chunk_cache=self._chunk_cache,
zarr_version=self._version)

def require_groups(self, *names):
"""Convenience method to require multiple groups in a single call."""
Expand Down Expand Up @@ -989,7 +1037,7 @@ def _require_dataset_nosync(self, name, shape, dtype=None, exact=False,
a = Array(self._store, path=path, read_only=self._read_only,
chunk_store=self._chunk_store, synchronizer=synchronizer,
cache_metadata=cache_metadata, cache_attrs=cache_attrs,
chunk_cache=self._chunk_cache)
chunk_cache=self._chunk_cache, meta_array=self._meta_array)
shape = normalize_shape(shape)
if shape != a.shape:
raise TypeError('shape do not match existing array; expected {}, got {}'
Expand Down Expand Up @@ -1178,8 +1226,7 @@ def _normalize_store_arg(store, *, storage_options=None, mode="r",


def group(store=None, overwrite=False, chunk_store=None,
cache_attrs=True, synchronizer=None, path=None,
chunk_cache=None):
cache_attrs=True, synchronizer=None, path=None, chunk_cache=None, *, zarr_version=None):
"""Create a group.

Parameters
Expand Down Expand Up @@ -1244,11 +1291,11 @@ def group(store=None, overwrite=False, chunk_store=None,

return Group(store, read_only=False, chunk_store=chunk_store,
cache_attrs=cache_attrs, synchronizer=synchronizer, path=path,
chunk_cache=chunk_cache)
zarr_version=zarr_version)


def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None,
chunk_store=None, storage_options=None, chunk_cache=None):
chunk_store=None, storage_options=None, chunk_cache=None, *, zarr_version=None, meta_array=None):
"""Open a group using file-mode-like semantics.

Parameters
Expand Down Expand Up @@ -1351,4 +1398,4 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N

return Group(store, read_only=read_only, cache_attrs=cache_attrs,
synchronizer=synchronizer, path=path, chunk_store=chunk_store,
chunk_cache=chunk_cache)
chunk_cache=chunk_cache, zarr_version=zarr_version, meta_array=meta_array)
40 changes: 36 additions & 4 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2348,7 +2348,7 @@ def __delitem__(self, key):
raise NotImplementedError # pragma: no cover


class LRUStoreCache(LRUMappingCache):
class LRUStoreCache(Store, LRUMappingCache):
"""Storage class that implements a least-recently-used (LRU) cache layer over
some other store. Intended primarily for use with stores that can be slow to
access, e.g., remote stores that require network communication to store and
Expand Down Expand Up @@ -2385,12 +2385,17 @@ class LRUStoreCache(LRUMappingCache):

"""

def __init__(self, store, max_size):
def __init__(self, store: StoreLike, max_size: int):
super(LRUStoreCache, self).__init__(max_size)
self._store = store
self._store: BaseStore = BaseStore._ensure_store(store)
self._max_size = max_size
self._current_size = 0
self._keys_cache = None
self._contains_cache = None
self._listdir_cache = dict()
self._listdir_cache: Dict[Path, Any] = dict()
self._values_cache: Dict[Path, Any] = OrderedDict()
self._mutex = Lock()
self.hits = self.misses = 0

def __getstate__(self):
return (self._store, self._max_size, self._current_size, self._keys_cache,
Expand Down Expand Up @@ -2430,6 +2435,30 @@ def listdir(self, path: Path = None):
def getsize(self, path=None) -> int:
return getsize(self._store, path=path)

def _pop_value(self):
# remove the first value from the cache, as this will be the least recently
# used value
_, v = self._values_cache.popitem(last=False)
return v

def _accommodate_value(self, value_size):
if self._max_size is None:
return
# ensure there is enough space in the cache for a new value
while self._current_size + value_size > self._max_size:
v = self._pop_value()
self._current_size -= buffer_size(v)

def _cache_value(self, key: Path, value):
# cache a value
value_size = buffer_size(value)
# check size of the value against max size, as if the value itself exceeds max
# size then we are never going to cache it
if self._max_size is None or value_size <= self._max_size:
self._accommodate_value(value_size)
self._values_cache[key] = value
self._current_size += value_size

def invalidate(self):
"""Completely clear the cache."""
with self._mutex:
Expand Down Expand Up @@ -2986,6 +3015,9 @@ def invalidate(self):
with self._mutex:
self._values_cache.clear()

def close(self):
pass

def __getitem__(self, key):
try:
# try to obtain the value from the cache
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.