Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Future.done() behavior (resolves #24). #28

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.5"
__version__ = "1.8.6"
69 changes: 61 additions & 8 deletions scaler/client/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, task: Task, is_delayed: bool, group_task_id: Optional[bytes],
self._result_object_id: Optional[bytes] = None
self._result_ready_event = threading.Event()
self._result_request_sent = False
self._result_received = False

self._profiling_info: Optional[ProfileResult] = None

Expand All @@ -42,6 +43,8 @@ def set_result_ready(self, object_id: Optional[bytes], profile_result: Optional[
if self.done():
raise InvalidStateError(f"invalid future state: {self._state}")

self._state = "FINISHED"

if object_id is not None:
self._result_object_id = object_id

Expand All @@ -54,25 +57,74 @@ def set_result_ready(self, object_id: Optional[bytes], profile_result: Optional[

self._result_ready_event.set()

def set_exception(self, exception: Optional[BaseException], profile_result: Optional[ProfileResult] = None) -> None:
def _set_result_or_exception(
self,
result: Optional[Any] = None,
exception: Optional[BaseException] = None,
profiling_info: Optional[ProfileResult] = None
) -> None:
with self._condition: # type: ignore[attr-defined]
if profile_result is not None:
self._profiling_info = profile_result
if self.cancelled():
raise InvalidStateError(f"invalid future state: {self._state}")

if self._result_received:
raise InvalidStateError("future already received object data.")

if profiling_info is not None:
if self._profiling_info is not None:
raise InvalidStateError("cannot set profiling info twice.")

self._profiling_info = profiling_info

self._state = "FINISHED"
self._result_received = True

if exception is not None:
assert result is None
self._exception = exception
for waiter in self._waiters:
waiter.add_exception(self)
else:
self._result = result
for waiter in self._waiters:
waiter.add_result(self)

self._result_ready_event.set()
self._condition.notify_all()

return super().set_exception(exception)
self._invoke_callbacks() # type: ignore[attr-defined]

def result(self, timeout=None):
def set_result(self, result: Any, profiling_info: Optional[ProfileResult] = None) -> None:
rafa-be marked this conversation as resolved.
Show resolved Hide resolved
self._set_result_or_exception(result=result, profiling_info=profiling_info)

def set_exception(self, exception: Optional[BaseException], profiling_info: Optional[ProfileResult] = None) -> None:
self._set_result_or_exception(exception=exception, profiling_info=profiling_info)

def result(self, timeout: Optional[float] = None) -> Any:
self._result_ready_event.wait(timeout)

with self._condition: # type: ignore[attr-defined]
# if it's delayed future, get the result when future.result() get called
# if it's delayed future, get the result when future.result() gets called
if self._is_delayed:
self._request_result_object()

# wait for
return super().result(timeout)
if not self._result_received:
self._condition.wait(timeout)

return super().result()

def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]:
self._result_ready_event.wait(timeout)

with self._condition: # type: ignore[attr-defined]
# if it's delayed future, get the result when future.exception() gets called
if self._is_delayed:
self._request_result_object()

if not self._result_received:
self._condition.wait(timeout)

return super().exception()

def cancel(self) -> bool:
with self._condition: # type: ignore[attr-defined]
Expand All @@ -88,6 +140,7 @@ def cancel(self) -> bool:
self._connector.send(TaskCancel.new_msg(self._task_id))

self._state = "CANCELLED"
self._result_received = True

self._result_ready_event.set()
self._condition.notify_all() # type: ignore[attr-defined]
Expand Down
20 changes: 17 additions & 3 deletions tests/test_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_callback(self):
done_called_event = Event()

def on_done_callback(fut):
self.assertTrue(fut.done())
self.assertAlmostEqual(fut.result(), 4.0)
done_called_event.set()

Expand All @@ -45,29 +46,42 @@ def test_as_completed(self):

def test_state(self):
with Client(address=self.address) as client:
fut = client.submit(noop_sleep, 1.0)
fut = client.submit(noop_sleep, 0.5)
self.assertTrue(fut.running())
self.assertFalse(fut.done())

fut.result()
time.sleep(1.5)

self.assertFalse(fut.running())
self.assertTrue(fut.done())

def test_cancel(self):
with Client(address=self.address) as client:
fut = client.submit(math.sqrt, 100.0)
fut.cancel()
self.assertTrue(fut.cancel())

self.assertTrue(fut.cancelled())
self.assertTrue(fut.done())

with self.assertRaises(CancelledError):
fut.result()

fut = client.submit(math.sqrt, 16)
fut.result()

# cancel() should fail on a completed future.
self.assertFalse(fut.cancel())
self.assertFalse(fut.cancelled())

def test_exception(self):
with Client(address=self.address) as client:
fut = client.submit(math.sqrt, "16")

with self.assertRaises(TypeError):
fut.result()

self.assertTrue(fut.done())

self.assertIsInstance(fut.exception(), TypeError)

def test_client_disconnected(self):
Expand Down
Loading