Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] GCS version auth [5/n] #36073

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/ray/common/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace ray {
#define STATUS_CODE_NOT_FOUND "NotFound"
#define STATUS_CODE_DISCONNECTED "Disconnected"
#define STATUS_CODE_SCHEDULING_CANCELLED "SchedulingCancelled"
#define STATUS_CODE_AUTH_ERROR "AuthError"
// object store status
#define STATUS_CODE_OBJECT_EXISTS "ObjectExists"
#define STATUS_CODE_OBJECT_NOT_FOUND "ObjectNotFound"
Expand Down Expand Up @@ -114,6 +115,7 @@ std::string Status::CodeAsString() const {
{StatusCode::TransientObjectStoreFull, STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL},
{StatusCode::GrpcUnavailable, STATUS_CODE_GRPC_UNAVAILABLE},
{StatusCode::GrpcUnknown, STATUS_CODE_GRPC_UNKNOWN},
{StatusCode::AuthError, STATUS_CODE_AUTH_ERROR},
};

auto it = code_to_str.find(code());
Expand Down Expand Up @@ -149,6 +151,7 @@ StatusCode Status::StringToCode(const std::string &str) {
{STATUS_CODE_OBJECT_UNKNOWN_OWNER, StatusCode::ObjectUnknownOwner},
{STATUS_CODE_OBJECT_STORE_FULL, StatusCode::ObjectStoreFull},
{STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL, StatusCode::TransientObjectStoreFull},
{STATUS_CODE_AUTH_ERROR, StatusCode::AuthError},
};

auto it = str_to_code.find(str);
Expand Down
10 changes: 8 additions & 2 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ enum class StatusCode : char {
ObjectUnknownOwner = 29,
RpcError = 30,
OutOfResource = 31,
// Meaning the ObjectRefStream reaches to the end of stream.
ObjectRefEndOfStream = 32
ObjectRefEndOfStream = 32,
AuthError = 33,
};

#if defined(__clang__)
Expand Down Expand Up @@ -252,6 +252,10 @@ class RAY_EXPORT Status {
return Status(StatusCode::OutOfResource, msg);
}

static Status AuthError(const std::string &msg) {
return Status(StatusCode::AuthError, msg);
}

static StatusCode StringToCode(const std::string &str);

// Returns true iff the status indicates success.
Expand Down Expand Up @@ -303,6 +307,8 @@ class RAY_EXPORT Status {

bool IsOutOfResource() const { return code() == StatusCode::OutOfResource; }

bool IsAuthError() const { return code() == StatusCode::AuthError; }

// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class JobInfoAccessor {
class NodeInfoAccessor {
public:
NodeInfoAccessor() = default;
explicit NodeInfoAccessor(GcsClient *client_impl);
NodeInfoAccessor(GcsClient *client_impl);
vitsai marked this conversation as resolved.
Show resolved Hide resolved
virtual ~NodeInfoAccessor() = default;
/// Register local node to GCS asynchronously.
///
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ Status GcsClient::Connect(instrumented_io_context &io_service,

// Init GCS subscriber instance.
gcs_subscriber_ = std::make_unique<GcsSubscriber>(gcs_address, std::move(subscriber));

job_accessor_ = std::make_unique<JobInfoAccessor>(this);
actor_accessor_ = std::make_unique<ActorInfoAccessor>(this);
node_accessor_ = std::make_unique<NodeInfoAccessor>(this);
Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
rpc::CheckAliveReply reply;
auto status = stub->CheckAlive(&context, request, &reply);
// If it is in memory, we don't have the new token until we connect again.
if (!status.ok()) {
if (!((!in_memory && !status.ok()) ||
(in_memory && GrpcStatusToRayStatus(status).IsAuthError()))) {
RAY_LOG(WARNING) << "Unable to reach GCS: " << status.error_code() << " "
<< status.error_message();
continue;
Expand Down Expand Up @@ -885,6 +886,7 @@ TEST_P(GcsClientTest, TestGcsTableReload) {

// Restart GCS.
RestartGcsServer();
RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_));

// Get information of nodes from GCS.
std::vector<rpc::GcsNodeInfo> node_list = GetNodeInfoList();
Expand Down Expand Up @@ -981,6 +983,7 @@ TEST_P(GcsClientTest, TestEvictExpiredDestroyedActors) {

// Restart GCS.
RestartGcsServer();
RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_));

for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
Expand Down Expand Up @@ -1008,13 +1011,15 @@ TEST_P(GcsClientTest, TestGcsAuth) {
RestartGcsServer();
auto node_info = Mocker::GenNodeInfo();

EXPECT_FALSE(RegisterNode(*node_info));
RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_));
EXPECT_TRUE(RegisterNode(*node_info));
}

TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) {
// Restart GCS.
RestartGcsServer();
RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_));
if (RayConfig::instance().gcs_storage() == gcs::GcsServer::kInMemoryStorage) {
RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_));
}
Expand Down
14 changes: 8 additions & 6 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/gcs/gcs_server/gcs_server.h"

#include <fstream>
#include <future>

#include "ray/common/asio/asio_util.h"
#include "ray/common/asio/instrumented_io_context.h"
Expand Down Expand Up @@ -59,6 +60,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
rpc_server_(config.grpc_server_name,
config.grpc_server_port,
config.node_ip_address == "127.0.0.1",
ClusterID::Nil(),
config.grpc_server_thread_num,
/*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()),
client_call_manager_(main_service,
Expand Down Expand Up @@ -157,11 +159,11 @@ void GcsServer::GetOrGenerateClusterId(
kv_manager_->GetInstance().Get(
kTokenNamespace,
kClusterIdKey,
[this, continuation = std::move(continuation)](
std::optional<std::string> provided_cluster_id) mutable {
if (!provided_cluster_id.has_value()) {
[this,
continuation = std::move(continuation)](std::optional<std::string> token) mutable {
if (!token.has_value()) {
ClusterID cluster_id = ClusterID::FromRandom();
RAY_LOG(INFO) << "No existing server cluster ID found. Generating new ID: "
RAY_LOG(INFO) << "No existing server token found. Generating new token: "
<< cluster_id.Hex();
kv_manager_->GetInstance().Put(
kTokenNamespace,
Expand All @@ -170,11 +172,11 @@ void GcsServer::GetOrGenerateClusterId(
false,
[&cluster_id,
continuation = std::move(continuation)](bool added_entry) mutable {
RAY_CHECK(added_entry) << "Failed to persist new cluster ID!";
RAY_CHECK(added_entry) << "Failed to persist new token!";
continuation(cluster_id);
});
} else {
ClusterID cluster_id = ClusterID::FromBinary(provided_cluster_id.value());
ClusterID cluster_id = ClusterID::FromBinary(token.value());
RAY_LOG(INFO) << "Found existing server token: " << cluster_id;
continuation(cluster_id);
}
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <atomic>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/runtime_env_manager.h"
Expand Down
1 change: 1 addition & 0 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ ObjectManager::ObjectManager(
object_manager_server_("ObjectManager",
config_.object_manager_port,
config_.object_manager_address == "127.0.0.1",
ClusterID::Nil(),
config_.rpc_service_threads_number),
object_manager_service_(rpc_service_, *this),
client_call_manager_(
Expand Down
Loading