Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][aDAG] support multi readers in multi node when dag is created from an actor #47601

Merged
merged 7 commits into from
Sep 14, 2024

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Sep 11, 2024

Why are these changes needed?

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, it looks good to me. My only concern is how much overhead will be introduced with the additional proxy actor? The overhead may be significant, especially since the main use case for creating a DAG from an actor is Ray Serve, which is sensitive to latency.

python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. My only concern is how much overhead will be introduced with the additional proxy actor when the driver is an actor?

@@ -66,27 +66,32 @@ def do_allocate_channel(
self,
reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]],
typ: ChannelOutputType,
read_by_adag_driver: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent to is_adag_output_channel? Should we use that name which seems easier to follow

Can we have a default value = False as most of the cases would be false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better not having default as it is kind of important flag that can cause hangs if set incorrectly.

Regarding the name, I thought is_adag_output_channel exposes more implementation detail than this (that read by driver == output), and that was the original name before I changed it to this one. I don't have strong preference. Lmk if you want me to change.

Copy link
Contributor

@ruisearch42 ruisearch42 Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with adag_driver is it's not a well defined concept so you need to explain it everywhere and can be confused with ray driver. I don't have a good idea right now, so will leave it to you to decide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm imo it is a reasonable wording, so unless there's strong pushback, I will probably just keep it

"Compiled DAGs currently require the InputNode() to be the "
"driver process or an actor method. Ray task is not supported."
)
def _get_proxy_actor() -> "ray.actor.ActorHandle":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _create_proxy_actor

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@@ -246,6 +246,9 @@ def _execute_impl(self, *args, **kwargs):
def __str__(self) -> str:
return get_dag_node_str(self, f"{self._method_name}()")

def __repr__(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why is this needed?
(repr is not added in other DAGNode)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was to make debugging easy when it was nested inside a list (in this case, repr is used).

@rkooo567
Copy link
Contributor Author

LGTM. My only concern is how much overhead will be introduced with the additional proxy actor when the driver is an actor?

I believe this mechanism is not supposed to introduce any additional delay (it is how driver -> actor works). What's the potential overhead coming from?

Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

@kevin85421
Copy link
Member

I believe this mechanism is not supposed to introduce any additional delay (it is how driver -> actor works). What's the potential overhead coming from?

  • Without this PR: actor 1 -> actor 2 -> actor 1
  • With this PR: actor 1 -> actor 2 -> proxy actor -> actor 1

It needs to go through an additional proxy actor in this case.

@rkooo567
Copy link
Contributor Author

@kevin85421 Does it affect the runtime performance? IIUC, we are not going through proxy actor when we read, so it should be okay.

@kevin85421
Copy link
Member

IIUC, we are not going through proxy actor when we read, so it should be okay.

Oh, I missed that part. Thanks!

@rkooo567 rkooo567 enabled auto-merge (squash) September 14, 2024 08:11
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Sep 14, 2024
@rkooo567 rkooo567 merged commit 4b2f6a0 into ray-project:master Sep 14, 2024
6 of 7 checks passed
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…from an actor (ray-project#47601)

Currently, when a DAG is created from an actor, we are using different mechanism from a driver. In a driver we create a ProxyActor vs actor we are just using the actor itself.

This inconsistent mechanism is prone to error. As an example, I found when we support multi reader in multi node, we have deadlock because the driver actor needs to call ray.get(a.allocate_channel.remote()) for a downstream actor while the downstream actor calls ray.get(driver_actor.create_ref.remote()).

This fixes the issue by making ProxyActor as the default mechanism even when a dag is created inside an actor.

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants