-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[aDAG] Overlap computation and communication #47586
base: master
Are you sure you want to change the base?
Conversation
9e13938
to
ccb561c
Compare
6b66049
to
4e752b2
Compare
@@ -172,7 +189,7 @@ def recv( | |||
dtype: "torch.dtype", | |||
peer_rank: int, | |||
allocator=Optional[TorchTensorAllocator], | |||
) -> "torch.Tensor": | |||
) -> Union["torch.Tensor", Tuple["torch.Tensor", "cp.cuda.Event"]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API needs to be refined:
- Consider introducing a new
recv_gpu_async()
API, but this complicates client side code which needs to decide which method to call; - We should not have cuda.Event in the API, may need our wrapper API for event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we instead always return Tuple["torch.Tensor", Optional["cp.cuda.Event"]]
? I think that will be a cleaner interface. You could also wrap this in our own dataclass like MaybeAsyncTensor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about generalizing all send/recv to Future? And if it is blocking, the returned future is already ready. (I think code complexity wise it is same as returning None bcause you need to check if the second output is None anyway)
fut = recv()
val = fut.wait()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Future sounds like a good idea. Will update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main comment is to try to reuse existing codepaths for executing tasks and reading/writing local args. I think the code will be much more robust this way, plus we need to do it anyway to support enabling overlapping per-task or object.
Seems possible to do if we wrap all inputs/outputs with a wrapper class like this, maybe we need to update the channel reading/writing:
@dataclass
class Value:
value: Any
# If set, then reader needs to synchronize before reading value.
cuda_event: Optional[cp.cuda.Event]
Also should think about how we can unit-test this. Ideally we should try to write a mocked test, maybe something like this one.
@@ -173,6 +174,106 @@ def do_profile_tasks( | |||
raise | |||
|
|||
|
|||
@DeveloperAPI | |||
def do_stream_tasks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we combine this logic with do_exec_tasks
?
I think the more codepaths we have here, the more brittle the code becomes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the first version of the code does that.
The downside of that is if/else branches in each of the _read()/_compute()/_write()/_exec_operation() methods, which is not quite clean. I get your point that adding a separate code path is brittle, and agree from general principle. Yet I feel reusing do_exec_tasks
is arguably brittle as well, as people need to think two different kinds of "execution loops" while maintaining a single method. I think the do_stream_tasks
code path would evolve more after this PR (e.g., supporting overlapping more, supporting shared memory involved operations) and there are likely interface changes which will make it more incompatible with do_exec_tasks
. My original thinking was that these two code paths will eventually converge, but it is probably better to keep them separate in the beginning.
The other benefit of different execution loops is not to introduce performance regressions for cases where overlapping is disabled.
Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point but I think in this case we do not want these paths to evolve separately too much, i.e. in one of the following PRs we should support more controls like disabling overlapping by task or specifying the stream to use. For these changes, it will be best to be able to use the same execution loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think I can change to that.
tasks: List["ExecutableTask"], | ||
schedule: List[_DAGNodeOperation], | ||
) -> None: | ||
"""Similar to `do_stream_tasks`, but with torch profiling enabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest starting/stopping the profiling context in a Ray task before/after sending the tasks to start the execution loop. That way we can reuse the current do_exec_tasks
codepath.
# if the operation is COMPUTE, the value is the result from the READ with | ||
# the same exec_task_idx; if the operation is WRITE, the value is the result | ||
# from the COMPUTE with the same exec_task_idx. | ||
self._stream_buffer: Dict[_DAGNodeOperation, Any] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse self._intermediate_buffer
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally reused it, but for Dict it has additional overhead of hashing/lookup etc. (although arguably the overhead is small), so kept the original path entirely intact.
This also ties to the decision whether we reuse do_exec_task
or have the new do_stream_task
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm my preference is that we reuse do_exec_task
and try to do the Future
kind of API that @rkooo567 suggested. I think with these changes it should be possible to introduce the stream execution in a clean way.
python/ray/dag/compiled_dag_node.py
Outdated
import cupy as cp | ||
|
||
exec_event = cp.cuda.Event() | ||
with exec_stream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be best if we can avoid implicitly setting the execution stream (user may have code that manages streams themselves).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the current behavior is if user uses another stream in their UDF, that will take precedence. Is it a behavior we don't want? Can you elaborate a bit on the problem and the stream-management-contract-with-user you have on mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm in general I'm wary of implicitly overwriting user defaults but it could be okay in this case since as you said, the user stream will take precedence. Once we have an API to set the execution stream, this also becomes less of an issue and more of a debate of what the better default behavior is (just use the default stream or create one).
One tricky thing is that the CUDA default stream also has different synchronization semantics depending on compilation flags...: https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to avoid using a new stream for execution. It is confusing behavior for users, and if they do something in main thread with a default stream, it can cause issues (tho it is not common).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Stephanie's original suggestion also to use the passed-in stream as execution stream rather than creating a new one? like this Sang's comment here #47586 (comment). @stephanie-wang
If so, I can change to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not use any stream for execution (so it uses default from users). And only when it is explicitly passed, we should allow to overwrite (in reality, users may just also do it themselves if required). let's minimze magic behaviors if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rkooo567 I will have a quick chat with you tomorrow to make sure I understand your recommendation.
@@ -172,7 +189,7 @@ def recv( | |||
dtype: "torch.dtype", | |||
peer_rank: int, | |||
allocator=Optional[TorchTensorAllocator], | |||
) -> "torch.Tensor": | |||
) -> Union["torch.Tensor", Tuple["torch.Tensor", "cp.cuda.Event"]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we instead always return Tuple["torch.Tensor", Optional["cp.cuda.Event"]]
? I think that will be a cleaner interface. You could also wrap this in our own dataclass like MaybeAsyncTensor
.
python/ray/dag/dag_node_operation.py
Outdated
# not supported (yet). For example, currently if a channel requires | ||
# both NCCL and shared memory transport, overlap optimization cannot | ||
# be applied. | ||
if out_of_order_limit == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some comments to this code to explain the logic?
Also, improve the name? _optimize_execution_schedule
is not very clear what it's optimizing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 or maybe just add high level approach in the docstring. Btw, the logic is we put all read ahead as many as out of order limit and put compute/send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really think we need to unify the execution loop. The reason is the test space becomes much larger (we need to also make sure existing case works correctly when overlap is used).
What about we always assume send/recv is unblocking and return Future? And if send/recv is a blocking, the future is returned after wait is finished. Otherwise, it just returns future. It is same as how gloo apis also work
|
||
# Feature flag to turn on torch profiling. | ||
RAY_ADAG_ENABLE_TORCH_PROFILING = ( | ||
os.environ.get("RAY_ADAG_ENABLE_TORCH_PROFILING", "0") == "1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will allow to dynamic profiling (like profiling N iterations, and you can enable/disable at runtime). I think this one is okay for now. Can you create a corresponding issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #47745
""" | ||
self.exec_task_idx = exec_task_idx | ||
self.type = operation_type | ||
self.method_name = method_name | ||
|
||
def next_operation(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add docstring to clarify the definition?
I am asking it because "next operation" can have 2 meanings. 1. the literal next op in the scheduling. the next operation for the same bind index
|
||
def __repr__(self): | ||
return f"(Task idx: {self.exec_task_idx}, Type: {self.type})" | ||
return f"([{self.exec_task_idx}] {self.method_name} {self.type})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just return __str__()
?
return actor_to_execution_schedule | ||
|
||
|
||
def _optimize_execution_schedule( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add unit tests like @kevin85421 did before? (not e2e, but unit level testing)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will do.
peer_rank, | ||
self._recv_stream.ptr, | ||
) | ||
event.record(self._recv_stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I can imagine we can also do recv_stream.synchronize()
what's the pros of cons of using event vs stream.synchornize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, at the consumer side (compute operation), we'd like to sync on a particular event on the recv_stream, rather than the whole stream (there might be other operations launched to the same recv_stream).
) | ||
if done: | ||
break | ||
profiler.export_chrome_trace(f"adag-proc-{pid}.json") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log before doing this so that users can see where the file is geneated
# exception in a RayTaskError here because it has already been wrapped | ||
# by the previous task. | ||
self.set_stream_buffer(exc) | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does it propagate back to the caller? Looks like the when an execution loop sees False return value, it just finishes the loop.
@@ -269,6 +271,52 @@ def test_torch_tensor_nccl(ray_start_regular): | |||
# ray.get(receiver.ping.remote()) | |||
|
|||
|
|||
@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) | |||
def test_torch_tensor_nccl_overlap(ray_start_regular): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add test cases where exception is raised from compute/recv/send? (and it is raised properly)
python/ray/dag/compiled_dag_node.py
Outdated
""" | ||
output_val, exec_event = self.reset_stream_buffer(op) | ||
exit = False | ||
exec_event.synchronize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should not block cpu here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this way, we can overlap shm write and compute (if it runs in kernel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to sync to make sure execution finishes, otherwise the value may be incorrect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can synchronize on compute stream, and then torch and cuda should handle the gpu synchronization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to sync on the event since there may be additional operations on the compute stream? Wouldn't waiting on the compute stream require unnecessarily longer waiting time?
Could you elaborate more on "then torch and cuda should handle the gpu synchronization"? Not sure what it means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm actually my bad. I think if event is created with blocking=False (which is the default https://docs.cupy.dev/en/stable/reference/generated/cupy.cuda.Event.html), this only blocks the relevant device, not cpu. So I think the current code is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 additional comments;
- Before proceeding refactoring, is it possible to unblock @woshiyyya? Maybe one easy solution is to build a custom wheel (you can ping reef team to learn how to do it) so that he can just use it for multi node tests.
- right now, we have 1 stream per recv/send for 1 process. I wonder if we need a stream per channel or should manage a queue of streams to have multiple nccl channels?
Yes that's my plan.
I think initially we can start with 1 stream. Whether multiple streams can provide better performance would depend on a few factors, and may need a bit of design, although we can perhaps try it. |
Yeah sounds good. Pp anyway has 1:1 only. We can revisit and benchmark when we have more than 1 input output use cases |
Newer commits are for graph visualization prototype, just a draft for now and no need to review new code. |
4321270
to
7e5811f
Compare
7e5811f
to
133157a
Compare
Why are these changes needed?
This PR supports overlapping computation and communication for GPU tasks, as described in https://docs.google.com/document/d/1AkAqrMPadk1rMyjKE4VN4bq058z36fgBcx0i4dHIW20/edit#heading=h.8jw8z0hmgva0
The scope is send/recv but does not include collectives.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.