Skip to content

Commit

Permalink
Use a close callback with UCX to improve error handling
Browse files Browse the repository at this point in the history
Register a close callback function with UCX to prevent writing when the
endpoint has already closed. This prevents errors often raised when a
remote process closes too quickly before the local process is able to
send the close message.

Closes rapidsai/dask-cuda#713
  • Loading branch information
pentschev committed Oct 18, 2021
1 parent 6a0217e commit 6be30d3
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
.. _UCX: https://github.com/openucx/ucx
"""
import functools
import logging
import os
import struct
Expand Down Expand Up @@ -166,6 +167,18 @@ def device_array(n):
ucx_create_listener = EndpointReuse.create_listener


def _close_comm(ref):
"""Callback to close Dask Comm when UCX Endpoint closes or errors
Parameters
----------
ref: weak reference to a Dask UCX comm
"""
comm = ref()
if comm is not None:
comm._closed = True


class UCX(Comm):
"""Comm object using UCP.
Expand Down Expand Up @@ -210,6 +223,17 @@ def __init__(self, ep, local_addr: str, peer_addr: str, deserialize=True):
self._peer_addr = peer_addr
self.deserialize = deserialize
self.comm_flag = None

# When the UCX endpoint closes or errors the registered callback
# is called.
if hasattr(self._ep, "set_close_callback"):
ref = weakref.ref(self)
self._ep.set_close_callback(functools.partial(_close_comm, ref))
self._closed = False
self._has_close_callback = True
else:
self._has_close_callback = False

logger.debug("UCX.__init__ %s", self)

@property
Expand Down Expand Up @@ -369,7 +393,13 @@ def ep(self):
raise CommClosedError("UCX Endpoint is closed")

def closed(self):
return self._ep is None
if self._has_close_callback is True:
# The self._closed flag is separate from the endpoint's lifetime, even when
# the endpoint has closed or errored, there may be messages on its buffer
# still to be received, even though sending is not possible anymore.
return self._closed
else:
return self._ep is None


class UCXConnector(Connector):
Expand Down

0 comments on commit 6be30d3

Please sign in to comment.