Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use multiple batches of limited size for large operations #321

Merged
merged 2 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions google/cloud/ndb/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ def get_batch(batch_cls, options=None):
options_key = ()

batch = batches.get(options_key)
if batch is not None:
if batch is not None and not batch.full():
return batch

def idle():
batch = batches.pop(options_key)
batch.idle_callback()
def idler(batch):
def idle():
if batches.get(options_key) is batch:
del batches[options_key]
batch.idle_callback()

return idle

batches[options_key] = batch = batch_cls(options)
_eventloop.add_idle(idle)
_eventloop.add_idle(idler(batch))
return batch
8 changes: 8 additions & 0 deletions google/cloud/ndb/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ class _GlobalCacheBatch(object):
"""Abstract base for classes used to batch operations for the global cache.
"""

def full(self):
"""Indicates whether more work can be added to this batch.

Returns:
boolean: `False`, always.
"""
return False

def idle_callback(self):
"""Call the cache operation.

Expand Down
48 changes: 45 additions & 3 deletions google/cloud/ndb/_datastore_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ def __init__(self, options):
self.options = options
self.todo = {}

def full(self):

"""Indicates whether more work can be added to this batch.

Returns:
boolean: `True` if number of keys to be looked up has reached 1000,
else `False`.
"""
return len(self.todo) >= 1000

def add(self, key):
"""Add a key to the batch to look up.

Expand Down Expand Up @@ -477,6 +487,15 @@ def __init__(self, options):
self.mutations = []
self.futures = []

def full(self):
"""Indicates whether more work can be added to this batch.

Returns:
boolean: `True` if number of mutations has reached 500, else
`False`.
"""
return len(self.mutations) >= 500

def put(self, entity_pb):
"""Add an entity to batch to be stored.

Expand Down Expand Up @@ -854,8 +873,15 @@ def allocate(keys, options):
Returns:
tasklets.Future: A future for the key completed with the allocated id.
"""
batch = _batch.get_batch(_AllocateIdsBatch, options)
return batch.add(keys)
futures = []
while keys:
batch = _batch.get_batch(_AllocateIdsBatch, options)
room_left = batch.room_left()
batch_keys = keys[:room_left]
futures.extend(batch.add(batch_keys))
keys = keys[room_left:]

return tasklets._MultiFuture(futures)


class _AllocateIdsBatch(object):
Expand All @@ -875,6 +901,22 @@ def __init__(self, options):
self.keys = []
self.futures = []

def full(self):
"""Indicates whether more work can be added to this batch.

Returns:
boolean: `True` if number of keys has reached 500, else `False`.
"""
return len(self.keys) >= 500

def room_left(self):
"""Get how many more keys can be added to this batch.

Returns:
int: 500 - number of keys already in batch
"""
return 500 - len(self.keys)

def add(self, keys):
"""Add incomplete keys to batch to allocate.

Expand All @@ -892,7 +934,7 @@ def add(self, keys):
self.keys.append(key)

self.futures.extend(futures)
return tasklets._MultiFuture(futures)
return futures

def idle_callback(self):
"""Perform a Datastore AllocateIds request on all batched keys."""
Expand Down
8 changes: 5 additions & 3 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ def with_ds_client(ds_client, to_delete, deleted_keys):

yield ds_client

if to_delete:
ds_client.delete_multi(to_delete)
deleted_keys.update(to_delete)
while to_delete:
batch = to_delete[:500]
ds_client.delete_multi(batch)
deleted_keys.update(batch)
to_delete = to_delete[500:]

not_deleted = [
entity
Expand Down
36 changes: 36 additions & 0 deletions tests/system/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,3 +1131,39 @@ class SomeKind(ndb.Model):
assert len(keys_upd) == len(keys)
assert len(set(keys_upd)) == len(set(keys))
assert set(keys_upd) == set(keys)


@pytest.mark.usefixtures("client_context")
def test_multi_with_lots_of_keys(dispose_of):
"""Regression test for issue #318.

https://github.com/googleapis/python-ndb/issues/318
"""
N = 1001

class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

foos = list(range(N))
entities = [SomeKind(foo=foo) for foo in foos]
keys = ndb.put_multi(entities)
dispose_of(*(key._key for key in keys))
assert len(keys) == N

entities = ndb.get_multi(keys)
assert [entity.foo for entity in entities] == foos

ndb.delete_multi(keys)
entities = ndb.get_multi(keys)
assert entities == [None] * N


@pytest.mark.usefixtures("client_context")
def test_allocate_a_lot_of_keys():
N = 1001

class SomeKind(ndb.Model):
pass

keys = SomeKind.allocate_ids(N)
assert len(keys) == N
11 changes: 11 additions & 0 deletions tests/unit/test__batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,25 @@ def test_it(self):

assert _batch.get_batch(MockBatch, options) is batch

batch._full = True
batch2 = _batch.get_batch(MockBatch, options)
assert batch2 is not batch
assert not batch2.idle_called

_eventloop.run()
assert batch.idle_called
assert batch2.idle_called


class MockBatch:
_full = False

def __init__(self, options):
self.options = options
self.idle_called = False

def idle_callback(self):
self.idle_called = True

def full(self):
return self._full
5 changes: 5 additions & 0 deletions tests/unit/test__cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ def test_add_and_idle_and_done_callbacks_w_error(in_context):
assert future1.exception() is error
assert future2.exception() is error

@staticmethod
def test_full():
batch = _cache._GlobalCacheGetBatch(None)
assert batch.full() is False


class Test_global_set:
@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test__datastore_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,9 +1207,9 @@ def test_constructor():
def test_add():
options = _options.Options()
batch = _api._AllocateIdsBatch(options)
future = batch.add(["key1", "key2"])
futures = batch.add(["key1", "key2"])
assert batch.keys == ["key1", "key2"]
assert batch.futures == future._dependencies
assert batch.futures == futures

@staticmethod
@mock.patch("google.cloud.ndb._datastore_api._datastore_allocate_ids")
Expand Down