From 1007af1922e3f53abc6cb6d50ba510d599095d67 Mon Sep 17 00:00:00 2001 From: vitsai Date: Tue, 18 Jul 2023 11:35:50 -0700 Subject: [PATCH 01/27] add cluster ID to python layer Signed-off-by: vitsai --- python/ray/_private/node.py | 7 +- python/ray/_private/services.py | 3 + python/ray/_private/worker.py | 2 + python/ray/_private/workers/default_worker.py | 7 ++ python/ray/_raylet.pxd | 1 - python/ray/_raylet.pyx | 21 ++++- python/ray/includes/common.pxd | 7 +- python/ray/includes/libcoreworker.pxd | 2 + python/ray/includes/unique_ids.pxd | 11 +++ src/ray/common/id.h | 1 + src/ray/core_worker/core_worker.cc | 2 +- src/ray/core_worker/core_worker_options.h | 3 + src/ray/gcs/gcs_client/gcs_client.cc | 92 +++++++++++-------- src/ray/gcs/gcs_client/gcs_client.h | 15 ++- src/ray/raylet/main.cc | 9 +- src/ray/rpc/gcs_server/gcs_rpc_client.h | 2 +- 16 files changed, 133 insertions(+), 52 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index cdf4dd079734..a325ae504fa2 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -83,6 +83,9 @@ def __init__( ) self.all_processes: dict = {} self.removal_lock = threading.Lock() + self.cluster_id = ( + ray_params.cluster_id if hasattr(ray_params, "cluster_id") else None + ) # Set up external Redis when `RAY_REDIS_ADDRESS` is specified. redis_address_env = os.environ.get("RAY_REDIS_ADDRESS") @@ -648,7 +651,8 @@ def _init_gcs_client(self): last_ex = None try: gcs_address = self.gcs_address - client = GcsClient(address=gcs_address) + client = GcsClient(address=gcs_address, cluster_id=self.cluster_id) + self.cluster_id = client.get_cluster_id() if self.head: # Send a simple request to make sure GCS is alive # if it's a head node. @@ -1064,6 +1068,7 @@ def start_raylet( self._ray_params.node_manager_port, self._raylet_socket_name, self._plasma_store_socket_name, + self._cluster_id, self._ray_params.worker_path, self._ray_params.setup_worker_path, self._ray_params.storage, diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 96671793928b..996a2289cfa2 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1357,6 +1357,7 @@ def start_raylet( node_manager_port: int, raylet_name: str, plasma_store_name: str, + cluster_id: str, worker_path: str, setup_worker_path: str, storage: str, @@ -1538,6 +1539,7 @@ def start_raylet( f"--session-name={session_name}", f"--temp-dir={temp_dir}", f"--webui={webui}", + f"--cluster-id={cluster_id}", ] ) @@ -1643,6 +1645,7 @@ def start_raylet( f"--gcs-address={gcs_address}", f"--session-name={session_name}", f"--labels={labels_json_str}", + f"--cluster-id={cluster_id}", ] if is_head_node: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 6b053b43e49d..550c8429c03e 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -463,6 +463,7 @@ def __init__(self): # different drivers that connect to the same Serve instance. # See https://github.com/ray-project/ray/pull/35070. self._filter_logs_by_job = True + self.cluster_id = None @property def connected(self): @@ -2262,6 +2263,7 @@ def connect( runtime_env_hash, startup_token, session_name, + node.cluster_id, "" if mode != SCRIPT_MODE else entrypoint, worker_launch_time_ms, worker_launched_time_ms, diff --git a/python/ray/_private/workers/default_worker.py b/python/ray/_private/workers/default_worker.py index 6000b4bd6fe2..6b49b685f707 100644 --- a/python/ray/_private/workers/default_worker.py +++ b/python/ray/_private/workers/default_worker.py @@ -17,6 +17,12 @@ parser = argparse.ArgumentParser( description=("Parse addresses for the worker to connect to.") ) +parser.add_argument( + "--cluster-id", + required=True, + type=str, + help="the auto-generated ID of the cluster", +) parser.add_argument( "--node-ip-address", required=True, @@ -207,6 +213,7 @@ gcs_address=args.gcs_address, session_name=args.session_name, webui=args.webui, + cluster_id=args.cluster_id, ) node = ray._private.node.Node( ray_params, diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 4099ac64645e..a465c15f952f 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -102,7 +102,6 @@ cdef class ObjectRef(BaseID): cdef CObjectID native(self) - cdef class ActorID(BaseID): cdef CActorID data diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 950ba8e22014..b4070bc3d5aa 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -129,8 +129,9 @@ from ray.includes.common cimport ( ) from ray.includes.unique_ids cimport ( CActorID, - CObjectID, + CClusterID, CNodeID, + CObjectID, CPlacementGroupID, ObjectIDIndexType, ) @@ -2335,16 +2336,25 @@ cdef class GcsClient: shared_ptr[CPythonGcsClient] inner object address object _nums_reconnect_retry + CClusterID cluster_id - def __cinit__(self, address, nums_reconnect_retry=5): + def __cinit__(self, address, nums_reconnect_retry=5, cluster_id=None): cdef GcsClientOptions gcs_options = GcsClientOptions.from_gcs_address(address) self.inner.reset(new CPythonGcsClient(dereference(gcs_options.native()))) self.address = address self._nums_reconnect_retry = nums_reconnect_retry + self.cluster_id = CClusterID.Nil() if cluster_id is None else CClusterID.FromHex(cluster_id) self._connect() - def _connect(self): - check_status(self.inner.get().Connect()) + def _connect(self, timeout=None): + cdef: + int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + with nogil: + status = self.inner.get().Connect(self.cluster_id, timeout_ms) + check_status(status) + + def get_cluster_id(self): + return self.cluster_id.Hex() @property def address(self): @@ -2844,7 +2854,7 @@ cdef class CoreWorker: node_ip_address, node_manager_port, raylet_ip_address, local_mode, driver_name, stdout_file, stderr_file, serialized_job_config, metrics_agent_port, runtime_env_hash, - startup_token, session_name, entrypoint, + startup_token, session_name, cluster_id, entrypoint, worker_launch_time_ms, worker_launched_time_ms): self.is_local_mode = local_mode @@ -2896,6 +2906,7 @@ cdef class CoreWorker: options.runtime_env_hash = runtime_env_hash options.startup_token = startup_token options.session_name = session_name + options.cluster_id = CClusterID.FromHex(cluster_id) options.entrypoint = entrypoint options.worker_launch_time_ms = worker_launch_time_ms options.worker_launched_time_ms = worker_launched_time_ms diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 66c54b12f285..e546a06222ad 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -12,6 +12,7 @@ from ray.includes.optional cimport ( from ray.includes.unique_ids cimport ( CActorID, CJobID, + CClusterID, CWorkerID, CObjectID, CTaskID, @@ -367,8 +368,9 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: cdef cppclass CPythonGcsClient "ray::gcs::PythonGcsClient": CPythonGcsClient(const CGcsClientOptions &options) - CRayStatus Connect() - + CRayStatus Connect( + CClusterID &cluster_id, + int64_t timeout_ms) CRayStatus CheckAlive( const c_vector[c_string] &raylet_addresses, int64_t timeout_ms, @@ -412,7 +414,6 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: int64_t timeout_ms, c_bool &is_accepted) - cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil: unordered_map[c_string, double] PythonGetResourcesTotal( const CGcsNodeInfo& node_info) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 3afb811405d2..1f52bbea0af0 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -13,6 +13,7 @@ from libcpp.vector cimport vector as c_vector from ray.includes.unique_ids cimport ( CActorID, + CClusterID, CNodeID, CJobID, CTaskID, @@ -359,6 +360,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_bool connect_on_start int runtime_env_hash int startup_token + CClusterID cluster_id c_string session_name c_string entrypoint int64_t worker_launch_time_ms diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 2fb14e6322c0..ab0f3663c499 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -7,6 +7,9 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: @staticmethod T FromBinary(const c_string &binary) + @staticmethod + T FromHex(const c_string &hex) + @staticmethod const T Nil() @@ -154,6 +157,14 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: CTaskID TaskId() const + cdef cppclass CClusterID "ray::ClusterID"(CUniqueID): + + @staticmethod + CClusterID FromHex(const c_string &hex_str) + + @staticmethod + const CClusterID Nil() + cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID): @staticmethod diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 7c5f430830ab..6ed0af57f819 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -395,6 +395,7 @@ std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id); type() : UniqueID() {} \ static type FromRandom() { return type(UniqueID::FromRandom()); } \ static type FromBinary(const std::string &binary) { return type(binary); } \ + static type FromHex(const std::string &hex) { return type(UniqueID::FromHex(hex)); } \ static type Nil() { return type(UniqueID::Nil()); } \ static constexpr size_t Size() { return kUniqueIDSize; } \ \ diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2990f121efa5..8e63513eac75 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -229,7 +229,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ gcs_client_ = std::make_shared(options_.gcs_options, GetWorkerID()); - RAY_CHECK_OK(gcs_client_->Connect(io_service_)); + RAY_CHECK_OK(gcs_client_->Connect(io_service_, options_.cluster_id)); RegisterToGcs(options_.worker_launch_time_ms, options_.worker_launched_time_ms); // Initialize the task state event buffer. diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h index be1b5e002775..05623bb25d36 100644 --- a/src/ray/core_worker/core_worker_options.h +++ b/src/ray/core_worker/core_worker_options.h @@ -92,6 +92,7 @@ struct CoreWorkerOptions { metrics_agent_port(-1), connect_on_start(true), runtime_env_hash(0), + cluster_id(ClusterID::Nil()), session_name(""), entrypoint(""), worker_launch_time_ms(-1), @@ -182,6 +183,8 @@ struct CoreWorkerOptions { /// may not have the same pid as the process the worker pool /// starts (due to shim processes). StartupToken startup_token{0}; + /// Cluster ID associated with the core worker. + ClusterID cluster_id; /// The function to allocate a new object for the memory store. /// This allows allocating the objects in the language frontend's memory. /// For example, for the Java worker, we can allocate the objects in the JVM heap diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index dfc4b2c5cfd9..d69576ac70ce 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -83,7 +83,7 @@ GcsClient::GcsClient(const GcsClientOptions &options, UniqueID gcs_client_id) Status GcsClient::Connect(instrumented_io_context &io_service) { // Connect to gcs service. - client_call_manager_ = std::make_unique(io_service); + client_call_manager_ = std::make_unique(io_service, cluster_id); gcs_rpc_client_ = std::make_shared( options_.gcs_address_, options_.gcs_port_, *client_call_manager_); @@ -145,7 +145,44 @@ std::pair GcsClient::GetGcsServerAddress() const { PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(options) {} -Status PythonGcsClient::Connect() { +namespace { +Status HandleGcsError(rpc::GcsStatus status) { + RAY_CHECK(status.code() != static_cast(StatusCode::OK)); + return Status::Invalid(status.message() + + " [GCS status code: " + std::to_string(status.code()) + "]"); +} +} // namespace + +Status PythonGcsClient::Connect(ClusterID &cluster_id, int64_t timeout_ms) { + if (cluster_id.IsNil()) { + RAY_LOG(INFO) << "Retrieving cluster ID from GCS server."; + grpc::ClientContext &&context = PrepareContext(timeout_ms); + + rpc::GetClusterIdRequest request; + rpc::GetClusterIdReply reply; + + auto status = + GrpcStatusToRayStatus(node_info_stub_->GetClusterId(&context, request, &reply)); + while (!status.IsTimedOut()) { + if (status.ok()) { + cluster_id = ClusterID::FromBinary(reply.cluster_id()); + RAY_CHECK(!cluster_id.IsNil()); + cluster_id_ = cluster_id; + RAY_LOG(INFO) << "Received cluster ID from GCS server: " << cluster_id_; + break; + } else if (!status.IsGrpcError()) { + return HandleGcsError(reply.status()); + } else { + status = GrpcStatusToRayStatus( + node_info_stub_->GetClusterId(&context, request, &reply)); + } + } + } else { + cluster_id_ = cluster_id; + RAY_LOG(INFO) << "Client initialized with provided cluster ID: " << cluster_id_; + } + + RAY_CHECK(!cluster_id.IsNil()) << "Unexpected nil cluster ID."; channel_ = rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_); kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_); @@ -156,24 +193,10 @@ Status PythonGcsClient::Connect() { return Status::OK(); } -Status HandleGcsError(rpc::GcsStatus status) { - RAY_CHECK(status.code() != static_cast(StatusCode::OK)); - return Status::Invalid(status.message() + - " [GCS status code: " + std::to_string(status.code()) + "]"); -} - -void GrpcClientContextWithTimeoutMs(grpc::ClientContext &context, int64_t timeout_ms) { - if (timeout_ms != -1) { - context.set_deadline(std::chrono::system_clock::now() + - std::chrono::milliseconds(timeout_ms)); - } -} - Status PythonGcsClient::CheckAlive(const std::vector &raylet_addresses, int64_t timeout_ms, std::vector &result) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::CheckAliveRequest request; for (const auto &address : raylet_addresses) { @@ -198,8 +221,7 @@ Status PythonGcsClient::InternalKVGet(const std::string &ns, const std::string &key, int64_t timeout_ms, std::string &value) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::InternalKVGetRequest request; request.set_namespace_(ns); @@ -225,8 +247,7 @@ Status PythonGcsClient::InternalKVMultiGet( const std::vector &keys, int64_t timeout_ms, std::unordered_map &result) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::InternalKVMultiGetRequest request; request.set_namespace_(ns); @@ -257,8 +278,7 @@ Status PythonGcsClient::InternalKVPut(const std::string &ns, bool overwrite, int64_t timeout_ms, int &added_num) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::InternalKVPutRequest request; request.set_namespace_(ns); @@ -284,8 +304,7 @@ Status PythonGcsClient::InternalKVDel(const std::string &ns, bool del_by_prefix, int64_t timeout_ms, int &deleted_num) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::InternalKVDelRequest request; request.set_namespace_(ns); @@ -309,8 +328,7 @@ Status PythonGcsClient::InternalKVKeys(const std::string &ns, const std::string &prefix, int64_t timeout_ms, std::vector &results) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::InternalKVKeysRequest request; request.set_namespace_(ns); @@ -333,8 +351,7 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns, const std::string &key, int64_t timeout_ms, bool &exists) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::InternalKVExistsRequest request; request.set_namespace_(ns); @@ -356,8 +373,7 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns, Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::PinRuntimeEnvURIRequest request; request.set_uri(uri); @@ -384,8 +400,7 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri, Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms, std::vector &result) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::GetAllNodeInfoRequest request; rpc::GetAllNodeInfoReply reply; @@ -404,8 +419,7 @@ Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms, Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, std::vector &result) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::GetAllJobInfoRequest request; rpc::GetAllJobInfoReply reply; @@ -426,8 +440,7 @@ Status PythonGcsClient::RequestClusterResourceConstraint( int64_t timeout_ms, const std::vector> &bundles, const std::vector &count_array) { - grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + grpc::ClientContext &&context = PrepareContext(timeout_ms); rpc::autoscaler::RequestClusterResourceConstraintRequest request; rpc::autoscaler::RequestClusterResourceConstraintReply reply; @@ -516,7 +529,10 @@ Status PythonCheckGcsHealth(const std::string &gcs_address, auto channel = rpc::GcsRpcClient::CreateGcsChannel(gcs_address, gcs_port); auto stub = rpc::NodeInfoGcsService::NewStub(channel); grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + if (timeout_ms != -1) { + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::milliseconds(timeout_ms)); + } rpc::CheckAliveRequest request; rpc::CheckAliveReply reply; grpc::Status status = stub->CheckAlive(&context, request, &reply); diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 318ef69d6763..3b8f3019bcda 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -191,7 +191,7 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this { class RAY_EXPORT PythonGcsClient { public: explicit PythonGcsClient(const GcsClientOptions &options); - Status Connect(); + Status Connect(ClusterID &cluster_id, int64_t timeout_ms = -1); Status CheckAlive(const std::vector &raylet_addresses, int64_t timeout_ms, @@ -242,6 +242,19 @@ class RAY_EXPORT PythonGcsClient { bool &is_accepted); private: + inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) { + grpc::ClientContext context; + if (timeout_ms != -1) { + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::milliseconds(timeout_ms)); + } + if (!cluster_id_.IsNil()) { + context.AddMetadata(kClusterIdKey, cluster_id_.Hex()); + } + return std::move(context); + } + + ClusterID cluster_id_; GcsClientOptions options_; std::unique_ptr kv_stub_; std::unique_ptr runtime_env_stub_; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 65d5f672c5aa..d01f8f6b6c81 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -69,6 +69,8 @@ DEFINE_int32(ray_debugger_external, 0, "Make Ray debugger externally accessible. DEFINE_int64(object_store_memory, -1, "The initial memory of the object store."); DEFINE_string(node_name, "", "The user-provided identifier or name for this node."); DEFINE_string(session_name, "", "Session name (ClusterID) of the cluster."); +DEFINE_string(cluster_id, "", "ID of the cluster, separate from observability."); + #ifdef __linux__ DEFINE_string(plasma_directory, "/dev/shm", @@ -153,6 +155,11 @@ int main(int argc, char *argv[]) { const std::string session_name = FLAGS_session_name; const bool is_head_node = FLAGS_head; const std::string labels_json_str = FLAGS_labels; + + ray::ClusterID cluster_id = ray::ClusterID::Nil(); + if (FLAGS_cluster_id != "") { + cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id); + } gflags::ShutDownCommandLineFlags(); // Configuration for the node manager. @@ -171,7 +178,7 @@ int main(int argc, char *argv[]) { ray::gcs::GcsClientOptions client_options(FLAGS_gcs_address); gcs_client = std::make_shared(client_options); - RAY_CHECK_OK(gcs_client->Connect(main_service)); + RAY_CHECK_OK(gcs_client->Connect(main_service, cluster_id)); std::unique_ptr raylet; RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig( diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 7c602cebda0a..94b0ca4d1b9e 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -117,7 +117,7 @@ class Executor { } \ delete executor; \ } else { \ - /* In case of GCS failure, we queue the request and these requets will be */ \ + /* In case of GCS failure, we queue the request and these requests will be */ \ /* executed once GCS is back. */ \ gcs_is_down_ = true; \ auto request_bytes = request.ByteSizeLong(); \ From d8b333d3487fe4aa9e3a00f4037f1048e2cdd193 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 20 Jul 2023 00:33:49 -0700 Subject: [PATCH 02/27] comments Signed-off-by: vitsai --- python/ray/_private/node.py | 6 +-- python/ray/_private/parameter.py | 2 + python/ray/_private/worker.py | 1 - python/ray/_raylet.pyx | 4 ++ python/ray/includes/common.pxd | 3 +- python/ray/includes/unique_ids.pxd | 2 + src/ray/common/id.h | 21 ---------- src/ray/gcs/gcs_client/gcs_client.cc | 58 +++++++++++++++++----------- src/ray/gcs/gcs_client/gcs_client.h | 11 +++--- src/ray/raylet/main.cc | 7 ++-- src/ray/rpc/client_call.h | 14 ++----- 11 files changed, 59 insertions(+), 70 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index a325ae504fa2..08190daac548 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -83,9 +83,7 @@ def __init__( ) self.all_processes: dict = {} self.removal_lock = threading.Lock() - self.cluster_id = ( - ray_params.cluster_id if hasattr(ray_params, "cluster_id") else None - ) + self.cluster_id = ray_params.cluster_id # Set up external Redis when `RAY_REDIS_ADDRESS` is specified. redis_address_env = os.environ.get("RAY_REDIS_ADDRESS") @@ -1068,7 +1066,7 @@ def start_raylet( self._ray_params.node_manager_port, self._raylet_socket_name, self._plasma_store_socket_name, - self._cluster_id, + self.cluster_id, self._ray_params.worker_path, self._ray_params.setup_worker_path, self._ray_params.storage, diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index d383f76370a4..6c1160681012 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -188,6 +188,7 @@ def __init__( env_vars: Optional[Dict[str, str]] = None, session_name: Optional[str] = None, webui: Optional[str] = None, + cluster_id: Optional[str] = None, ): self.redis_address = redis_address self.gcs_address = gcs_address @@ -249,6 +250,7 @@ def __init__( self._enable_object_reconstruction = enable_object_reconstruction self.labels = labels self._check_usage() + self.cluster_id = cluster_id # Set the internal config options for object reconstruction. if enable_object_reconstruction: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 550c8429c03e..de2d4cd9a8f2 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -463,7 +463,6 @@ def __init__(self): # different drivers that connect to the same Serve instance. # See https://github.com/ray-project/ray/pull/35070. self._filter_logs_by_job = True - self.cluster_id = None @property def connected(self): diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b4070bc3d5aa..b42f51d56f3a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2351,6 +2351,10 @@ cdef class GcsClient: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 with nogil: status = self.inner.get().Connect(self.cluster_id, timeout_ms) + + if self.cluster_id.IsNil(): + self.cluster_id = self.inner.get().GetClusterId() + assert not self.cluster_id.IsNil() check_status(status) def get_cluster_id(self): diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index e546a06222ad..0c318e7173e5 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -369,7 +369,7 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: CPythonGcsClient(const CGcsClientOptions &options) CRayStatus Connect( - CClusterID &cluster_id, + const CClusterID &cluster_id, int64_t timeout_ms) CRayStatus CheckAlive( const c_vector[c_string] &raylet_addresses, @@ -407,6 +407,7 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: CRayStatus GetClusterStatus( int64_t timeout_ms, c_string &serialized_reply) + CClusterID GetClusterId() CRayStatus DrainNode( const c_string &node_id, int32_t reason, diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index ab0f3663c499..8f07ffdab14e 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -165,6 +165,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: @staticmethod const CClusterID Nil() +# c_bool IsNil() const + cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID): @staticmethod diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 6ed0af57f819..bca6c66492ef 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -415,27 +415,6 @@ std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id); // Restore the compiler alignment to default (8 bytes). #pragma pack(pop) -struct SafeClusterID { - private: - mutable absl::Mutex m_; - ClusterID id_ GUARDED_BY(m_); - - public: - SafeClusterID(const ClusterID &id) : id_(id) {} - - const ClusterID load() const { - absl::MutexLock l(&m_); - return id_; - } - - ClusterID exchange(const ClusterID &newId) { - absl::MutexLock l(&m_); - ClusterID old = id_; - id_ = newId; - return old; - } -}; - template BaseID::BaseID() { // Using const_cast to directly change data is dangerous. The cached diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index d69576ac70ce..9eefe217ec41 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -81,7 +81,8 @@ void GcsSubscriberClient::PubsubCommandBatch( GcsClient::GcsClient(const GcsClientOptions &options, UniqueID gcs_client_id) : options_(options), gcs_client_id_(gcs_client_id) {} -Status GcsClient::Connect(instrumented_io_context &io_service) { +Status GcsClient::Connect(instrumented_io_context &io_service, + const ClusterID &cluster_id) { // Connect to gcs service. client_call_manager_ = std::make_unique(io_service, cluster_id); gcs_rpc_client_ = std::make_shared( @@ -153,10 +154,19 @@ Status HandleGcsError(rpc::GcsStatus status) { } } // namespace -Status PythonGcsClient::Connect(ClusterID &cluster_id, int64_t timeout_ms) { +Status PythonGcsClient::Connect(const ClusterID &cluster_id, int64_t timeout_ms) { + channel_ = + rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_); + kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_); + runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_); + node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_); + job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); + autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); + if (cluster_id.IsNil()) { RAY_LOG(INFO) << "Retrieving cluster ID from GCS server."; - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::GetClusterIdRequest request; rpc::GetClusterIdReply reply; @@ -165,9 +175,8 @@ Status PythonGcsClient::Connect(ClusterID &cluster_id, int64_t timeout_ms) { GrpcStatusToRayStatus(node_info_stub_->GetClusterId(&context, request, &reply)); while (!status.IsTimedOut()) { if (status.ok()) { - cluster_id = ClusterID::FromBinary(reply.cluster_id()); - RAY_CHECK(!cluster_id.IsNil()); - cluster_id_ = cluster_id; + cluster_id_ = ClusterID::FromBinary(reply.cluster_id()); + RAY_CHECK(!cluster_id_.IsNil()); RAY_LOG(INFO) << "Received cluster ID from GCS server: " << cluster_id_; break; } else if (!status.IsGrpcError()) { @@ -183,13 +192,6 @@ Status PythonGcsClient::Connect(ClusterID &cluster_id, int64_t timeout_ms) { } RAY_CHECK(!cluster_id.IsNil()) << "Unexpected nil cluster ID."; - channel_ = - rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_); - kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_); - runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_); - node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_); - job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); - autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); return Status::OK(); } @@ -221,7 +223,8 @@ Status PythonGcsClient::InternalKVGet(const std::string &ns, const std::string &key, int64_t timeout_ms, std::string &value) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::InternalKVGetRequest request; request.set_namespace_(ns); @@ -247,7 +250,8 @@ Status PythonGcsClient::InternalKVMultiGet( const std::vector &keys, int64_t timeout_ms, std::unordered_map &result) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::InternalKVMultiGetRequest request; request.set_namespace_(ns); @@ -278,7 +282,8 @@ Status PythonGcsClient::InternalKVPut(const std::string &ns, bool overwrite, int64_t timeout_ms, int &added_num) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::InternalKVPutRequest request; request.set_namespace_(ns); @@ -304,7 +309,8 @@ Status PythonGcsClient::InternalKVDel(const std::string &ns, bool del_by_prefix, int64_t timeout_ms, int &deleted_num) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::InternalKVDelRequest request; request.set_namespace_(ns); @@ -328,7 +334,8 @@ Status PythonGcsClient::InternalKVKeys(const std::string &ns, const std::string &prefix, int64_t timeout_ms, std::vector &results) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::InternalKVKeysRequest request; request.set_namespace_(ns); @@ -351,7 +358,8 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns, const std::string &key, int64_t timeout_ms, bool &exists) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::InternalKVExistsRequest request; request.set_namespace_(ns); @@ -373,7 +381,8 @@ Status PythonGcsClient::InternalKVExists(const std::string &ns, Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::PinRuntimeEnvURIRequest request; request.set_uri(uri); @@ -400,7 +409,8 @@ Status PythonGcsClient::PinRuntimeEnvUri(const std::string &uri, Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms, std::vector &result) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::GetAllNodeInfoRequest request; rpc::GetAllNodeInfoReply reply; @@ -419,7 +429,8 @@ Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms, Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, std::vector &result) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::GetAllJobInfoRequest request; rpc::GetAllJobInfoReply reply; @@ -440,7 +451,8 @@ Status PythonGcsClient::RequestClusterResourceConstraint( int64_t timeout_ms, const std::vector> &bundles, const std::vector &count_array) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::autoscaler::RequestClusterResourceConstraintRequest request; rpc::autoscaler::RequestClusterResourceConstraintReply reply; diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 3b8f3019bcda..af0d6ffb8cb8 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -84,7 +84,8 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this { /// \param instrumented_io_context IO execution service. /// /// \return Status - virtual Status Connect(instrumented_io_context &io_service); + virtual Status Connect(instrumented_io_context &io_service, + const ClusterID &cluster_id = ClusterID::Nil()); /// Disconnect with GCS Service. Non-thread safe. virtual void Disconnect(); @@ -191,7 +192,7 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this { class RAY_EXPORT PythonGcsClient { public: explicit PythonGcsClient(const GcsClientOptions &options); - Status Connect(ClusterID &cluster_id, int64_t timeout_ms = -1); + Status Connect(const ClusterID &cluster_id, int64_t timeout_ms); Status CheckAlive(const std::vector &raylet_addresses, int64_t timeout_ms, @@ -241,9 +242,10 @@ class RAY_EXPORT PythonGcsClient { int64_t timeout_ms, bool &is_accepted); + ClusterID GetClusterId() const { return cluster_id_; } + private: - inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) { - grpc::ClientContext context; + void PrepareContext(grpc::ClientContext &context, int64_t timeout_ms) { if (timeout_ms != -1) { context.set_deadline(std::chrono::system_clock::now() + std::chrono::milliseconds(timeout_ms)); @@ -251,7 +253,6 @@ class RAY_EXPORT PythonGcsClient { if (!cluster_id_.IsNil()) { context.AddMetadata(kClusterIdKey, cluster_id_.Hex()); } - return std::move(context); } ClusterID cluster_id_; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index d01f8f6b6c81..36d357cc8d76 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -156,10 +156,9 @@ int main(int argc, char *argv[]) { const bool is_head_node = FLAGS_head; const std::string labels_json_str = FLAGS_labels; - ray::ClusterID cluster_id = ray::ClusterID::Nil(); - if (FLAGS_cluster_id != "") { - cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id); - } + RAY_CHECK(FLAGS_cluster_id != "") << "Expected cluster ID."; + ray::ClusterID cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id); + RAY_LOG(INFO) << "Setting cluster ID to: " << cluster_id; gflags::ShutDownCommandLineFlags(); // Configuration for the node manager. diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index 98c1e519d3c5..b0f998aa9e8d 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -194,7 +194,7 @@ class ClientCallManager { const ClusterID &cluster_id = ClusterID::Nil(), int num_threads = 1, int64_t call_timeout_ms = -1) - : cluster_id_(ClusterID::Nil()), + : cluster_id_(cluster_id), main_service_(main_service), num_threads_(num_threads), shutdown_(false), @@ -249,7 +249,7 @@ class ClientCallManager { } auto call = std::make_shared>( - callback, cluster_id_.load(), std::move(stats_handle), method_timeout_ms); + callback, cluster_id_, std::move(stats_handle), method_timeout_ms); // Send request. // Find the next completion queue to wait for response. call->response_reader_ = (stub.*prepare_async_function)( @@ -267,14 +267,6 @@ class ClientCallManager { return call; } - void SetClusterId(const ClusterID &cluster_id) { - auto old_id = cluster_id_.exchange(ClusterID::Nil()); - if (!old_id.IsNil() && (old_id != cluster_id)) { - RAY_LOG(FATAL) << "Expected cluster ID to be Nil or " << cluster_id << ", but got" - << old_id; - } - } - /// Get the main service of this rpc. instrumented_io_context &GetMainService() { return main_service_; } @@ -328,7 +320,7 @@ class ClientCallManager { /// UUID of the cluster. Potential race between creating a ClientCall object /// and setting the cluster ID. - SafeClusterID cluster_id_; + ClusterID cluster_id_; /// The main event loop, to which the callback functions will be posted. instrumented_io_context &main_service_; From 9c7ac391606e53e6344e91304964f6a8d622851b Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 20 Jul 2023 02:00:23 -0700 Subject: [PATCH 03/27] oops Signed-off-by: vitsai --- src/ray/gcs/gcs_client/gcs_client.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 9eefe217ec41..fb968fc7a1bd 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -483,7 +483,7 @@ Status PythonGcsClient::GetClusterStatus(int64_t timeout_ms, rpc::autoscaler::GetClusterStatusRequest request; rpc::autoscaler::GetClusterStatusReply reply; grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + PrepareContext(context, timeout_ms); grpc::Status status = autoscaler_stub_->GetClusterStatus(&context, request, &reply); @@ -509,7 +509,7 @@ Status PythonGcsClient::DrainNode(const std::string &node_id, rpc::autoscaler::DrainNodeReply reply; grpc::ClientContext context; - GrpcClientContextWithTimeoutMs(context, timeout_ms); + PrepareContext(context, timeout_ms); grpc::Status status = autoscaler_stub_->DrainNode(&context, request, &reply); From 6561d65c89dc5f0a92b2f6b6e910b0a4ac81f8a2 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 20 Jul 2023 03:26:13 -0700 Subject: [PATCH 04/27] fix mock Signed-off-by: vitsai --- src/mock/ray/gcs/gcs_client/gcs_client.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/mock/ray/gcs/gcs_client/gcs_client.h b/src/mock/ray/gcs/gcs_client/gcs_client.h index e7b687d04e7d..cf232a712f51 100644 --- a/src/mock/ray/gcs/gcs_client/gcs_client.h +++ b/src/mock/ray/gcs/gcs_client/gcs_client.h @@ -31,7 +31,10 @@ namespace gcs { class MockGcsClient : public GcsClient { public: - MOCK_METHOD(Status, Connect, (instrumented_io_context & io_service), (override)); + MOCK_METHOD(Status, + Connect, + (instrumented_io_context & io_service, const ClusterID &cluster_id), + (override)); MOCK_METHOD(void, Disconnect, (), (override)); MOCK_METHOD((std::pair), GetGcsServerAddress, (), (const, override)); MOCK_METHOD(std::string, DebugString, (), (const, override)); From e3e6d0b6cf0d9a872b7be76ab4d7566cd82dc050 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 20 Jul 2023 16:31:22 -0700 Subject: [PATCH 05/27] fix bug Signed-off-by: vitsai --- python/ray/_private/node.py | 2 +- python/ray/_raylet.pyx | 7 ++++++- src/ray/gcs/gcs_client/gcs_client.cc | 4 +++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 08190daac548..5c58ced33822 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -1066,7 +1066,7 @@ def start_raylet( self._ray_params.node_manager_port, self._raylet_socket_name, self._plasma_store_socket_name, - self.cluster_id, + self.cluster_id.decode(), self._ray_params.worker_path, self._ray_params.setup_worker_path, self._ray_params.storage, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b42f51d56f3a..67ba1af17624 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2343,7 +2343,12 @@ cdef class GcsClient: self.inner.reset(new CPythonGcsClient(dereference(gcs_options.native()))) self.address = address self._nums_reconnect_retry = nums_reconnect_retry - self.cluster_id = CClusterID.Nil() if cluster_id is None else CClusterID.FromHex(cluster_id) + cdef c_string c_cluster_id + if cluster_id is None: + self.cluster_id = CClusterID.Nil() + else: + c_cluster_id = cluster_id + self.cluster_id = CClusterID.FromHex(c_cluster_id) self._connect() def _connect(self, timeout=None): diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index fb968fc7a1bd..a980e387c904 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -182,6 +182,8 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, int64_t timeout_ms) } else if (!status.IsGrpcError()) { return HandleGcsError(reply.status()); } else { + grpc::ClientContext context; + PrepareContext(context, timeout_ms); status = GrpcStatusToRayStatus( node_info_stub_->GetClusterId(&context, request, &reply)); } @@ -191,7 +193,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, int64_t timeout_ms) RAY_LOG(INFO) << "Client initialized with provided cluster ID: " << cluster_id_; } - RAY_CHECK(!cluster_id.IsNil()) << "Unexpected nil cluster ID."; + RAY_CHECK(!cluster_id_.IsNil()) << "Unexpected nil cluster ID."; return Status::OK(); } From d5ad7bce6c2c51fc42df2c2fe4a4eecf48ff7840 Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 24 Jul 2023 07:41:13 -0700 Subject: [PATCH 06/27] comments Signed-off-by: vitsai --- python/ray/_private/node.py | 5 +++-- python/ray/_private/parameter.py | 1 + python/ray/includes/unique_ids.pxd | 2 -- src/ray/gcs/gcs_client/gcs_client.cc | 2 +- src/ray/gcs/gcs_client/gcs_client.h | 2 +- src/ray/raylet/main.cc | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 5c58ced33822..067ae62e91a5 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -83,7 +83,6 @@ def __init__( ) self.all_processes: dict = {} self.removal_lock = threading.Lock() - self.cluster_id = ray_params.cluster_id # Set up external Redis when `RAY_REDIS_ADDRESS` is specified. redis_address_env = os.environ.get("RAY_REDIS_ADDRESS") @@ -649,7 +648,9 @@ def _init_gcs_client(self): last_ex = None try: gcs_address = self.gcs_address - client = GcsClient(address=gcs_address, cluster_id=self.cluster_id) + client = GcsClient( + address=gcs_address, cluster_id=self._ray_params.cluster_id + ) self.cluster_id = client.get_cluster_id() if self.head: # Send a simple request to make sure GCS is alive diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 6c1160681012..538fe09a505d 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -127,6 +127,7 @@ class RayParams: env_vars: Override environment variables for the raylet. session_name: The name of the session of the ray cluster. webui: The url of the UI. + cluster_id: The cluster ID. """ def __init__( diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 8f07ffdab14e..ab0f3663c499 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -165,8 +165,6 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: @staticmethod const CClusterID Nil() -# c_bool IsNil() const - cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID): @staticmethod diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index a980e387c904..b44e4deb94e7 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -148,7 +148,7 @@ PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(opt namespace { Status HandleGcsError(rpc::GcsStatus status) { - RAY_CHECK(status.code() != static_cast(StatusCode::OK)); + RAY_CHECK_NE(status.code(), static_cast(StatusCode::OK)); return Status::Invalid(status.message() + " [GCS status code: " + std::to_string(status.code()) + "]"); } diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index af0d6ffb8cb8..11f72513d611 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -242,7 +242,7 @@ class RAY_EXPORT PythonGcsClient { int64_t timeout_ms, bool &is_accepted); - ClusterID GetClusterId() const { return cluster_id_; } + const ClusterID &GetClusterId() const { return cluster_id_; } private: void PrepareContext(grpc::ClientContext &context, int64_t timeout_ms) { diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 36d357cc8d76..bd74db30f2a3 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -156,7 +156,7 @@ int main(int argc, char *argv[]) { const bool is_head_node = FLAGS_head; const std::string labels_json_str = FLAGS_labels; - RAY_CHECK(FLAGS_cluster_id != "") << "Expected cluster ID."; + RAY_CHECK_NE(FLAGS_cluster_id, "") << "Expected cluster ID."; ray::ClusterID cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id); RAY_LOG(INFO) << "Setting cluster ID to: " << cluster_id; gflags::ShutDownCommandLineFlags(); From 0ad6e04bfa7fbaa06f22e5ac6698508361f4359f Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 24 Jul 2023 10:33:31 -0700 Subject: [PATCH 07/27] fix test failures Signed-off-by: vitsai --- .../test/ownership_based_object_directory_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/object_manager/test/ownership_based_object_directory_test.cc b/src/ray/object_manager/test/ownership_based_object_directory_test.cc index 3b7624a604a3..a326b2c9eb65 100644 --- a/src/ray/object_manager/test/ownership_based_object_directory_test.cc +++ b/src/ray/object_manager/test/ownership_based_object_directory_test.cc @@ -101,7 +101,8 @@ class MockGcsClient : public gcs::GcsClient { return *node_accessor_; } - MOCK_METHOD1(Connect, Status(instrumented_io_context &io_service)); + MOCK_METHOD2(Connect, + Status(instrumented_io_context &io_service, const ClusterID &cluster_id)); MOCK_METHOD0(Disconnect, void()); }; From 155832845f0e1d06f49960b383dce9936e0b523d Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 24 Jul 2023 13:16:43 -0700 Subject: [PATCH 08/27] comment Signed-off-by: vitsai --- python/ray/_private/node.py | 2 +- python/ray/_raylet.pyx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 067ae62e91a5..c2fdfa6810bf 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -1067,7 +1067,7 @@ def start_raylet( self._ray_params.node_manager_port, self._raylet_socket_name, self._plasma_store_socket_name, - self.cluster_id.decode(), + self.cluster_id, self._ray_params.worker_path, self._ray_params.setup_worker_path, self._ray_params.storage, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 67ba1af17624..3d485ec5d60d 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2363,7 +2363,7 @@ cdef class GcsClient: check_status(status) def get_cluster_id(self): - return self.cluster_id.Hex() + return self.cluster_id.Hex().decode() @property def address(self): From 8c5d42bc2154e1695ce83c775c97bd3df8adc091 Mon Sep 17 00:00:00 2001 From: vitsai Date: Wed, 26 Jul 2023 09:18:43 -0700 Subject: [PATCH 09/27] change to retry Signed-off-by: vitsai --- python/ray/_raylet.pyx | 3 ++- python/ray/includes/common.pxd | 3 ++- src/ray/gcs/gcs_client/gcs_client.cc | 27 +++++++++++++-------------- src/ray/gcs/gcs_client/gcs_client.h | 2 +- 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3d485ec5d60d..39b5f6bbd77f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2354,8 +2354,9 @@ cdef class GcsClient: def _connect(self, timeout=None): cdef: int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + size_t num_retries = self._nums_reconnect_retry with nogil: - status = self.inner.get().Connect(self.cluster_id, timeout_ms) + status = self.inner.get().Connect(self.cluster_id, timeout_ms, num_retries) if self.cluster_id.IsNil(): self.cluster_id = self.inner.get().GetClusterId() diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 0c318e7173e5..1f8c25ee525a 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -370,7 +370,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: CRayStatus Connect( const CClusterID &cluster_id, - int64_t timeout_ms) + int64_t timeout_ms, + size_t num_retries) CRayStatus CheckAlive( const c_vector[c_string] &raylet_addresses, int64_t timeout_ms, diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index b44e4deb94e7..5f58c79b8d10 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -154,7 +154,9 @@ Status HandleGcsError(rpc::GcsStatus status) { } } // namespace -Status PythonGcsClient::Connect(const ClusterID &cluster_id, int64_t timeout_ms) { +Status PythonGcsClient::Connect(const ClusterID &cluster_id, + int64_t timeout_ms, + size_t num_retries) { channel_ = rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_); kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_); @@ -165,29 +167,26 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, int64_t timeout_ms) if (cluster_id.IsNil()) { RAY_LOG(INFO) << "Retrieving cluster ID from GCS server."; - grpc::ClientContext context; - PrepareContext(context, timeout_ms); - rpc::GetClusterIdRequest request; rpc::GetClusterIdReply reply; - auto status = - GrpcStatusToRayStatus(node_info_stub_->GetClusterId(&context, request, &reply)); - while (!status.IsTimedOut()) { - if (status.ok()) { + Status connect_status; + for (; num_retries > 0; num_retries--) { + grpc::ClientContext context; + PrepareContext(context, timeout_ms); + connect_status = + GrpcStatusToRayStatus(node_info_stub_->GetClusterId(&context, request, &reply)); + + if (connect_status.ok()) { cluster_id_ = ClusterID::FromBinary(reply.cluster_id()); RAY_CHECK(!cluster_id_.IsNil()); RAY_LOG(INFO) << "Received cluster ID from GCS server: " << cluster_id_; break; - } else if (!status.IsGrpcError()) { + } else if (!connect_status.IsGrpcError()) { return HandleGcsError(reply.status()); - } else { - grpc::ClientContext context; - PrepareContext(context, timeout_ms); - status = GrpcStatusToRayStatus( - node_info_stub_->GetClusterId(&context, request, &reply)); } } + RAY_RETURN_NOT_OK(connect_status); } else { cluster_id_ = cluster_id; RAY_LOG(INFO) << "Client initialized with provided cluster ID: " << cluster_id_; diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 11f72513d611..5773a0229b0b 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -192,7 +192,7 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this { class RAY_EXPORT PythonGcsClient { public: explicit PythonGcsClient(const GcsClientOptions &options); - Status Connect(const ClusterID &cluster_id, int64_t timeout_ms); + Status Connect(const ClusterID &cluster_id, int64_t timeout_ms, size_t num_retries); Status CheckAlive(const std::vector &raylet_addresses, int64_t timeout_ms, From d1cf54b8d4314ae41c1bce8b13d942ab052bf9ad Mon Sep 17 00:00:00 2001 From: vitsai Date: Wed, 26 Jul 2023 13:31:26 -0700 Subject: [PATCH 10/27] zero retry should still execute once Signed-off-by: vitsai --- src/ray/gcs/gcs_client/gcs_client.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 5f58c79b8d10..a63379bb2bfa 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -165,13 +165,15 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); + RAY_CHECK(num_retries >= 0) << "Expected non-negative retries, but got " << num_retries; + if (cluster_id.IsNil()) { RAY_LOG(INFO) << "Retrieving cluster ID from GCS server."; rpc::GetClusterIdRequest request; rpc::GetClusterIdReply reply; Status connect_status; - for (; num_retries > 0; num_retries--) { + for (; num_retries >= 0; num_retries--) { grpc::ClientContext context; PrepareContext(context, timeout_ms); connect_status = @@ -179,8 +181,8 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, if (connect_status.ok()) { cluster_id_ = ClusterID::FromBinary(reply.cluster_id()); - RAY_CHECK(!cluster_id_.IsNil()); RAY_LOG(INFO) << "Received cluster ID from GCS server: " << cluster_id_; + RAY_CHECK(!cluster_id_.IsNil()); break; } else if (!connect_status.IsGrpcError()) { return HandleGcsError(reply.status()); From f69f2a0157c8bddbeeb36b73a03f66edc6a7ed43 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 27 Jul 2023 14:09:52 +0000 Subject: [PATCH 11/27] fix retries Signed-off-by: vitsai --- src/ray/gcs/gcs_client/gcs_client.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index a63379bb2bfa..150e86b17b38 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -165,7 +165,8 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); - RAY_CHECK(num_retries >= 0) << "Expected non-negative retries, but got " << num_retries; + size_t tries = num_retries + 1; + RAY_CHECK(tries > 0) << "Expected positive retries, but got " << tries; if (cluster_id.IsNil()) { RAY_LOG(INFO) << "Retrieving cluster ID from GCS server."; @@ -173,7 +174,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, rpc::GetClusterIdReply reply; Status connect_status; - for (; num_retries >= 0; num_retries--) { + for (; tries > 0; tries--) { grpc::ClientContext context; PrepareContext(context, timeout_ms); connect_status = @@ -187,6 +188,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, } else if (!connect_status.IsGrpcError()) { return HandleGcsError(reply.status()); } + sleep(1); } RAY_RETURN_NOT_OK(connect_status); } else { From 43760dd1bb67682e56a8d765d8d5f407456a53f0 Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 31 Jul 2023 14:04:09 -0700 Subject: [PATCH 12/27] split up gpu tests Signed-off-by: vitsai --- python/ray/train/BUILD | 8 +++ python/ray/train/tests/test_gpu.py | 29 ---------- python/ray/train/tests/test_gpu_2.py | 85 ++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 29 deletions(-) create mode 100644 python/ray/train/tests/test_gpu_2.py diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD index 0df895e95a9f..f6ee27a7804d 100644 --- a/python/ray/train/BUILD +++ b/python/ray/train/BUILD @@ -342,6 +342,14 @@ py_test( deps = [":train_lib", ":conftest"] ) +py_test( + name = "test_gpu_2", + size = "large", + srcs = ["tests/test_gpu_2.py"], + tags = ["team:ml", "exclusive", "gpu_only"], + deps = [":train_lib", ":conftest"] +) + py_test( name = "test_gpu_auto_transfer", size = "medium", diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py index 7ce50a9a153a..19d271c88c84 100644 --- a/python/ray/train/tests/test_gpu.py +++ b/python/ray/train/tests/test_gpu.py @@ -359,35 +359,6 @@ def train_fn(): assert isinstance(exc_info.value.__cause__, RayTaskError) -@pytest.mark.parametrize("use_gpu", (True, False)) -def test_torch_iter_torch_batches_auto_device(ray_start_4_cpus_2_gpus, use_gpu): - """ - Tests that iter_torch_batches in TorchTrainer worker function uses the - default device. - """ - - def train_fn(): - dataset = train.get_dataset_shard("train") - for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"): - assert str(batch["data"].device) == "cpu" - - # Autodetect - for batch in dataset.iter_torch_batches(dtypes=torch.float): - assert str(batch["data"].device) == str(train.torch.get_device()) - - dataset = ray.data.from_numpy(np.array([[1, 2, 3, 4, 5], [1, 2, 3, 4, 5]]).T) - # Test that this works outside a Train function - for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"): - assert str(batch["data"].device) == "cpu" - - trainer = TorchTrainer( - train_fn, - scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu), - datasets={"train": dataset}, - ) - trainer.fit() - - if __name__ == "__main__": import sys diff --git a/python/ray/train/tests/test_gpu_2.py b/python/ray/train/tests/test_gpu_2.py new file mode 100644 index 000000000000..1c772d38a1e3 --- /dev/null +++ b/python/ray/train/tests/test_gpu_2.py @@ -0,0 +1,85 @@ +import os +import time + +from unittest.mock import patch +import pytest +import numpy as np +import torch +import torchvision +from torch.nn.parallel import DistributedDataParallel +from torch.utils.data import DataLoader, DistributedSampler + +import ray +import ray.data +from ray.exceptions import RayTaskError +from ray.air import session +from ray import tune + +import ray.train as train +from ray.air.config import ScalingConfig +from ray.train.constants import DEFAULT_NCCL_SOCKET_IFNAME +from ray.train.examples.pytorch.torch_linear_example import LinearDataset +from ray.train.torch.config import TorchConfig, _TorchBackend +from ray.train.torch.torch_trainer import TorchTrainer +from ray.train.trainer import TrainingFailedError +from ray.train._internal.worker_group import WorkerGroup + + +class LinearDatasetDict(LinearDataset): + """Modifies the LinearDataset to return a Dict instead of a Tuple.""" + + def __getitem__(self, index): + return {"x": self.x[index, None], "y": self.y[index, None]} + + +class NonTensorDataset(LinearDataset): + """Modifies the LinearDataset to also return non-tensor objects.""" + + def __getitem__(self, index): + return {"x": self.x[index, None], "y": 2} + + +# Currently in DataParallelTrainers we only report metrics from rank 0. +# For testing purposes here, we need to be able to report from all +# workers. +class TorchTrainerPatchedMultipleReturns(TorchTrainer): + def _report(self, training_iterator) -> None: + for results in training_iterator: + tune.report(results=results) + + +@pytest.mark.parametrize("use_gpu", (True, False)) +def test_torch_iter_torch_batches_auto_device(ray_start_4_cpus_2_gpus, use_gpu): + """ + Tests that iter_torch_batches in TorchTrainer worker function uses the + default device. + """ + + def train_fn(): + dataset = train.get_dataset_shard("train") + for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"): + assert str(batch["data"].device) == "cpu" + + # Autodetect + for batch in dataset.iter_torch_batches(dtypes=torch.float): + assert str(batch["data"].device) == str(train.torch.get_device()) + + dataset = ray.data.from_numpy(np.array([[1, 2, 3, 4, 5], [1, 2, 3, 4, 5]]).T) + # Test that this works outside a Train function + for batch in dataset.iter_torch_batches(dtypes=torch.float, device="cpu"): + assert str(batch["data"].device) == "cpu" + + trainer = TorchTrainer( + train_fn, + scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu), + datasets={"train": dataset}, + ) + trainer.fit() + + +if __name__ == "__main__": + import sys + + import pytest + + sys.exit(pytest.main(["-v", "-x", "-s", __file__])) From 198f7aec7c5da646dfc264e1a4c57bd1729727c6 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 27 Jul 2023 14:09:52 +0000 Subject: [PATCH 13/27] fix retries Signed-off-by: vitsai --- src/ray/gcs/gcs_client/gcs_client.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 150e86b17b38..fe02dd002d13 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -14,6 +14,8 @@ #include "ray/gcs/gcs_client/gcs_client.h" +#include +#include #include #include "ray/common/ray_config.h" @@ -188,7 +190,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, } else if (!connect_status.IsGrpcError()) { return HandleGcsError(reply.status()); } - sleep(1); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } RAY_RETURN_NOT_OK(connect_status); } else { From 7711e542767395d4a7d3e6fbd1d0089aded78a58 Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 31 Jul 2023 16:53:55 -0700 Subject: [PATCH 14/27] fix progress reporter test Signed-off-by: vitsai --- python/ray/tune/tests/test_progress_reporter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/tests/test_progress_reporter.py b/python/ray/tune/tests/test_progress_reporter.py index d1e8a0c93119..87855f66ca3c 100644 --- a/python/ray/tune/tests/test_progress_reporter.py +++ b/python/ray/tune/tests/test_progress_reporter.py @@ -689,7 +689,9 @@ def testEndToEndReporting(self): if os.environ.get("TUNE_NEW_EXECUTION") == "0": assert EXPECTED_END_TO_END_START in output assert EXPECTED_END_TO_END_END in output - assert "(raylet)" not in output, "Unexpected raylet log messages" + for line in output.splitlines(): + if "(raylet)" in line: + assert "cluster ID" in line, "Unexpected raylet log messages" except Exception: print("*** BEGIN OUTPUT ***") print(output) From 1f90063e70e96e5d69748fd5626bcd1472cb1d3e Mon Sep 17 00:00:00 2001 From: vitsai Date: Tue, 1 Aug 2023 13:52:13 -0700 Subject: [PATCH 15/27] fix several more tests Signed-off-by: vitsai --- python/ray/_private/node.py | 37 +++++++++++++++++++----------- python/ray/_private/parameter.py | 3 +++ python/ray/_private/worker.py | 4 ++++ python/ray/_raylet.pyx | 5 ++-- python/ray/includes/unique_ids.pxd | 3 +++ python/ray/tests/test_ray_init.py | 2 +- python/ray/tests/test_tqdm.py | 4 ++-- 7 files changed, 39 insertions(+), 19 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index c2fdfa6810bf..3046f99bf378 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -649,7 +649,9 @@ def _init_gcs_client(self): try: gcs_address = self.gcs_address client = GcsClient( - address=gcs_address, cluster_id=self._ray_params.cluster_id + address=gcs_address, + cluster_id=self._ray_params.cluster_id, + no_gcs=self._ray_params.no_gcs, ) self.cluster_id = client.get_cluster_id() if self.head: @@ -667,19 +669,26 @@ def _init_gcs_client(self): time.sleep(1) if self._gcs_client is None: - with open(os.path.join(self._logs_dir, "gcs_server.err")) as err: - # Use " C " or " E " to exclude the stacktrace. - # This should work for most cases, especitally - # it's when GCS is starting. Only display last 10 lines of logs. - errors = [e for e in err.readlines() if " C " in e or " E " in e][-10:] - error_msg = "\n" + "".join(errors) + "\n" - raise RuntimeError( - f"Failed to {'start' if self.head else 'connect to'} GCS. " - f" Last {len(errors)} lines of error files:" - f"{error_msg}." - f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}" - " for details" - ) + if hasattr(self, "_logs_dir"): + with open(os.path.join(self._logs_dir, "gcs_server.err")) as err: + # Use " C " or " E " to exclude the stacktrace. + # This should work for most cases, especitally + # it's when GCS is starting. Only display last 10 lines of logs. + errors = [e for e in err.readlines() if " C " in e or " E " in e][ + -10: + ] + error_msg = "\n" + "".join(errors) + "\n" + raise RuntimeError( + f"Failed to {'start' if self.head else 'connect to'} GCS. " + f" Last {len(errors)} lines of error files:" + f"{error_msg}." + f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}" + " for details" + ) + else: + raise RuntimeError( + f"Failed to {'start' if self.head else 'connect to'} GCS." + ) ray.experimental.internal_kv._initialize_internal_kv(self._gcs_client) diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 538fe09a505d..277d5e885c86 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -128,6 +128,7 @@ class RayParams: session_name: The name of the session of the ray cluster. webui: The url of the UI. cluster_id: The cluster ID. + no_gcs: Whether clients should connect to GCS by default. """ def __init__( @@ -190,6 +191,7 @@ def __init__( session_name: Optional[str] = None, webui: Optional[str] = None, cluster_id: Optional[str] = None, + no_gcs: bool = False, ): self.redis_address = redis_address self.gcs_address = gcs_address @@ -252,6 +254,7 @@ def __init__( self.labels = labels self._check_usage() self.cluster_id = cluster_id + self.no_gcs = no_gcs # Set the internal config options for object reconstruction. if enable_object_reconstruction: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index de2d4cd9a8f2..058ccb318d00 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1126,6 +1126,7 @@ def init( namespace: Optional[str] = None, runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821 storage: Optional[str] = None, + no_gcs: bool = False, **kwargs, ) -> BaseContext: """ @@ -1429,6 +1430,8 @@ def init( if bootstrap_address is not None: gcs_address = bootstrap_address logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address) + else: + assert not no_gcs if _node_ip_address is not None: node_ip_address = services.resolve_ip_for_localhost(_node_ip_address) @@ -1572,6 +1575,7 @@ def init( _system_config=_system_config, enable_object_reconstruction=_enable_object_reconstruction, metrics_export_port=_metrics_export_port, + no_gcs=no_gcs, ) try: _global_node = ray._private.node.Node( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 39b5f6bbd77f..b9c2edcb090a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2338,7 +2338,7 @@ cdef class GcsClient: object _nums_reconnect_retry CClusterID cluster_id - def __cinit__(self, address, nums_reconnect_retry=5, cluster_id=None): + def __cinit__(self, address, nums_reconnect_retry=5, cluster_id=None, no_gcs=False): cdef GcsClientOptions gcs_options = GcsClientOptions.from_gcs_address(address) self.inner.reset(new CPythonGcsClient(dereference(gcs_options.native()))) self.address = address @@ -2349,7 +2349,8 @@ cdef class GcsClient: else: c_cluster_id = cluster_id self.cluster_id = CClusterID.FromHex(c_cluster_id) - self._connect() + if not no_gcs: + self._connect() def _connect(self, timeout=None): cdef: diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index ab0f3663c499..cdb9b58d9188 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -162,6 +162,9 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: @staticmethod CClusterID FromHex(const c_string &hex_str) + @staticmethod + CClusterID FromRandom() + @staticmethod const CClusterID Nil() diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 38e351a8a025..ebfc663c32ee 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -125,7 +125,7 @@ def test_ray_init_existing_instance_crashed(address): # If no address is specified, we will default to an existing cluster. ray._private.node.NUM_REDIS_GET_RETRIES = 1 with pytest.raises(ConnectionError): - ray.init(address=address) + ray.init(address=address, no_gcs=True) finally: ray._private.utils.reset_ray_address() diff --git a/python/ray/tests/test_tqdm.py b/python/ray/tests/test_tqdm.py index d7763fd062bd..89aaf1a4ba93 100644 --- a/python/ray/tests/test_tqdm.py +++ b/python/ray/tests/test_tqdm.py @@ -35,9 +35,9 @@ def update(self): assert not mgr.in_hidden_state # Test stdout save/restore clearing. - assert mgr.num_hides == 0 + num_hides = mgr.num_hides ray.get(a.print_something.remote()) - wait_for_condition(lambda: mgr.num_hides == 1) + wait_for_condition(lambda: mgr.num_hides == num_hides + 1) wait_for_condition(lambda: not mgr.in_hidden_state) From 146c4bfddfb09762a4ccd7adc5deef94d8b80dd7 Mon Sep 17 00:00:00 2001 From: vitsai Date: Wed, 2 Aug 2023 07:59:51 -0700 Subject: [PATCH 16/27] bye segfault Signed-off-by: vitsai --- python/ray/_raylet.pyx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b9c2edcb090a..3b0fec7c70e1 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2349,8 +2349,9 @@ cdef class GcsClient: else: c_cluster_id = cluster_id self.cluster_id = CClusterID.FromHex(c_cluster_id) - if not no_gcs: - self._connect() + if no_gcs: + self.cluster_id = CClusterID.FromRandom() + self._connect() def _connect(self, timeout=None): cdef: From 3906905aec7f6dd8b70defc1ba14c84c44814352 Mon Sep 17 00:00:00 2001 From: vitsai Date: Wed, 2 Aug 2023 13:13:00 -0700 Subject: [PATCH 17/27] ignore gcs for monitor Signed-off-by: vitsai --- python/ray/_raylet.pyx | 7 +++---- python/ray/autoscaler/_private/monitor.py | 3 ++- src/ray/gcs/gcs_client/gcs_client.cc | 24 +++++++++++------------ 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3b0fec7c70e1..8840b8293500 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2349,9 +2349,8 @@ cdef class GcsClient: else: c_cluster_id = cluster_id self.cluster_id = CClusterID.FromHex(c_cluster_id) - if no_gcs: - self.cluster_id = CClusterID.FromRandom() - self._connect() + if not no_gcs: + self._connect() def _connect(self, timeout=None): cdef: @@ -2360,10 +2359,10 @@ cdef class GcsClient: with nogil: status = self.inner.get().Connect(self.cluster_id, timeout_ms, num_retries) + check_status(status) if self.cluster_id.IsNil(): self.cluster_id = self.inner.get().GetClusterId() assert not self.cluster_id.IsNil() - check_status(status) def get_cluster_id(self): return self.cluster_id.Hex().decode() diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 9f22f283b54f..90fa3577d53a 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -150,7 +150,8 @@ def __init__( gcs_channel ) worker = ray._private.worker.global_worker - gcs_client = GcsClient(address=self.gcs_address) + # TODO: eventually plumb ClusterID through to here + gcs_client = GcsClient(address=self.gcs_address, no_gcs=True) if monitor_ip: monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index fe02dd002d13..b0d6bb803672 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -146,7 +146,15 @@ std::pair GcsClient::GetGcsServerAddress() const { return gcs_rpc_client_->GetAddress(); } -PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(options) {} +PythonGcsClient::PythonGcsClient(const GcsClientOptions &options) : options_(options) { + channel_ = + rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_); + kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_); + runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_); + node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_); + job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); + autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); +} namespace { Status HandleGcsError(rpc::GcsStatus status) { @@ -159,18 +167,10 @@ Status HandleGcsError(rpc::GcsStatus status) { Status PythonGcsClient::Connect(const ClusterID &cluster_id, int64_t timeout_ms, size_t num_retries) { - channel_ = - rpc::GcsRpcClient::CreateGcsChannel(options_.gcs_address_, options_.gcs_port_); - kv_stub_ = rpc::InternalKVGcsService::NewStub(channel_); - runtime_env_stub_ = rpc::RuntimeEnvGcsService::NewStub(channel_); - node_info_stub_ = rpc::NodeInfoGcsService::NewStub(channel_); - job_info_stub_ = rpc::JobInfoGcsService::NewStub(channel_); - autoscaler_stub_ = rpc::autoscaler::AutoscalerStateService::NewStub(channel_); - - size_t tries = num_retries + 1; - RAY_CHECK(tries > 0) << "Expected positive retries, but got " << tries; - if (cluster_id.IsNil()) { + size_t tries = num_retries + 1; + RAY_CHECK(tries > 0) << "Expected positive retries, but got " << tries; + RAY_LOG(INFO) << "Retrieving cluster ID from GCS server."; rpc::GetClusterIdRequest request; rpc::GetClusterIdReply reply; From 3228ba0b4df7e9b3a13d18ed9a9002cc4807f4f6 Mon Sep 17 00:00:00 2001 From: vitsai Date: Wed, 2 Aug 2023 23:30:59 +0000 Subject: [PATCH 18/27] one job agent test fix Signed-off-by: vitsai --- dashboard/modules/job/tests/test_job_agent.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dashboard/modules/job/tests/test_job_agent.py b/dashboard/modules/job/tests/test_job_agent.py index 37d51161ed77..34367b508ab7 100644 --- a/dashboard/modules/job/tests/test_job_agent.py +++ b/dashboard/modules/job/tests/test_job_agent.py @@ -532,8 +532,14 @@ def test_agent_logs_not_streamed_to_drivers(): err_str = proc.stderr.read().decode("ascii") print(out_str, err_str) - assert "(raylet)" not in out_str - assert "(raylet)" not in err_str + + def check_output(blob): + for line in blob.splitlines(): + if "(raylet)" in line: + assert "cluster ID" in line + + check_output(out_str) + check_output(err_str) @pytest.mark.asyncio From eb5ccc3b7dce15e983f189dd08f5ef64421fce8e Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 3 Aug 2023 16:41:31 -0700 Subject: [PATCH 19/27] cleaner test fix and change to debug logs Signed-off-by: vitsai --- dashboard/modules/job/tests/test_job_agent.py | 9 ++------- python/ray/_private/node.py | 1 - python/ray/_private/parameter.py | 3 --- python/ray/_private/worker.py | 4 ---- python/ray/_raylet.pyx | 9 ++++----- python/ray/autoscaler/_private/monitor.py | 2 +- python/ray/tests/test_autoscaler.py | 6 +++++- src/ray/gcs/gcs_client/gcs_client.cc | 6 +++--- 8 files changed, 15 insertions(+), 25 deletions(-) diff --git a/dashboard/modules/job/tests/test_job_agent.py b/dashboard/modules/job/tests/test_job_agent.py index 34367b508ab7..b6e1a838c378 100644 --- a/dashboard/modules/job/tests/test_job_agent.py +++ b/dashboard/modules/job/tests/test_job_agent.py @@ -533,13 +533,8 @@ def test_agent_logs_not_streamed_to_drivers(): print(out_str, err_str) - def check_output(blob): - for line in blob.splitlines(): - if "(raylet)" in line: - assert "cluster ID" in line - - check_output(out_str) - check_output(err_str) + assert "(raylet)" not in out_str + assert "(raylet)" not in err_str @pytest.mark.asyncio diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 3046f99bf378..181396049f4e 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -651,7 +651,6 @@ def _init_gcs_client(self): client = GcsClient( address=gcs_address, cluster_id=self._ray_params.cluster_id, - no_gcs=self._ray_params.no_gcs, ) self.cluster_id = client.get_cluster_id() if self.head: diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 277d5e885c86..538fe09a505d 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -128,7 +128,6 @@ class RayParams: session_name: The name of the session of the ray cluster. webui: The url of the UI. cluster_id: The cluster ID. - no_gcs: Whether clients should connect to GCS by default. """ def __init__( @@ -191,7 +190,6 @@ def __init__( session_name: Optional[str] = None, webui: Optional[str] = None, cluster_id: Optional[str] = None, - no_gcs: bool = False, ): self.redis_address = redis_address self.gcs_address = gcs_address @@ -254,7 +252,6 @@ def __init__( self.labels = labels self._check_usage() self.cluster_id = cluster_id - self.no_gcs = no_gcs # Set the internal config options for object reconstruction. if enable_object_reconstruction: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 058ccb318d00..de2d4cd9a8f2 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1126,7 +1126,6 @@ def init( namespace: Optional[str] = None, runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821 storage: Optional[str] = None, - no_gcs: bool = False, **kwargs, ) -> BaseContext: """ @@ -1430,8 +1429,6 @@ def init( if bootstrap_address is not None: gcs_address = bootstrap_address logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address) - else: - assert not no_gcs if _node_ip_address is not None: node_ip_address = services.resolve_ip_for_localhost(_node_ip_address) @@ -1575,7 +1572,6 @@ def init( _system_config=_system_config, enable_object_reconstruction=_enable_object_reconstruction, metrics_export_port=_metrics_export_port, - no_gcs=no_gcs, ) try: _global_node = ray._private.node.Node( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 8840b8293500..1a6de2b281bd 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2338,7 +2338,7 @@ cdef class GcsClient: object _nums_reconnect_retry CClusterID cluster_id - def __cinit__(self, address, nums_reconnect_retry=5, cluster_id=None, no_gcs=False): + def __cinit__(self, address, nums_reconnect_retry=5, cluster_id=None): cdef GcsClientOptions gcs_options = GcsClientOptions.from_gcs_address(address) self.inner.reset(new CPythonGcsClient(dereference(gcs_options.native()))) self.address = address @@ -2349,12 +2349,11 @@ cdef class GcsClient: else: c_cluster_id = cluster_id self.cluster_id = CClusterID.FromHex(c_cluster_id) - if not no_gcs: - self._connect() + self._connect(5) - def _connect(self, timeout=None): + def _connect(self, timeout_s=None): cdef: - int64_t timeout_ms = round(1000 * timeout) if timeout else -1 + int64_t timeout_ms = round(1000 * timeout_s) if timeout_s else -1 size_t num_retries = self._nums_reconnect_retry with nogil: status = self.inner.get().Connect(self.cluster_id, timeout_ms, num_retries) diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index 90fa3577d53a..34db56f511f9 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -151,7 +151,7 @@ def __init__( ) worker = ray._private.worker.global_worker # TODO: eventually plumb ClusterID through to here - gcs_client = GcsClient(address=self.gcs_address, no_gcs=True) + gcs_client = GcsClient(address=self.gcs_address) if monitor_ip: monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}" diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 0b6b7210157e..736a5c2cabd6 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -3514,6 +3514,8 @@ class FaultyAutoscaler: def __init__(self, *args, **kwargs): raise AutoscalerInitFailException + os.environ["RAY_GCS_SERVER_PORT"] = "12345" + ray.init() with patch("ray._private.utils.publish_error_to_driver") as mock_publish: with patch.multiple( "ray.autoscaler._private.monitor", @@ -3521,7 +3523,9 @@ def __init__(self, *args, **kwargs): _internal_kv_initialized=Mock(return_value=False), ): monitor = Monitor( - address="here:12345", autoscaling_config="", log_dir=self.tmpdir + address="localhost:12345", + autoscaling_config="", + log_dir=self.tmpdir, ) with pytest.raises(AutoscalerInitFailException): monitor.run() diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index b0d6bb803672..927f0b13db9b 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -171,7 +171,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, size_t tries = num_retries + 1; RAY_CHECK(tries > 0) << "Expected positive retries, but got " << tries; - RAY_LOG(INFO) << "Retrieving cluster ID from GCS server."; + RAY_LOG(DEBUG) << "Retrieving cluster ID from GCS server."; rpc::GetClusterIdRequest request; rpc::GetClusterIdReply reply; @@ -184,7 +184,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, if (connect_status.ok()) { cluster_id_ = ClusterID::FromBinary(reply.cluster_id()); - RAY_LOG(INFO) << "Received cluster ID from GCS server: " << cluster_id_; + RAY_LOG(DEBUG) << "Received cluster ID from GCS server: " << cluster_id_; RAY_CHECK(!cluster_id_.IsNil()); break; } else if (!connect_status.IsGrpcError()) { @@ -195,7 +195,7 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, RAY_RETURN_NOT_OK(connect_status); } else { cluster_id_ = cluster_id; - RAY_LOG(INFO) << "Client initialized with provided cluster ID: " << cluster_id_; + RAY_LOG(DEBUG) << "Client initialized with provided cluster ID: " << cluster_id_; } RAY_CHECK(!cluster_id_.IsNil()) << "Unexpected nil cluster ID."; From 07b904a668d8cef8d0f65e86fb7f5511390c3394 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 3 Aug 2023 22:45:13 -0700 Subject: [PATCH 20/27] comment Signed-off-by: vitsai --- python/ray/tests/test_ray_init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index ebfc663c32ee..38e351a8a025 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -125,7 +125,7 @@ def test_ray_init_existing_instance_crashed(address): # If no address is specified, we will default to an existing cluster. ray._private.node.NUM_REDIS_GET_RETRIES = 1 with pytest.raises(ConnectionError): - ray.init(address=address, no_gcs=True) + ray.init(address=address) finally: ray._private.utils.reset_ray_address() From 4e70217e40246bb7e4564df1a26fcc35ff091c8b Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 4 Aug 2023 15:41:48 -0700 Subject: [PATCH 21/27] comments Signed-off-by: vitsai --- python/ray/tests/test_autoscaler.py | 5 +++++ python/ray/tests/test_tqdm.py | 4 ++-- python/ray/train/BUILD | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 736a5c2cabd6..861349774ec7 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -3514,6 +3514,7 @@ class FaultyAutoscaler: def __init__(self, *args, **kwargs): raise AutoscalerInitFailException + prev_port = os.environ.get("RAY_GCS_SERVER_PORT") os.environ["RAY_GCS_SERVER_PORT"] = "12345" ray.init() with patch("ray._private.utils.publish_error_to_driver") as mock_publish: @@ -3530,6 +3531,10 @@ def __init__(self, *args, **kwargs): with pytest.raises(AutoscalerInitFailException): monitor.run() mock_publish.assert_called_once() + if prev_port is not None: + os.environ["RAY_GCS_SERVER_PORT"] = prev_port + else: + del os.environ["RAY_GCS_SERVER_PORT"] def testInitializeSDKArguments(self): # https://github.com/ray-project/ray/issues/23166 diff --git a/python/ray/tests/test_tqdm.py b/python/ray/tests/test_tqdm.py index 89aaf1a4ba93..d7763fd062bd 100644 --- a/python/ray/tests/test_tqdm.py +++ b/python/ray/tests/test_tqdm.py @@ -35,9 +35,9 @@ def update(self): assert not mgr.in_hidden_state # Test stdout save/restore clearing. - num_hides = mgr.num_hides + assert mgr.num_hides == 0 ray.get(a.print_something.remote()) - wait_for_condition(lambda: mgr.num_hides == num_hides + 1) + wait_for_condition(lambda: mgr.num_hides == 1) wait_for_condition(lambda: not mgr.in_hidden_state) diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD index f6ee27a7804d..42e976e04109 100644 --- a/python/ray/train/BUILD +++ b/python/ray/train/BUILD @@ -344,7 +344,7 @@ py_test( py_test( name = "test_gpu_2", - size = "large", + size = "medium", srcs = ["tests/test_gpu_2.py"], tags = ["team:ml", "exclusive", "gpu_only"], deps = [":train_lib", ":conftest"] From d072ea387570dddaaa905deba7aa98e0755ac37d Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 7 Aug 2023 10:45:00 -0700 Subject: [PATCH 22/27] lint Signed-off-by: vitsai --- python/ray/train/tests/test_gpu.py | 1 - python/ray/train/tests/test_gpu_2.py | 13 ------------- 2 files changed, 14 deletions(-) diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py index 19d271c88c84..a5ac24e90c3e 100644 --- a/python/ray/train/tests/test_gpu.py +++ b/python/ray/train/tests/test_gpu.py @@ -3,7 +3,6 @@ from unittest.mock import patch import pytest -import numpy as np import torch import torchvision from torch.nn.parallel import DistributedDataParallel diff --git a/python/ray/train/tests/test_gpu_2.py b/python/ray/train/tests/test_gpu_2.py index 1c772d38a1e3..1889a7d3349e 100644 --- a/python/ray/train/tests/test_gpu_2.py +++ b/python/ray/train/tests/test_gpu_2.py @@ -1,28 +1,15 @@ -import os -import time - -from unittest.mock import patch import pytest import numpy as np import torch -import torchvision -from torch.nn.parallel import DistributedDataParallel -from torch.utils.data import DataLoader, DistributedSampler import ray import ray.data -from ray.exceptions import RayTaskError -from ray.air import session from ray import tune import ray.train as train from ray.air.config import ScalingConfig -from ray.train.constants import DEFAULT_NCCL_SOCKET_IFNAME from ray.train.examples.pytorch.torch_linear_example import LinearDataset -from ray.train.torch.config import TorchConfig, _TorchBackend from ray.train.torch.torch_trainer import TorchTrainer -from ray.train.trainer import TrainingFailedError -from ray.train._internal.worker_group import WorkerGroup class LinearDatasetDict(LinearDataset): From e4429550f4417bdd076b41f49095059dd13b2444 Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 7 Aug 2023 16:47:58 -0700 Subject: [PATCH 23/27] preserve error types of existing logic Signed-off-by: vitsai --- python/ray/_private/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index de2d4cd9a8f2..19e85bc97401 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1581,7 +1581,7 @@ def init( spawn_reaper=False, connect_only=True, ) - except ConnectionError: + except (ConnectionError, RuntimeError): if gcs_address == ray._private.utils.read_ray_address(_temp_dir): logger.info( "Failed to connect to the default Ray cluster address at " @@ -1590,7 +1590,7 @@ def init( "address to connect to, run `ray stop` or restart Ray with " "`ray start`." ) - raise + raise ConnectionError # Log a message to find the Ray address that we connected to and the # dashboard URL. From 38070fc4dcf6bf535142721457240f9e18b8da91 Mon Sep 17 00:00:00 2001 From: vitsai Date: Tue, 8 Aug 2023 15:38:05 -0700 Subject: [PATCH 24/27] increase timeout for init test Signed-off-by: vitsai --- python/ray/tests/test_ray_init_2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index 55821eeebf1b..7118873288bf 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -271,7 +271,7 @@ def verify(): return True try: - wait_for_condition(verify, timeout=10, retry_interval_ms=2000) + wait_for_condition(verify, timeout=15, retry_interval_ms=2000) finally: proc.terminate() proc.wait() From af111c7e23262c60c7f215bff9083f0dba03945e Mon Sep 17 00:00:00 2001 From: vitsai Date: Tue, 8 Aug 2023 18:30:48 -0700 Subject: [PATCH 25/27] rebase fixes Signed-off-by: vitsai --- src/ray/gcs/gcs_client/gcs_client.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 927f0b13db9b..4fcc2fad191f 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -205,7 +205,8 @@ Status PythonGcsClient::Connect(const ClusterID &cluster_id, Status PythonGcsClient::CheckAlive(const std::vector &raylet_addresses, int64_t timeout_ms, std::vector &result) { - grpc::ClientContext &&context = PrepareContext(timeout_ms); + grpc::ClientContext context; + PrepareContext(context, timeout_ms); rpc::CheckAliveRequest request; for (const auto &address : raylet_addresses) { From b0d51b39e874b9d87d70717425b110e631847f48 Mon Sep 17 00:00:00 2001 From: vitsai Date: Wed, 9 Aug 2023 22:10:07 +0000 Subject: [PATCH 26/27] fix gcs fault tolerance Signed-off-by: vitsai --- python/ray/_private/gcs_aio_client.py | 2 ++ python/ray/tests/BUILD | 2 +- python/ray/tests/test_gcs_fault_tolerance.py | 9 ++++++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/gcs_aio_client.py b/python/ray/_private/gcs_aio_client.py index d5502c6dcea1..f794fd7c7eee 100644 --- a/python/ray/_private/gcs_aio_client.py +++ b/python/ray/_private/gcs_aio_client.py @@ -56,7 +56,9 @@ def __init__( self._nums_reconnect_retry = nums_reconnect_retry def _connect(self): + print("vct connecting") self._gcs_client._connect() + print("vct connected") @property def address(self): diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index dc20b9c57934..4a04c76c082d 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -210,7 +210,6 @@ py_test_module_list( "test_unhandled_error.py", "test_utils.py", "test_widgets.py", - "test_node_labels.py", ], size = "small", tags = ["exclusive", "small_size_python_tests", "team:core"], @@ -221,6 +220,7 @@ py_test_module_list( files = [ "test_gcs_ha_e2e.py", "test_memory_pressure.py", + "test_node_labels.py", ], size = "medium", tags = ["exclusive", "team:core", "xcommit"], diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 869c9ab648fe..fcd260a77307 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -433,13 +433,16 @@ def test_gcs_aio_client_reconnect( passed = [False] async def async_kv_get(): - gcs_aio_client = gcs_utils.GcsAioClient( - address=gcs_address, nums_reconnect_retry=20 if auto_reconnect else 0 - ) if not auto_reconnect: with pytest.raises(Exception): + gcs_aio_client = gcs_utils.GcsAioClient( + address=gcs_address, nums_reconnect_retry=0 + ) await gcs_aio_client.internal_kv_get(b"a", None) else: + gcs_aio_client = gcs_utils.GcsAioClient( + address=gcs_address, nums_reconnect_retry=20 + ) assert await gcs_aio_client.internal_kv_get(b"a", None) == b"b" return True From d00c49e4a7f5eae430996706ccfddea8d9abb67b Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 10 Aug 2023 02:37:40 +0000 Subject: [PATCH 27/27] increase size of threshold tests Signed-off-by: vitsai --- python/ray/autoscaler/v2/BUILD | 2 +- python/ray/serve/BUILD | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/v2/BUILD b/python/ray/autoscaler/v2/BUILD index ddc043706ab8..839f29587e74 100644 --- a/python/ray/autoscaler/v2/BUILD +++ b/python/ray/autoscaler/v2/BUILD @@ -80,7 +80,7 @@ py_test( py_test( name = "test_sdk", - size = "small", + size = "medium", srcs = ["tests/test_sdk.py"], tags = ["team:core", "exclusive"], deps = ["//:ray_lib", ":conftest"], diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index f4682b56ea8e..be7a7f8e955f 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -669,7 +669,7 @@ py_test( py_test( name = "test_callback", - size = "small", + size = "medium", srcs = serve_tests_srcs, tags = ["exclusive", "team:serve"], deps = [":serve_lib"],