-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Fault tolerance for compiled DAGs #41943
Conversation
debugger_breakpoint, | ||
) | ||
values = self.deserialize_objects(data_metadata_pairs, object_refs) | ||
for i, value in enumerate(values): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from non-experimental path.
fd3459a
to
477cadf
Compare
477cadf
to
13e8bd7
Compare
@@ -751,8 +756,14 @@ Status PlasmaClient::Impl::EnsureGetAcquired( | |||
} | |||
|
|||
int64_t version_read = 0; | |||
|
|||
// Need to unlock the client mutex since ReadAcquire() is blocking. | |||
// TODO(ekl) is this entirely thread-safe? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are probably assured the object cannot be deallocated while a reader has the reference. @stephanie-wang @rkooo567 does this seem right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now mutable objects are never deallocated :D
But yeah in general this should be okay as long as we make sure to increment the PlasmaClient's local ref count for the object before we unlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is an edge case here where an actor dies after WriteAcquire but before WriteRelease. For that case, we would need to make sure to write the exception and have one process Release (or make Release safe for multiple writers).
python/ray/experimental/channel.py
Outdated
try_wait=True, | ||
) | ||
except Exception as e: | ||
if not _is_write_acquire_failed_error(e): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I didn't quite understand this condition. It seems to fail silently if we fail to acquire?
Also, could you comment on what cases we expect to fail to acquire?
python/ray/dag/compiled_dag_node.py
Outdated
@@ -75,10 +73,21 @@ def do_exec_compiled_task( | |||
channel.end_read() | |||
|
|||
except Exception as e: | |||
logging.warn(f"Compiled DAG task aborted with exception: {e}") | |||
logging.info(f"Compiled DAG task exited with exception: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-Ray exceptions, I wonder if we should instead store the error and keep looping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -176,6 +192,32 @@ def f(x): | |||
dag.experimental_compile() | |||
|
|||
|
|||
@pytest.mark.parametrize("num_actors", [1, 4]) | |||
def test_dag_fault_tolerance(ray_start_regular, num_actors): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test for worker process dying?
I see, would we need a timeout here to force release of the lock? |
python/ray/dag/compiled_dag_node.py
Outdated
@@ -75,10 +73,21 @@ def do_exec_compiled_task( | |||
channel.end_read() | |||
|
|||
except Exception as e: | |||
logging.warn(f"Compiled DAG task aborted with exception: {e}") | |||
logging.info(f"Compiled DAG task exited with exception: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
If we know that the original writer has definitely died, it should be okay to directly write the plasma buffer with the exception object and WriteRelease. But actually yeah it is a bit tricky if the process fails while holding the pthread_mutex in WriteAcquire or WriteRelease. I think we need to rethink that concurrency mechanism... |
A long enough timeout on pthread_mutex_lock seems okay for now; we can probably improve it later. |
By the way, seems like we need something like this to support multi-node too, so that we have a way to signal that we should stop waiting for values to send to the other node. I think it'd be best if we can send a special value like "EOF" instead of storing an exception, so that way it works for both python and C++ readers. |
So how about this, we can switch the error writing path of WriteAcquire to
Can you explain more? |
Signed-off-by: Eric Liang <ekhliang@gmail.com>
96b78b6
to
d12ef99
Compare
Signed-off-by: Eric Liang <ekhliang@gmail.com>
d12ef99
to
94c5ba4
Compare
From offline discussion, this should be refactored so that
EOS will be implemented as an "error bit" that can be set on the channel without locking. This allows error handling to avoid race conditions, while still preserving exception messages in most cases. |
What's missing to support "all errors" in this case? |
Signed-off-by: Eric Liang <ekhliang@gmail.com>
0a2c8fd
to
2e7d2a4
Compare
@@ -68,17 +69,41 @@ def do_exec_compiled_task( | |||
for idx, channel in input_channel_idxs: | |||
resolved_inputs[idx] = channel.begin_read() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to explicitly try-catch the channel calls so that we can differentiate between expected errors (channel closed), application code errors, and anything else that might error in this loop (most likely system bugs). The try-catch at the end can be for system errors only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I played around with this and deciding the semantics is tricky, so I think we should tackle this later on for productionization.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
2e7d2a4
to
482296d
Compare
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu> Signed-off-by: Eric Liang <ekhliang@gmail.com>
Comments addressed |
Signed-off-by: Eric Liang <ekhliang@gmail.com>
1c32efb
to
1e19048
Compare
Tests look good, retrying flaky one. |
This adds fault tolerance and a teardown method for compiled DAGs.
Why are these changes needed?
This adds fault tolerance and a
teardown
method for compiled DAGs.RaySystemError("channel closed")
.compiled_dag.teardown()
Note: cancellation is best-effort and currently requires running a task on the actor's main concurrency group. If the actor is busy with some other task submitted by the user, cancellation will be delayed.
Related issue number
#41769
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.