Skip to content

Commit

Permalink
fix: fix race condition in remote calls (#329)
Browse files Browse the repository at this point in the history
The wrapper, `_remote.RemoteCall`, added to help attach debugging into
to gRPC calls, inadvertantly introduced a race condition between the
gRPC thread and the NDB thread, where the gRPC thread might call
`RemoteCall._finish` *during* a call to `RemoteCall.add_done_callback`.

The solution, here, is to turn `RemoteCall.add_done_callback` into a
direct pass-through to `grpc.Future.add_done_callback` on the wrapped
future. The callback which is eventually executed in the gRPC thread,
only pushes the finished RPC onto a `queue.Queue` which is eventually
consumed by the event loop running in the NDB thread.

Fixes #302.
  • Loading branch information
Chris Rossi authored Feb 7, 2020
1 parent 0484c7a commit f550510
Showing 1 changed file with 8 additions and 17 deletions.
25 changes: 8 additions & 17 deletions google/cloud/ndb/_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ class RemoteCall(object):
def __init__(self, future, info):
self.future = future
self.info = info
self._callbacks = []

future.add_done_callback(self._finish)

def __repr__(self):
return self.info

def exception(self):
"""Calls :meth:`grpc.Future.exception` on attr:`future`."""
"""Calls :meth:`grpc.Future.exception` on :attr:`future`."""
# GRPC will actually raise FutureCancelledError.
# We'll translate that to our own Cancelled exception and *return* it,
# which is far more polite for a method that *returns exceptions*.
Expand All @@ -57,7 +54,7 @@ def exception(self):
return exceptions.Cancelled()

def result(self):
"""Calls :meth:`grpc.Future.result` on attr:`future`."""
"""Calls :meth:`grpc.Future.result` on :attr:`future`."""
return self.future.result()

def add_done_callback(self, callback):
Expand All @@ -67,19 +64,13 @@ def add_done_callback(self, callback):
Args:
callback (Callable): The function to execute.
"""
if self.future.done():
callback(self)
else:
self._callbacks.append(callback)
remote = self

def wrapper(rpc):
return callback(remote)

self.future.add_done_callback(wrapper)

def cancel(self):
"""Calls :meth:`grpc.Future.cancel` on attr:`cancel`."""
return self.future.cancel()

def _finish(self, rpc):
"""Called when remote future is finished.
Used to call our own done callbacks.
"""
for callback in self._callbacks:
callback(self)

0 comments on commit f550510

Please sign in to comment.