Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add cluster ID to the Python layer #37583

Merged
merged 27 commits into from
Aug 10, 2023

Conversation

vitsai
Copy link
Contributor

@vitsai vitsai commented Jul 19, 2023

Why are these changes needed?

An earlier change added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@vitsai vitsai requested a review from a team as a code owner July 19, 2023 21:19
@vitsai vitsai force-pushed the gcs-raylet-plumb branch from 16b5e03 to 5e60f9a Compare July 19, 2023 21:35
@fishbone fishbone self-assigned this Jul 19, 2023
src/ray/raylet/main.cc Outdated Show resolved Hide resolved
Comment on lines 244 to 256
inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) {
grpc::ClientContext context;
if (timeout_ms != -1) {
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::milliseconds(timeout_ms));
}
if (!cluster_id_.IsNil()) {
context.AddMetadata(kClusterIdKey, cluster_id_.Hex());
}
return std::move(context);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) {
grpc::ClientContext context;
if (timeout_ms != -1) {
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::milliseconds(timeout_ms));
}
if (!cluster_id_.IsNil()) {
context.AddMetadata(kClusterIdKey, cluster_id_.Hex());
}
return std::move(context);
}
grpc::ClientContext PrepareContext(int64_t timeout_ms) {
grpc::ClientContext context;
if (timeout_ms != -1) {
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::milliseconds(timeout_ms));
}
if (!cluster_id_.IsNil()) {
context.AddMetadata(kClusterIdKey, cluster_id_.Hex());
}
return context;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiler complains because there's no copy constructor for ClientContext. Moved the ClientContext out of the helper the way it was before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, interesting... because you actually can return a unique_ptr directly.

@@ -432,8 +416,7 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms,
Status PythonGcsClient::RequestClusterResourceConstraint(
int64_t timeout_ms,
const std::vector<std::unordered_map<std::string, double>> &bundles) {
grpc::ClientContext context;
GrpcClientContextWithTimeoutMs(context, timeout_ms);
grpc::ClientContext &&context = PrepareContext(timeout_ms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
grpc::ClientContext &&context = PrepareContext(timeout_ms);
grpc::ClientContext context = PrepareContext(timeout_ms);

Copy link
Contributor

@fishbone fishbone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it looks good.

@fishbone
Copy link
Contributor

Could you run the failed release test to make sure it's working?

src/ray/gcs/gcs_client/gcs_client.cc Outdated Show resolved Hide resolved
src/ray/raylet/main.cc Outdated Show resolved Hide resolved
@@ -83,6 +83,9 @@ def __init__(
)
self.all_processes: dict = {}
self.removal_lock = threading.Lock()
self.cluster_id = (
ray_params.cluster_id if hasattr(ray_params, "cluster_id") else None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why hasattr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally it was because there was no enforcement that the attr was even declared, and when it's not declared, it throws exception instead of returning None (i.e. if ray_params.cluster_id = cluster_id is only called sometimes).

But RayParams is fixed now per your other comment.

python/ray/_private/workers/default_worker.py Show resolved Hide resolved
python/ray/_private/node.py Outdated Show resolved Hide resolved
python/ray/_private/worker.py Outdated Show resolved Hide resolved
src/ray/gcs/gcs_client/gcs_client.cc Outdated Show resolved Hide resolved
@vitsai vitsai force-pushed the gcs-raylet-plumb branch from 5e60f9a to 983eb0c Compare July 20, 2023 08:32
Copy link
Contributor Author

@vitsai vitsai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good comments

Comment on lines 244 to 256
inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) {
grpc::ClientContext context;
if (timeout_ms != -1) {
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::milliseconds(timeout_ms));
}
if (!cluster_id_.IsNil()) {
context.AddMetadata(kClusterIdKey, cluster_id_.Hex());
}
return std::move(context);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiler complains because there's no copy constructor for ClientContext. Moved the ClientContext out of the helper the way it was before.

@@ -83,6 +83,9 @@ def __init__(
)
self.all_processes: dict = {}
self.removal_lock = threading.Lock()
self.cluster_id = (
ray_params.cluster_id if hasattr(ray_params, "cluster_id") else None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally it was because there was no enforcement that the attr was even declared, and when it's not declared, it throws exception instead of returning None (i.e. if ray_params.cluster_id = cluster_id is only called sometimes).

But RayParams is fixed now per your other comment.

@@ -231,7 +232,20 @@ class RAY_EXPORT PythonGcsClient {
const std::vector<std::unordered_map<std::string, double>> &bundles);
Status GetClusterStatus(int64_t timeout_ms, std::string &serialized_reply);

ClusterID GetClusterId() const { return cluster_id_; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const ClusterID& maybe? not big deal btw.

Copy link
Contributor

@fishbone fishbone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Let's make sure the broken release test passes

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

python/ray/_private/node.py Outdated Show resolved Hide resolved
python/ray/_private/node.py Outdated Show resolved Hide resolved
python/ray/_private/parameter.py Show resolved Hide resolved
python/ray/includes/unique_ids.pxd Outdated Show resolved Hide resolved
src/ray/gcs/gcs_client/gcs_client.cc Outdated Show resolved Hide resolved

auto status =
GrpcStatusToRayStatus(node_info_stub_->GetClusterId(&context, request, &reply));
while (!status.IsTimedOut()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fail after several retries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like all the other methods use timeouts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added retries to emulate the @_auto_reconnect on the python side

src/ray/raylet/main.cc Outdated Show resolved Hide resolved
python/ray/_private/node.py Outdated Show resolved Hide resolved
@vitsai vitsai force-pushed the gcs-raylet-plumb branch 2 times, most recently from e50f292 to f7028b9 Compare July 31, 2023 21:12
@vitsai vitsai force-pushed the gcs-raylet-plumb branch from 68b3cd5 to 9b6f671 Compare August 1, 2023 21:03
@vitsai vitsai force-pushed the gcs-raylet-plumb branch from 9be9cf3 to 40c36cf Compare August 3, 2023 04:34
assert "(raylet)" not in out_str
assert "(raylet)" not in err_str

def check_output(blob):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure, but the test is to make sure no logs are streamed to worker from raylet. SHould we keep that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we added new logs in this change that aren't errors, so we broke that invariant. Probably doesn't make sense to test for it anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fixed

f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}"
" for details"
)
if hasattr(self, "_logs_dir"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to check the attr now? Could you add comment there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was always a bug, but the path never got triggered. In the init path _logs_dir is inited after GcsClient

@@ -1126,6 +1126,7 @@ def init(
namespace: Optional[str] = None,
runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821
storage: Optional[str] = None,
no_gcs: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't change the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a global variable then? initialization order might be tricky

Copy link
Contributor Author

@vitsai vitsai Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the problem is that some tests don't start GCS, and then the newly added RPC hangs.

Copy link
Contributor

@fishbone fishbone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't change the API. Still in progress of reviewing.

vitsai added 3 commits August 8, 2023 23:44
Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <victoria@anyscale.com>
@vitsai vitsai force-pushed the gcs-raylet-plumb branch from 6702492 to af111c7 Compare August 9, 2023 06:56
vitsai added 2 commits August 9, 2023 22:10
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
@fishbone fishbone merged commit cfe608c into ray-project:master Aug 10, 2023
edoakes added a commit to edoakes/ray that referenced this pull request Aug 10, 2023
This reverts commit cfe608c.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
edoakes added a commit that referenced this pull request Aug 10, 2023
This reverts commit cfe608c.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
shrekris-anyscale pushed a commit to shrekris-anyscale/ray that referenced this pull request Aug 10, 2023
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer.

Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
shrekris-anyscale pushed a commit to shrekris-anyscale/ray that referenced this pull request Aug 10, 2023
ray-project#38320)

This reverts commit cfe608c.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
jjyao pushed a commit that referenced this pull request Aug 11, 2023
Redo #37583

Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer.

Signed-off-by: NripeshN <nn2012@hw.ac.uk>
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
ray-project#38320)

This reverts commit cfe608c.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
Redo ray-project#37583

Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer.

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
ray-project#38320)

This reverts commit cfe608c.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
Redo ray-project#37583

Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer.

Signed-off-by: Victor <vctr.y.m@example.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
ray-project#38320)

This reverts commit cfe608c.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: Victor <vctr.y.m@example.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
Redo ray-project#37583

Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
Signed-off-by: Victor <vctr.y.m@example.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants