-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][dashboard] Update nodes on delta. #47367
Conversation
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
subscriber = GcsAioNodeInfoSubscriber(address=gcs_addr) | ||
await subscriber.subscribe() | ||
|
||
# Get all node info from GCS. For TOCTOU, it happens after the subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's TOCTOU?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you do:
- get all
- subscribe
- for each update, ...
between 1 and 2 you can miss some nodes. so we do
- subscribe
- get all
- for each update, ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's expand the acronym as it's not a commonly used one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think actor_head is not doing that, it's doing get all
and then subscribe
so it has the TOCTOU problem?
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Fixed all comments, reverted state_aggregator.py TPE changes (will be in another PR). PTAL! @jjyao @alexeykudinkin |
subscriber = GcsAioNodeInfoSubscriber(address=gcs_addr) | ||
await subscriber.subscribe() | ||
|
||
# Get all node info from GCS. For TOCTOU, it happens after the subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think actor_head is not doing that, it's doing get all
and then subscribe
so it has the TOCTOU problem?
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minor comments
for actor_id, actor_table_data in actors.items() | ||
} | ||
|
||
def convert(actors) -> Dict[str, dict]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we duplicating the changes already made in https://github.com/anyscale/runtime/pull/928?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I can leave this to you.
subscriber = GcsAioNodeInfoSubscriber(address=gcs_addr) | ||
await subscriber.subscribe() | ||
|
||
# Get all node info from GCS. For TOCTOU, it happens after the subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's expand the acronym as it's not a commonly used one
Thanks for the quick turn around! Let's do another round of reviews @jjyao @alexeykudinkin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lg
@@ -136,6 +136,20 @@ def __init__(self, dashboard_head): | |||
self.accumulative_event_processing_s = 0 | |||
|
|||
async def _update_actors(self): | |||
""" | |||
Yields actor info. First yields all actors from GCS, then subscribes to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method doesn't yield anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make the code consistent for actor_head and node_head: e.g. both use AsyncGenerator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes but let's make it another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should at least fix the comment to match the current PR
@@ -136,6 +136,20 @@ def __init__(self, dashboard_head): | |||
self.accumulative_event_processing_s = 0 | |||
|
|||
async def _update_actors(self): | |||
""" | |||
Yields actor info. First yields all actors from GCS, then subscribes to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should at least fix the comment to match the current PR
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Like actor_head.py, we now update DataSource.nodes on delta. It first queries all node infos, then subscribes node deltas. Each delta updates: 1. DataSource.nodes[node_id] 2. DataSource.agents[node_id] 3. a warning generated after RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT = 10s Note on (2) agents: it's read from internal kv, and is not readily available until the agent.py is spawned and writes its own port to internal kv. So we make an async task for each node to poll this port every 1s. It occurs that the get-all-then-subscribe code has a TOCTOU problem, so also updated actor_head.py to first subscribe then get all actors. Signed-off-by: Ruiyang Wang <rywang014@gmail.com> Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Like actor_head.py, we now update DataSource.nodes on delta. It first queries all node infos, then subscribes node deltas. Each delta updates: 1. DataSource.nodes[node_id] 2. DataSource.agents[node_id] 3. a warning generated after RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT = 10s Note on (2) agents: it's read from internal kv, and is not readily available until the agent.py is spawned and writes its own port to internal kv. So we make an async task for each node to poll this port every 1s. It occurs that the get-all-then-subscribe code has a TOCTOU problem, so also updated actor_head.py to first subscribe then get all actors. Signed-off-by: Ruiyang Wang <rywang014@gmail.com> Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Like actor_head.py, we now update DataSource.nodes on delta. It first queries all node infos, then subscribes node deltas. Each delta updates: 1. DataSource.nodes[node_id] 2. DataSource.agents[node_id] 3. a warning generated after RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT = 10s Note on (2) agents: it's read from internal kv, and is not readily available until the agent.py is spawned and writes its own port to internal kv. So we make an async task for each node to poll this port every 1s. It occurs that the get-all-then-subscribe code has a TOCTOU problem, so also updated actor_head.py to first subscribe then get all actors. Signed-off-by: Ruiyang Wang <rywang014@gmail.com> Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Like actor_head.py, we now update DataSource.nodes on delta. It first queries all node infos, then subscribes node deltas. Each delta updates: 1. DataSource.nodes[node_id] 2. DataSource.agents[node_id] 3. a warning generated after RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT = 10s Note on (2) agents: it's read from internal kv, and is not readily available until the agent.py is spawned and writes its own port to internal kv. So we make an async task for each node to poll this port every 1s. It occurs that the get-all-then-subscribe code has a TOCTOU problem, so also updated actor_head.py to first subscribe then get all actors. Signed-off-by: Ruiyang Wang <rywang014@gmail.com> Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Like actor_head.py, we now update DataSource.nodes on delta. It first queries all node infos, then subscribes node deltas. Each delta updates: 1. DataSource.nodes[node_id] 2. DataSource.agents[node_id] 3. a warning generated after RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT = 10s Note on (2) agents: it's read from internal kv, and is not readily available until the agent.py is spawned and writes its own port to internal kv. So we make an async task for each node to poll this port every 1s. It occurs that the get-all-then-subscribe code has a TOCTOU problem, so also updated actor_head.py to first subscribe then get all actors. Signed-off-by: Ruiyang Wang <rywang014@gmail.com> Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Like actor_head.py, we now update DataSource.nodes on delta. It first queries all node infos, then subscribes node deltas. Each delta updates:
Note on (2) agents: it's read from internal kv, and is not readily available until the agent.py is spawned and writes its own port to internal kv. So we make an async task for each node to poll this port every 1s.
It occurs that the get-all-then-subscribe code has a TOCTOU problem, so also updated actor_head.py to first subscribe then get all actors.