Skip to content

Commit

Permalink
squashed changes before rebase
Browse files Browse the repository at this point in the history
[core] Add ClusterID token to GRPC server [1/n] (ray-project#36517)

First of a stack of changes to plumb through token exchange between GCS client and server. This adds a ClusterID token that can be passed to a GRPC server, which then initializes each component GRPC service with the token by passing to the ServerCallFactory objects when they are set up. When the factories create ServerCall objects for the GRPC service completion queue, this token is also passed to the ServerCall to check against inbound request metadata. The actual authentication check does not take place in this PR.

Note: This change also minorly cleans up some code in GCS server (changes a string check to use an enum).

Next change (client-side analogue): ray-project#36526

[core] Generate GCS server token

Signed-off-by: vitsai <victoria@anyscale.com>

Add client-side logic for setting cluster ID.

Signed-off-by: vitsai <victoria@anyscale.com>

bug fixes

Signed-off-by: vitsai <victoria@anyscale.com>

comments

Signed-off-by: vitsai <victoria@anyscale.com>

bug workaround

Signed-off-by: vitsai <victoria@anyscale.com>

Fix windows build

Signed-off-by: vitsai <victoria@anyscale.com>

fix bug

Signed-off-by: vitsai <victoria@anyscale.com>

remove auth stuff from this pr

Signed-off-by: vitsai <victoria@anyscale.com>

fix mock build

Signed-off-by: vitsai <victoria@anyscale.com>

comments

Signed-off-by: vitsai <victoria@anyscale.com>

remove future

Signed-off-by: vitsai <victoria@anyscale.com>

Remove top-level changes

Signed-off-by: vitsai <victoria@anyscale.com>

comments

Signed-off-by: vitsai <victoria@anyscale.com>

Peel back everything that's not grpc-layer changes

Signed-off-by: vitsai <victoria@anyscale.com>

Change atomic to mutex

Signed-off-by: vitsai <victoria@anyscale.com>

Fix alignment of SafeClusterID

Signed-off-by: vitsai <victoria@anyscale.com>

comments

Signed-off-by: vitsai <victoria@anyscale.com>

Add back everything in GCS server except RPC definition

Signed-off-by: vitsai <victoria@anyscale.com>

fix bug

Signed-off-by: vitsai <victoria@anyscale.com>

comments

Signed-off-by: vitsai <victoria@anyscale.com>

comments

Signed-off-by: vitsai <victoria@anyscale.com>
  • Loading branch information
vitsai committed Jul 10, 2023
1 parent 19a9e3a commit 6cb5508
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
38 changes: 38 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
RAY_LOG(FATAL) << "Unexpected storage type: " << storage_type_;
}

// Init KV Manager. This needs to be initialized first here so that
// it can be used to retrieve the cluster ID.
InitKVManager();
CacheAndSetClusterId();

auto on_done = [this](const ray::Status &status) {
RAY_CHECK(status.ok()) << "Failed to put internal config";
this->main_service_.stop();
Expand Down Expand Up @@ -181,6 +186,39 @@ void GcsServer::GetOrGenerateClusterId(
});
}

void GcsServer::CacheAndSetClusterId() {
static std::string const kTokenNamespace = "cluster";
kv_manager_->GetInstance().Get(
kTokenNamespace, kClusterIdKey, [this](std::optional<std::string> token) mutable {
if (!token.has_value()) {
ClusterID cluster_id = ClusterID::FromRandom();
RAY_LOG(INFO) << "No existing server token found. Generating new token: "
<< cluster_id.Hex();
kv_manager_->GetInstance().Put(kTokenNamespace,
kClusterIdKey,
cluster_id.Binary(),
false,
[this, &cluster_id](bool added_entry) mutable {
RAY_CHECK(added_entry)
<< "Failed to persist new token!";
rpc_server_.SetClusterId(cluster_id);
main_service_.stop();
});
} else {
ClusterID cluster_id = ClusterID::FromBinary(token.value());
RAY_LOG(INFO) << "Found existing server token: " << cluster_id;
rpc_server_.SetClusterId(cluster_id);
main_service_.stop();
}
});
// This will run the async Get and Put inline.
main_service_.run();
main_service_.restart();

// Check the cluster ID exists. There is a RAY_CHECK in here.
RAY_UNUSED(rpc_server_.GetClusterId());
}

void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init cluster resource scheduler.
InitClusterResourceScheduler();
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <atomic>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/runtime_env_manager.h"
Expand Down

0 comments on commit 6cb5508

Please sign in to comment.