From 090c3e0b337e611915be6741e05bfd115e7ce5c7 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 30 Jul 2024 00:48:12 -0700 Subject: [PATCH 01/29] initial Signed-off-by: Ruiyang Wang --- .../kubernetes/user-guides/kuberay-gcs-ft.md | 4 +- python/ray/_private/gcs_utils.py | 6 +- python/ray/_raylet.pyx | 6 +- python/ray/includes/global_state_accessor.pxd | 54 +--- python/ray/includes/global_state_accessor.pxi | 2 +- src/ray/gcs/redis_client.cc | 68 ---- src/ray/gcs/redis_client.h | 2 - .../gcs/store_client/redis_store_client.cc | 305 +++++++++--------- src/ray/gcs/store_client/redis_store_client.h | 64 ++-- 9 files changed, 211 insertions(+), 300 deletions(-) diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index 036ebd176d34..469f7dcca96e 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -48,7 +48,7 @@ curl -LO https://raw.githubusercontent.com/ray-project/kuberay/v1.0.0/ray-operat kubectl apply -f ray-cluster.external-redis.yaml ``` -### Step 4: Verify the Kubernetes cluster status +### Step 4: Verify the Kubernetes cluster status ```sh # Step 4.1: List all Pods in the `default` namespace. @@ -141,7 +141,7 @@ KEYS * # 1) "864b004c-6305-42e3-ac46-adfa8eb6f752" # Step 6.5: Check the value of the key. -HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752 +SCAN 0 MATCH 864b004c-6305-42e3-ac46-adfa8eb6f752* ``` In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster. diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index dcfb328b545d..4f32933db900 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -127,7 +127,7 @@ def cleanup_redis_storage( storage_namespace: The namespace of the storage to be deleted. """ - from ray._raylet import del_key_from_storage # type: ignore + from ray._raylet import del_key_prefix_from_storage # type: ignore if not isinstance(host, str): raise ValueError("Host must be a string") @@ -144,6 +144,6 @@ def cleanup_redis_storage( if not isinstance(storage_namespace, str): raise ValueError("storage namespace must be a string") - # Right now, GCS store all data into a hash set key by storage_namespace. + # Right now, GCS store all data into a hash keys prefixed by storage_namespace. # So we only need to delete the specific key to cleanup the cluster. - return del_key_from_storage(host, port, password, use_ssl, storage_namespace) + return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e89158484f9c..4009a5edf407 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -160,7 +160,7 @@ from ray.includes.libcoreworker cimport ( from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor -from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync +from ray.includes.global_state_accessor cimport RedisDelKeyPrefixSync, RedisGetKeySync from ray.includes.optional cimport ( optional, nullopt ) @@ -5177,8 +5177,8 @@ cdef void async_callback(shared_ptr[CRayObject] obj, cpython.Py_DECREF(user_callback) -def del_key_from_storage(host, port, password, use_ssl, key): - return RedisDelKeySync(host, port, password, use_ssl, key) +def del_key_prefix_from_storage(host, port, password, use_ssl, key): + return RedisDelKeyPrefixSync(host, port, password, use_ssl, key) def get_session_key_from_storage(host, port, password, use_ssl, config, key): diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 417e4f2c5cac..9c81a3bd504f 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -128,59 +128,7 @@ cdef extern from * namespace "ray::gcs" nogil: cdef extern from * namespace "ray::gcs" nogil: - """ - #include - #include "ray/gcs/redis_client.h" - namespace ray { - namespace gcs { - - class Cleanup { - public: - Cleanup(std::function f): f_(f) {} - ~Cleanup() { f_(); } - private: - std::function f_; - }; - - bool RedisDelKeySync(const std::string& host, - int32_t port, - const std::string& password, - bool use_ssl, - const std::string& key) { - RedisClientOptions options(host, port, password, use_ssl); - auto cli = std::make_unique(options); - - instrumented_io_context io_service; - - auto thread = std::make_unique([&]() { - boost::asio::io_service::work work(io_service); - io_service.run(); - }); - - Cleanup _([&](){ - io_service.stop(); - thread->join(); - }); - - auto status = cli->Connect(io_service); - RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString(); - - auto context = cli->GetPrimaryContext(); - auto cmd = std::vector{"DEL", key}; - auto reply = context->RunArgvSync(cmd); - if(reply->ReadAsInteger() == 1) { - RAY_LOG(INFO) << "Successfully deleted " << key; - return true; - } else { - RAY_LOG(ERROR) << "Failed to delete " << key; - return false; - } - } - - } - } - """ - c_bool RedisDelKeySync(const c_string& host, + c_bool RedisDelKeyPrefixSync(const c_string& host, c_int32_t port, const c_string& password, c_bool use_ssl, diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index 0b312e40ebaa..49c23b803ffd 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -15,7 +15,7 @@ from ray.includes.unique_ids cimport ( from ray.includes.global_state_accessor cimport ( CGlobalStateAccessor, - RedisDelKeySync, + RedisDelKeyPrefixSync, ) from ray.includes.optional cimport ( diff --git a/src/ray/gcs/redis_client.cc b/src/ray/gcs/redis_client.cc index b98428b7e432..4464e8098968 100644 --- a/src/ray/gcs/redis_client.cc +++ b/src/ray/gcs/redis_client.cc @@ -25,69 +25,6 @@ namespace ray { namespace gcs { -/// Run redis command using specified context and store the result in `reply`. Return true -/// if the number of attemps didn't reach `redis_db_connect_retries`. -static bool RunRedisCommandWithRetries( - redisContext *context, - const char *command, - redisReply **reply, - const std::function &condition) { - int num_attempts = 0; - while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { - // Try to execute the command. - *reply = reinterpret_cast(redisCommand(context, command)); - if (condition(*reply)) { - break; - } - - // Sleep for a little, and try again if the entry isn't there yet. - freeReplyObject(*reply); - std::this_thread::sleep_for(std::chrono::milliseconds( - RayConfig::instance().redis_db_connect_wait_milliseconds())); - num_attempts++; - } - return num_attempts < RayConfig::instance().redis_db_connect_retries(); -} - -static int DoGetNextJobID(redisContext *context) { - // This is bad since duplicate logic lives in redis_client - // and redis_store_client. - // A refactoring is needed to make things clean. - // src/ray/gcs/store_client/redis_store_client.cc#L42 - // TODO (iycheng): Unify the way redis key is formated. - static const std::string kTableSeparator = ":"; - static const std::string kClusterSeparator = "@"; - static std::string key = RayConfig::instance().external_storage_namespace() + - kClusterSeparator + kTableSeparator + "JobCounter"; - static std::string cmd = - "HINCRBY " + RayConfig::instance().external_storage_namespace() + " " + key + " 1"; - - redisReply *reply = nullptr; - bool under_retry_limit = RunRedisCommandWithRetries( - context, cmd.c_str(), &reply, [](const redisReply *reply) { - if (reply == nullptr) { - RAY_LOG(WARNING) << "Didn't get reply for " << cmd; - return false; - } - if (reply->type == REDIS_REPLY_NIL) { - RAY_LOG(WARNING) << "Got nil reply for " << cmd; - return false; - } - if (reply->type == REDIS_REPLY_ERROR) { - RAY_LOG(WARNING) << "Got error reply for " << cmd << " Error is " << reply->str; - return false; - } - return true; - }); - RAY_CHECK(reply); - RAY_CHECK(under_retry_limit) << "No entry found for JobCounter"; - RAY_CHECK(reply->type == REDIS_REPLY_INTEGER) - << "Expected integer, found Redis type " << reply->type << " for JobCounter"; - int counter = reply->integer; - freeReplyObject(reply); - return counter; -} - RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {} Status RedisClient::Connect(instrumented_io_context &io_service) { @@ -127,11 +64,6 @@ void RedisClient::Disconnect() { RAY_LOG(DEBUG) << "RedisClient disconnected."; } -int RedisClient::GetNextJobID() { - RAY_CHECK(primary_context_); - return DoGetNextJobID(primary_context_->sync_context()); -} - } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_client.h b/src/ray/gcs/redis_client.h index 893031fb8771..5207311d6d3f 100644 --- a/src/ray/gcs/redis_client.h +++ b/src/ray/gcs/redis_client.h @@ -69,8 +69,6 @@ class RedisClient { std::shared_ptr GetPrimaryContext() { return primary_context_; } - int GetNextJobID(); - protected: /// Attach this client to an asio event loop. Note that only /// one event loop should be attached at a time. diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 81d1ce292aa7..ddb54be0084c 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -16,6 +16,7 @@ #include #include +#include #include "absl/strings/match.h" #include "absl/strings/str_cat.h" @@ -23,12 +24,10 @@ #include "ray/util/logging.h" namespace ray { - namespace gcs { namespace { -const std::string_view kTableSeparator = ":"; const std::string_view kClusterSeparator = "@"; // "[, ], -, ?, *, ^, \" are special chars in Redis pattern matching. @@ -40,30 +39,12 @@ std::string EscapeMatchPattern(const std::string &s) { } std::string GenRedisKey(const std::string &external_storage_namespace, - const std::string &table_name, - const std::string &key) { - return absl::StrCat( - external_storage_namespace, kClusterSeparator, table_name, kTableSeparator, key); -} - -std::string GenKeyRedisMatchPattern(const std::string &external_storage_namespace, - const std::string &table_name) { - return absl::StrCat(EscapeMatchPattern(external_storage_namespace), - kClusterSeparator, - EscapeMatchPattern(table_name), - kTableSeparator, - "*"); + const std::string &table_name) { + return absl::StrCat("RAY", external_storage_namespace, kClusterSeparator, table_name); } -std::string GenKeyRedisMatchPattern(const std::string &external_storage_namespace, - const std::string &table_name, - const std::string &key) { - return absl::StrCat(EscapeMatchPattern(external_storage_namespace), - kClusterSeparator, - EscapeMatchPattern(table_name), - kTableSeparator, - EscapeMatchPattern(key), - "*"); +std::string GenRedisKeyPrefixMatchPattern(const std::string &key_prefix) { + return absl::StrCat(EscapeMatchPattern(key_prefix), "*"); } std::vector> GenCommandsBatched( @@ -83,55 +64,38 @@ std::vector> GenCommandsBatched( return batched_requests; } -std::string GetKeyFromRedisKey(const std::string &external_storage_namespace, - const std::string &redis_key, - const std::string &table_name) { - auto pos = external_storage_namespace.size() + kClusterSeparator.size() + - table_name.size() + kTableSeparator.size(); - return redis_key.substr(pos, redis_key.size() - pos); -} - } // namespace void RedisStoreClient::MGetValues(const std::string &table_name, const std::vector &keys, const MapCallback &callback) { - // The `MGET` command for each shard. - auto batched_commands = GenCommandsBatched("HMGET", external_storage_namespace_, keys); + // The `HMGET` command for each shard. + auto batched_commands = GenCommandsBatched( + "HMGET", GenRedisKey(external_storage_namespace_, table_name), keys); auto total_count = batched_commands.size(); auto finished_count = std::make_shared(0); auto key_value_map = std::make_shared>(); for (auto &command : batched_commands) { - auto mget_keys = std::move(command); - std::vector partition_keys(mget_keys.begin() + 2, mget_keys.end()); - auto mget_callback = [this, - table_name, - finished_count, - total_count, - mget_keys, - callback, - key_value_map](const std::shared_ptr &reply) { - if (!reply->IsNil()) { - auto value = reply->ReadAsStringArray(); - // The 0 th element of mget_keys is "MGET", so we start from the 1 th - // element. - for (size_t index = 0; index < value.size(); ++index) { - if (value[index].has_value()) { - (*key_value_map)[GetKeyFromRedisKey( - external_storage_namespace_, mget_keys[index + 2], table_name)] = - *(value[index]); + std::vector partition_keys(command.begin() + 2, command.end()); + auto mget_callback = + [finished_count, total_count, partition_keys, callback, key_value_map]( + const std::shared_ptr &reply) { + if (!reply->IsNil()) { + auto value = reply->ReadAsStringArray(); + for (size_t index = 0; index < value.size(); ++index) { + if (value[index].has_value()) { + (*key_value_map)[partition_keys[index]] = *(value[index]); + } + } } - } - } - ++(*finished_count); - if (*finished_count == total_count) { - callback(std::move(*key_value_map)); - } - }; - SendRedisCmd( - std::move(partition_keys), std::move(mget_keys), std::move(mget_callback)); + ++(*finished_count); + if (*finished_count == total_count) { + callback(std::move(*key_value_map)); + } + }; + SendRedisCmd(std::move(partition_keys), std::move(command), std::move(mget_callback)); } } @@ -148,10 +112,20 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, const std::string &data, bool overwrite, std::function callback) { - return DoPut(GenRedisKey(external_storage_namespace_, table_name, key), - data, - overwrite, - callback); + std::vector args = {overwrite ? "HSET" : "HSETNX", + GenRedisKey(external_storage_namespace_, table_name), + key, + data}; + RedisCallback write_callback = nullptr; + if (callback) { + write_callback = + [callback = std::move(callback)](const std::shared_ptr &reply) { + auto added_num = reply->ReadAsInteger(); + callback(added_num != 0); + }; + } + SendRedisCmd({key}, std::move(args), std::move(write_callback)); + return Status::OK(); } Status RedisStoreClient::AsyncGet(const std::string &table_name, @@ -169,8 +143,8 @@ Status RedisStoreClient::AsyncGet(const std::string &table_name, callback(Status::OK(), std::move(result)); }; - std::string redis_key = GenRedisKey(external_storage_namespace_, table_name, key); - std::vector args = {"HGET", external_storage_namespace_, redis_key}; + std::string redis_key = GenRedisKey(external_storage_namespace_, table_name); + std::vector args = {"HGET", redis_key, /*field=*/key}; SendRedisCmd({redis_key}, std::move(args), std::move(redis_callback)); return Status::OK(); } @@ -179,15 +153,10 @@ Status RedisStoreClient::AsyncGetAll( const std::string &table_name, const MapCallback &callback) { RAY_CHECK(callback); - std::string match_pattern = - GenKeyRedisMatchPattern(external_storage_namespace_, table_name); auto scanner = std::make_shared( - redis_client_, external_storage_namespace_, table_name); - auto on_done = [callback, - scanner](absl::flat_hash_map &&result) { - callback(std::move(result)); - }; - return scanner->ScanKeysAndValues(match_pattern, on_done); + redis_client_, GenRedisKey(external_storage_namespace_, table_name), "*"); + return scanner->ScanKeysAndValues( + [scanner, callback](auto kv) { callback(std::move(kv)); }); } Status RedisStoreClient::AsyncDelete(const std::string &table_name, @@ -209,12 +178,7 @@ Status RedisStoreClient::AsyncBatchDelete(const std::string &table_name, } return Status::OK(); } - std::vector redis_keys; - redis_keys.reserve(keys.size()); - for (auto &key : keys) { - redis_keys.push_back(GenRedisKey(external_storage_namespace_, table_name, key)); - } - return DeleteByKeys(redis_keys, callback); + return DeleteByKeys(table_name, keys, callback); } Status RedisStoreClient::AsyncMultiGet( @@ -226,11 +190,7 @@ Status RedisStoreClient::AsyncMultiGet( callback({}); return Status::OK(); } - std::vector true_keys; - for (auto &key : keys) { - true_keys.push_back(GenRedisKey(external_storage_namespace_, table_name, key)); - } - MGetValues(table_name, true_keys, callback); + MGetValues(table_name, keys, callback); return Status::OK(); } @@ -338,27 +298,11 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, } } -Status RedisStoreClient::DoPut(const std::string &key, - const std::string &data, - bool overwrite, - std::function callback) { - std::vector args = { - overwrite ? "HSET" : "HSETNX", external_storage_namespace_, key, data}; - RedisCallback write_callback = nullptr; - if (callback) { - write_callback = - [callback = std::move(callback)](const std::shared_ptr &reply) { - auto added_num = reply->ReadAsInteger(); - callback(added_num != 0); - }; - } - SendRedisCmd({key}, std::move(args), std::move(write_callback)); - return Status::OK(); -} - -Status RedisStoreClient::DeleteByKeys(const std::vector &keys, +Status RedisStoreClient::DeleteByKeys(const std::string &table, + const std::vector &keys, std::function callback) { - auto del_cmds = GenCommandsBatched("HDEL", external_storage_namespace_, keys); + auto del_cmds = + GenCommandsBatched("HDEL", GenRedisKey(external_storage_namespace_, table), keys); auto total_count = del_cmds.size(); auto finished_count = std::make_shared(0); auto num_deleted = std::make_shared(0); @@ -381,28 +325,27 @@ Status RedisStoreClient::DeleteByKeys(const std::vector &keys, return Status::OK(); } -RedisStoreClient::RedisScanner::RedisScanner( - std::shared_ptr redis_client, - const std::string &external_storage_namespace, - const std::string &table_name) +RedisStoreClient::RedisScanner::RedisScanner(std::shared_ptr redis_client, + const std::string &table_name, + const std::string &match_pattern) : table_name_(table_name), - external_storage_namespace_(external_storage_namespace), + match_pattern_(match_pattern), redis_client_(std::move(redis_client)) { + RAY_CHECK(!match_pattern.empty()); cursor_ = 0; + pending_request_count_ = 0; } Status RedisStoreClient::RedisScanner::ScanKeysAndValues( - const std::string &match_pattern, const MapCallback &callback) { auto on_done = [this, callback](const Status &status) { callback(std::move(results_)); }; - Scan(match_pattern, on_done); + Scan(on_done); return Status::OK(); } -void RedisStoreClient::RedisScanner::Scan(const std::string &match_pattern, - const StatusCallback &callback) { +void RedisStoreClient::RedisScanner::Scan(const StatusCallback &callback) { // This lock guards cursor_ because the callbacks // can modify cursor_. If performance is a concern, // we should consider using a reader-writer lock. @@ -415,26 +358,23 @@ void RedisStoreClient::RedisScanner::Scan(const std::string &match_pattern, size_t batch_count = RayConfig::instance().maximum_gcs_storage_operation_batch_size(); ++pending_request_count_; - auto scan_callback = - [this, match_pattern, callback](const std::shared_ptr &reply) { - OnScanCallback(match_pattern, reply, callback); - }; + auto scan_callback = [this, callback](const std::shared_ptr &reply) { + OnScanCallback(reply, callback); + }; // Scan by prefix from Redis. - std::vector args = {"HSCAN", - external_storage_namespace_, - std::to_string(cursor_.value()), - "MATCH", - match_pattern, - "COUNT", - std::to_string(batch_count)}; + std::vector args = {"HSCAN", table_name_, std::to_string(cursor_.value())}; + if (match_pattern_ != "*") { + args.push_back("MATCH"); + args.push_back(match_pattern_); + } + args.push_back("COUNT"); + args.push_back(std::to_string(batch_count)); auto primary_context = redis_client_->GetPrimaryContext(); primary_context->RunArgvAsync(args, scan_callback); } void RedisStoreClient::RedisScanner::OnScanCallback( - const std::string &match_pattern, - const std::shared_ptr &reply, - const StatusCallback &callback) { + const std::shared_ptr &reply, const StatusCallback &callback) { RAY_CHECK(reply); std::vector scan_result; size_t cursor = reply->ReadAsScanArray(&scan_result); @@ -448,50 +388,59 @@ void RedisStoreClient::RedisScanner::OnScanCallback( } else { cursor_ = cursor; } + // Result is an array of key-value pairs. RAY_CHECK(scan_result.size() % 2 == 0); for (size_t i = 0; i < scan_result.size(); i += 2) { - auto key = GetKeyFromRedisKey( - external_storage_namespace_, std::move(scan_result[i]), table_name_); - results_.emplace(std::move(key), std::move(scan_result[i + 1])); + results_.emplace(std::move(scan_result[i]), std::move(scan_result[i + 1])); } } // If pending_request_count_ is equal to 0, it means that the scan of this batch is // completed and the next batch is started if any. if (--pending_request_count_ == 0) { - Scan(match_pattern, callback); + Scan(callback); } } -int RedisStoreClient::GetNextJobID() { return redis_client_->GetNextJobID(); } +int RedisStoreClient::GetNextJobID() { + // Note: This is not a HASH! It's a simple key-value pair. + // Key: "RAYexternal_storage_namespace@JobCounter" + // Value: The next job ID. + std::string key = GenRedisKey(external_storage_namespace_, "JobCounter"); + std::vector args = {"INCRBY", key, "1"}; + + auto cxt = redis_client_->GetPrimaryContext(); + auto reply = cxt->RunArgvSync(args); + return reply->ReadAsInteger(); +} Status RedisStoreClient::AsyncGetKeys( const std::string &table_name, const std::string &prefix, std::function)> callback) { - std::string match_pattern = - GenKeyRedisMatchPattern(external_storage_namespace_, table_name, prefix); auto scanner = std::make_shared( - redis_client_, external_storage_namespace_, table_name); - - auto on_done = [table_name, callback, scanner](auto redis_result) { - std::vector result; - result.reserve(redis_result.size()); - for (const auto &[key, _] : redis_result) { - result.push_back(key); - } - callback(std::move(result)); - }; - return scanner->ScanKeysAndValues(match_pattern, on_done); + redis_client_, + /*table_name=*/GenRedisKey(external_storage_namespace_, table_name), + GenRedisKeyPrefixMatchPattern(prefix)); + // Needs to keep a reference to the scanner object. + return scanner->ScanKeysAndValues( + [scanner, callback](absl::flat_hash_map &&result) { + std::vector keys; + keys.reserve(result.size()); + for (const auto &[k, v] : result) { + keys.push_back(k); + } + callback(std::move(keys)); + }); } Status RedisStoreClient::AsyncExists(const std::string &table_name, const std::string &key, std::function callback) { - std::string redis_key = GenRedisKey(external_storage_namespace_, table_name, key); - std::vector args = {"HEXISTS", external_storage_namespace_, redis_key}; + std::vector args = { + "HEXISTS", GenRedisKey(external_storage_namespace_, table_name), key}; SendRedisCmd( - {redis_key}, + {key}, std::move(args), [callback = std::move(callback)](const std::shared_ptr &reply) { bool exists = reply->ReadAsInteger() > 0; @@ -500,6 +449,62 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, return Status::OK(); } +class Cleanup { + public: + Cleanup(std::function f) : f_(f) {} + ~Cleanup() { f_(); } + + private: + std::function f_; +}; + +bool RedisDelKeyPrefixSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &key_prefix) { + RedisClientOptions options(host, port, password, use_ssl); + auto cli = std::make_unique(options); + + instrumented_io_context io_service; + + auto thread = std::make_unique([&]() { + boost::asio::io_service::work work(io_service); + io_service.run(); + }); + + Cleanup _([&]() { + io_service.stop(); + thread->join(); + }); + + auto status = cli->Connect(io_service); + RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString(); + + auto context = cli->GetPrimaryContext(); + auto cmd = std::vector{"KEYS", GenRedisKeyPrefixMatchPattern(key_prefix)}; + auto reply = context->RunArgvSync(cmd); + const auto keys = reply->ReadAsStringArray(); + if (keys.empty()) { + RAY_LOG(INFO) << "No keys found for prefix " << key_prefix; + return true; + } + auto del_cmd = std::vector{"DEL"}; + for (const auto &key : keys) { + if (key.has_value()) { + del_cmd.push_back(key.value()); + } + } + auto del_reply = context->RunArgvSync(del_cmd); + if (del_reply->ReadAsInteger() > 0) { + RAY_LOG(INFO) << "Successfully deleted keys with prefix " << key_prefix; + return true; + } else { + RAY_LOG(ERROR) << "Failed to delete keys with prefix " << key_prefix; + return false; + } +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 9bbba79e02e8..2a234fd0a34b 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -30,6 +30,25 @@ namespace ray { namespace gcs { +// StoreClient using Redis as persistence backend. +// Note in redis term a "key" points to a hash table and a "field" is a key, a "value" +// is just a value. We double quote "key" and "field" to avoid confusion. +// +// In variable namings, we stick to the table - key - value terminology. +// +// Schema: +// - Each table is a Redis HASH. The HASH "key" is +// "RAY" + `external_storage_namespace` + "@" + `table_name`. +// The RAY prefix is in case external_storage_namespace being empty. +// - Each key-value pair in the hash is a row in the table. The "field" is the key. +// +// Consistency: +// - All Put/Get/Delete operations to a same (table, key) pair are serialized, see #35123. +// - For MultiGet/BatchDelete operations, they are subject to *all* keys in the operation, +// i.e. only after it's at the queue front of all keys, it will be processed. +// - A big loophole is GetAll and AsyncGetKeys. They're not serialized with other +// operations, since "since it's either RPC call or used during initializing GCS". [1] +// [1] https://github.com/ray-project/ray/pull/35123#issuecomment-1546549046 class RedisStoreClient : public StoreClient { public: explicit RedisStoreClient(std::shared_ptr redis_client); @@ -71,30 +90,33 @@ class RedisStoreClient : public StoreClient { private: /// \class RedisScanner - /// This class is used to scan data from Redis. /// - /// If you called one method, should never call the other methods. - /// Otherwise it will disturb the status of the RedisScanner. + /// This class is used to HSCAN data from a Redis table. For our purpose, pattern + /// matching is not supported (always returns all keys in the table, that is "*"). + /// + /// The scan is not locked with other operations. It's not guaranteed to be consistent + /// with other operations. It's batched by + /// RAY_maximum_gcs_storage_operation_batch_size. + /// + /// Callers can only call ScanKeysAndValues or ScanKeys once. class RedisScanner { public: explicit RedisScanner(std::shared_ptr redis_client, - const std::string &external_storage_namespace, - const std::string &table_name); + const std::string &table_name, + const std::string &match_pattern); - Status ScanKeysAndValues(const std::string &match_pattern, - const MapCallback &callback); + Status ScanKeysAndValues(const MapCallback &callback); private: - void Scan(const std::string &match_pattern, const StatusCallback &callback); + void Scan(const StatusCallback &callback); - void OnScanCallback(const std::string &match_pattern, - const std::shared_ptr &reply, + void OnScanCallback(const std::shared_ptr &reply, const StatusCallback &callback); /// The table name that the scanner will scan. std::string table_name_; - // The namespace of the external storage. Used for isolation. - std::string external_storage_namespace_; + /// The pattern to match the keys. + std::string match_pattern_; /// Mutex to protect the cursor_ field and the keys_ field and the /// key_value_map_ field. @@ -132,12 +154,8 @@ class RedisStoreClient : public StoreClient { std::vector> TakeRequestsFromSendingQueue( const std::vector &keys) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - Status DoPut(const std::string &key, - const std::string &data, - bool overwrite, - std::function callback); - - Status DeleteByKeys(const std::vector &keys, + Status DeleteByKeys(const std::string &table_name, + const std::vector &keys, std::function callback); // Send the redis command to the server. This method will make request to be @@ -151,6 +169,9 @@ class RedisStoreClient : public StoreClient { std::vector args, RedisCallback redis_callback); + // HMGET external_storage_namespace@table_name key1 key2 ... + // `keys` are chunked to multiple HMGET commands by + // RAY_maximum_gcs_storage_operation_batch_size. void MGetValues(const std::string &table_name, const std::vector &keys, const MapCallback &callback); @@ -166,6 +187,13 @@ class RedisStoreClient : public StoreClient { FRIEND_TEST(RedisStoreClientTest, Random); }; +// Helper function used by Python to delete all redis HASHes with a given prefix. +bool RedisDelKeyPrefixSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &key_prefix); + } // namespace gcs } // namespace ray From a1fc834a8048ef2fa9a1b1b0462df94b889ca7a4 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 30 Jul 2024 00:51:53 -0700 Subject: [PATCH 02/29] wording Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/redis_store_client.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 2a234fd0a34b..c2eef286f321 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -39,7 +39,6 @@ namespace gcs { // Schema: // - Each table is a Redis HASH. The HASH "key" is // "RAY" + `external_storage_namespace` + "@" + `table_name`. -// The RAY prefix is in case external_storage_namespace being empty. // - Each key-value pair in the hash is a row in the table. The "field" is the key. // // Consistency: @@ -91,8 +90,7 @@ class RedisStoreClient : public StoreClient { private: /// \class RedisScanner /// - /// This class is used to HSCAN data from a Redis table. For our purpose, pattern - /// matching is not supported (always returns all keys in the table, that is "*"). + /// This class is used to HSCAN data from a Redis table. /// /// The scan is not locked with other operations. It's not guaranteed to be consistent /// with other operations. It's batched by From a5dad23b21e0c217599d826c3050850c59f7c48e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 20 Aug 2024 15:04:44 -0700 Subject: [PATCH 03/29] fixes Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_server/store_client_kv.cc | 1 + .../gcs/store_client/redis_store_client.cc | 48 +++++++++++-------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/src/ray/gcs/gcs_server/store_client_kv.cc b/src/ray/gcs/gcs_server/store_client_kv.cc index 11bf15b10291..17228bff6f86 100644 --- a/src/ray/gcs/gcs_server/store_client_kv.cc +++ b/src/ray/gcs/gcs_server/store_client_kv.cc @@ -150,6 +150,7 @@ void StoreClientInternalKV::Keys(const std::string &ns, MakeKey(ns, prefix), [callback = std::move(callback)](std::vector keys) { std::vector true_keys; + true_keys.reserve(keys.size()); for (auto &key : keys) { true_keys.emplace_back(ExtractKey(key)); } diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index ddb54be0084c..1ab429f14b81 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include "absl/strings/match.h" #include "absl/strings/str_cat.h" @@ -34,7 +35,7 @@ const std::string_view kClusterSeparator = "@"; // escape them with / according to the doc: // https://redis.io/commands/keys/ std::string EscapeMatchPattern(const std::string &s) { - static std::regex kSpecialChars("\\[|\\]|-|\\?|\\*|\\^|\\\\"); + static std::regex kSpecialChars(R"(\[|\]|-|\?|\*|\^|\\)"); return std::regex_replace(s, kSpecialChars, "\\$&"); } @@ -77,24 +78,28 @@ void RedisStoreClient::MGetValues(const std::string &table_name, auto key_value_map = std::make_shared>(); for (auto &command : batched_commands) { + // Each command is "HMGET ...". + // Get the keys from the command by skipping the first two elements (HMGET and hash) std::vector partition_keys(command.begin() + 2, command.end()); - auto mget_callback = - [finished_count, total_count, partition_keys, callback, key_value_map]( - const std::shared_ptr &reply) { - if (!reply->IsNil()) { - auto value = reply->ReadAsStringArray(); - for (size_t index = 0; index < value.size(); ++index) { - if (value[index].has_value()) { - (*key_value_map)[partition_keys[index]] = *(value[index]); - } - } + auto mget_callback = [finished_count, + total_count, + partition_keys, + callback = std::move(callback), + key_value_map](const std::shared_ptr &reply) { + if (!reply->IsNil()) { + auto value = reply->ReadAsStringArray(); + for (size_t index = 0; index < value.size(); ++index) { + if (value[index].has_value()) { + (*key_value_map)[partition_keys[index]] = *(value[index]); } + } + } - ++(*finished_count); - if (*finished_count == total_count) { - callback(std::move(*key_value_map)); - } - }; + ++(*finished_count); + if (*finished_count == total_count) { + callback(std::move(*key_value_map)); + } + }; SendRedisCmd(std::move(partition_keys), std::move(command), std::move(mget_callback)); } } @@ -331,7 +336,8 @@ RedisStoreClient::RedisScanner::RedisScanner(std::shared_ptr redis_ : table_name_(table_name), match_pattern_(match_pattern), redis_client_(std::move(redis_client)) { - RAY_CHECK(!match_pattern.empty()); + RAY_CHECK(!match_pattern.empty()) << "Match pattern should not be empty. Are you " + "calling AsyncGetKeys with an empty prefix?"; cursor_ = 0; pending_request_count_ = 0; } @@ -358,7 +364,8 @@ void RedisStoreClient::RedisScanner::Scan(const StatusCallback &callback) { size_t batch_count = RayConfig::instance().maximum_gcs_storage_operation_batch_size(); ++pending_request_count_; - auto scan_callback = [this, callback](const std::shared_ptr &reply) { + auto scan_callback = [this, callback = std::move(callback)]( + const std::shared_ptr &reply) { OnScanCallback(reply, callback); }; // Scan by prefix from Redis. @@ -451,13 +458,14 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, class Cleanup { public: - Cleanup(std::function f) : f_(f) {} + explicit Cleanup(std::function f) : f_(std::move(f)) {} ~Cleanup() { f_(); } private: std::function f_; }; +// Returns True if at least 1 key is deleted, False otherwise. bool RedisDelKeyPrefixSync(const std::string &host, int32_t port, const std::string &password, @@ -484,7 +492,7 @@ bool RedisDelKeyPrefixSync(const std::string &host, auto context = cli->GetPrimaryContext(); auto cmd = std::vector{"KEYS", GenRedisKeyPrefixMatchPattern(key_prefix)}; auto reply = context->RunArgvSync(cmd); - const auto keys = reply->ReadAsStringArray(); + const auto &keys = reply->ReadAsStringArray(); if (keys.empty()) { RAY_LOG(INFO) << "No keys found for prefix " << key_prefix; return true; From 35c67766647bb517bbe0ed0f73e73d3eb405902c Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 20 Aug 2024 16:27:01 -0700 Subject: [PATCH 04/29] lint Signed-off-by: Ruiyang Wang --- python/ray/includes/global_state_accessor.pxd | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 9c81a3bd504f..3b0310f17d78 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -129,7 +129,7 @@ cdef extern from * namespace "ray::gcs" nogil: cdef extern from * namespace "ray::gcs" nogil: c_bool RedisDelKeyPrefixSync(const c_string& host, - c_int32_t port, - const c_string& password, - c_bool use_ssl, - const c_string& key) + c_int32_t port, + const c_string& password, + c_bool use_ssl, + const c_string& key) From dc0069857ce2108b759720160cf3287dcb1e76b7 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 21 Aug 2024 13:21:44 -0700 Subject: [PATCH 05/29] typed Signed-off-by: Ruiyang Wang --- .../gcs/store_client/redis_store_client.cc | 217 +++++++++--------- src/ray/gcs/store_client/redis_store_client.h | 74 ++++-- 2 files changed, 172 insertions(+), 119 deletions(-) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 1ab429f14b81..bf25113f4473 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -39,58 +39,58 @@ std::string EscapeMatchPattern(const std::string &s) { return std::regex_replace(s, kSpecialChars, "\\$&"); } -std::string GenRedisKey(const std::string &external_storage_namespace, - const std::string &table_name) { - return absl::StrCat("RAY", external_storage_namespace, kClusterSeparator, table_name); -} - -std::string GenRedisKeyPrefixMatchPattern(const std::string &key_prefix) { - return absl::StrCat(EscapeMatchPattern(key_prefix), "*"); -} - -std::vector> GenCommandsBatched( - const std::string &command, - const std::string &hash_field, - const std::vector &keys) { - std::vector> batched_requests; - for (auto &key : keys) { +// Assume `command` can take arbitary number of keys. Chunk the args into multiple +// commands with the same command name and the same redis_key. Each chunk has at most +// `maximum_gcs_storage_operation_batch_size` keys. +std::vector GenCommandsBatched(const std::string &command, + const RedisKey &redis_key, + const std::vector &args) { + std::vector batched_requests; + for (auto &arg : args) { // If it's empty or the last batch is full, add a new batch. if (batched_requests.empty() || - batched_requests.back().size() >= - RayConfig::instance().maximum_gcs_storage_operation_batch_size() + 2) { - batched_requests.emplace_back(std::vector{command, hash_field}); + batched_requests.back().args.size() >= + RayConfig::instance().maximum_gcs_storage_operation_batch_size()) { + batched_requests.emplace_back(RedisCommand{command, redis_key, {}}); } - batched_requests.back().push_back(key); + batched_requests.back().args.push_back(arg); } return batched_requests; } } // namespace +RedisKey::RedisKey(absl::string_view external_storage_namespace, + absl::string_view table_name) + : redis_key(absl::StrCat( + "RAY", external_storage_namespace, kClusterSeparator, table_name)) {} + +RedisMatchPattern RedisMatchPattern::Prefix(const std::string &prefix) { + return RedisMatchPattern(absl::StrCat(EscapeMatchPattern(prefix), "*")); +} + void RedisStoreClient::MGetValues(const std::string &table_name, const std::vector &keys, const MapCallback &callback) { // The `HMGET` command for each shard. auto batched_commands = GenCommandsBatched( - "HMGET", GenRedisKey(external_storage_namespace_, table_name), keys); + "HMGET", RedisKey(external_storage_namespace_, table_name), keys); auto total_count = batched_commands.size(); auto finished_count = std::make_shared(0); auto key_value_map = std::make_shared>(); for (auto &command : batched_commands) { - // Each command is "HMGET ...". - // Get the keys from the command by skipping the first two elements (HMGET and hash) - std::vector partition_keys(command.begin() + 2, command.end()); auto mget_callback = [finished_count, total_count, - partition_keys, - callback = std::move(callback), + // Copies! + args = command.args, + callback, key_value_map](const std::shared_ptr &reply) { if (!reply->IsNil()) { auto value = reply->ReadAsStringArray(); for (size_t index = 0; index < value.size(); ++index) { if (value[index].has_value()) { - (*key_value_map)[partition_keys[index]] = *(value[index]); + (*key_value_map)[args[index]] = *(value[index]); } } } @@ -100,7 +100,7 @@ void RedisStoreClient::MGetValues(const std::string &table_name, callback(std::move(*key_value_map)); } }; - SendRedisCmd(std::move(partition_keys), std::move(command), std::move(mget_callback)); + SendRedisCmd(command.args, std::move(command), std::move(mget_callback)); } } @@ -117,10 +117,9 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, const std::string &data, bool overwrite, std::function callback) { - std::vector args = {overwrite ? "HSET" : "HSETNX", - GenRedisKey(external_storage_namespace_, table_name), - key, - data}; + RedisCommand command{.command = overwrite ? "HSET" : "HSETNX", + .redis_key = RedisKey(external_storage_namespace_, table_name), + .args = {key, data}}; RedisCallback write_callback = nullptr; if (callback) { write_callback = @@ -129,7 +128,7 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, callback(added_num != 0); }; } - SendRedisCmd({key}, std::move(args), std::move(write_callback)); + SendRedisCmd({key}, std::move(command), std::move(write_callback)); return Status::OK(); } @@ -145,12 +144,13 @@ Status RedisStoreClient::AsyncGet(const std::string &table_name, } RAY_CHECK(!reply->IsError()) << "Failed to get from Redis with status: " << reply->ReadAsStatus(); - callback(Status::OK(), std::move(result)); + callback(Status::OK(), result); }; - std::string redis_key = GenRedisKey(external_storage_namespace_, table_name); - std::vector args = {"HGET", redis_key, /*field=*/key}; - SendRedisCmd({redis_key}, std::move(args), std::move(redis_callback)); + RedisCommand command{.command = "HGET", + .redis_key = RedisKey(external_storage_namespace_, table_name), + .args = {key}}; + SendRedisCmd({key}, std::move(command), std::move(redis_callback)); return Status::OK(); } @@ -158,10 +158,11 @@ Status RedisStoreClient::AsyncGetAll( const std::string &table_name, const MapCallback &callback) { RAY_CHECK(callback); - auto scanner = std::make_shared( - redis_client_, GenRedisKey(external_storage_namespace_, table_name), "*"); - return scanner->ScanKeysAndValues( - [scanner, callback](auto kv) { callback(std::move(kv)); }); + RedisScanner::ScanKeysAndValues(redis_client_, + RedisKey(external_storage_namespace_, table_name), + RedisMatchPattern::Any(), + callback); + return Status::OK(); } Status RedisStoreClient::AsyncDelete(const std::string &table_name, @@ -218,7 +219,7 @@ size_t RedisStoreClient::PushToSendingQueue(const std::vector &keys // this queue. op_iter->second.push(nullptr); } else { - op_iter->second.push(send_request); + op_iter->second.push(std::move(send_request)); } } return queue_added; @@ -243,7 +244,7 @@ std::vector> RedisStoreClient::TakeRequestsFromSendingQueu } void RedisStoreClient::SendRedisCmd(std::vector keys, - std::vector args, + RedisCommand command, RedisCallback redis_callback) { RAY_CHECK(!keys.empty()); // The number of keys that's ready for this request. @@ -254,7 +255,7 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, std::function send_redis = [this, num_ready_keys = num_ready_keys, keys, - args = std::move(args), + command = std::move(command), redis_callback = std::move(redis_callback)]() mutable { { @@ -268,7 +269,7 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, } // Send the actual request auto cxt = redis_client_->GetPrimaryContext(); - cxt->RunArgvAsync(std::move(args), + cxt->RunArgvAsync(command.ToRedisArgs(), [this, keys = std::move(keys), redis_callback = std::move(redis_callback)](auto reply) { @@ -307,81 +308,87 @@ Status RedisStoreClient::DeleteByKeys(const std::string &table, const std::vector &keys, std::function callback) { auto del_cmds = - GenCommandsBatched("HDEL", GenRedisKey(external_storage_namespace_, table), keys); + GenCommandsBatched("HDEL", RedisKey(external_storage_namespace_, table), keys); auto total_count = del_cmds.size(); auto finished_count = std::make_shared(0); auto num_deleted = std::make_shared(0); auto context = redis_client_->GetPrimaryContext(); for (auto &command : del_cmds) { - std::vector partition_keys(command.begin() + 2, command.end()); - auto delete_callback = [num_deleted, finished_count, total_count, callback]( - const std::shared_ptr &reply) { - (*num_deleted) += reply->ReadAsInteger(); - ++(*finished_count); - if (*finished_count == total_count) { - if (callback) { - callback(*num_deleted); - } - } - }; - SendRedisCmd( - std::move(partition_keys), std::move(command), std::move(delete_callback)); + auto delete_callback = + [num_deleted, finished_count, total_count, callback = std::move(callback)]( + const std::shared_ptr &reply) { + (*num_deleted) += reply->ReadAsInteger(); + ++(*finished_count); + if (*finished_count == total_count) { + if (callback) { + callback(*num_deleted); + } + } + }; + SendRedisCmd(command.args, std::move(command), std::move(delete_callback)); } return Status::OK(); } -RedisStoreClient::RedisScanner::RedisScanner(std::shared_ptr redis_client, - const std::string &table_name, - const std::string &match_pattern) - : table_name_(table_name), - match_pattern_(match_pattern), - redis_client_(std::move(redis_client)) { - RAY_CHECK(!match_pattern.empty()) << "Match pattern should not be empty. Are you " - "calling AsyncGetKeys with an empty prefix?"; +RedisStoreClient::RedisScanner::RedisScanner( + PrivateCtorTag ctor_tag, + std::shared_ptr redis_client, + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback) + : redis_key_(std::move(redis_key)), + match_pattern_(std::move(match_pattern)), + redis_client_(std::move(redis_client)), + callback_(std::move(callback)) { cursor_ = 0; pending_request_count_ = 0; } -Status RedisStoreClient::RedisScanner::ScanKeysAndValues( - const MapCallback &callback) { - auto on_done = [this, callback](const Status &status) { - callback(std::move(results_)); - }; - Scan(on_done); - return Status::OK(); +void RedisStoreClient::RedisScanner::ScanKeysAndValues( + std::shared_ptr redis_client, + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback) { + auto scanner = std::make_shared(PrivateCtorTag(), + std::move(redis_client), + std::move(redis_key), + std::move(match_pattern), + std::move(callback)); + scanner->Scan(); } -void RedisStoreClient::RedisScanner::Scan(const StatusCallback &callback) { +void RedisStoreClient::RedisScanner::Scan() { // This lock guards cursor_ because the callbacks // can modify cursor_. If performance is a concern, // we should consider using a reader-writer lock. absl::MutexLock lock(&mutex_); if (!cursor_.has_value()) { - callback(Status::OK()); - return; + callback_(std::move(results_)); } size_t batch_count = RayConfig::instance().maximum_gcs_storage_operation_batch_size(); ++pending_request_count_; - auto scan_callback = [this, callback = std::move(callback)]( - const std::shared_ptr &reply) { - OnScanCallback(reply, callback); - }; // Scan by prefix from Redis. - std::vector args = {"HSCAN", table_name_, std::to_string(cursor_.value())}; - if (match_pattern_ != "*") { - args.push_back("MATCH"); - args.push_back(match_pattern_); + RedisCommand command = {"HSCAN", redis_key_, {std::to_string(cursor_.value())}}; + if (match_pattern_.escaped != "*") { + command.args.push_back("MATCH"); + command.args.push_back(match_pattern_.escaped); } - args.push_back("COUNT"); - args.push_back(std::to_string(batch_count)); + command.args.push_back("COUNT"); + command.args.push_back(std::to_string(batch_count)); auto primary_context = redis_client_->GetPrimaryContext(); - primary_context->RunArgvAsync(args, scan_callback); + primary_context->RunArgvAsync( + command.ToRedisArgs(), + [this, + // keeps myself from being destructed waiting for async. + shared_this = shared_from_this()](const std::shared_ptr &reply) { + OnScanCallback(reply); + }); } void RedisStoreClient::RedisScanner::OnScanCallback( - const std::shared_ptr &reply, const StatusCallback &callback) { + const std::shared_ptr &reply) { RAY_CHECK(reply); std::vector scan_result; size_t cursor = reply->ReadAsScanArray(&scan_result); @@ -405,7 +412,7 @@ void RedisStoreClient::RedisScanner::OnScanCallback( // If pending_request_count_ is equal to 0, it means that the scan of this batch is // completed and the next batch is started if any. if (--pending_request_count_ == 0) { - Scan(callback); + Scan(); } } @@ -413,25 +420,23 @@ int RedisStoreClient::GetNextJobID() { // Note: This is not a HASH! It's a simple key-value pair. // Key: "RAYexternal_storage_namespace@JobCounter" // Value: The next job ID. - std::string key = GenRedisKey(external_storage_namespace_, "JobCounter"); - std::vector args = {"INCRBY", key, "1"}; + RedisCommand command = { + "INCRBY", RedisKey(external_storage_namespace_, "JobCounter"), {"1"}}; auto cxt = redis_client_->GetPrimaryContext(); - auto reply = cxt->RunArgvSync(args); - return reply->ReadAsInteger(); + auto reply = cxt->RunArgvSync(command.ToRedisArgs()); + return static_cast(reply->ReadAsInteger()); } Status RedisStoreClient::AsyncGetKeys( const std::string &table_name, const std::string &prefix, std::function)> callback) { - auto scanner = std::make_shared( + RedisScanner::ScanKeysAndValues( redis_client_, - /*table_name=*/GenRedisKey(external_storage_namespace_, table_name), - GenRedisKeyPrefixMatchPattern(prefix)); - // Needs to keep a reference to the scanner object. - return scanner->ScanKeysAndValues( - [scanner, callback](absl::flat_hash_map &&result) { + RedisKey(external_storage_namespace_, table_name), + RedisMatchPattern::Prefix(prefix), + [callback](absl::flat_hash_map &&result) { std::vector keys; keys.reserve(result.size()); for (const auto &[k, v] : result) { @@ -439,16 +444,17 @@ Status RedisStoreClient::AsyncGetKeys( } callback(std::move(keys)); }); + return Status::OK(); } Status RedisStoreClient::AsyncExists(const std::string &table_name, const std::string &key, std::function callback) { - std::vector args = { - "HEXISTS", GenRedisKey(external_storage_namespace_, table_name), key}; + RedisCommand command = { + "HEXISTS", RedisKey(external_storage_namespace_, table_name), {key}}; SendRedisCmd( {key}, - std::move(args), + std::move(command), [callback = std::move(callback)](const std::shared_ptr &reply) { bool exists = reply->ReadAsInteger() > 0; callback(exists); @@ -490,7 +496,7 @@ bool RedisDelKeyPrefixSync(const std::string &host, RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString(); auto context = cli->GetPrimaryContext(); - auto cmd = std::vector{"KEYS", GenRedisKeyPrefixMatchPattern(key_prefix)}; + std::vector cmd{"KEYS", RedisMatchPattern::Prefix(key_prefix).escaped}; auto reply = context->RunArgvSync(cmd); const auto &keys = reply->ReadAsStringArray(); if (keys.empty()) { @@ -507,10 +513,9 @@ bool RedisDelKeyPrefixSync(const std::string &host, if (del_reply->ReadAsInteger() > 0) { RAY_LOG(INFO) << "Successfully deleted keys with prefix " << key_prefix; return true; - } else { - RAY_LOG(ERROR) << "Failed to delete keys with prefix " << key_prefix; - return false; } + RAY_LOG(ERROR) << "Failed to delete keys with prefix " << key_prefix; + return false; } } // namespace gcs diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index c2eef286f321..5d2894466127 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -30,6 +30,42 @@ namespace ray { namespace gcs { +// Typed key to avoid forgetting to prepend external_storage_namespace. +struct RedisKey { + explicit RedisKey(absl::string_view external_storage_namespace, + absl::string_view table_name); + const std::string redis_key; +}; + +struct RedisMatchPattern { + static RedisMatchPattern Prefix(const std::string &prefix); + static RedisMatchPattern Any() { + static const RedisMatchPattern kAny("*"); + return kAny; + } + const std::string escaped; + + private: + explicit RedisMatchPattern(std::string escaped) : escaped(std::move(escaped)) {} +}; + +struct RedisCommand { + std::string command; + // Redis "key" referring to a HASH. + RedisKey redis_key; + std::vector args; + + std::vector ToRedisArgs() const { + std::vector redis_args; + redis_args.push_back(command); + redis_args.push_back(redis_key.redis_key); + for (const auto &arg : args) { + redis_args.push_back(arg); + } + return redis_args; + } +}; + // StoreClient using Redis as persistence backend. // Note in redis term a "key" points to a hash table and a "field" is a key, a "value" // is just a value. We double quote "key" and "field" to avoid confusion. @@ -95,26 +131,36 @@ class RedisStoreClient : public StoreClient { /// The scan is not locked with other operations. It's not guaranteed to be consistent /// with other operations. It's batched by /// RAY_maximum_gcs_storage_operation_batch_size. - /// - /// Callers can only call ScanKeysAndValues or ScanKeys once. - class RedisScanner { + class RedisScanner : std::enable_shared_from_this { + private: + // We want a private ctor but can use make_shared. + // See https://en.cppreference.com/w/cpp/memory/enable_shared_from_this + struct PrivateCtorTag {}; + public: - explicit RedisScanner(std::shared_ptr redis_client, - const std::string &table_name, - const std::string &match_pattern); + explicit RedisScanner(PrivateCtorTag tag, + std::shared_ptr redis_client, + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback); - Status ScanKeysAndValues(const MapCallback &callback); + static void ScanKeysAndValues(std::shared_ptr redis_client, + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback); private: - void Scan(const StatusCallback &callback); + // Scans the keys and values, one batch a time. Once all keys are scanned, the + // callback will be called. When the calls are in progress, the scanner temporarily + // holds its own reference so users don't need to keep it alive. + void Scan(); - void OnScanCallback(const std::shared_ptr &reply, - const StatusCallback &callback); + void OnScanCallback(const std::shared_ptr &reply); /// The table name that the scanner will scan. - std::string table_name_; + RedisKey redis_key_; /// The pattern to match the keys. - std::string match_pattern_; + RedisMatchPattern match_pattern_; /// Mutex to protect the cursor_ field and the keys_ field and the /// key_value_map_ field. @@ -130,6 +176,8 @@ class RedisStoreClient : public StoreClient { std::atomic pending_request_count_{0}; std::shared_ptr redis_client_; + + MapCallback callback_; }; // Push a request to the sending queue. @@ -164,7 +212,7 @@ class RedisStoreClient : public StoreClient { // \param args The redis commands // \param redis_callback The callback to call when the reply is received. void SendRedisCmd(std::vector keys, - std::vector args, + RedisCommand command, RedisCallback redis_callback); // HMGET external_storage_namespace@table_name key1 key2 ... From 627273803c9360c139e8bbe3999d5e17a25951fe Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 21 Aug 2024 13:28:20 -0700 Subject: [PATCH 06/29] comment example Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/redis_store_client.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index bf25113f4473..a92480fb8c20 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -403,6 +403,9 @@ void RedisStoreClient::RedisScanner::OnScanCallback( cursor_ = cursor; } // Result is an array of key-value pairs. + // scan_result[i] = key, scan_result[i+1] = value + // Example req: HSCAN hash_with_cluster_id_for_Jobs + // scan_result = job1 job1_value job2 job2_value RAY_CHECK(scan_result.size() % 2 == 0); for (size_t i = 0; i < scan_result.size(); i += 2) { results_.emplace(std::move(scan_result[i]), std::move(scan_result[i + 1])); From f1f527fb196ca5f5dae3b7a34794e704875a3dec Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:15:24 -0700 Subject: [PATCH 07/29] Update python/ray/_private/gcs_utils.py Co-authored-by: Jiajun Yao Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> --- python/ray/_private/gcs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 4f32933db900..3c51b7cabbf1 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -144,6 +144,6 @@ def cleanup_redis_storage( if not isinstance(storage_namespace, str): raise ValueError("storage namespace must be a string") - # Right now, GCS store all data into a hash keys prefixed by storage_namespace. + # Right now, GCS stores all data into multiple hashes with keys prefixed by storage_namespace. # So we only need to delete the specific key to cleanup the cluster. return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace) From 2fd9155f14e06d17c2ab56a457b418bf53db01db Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:15:31 -0700 Subject: [PATCH 08/29] Update python/ray/_raylet.pyx Co-authored-by: Jiajun Yao Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> --- python/ray/_raylet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 4009a5edf407..f45738e53de7 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -5177,7 +5177,7 @@ cdef void async_callback(shared_ptr[CRayObject] obj, cpython.Py_DECREF(user_callback) -def del_key_prefix_from_storage(host, port, password, use_ssl, key): +def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix): return RedisDelKeyPrefixSync(host, port, password, use_ssl, key) From cc1a7b5b28165d744c2cdeae3e644bc7bb242410 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:15:42 -0700 Subject: [PATCH 09/29] Update python/ray/includes/global_state_accessor.pxd Co-authored-by: Jiajun Yao Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> --- python/ray/includes/global_state_accessor.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 3b0310f17d78..75c5e46c2559 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -132,4 +132,4 @@ cdef extern from * namespace "ray::gcs" nogil: c_int32_t port, const c_string& password, c_bool use_ssl, - const c_string& key) + const c_string& key_prefix) From 6c245a397ba503bda00f8f336f23f4afdd431945 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 21 Aug 2024 18:15:57 -0700 Subject: [PATCH 10/29] RedisConcurrentKey Signed-off-by: Ruiyang Wang --- .../kubernetes/user-guides/kuberay-gcs-ft.md | 10 +++++-- .../gcs/store_client/redis_store_client.cc | 27 +++++++++++++------ src/ray/gcs/store_client/redis_store_client.h | 26 +++++++++++++++--- src/ray/util/container_util.h | 11 ++++++++ 4 files changed, 60 insertions(+), 14 deletions(-) diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index 469f7dcca96e..c8ae06a52e77 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -136,12 +136,18 @@ export REDIS_POD=$(kubectl get pods --selector=app=redis -o custom-columns=POD:m kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000" # Step 6.4: Check the keys in Redis. +# Note: the schema changed in Ray 2.35.0. Previously we use a single HASH table, +# now we use multiple HASH tables with a common prefix. + KEYS * # [Example output]: -# 1) "864b004c-6305-42e3-ac46-adfa8eb6f752" +# 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752:actors" +# [Example output Before Ray 2.35.0]: +# 2) "864b004c-6305-42e3-ac46-adfa8eb6f752" +# # Step 6.5: Check the value of the key. -SCAN 0 MATCH 864b004c-6305-42e3-ac46-adfa8eb6f752* +SCAN 0 MATCH RAY864b004c-6305-42e3-ac46-adfa8eb6f752* ``` In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster. diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index a92480fb8c20..73110f575172 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -22,6 +22,7 @@ #include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "ray/gcs/redis_context.h" +#include "ray/util/container_util.h" #include "ray/util/logging.h" namespace ray { @@ -100,7 +101,12 @@ void RedisStoreClient::MGetValues(const std::string &table_name, callback(std::move(*key_value_map)); } }; - SendRedisCmd(command.args, std::move(command), std::move(mget_callback)); + SendRedisCmd(ray::mapped(command.args, + [&table_name](const std::string &key) { + return RedisConcurrencyKey{table_name, key}; + }), + std::move(command), + std::move(mget_callback)); } } @@ -128,7 +134,7 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, callback(added_num != 0); }; } - SendRedisCmd({key}, std::move(command), std::move(write_callback)); + SendRedisCmd({{table_name, key}}, std::move(command), std::move(write_callback)); return Status::OK(); } @@ -150,7 +156,7 @@ Status RedisStoreClient::AsyncGet(const std::string &table_name, RedisCommand command{.command = "HGET", .redis_key = RedisKey(external_storage_namespace_, table_name), .args = {key}}; - SendRedisCmd({key}, std::move(command), std::move(redis_callback)); + SendRedisCmd({{table_name, key}}, std::move(command), std::move(redis_callback)); return Status::OK(); } @@ -200,7 +206,7 @@ Status RedisStoreClient::AsyncMultiGet( return Status::OK(); } -size_t RedisStoreClient::PushToSendingQueue(const std::vector &keys, +size_t RedisStoreClient::PushToSendingQueue(const std::vector &keys, std::function send_request) { size_t queue_added = 0; for (const auto &key : keys) { @@ -226,7 +232,7 @@ size_t RedisStoreClient::PushToSendingQueue(const std::vector &keys } std::vector> RedisStoreClient::TakeRequestsFromSendingQueue( - const std::vector &keys) { + const std::vector &keys) { std::vector> send_requests; for (const auto &key : keys) { auto [op_iter, added] = @@ -243,7 +249,7 @@ std::vector> RedisStoreClient::TakeRequestsFromSendingQueu return send_requests; } -void RedisStoreClient::SendRedisCmd(std::vector keys, +void RedisStoreClient::SendRedisCmd(std::vector keys, RedisCommand command, RedisCallback redis_callback) { RAY_CHECK(!keys.empty()); @@ -325,7 +331,12 @@ Status RedisStoreClient::DeleteByKeys(const std::string &table, } } }; - SendRedisCmd(command.args, std::move(command), std::move(delete_callback)); + SendRedisCmd(ray::mapped(command.args, + [&table](const std::string &key) { + return RedisConcurrencyKey{table, key}; + }), + std::move(command), + std::move(delete_callback)); } return Status::OK(); } @@ -456,7 +467,7 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, RedisCommand command = { "HEXISTS", RedisKey(external_storage_namespace_, table_name), {key}}; SendRedisCmd( - {key}, + {{table_name, key}}, std::move(command), [callback = std::move(callback)](const std::shared_ptr &reply) { bool exists = reply->ReadAsInteger() > 0; diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 5d2894466127..8170565d7bc1 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -66,6 +66,24 @@ struct RedisCommand { } }; +struct RedisConcurrencyKey { + std::string table_name; + std::string key; + + template + friend H AbslHashValue(H h, const RedisConcurrencyKey &k) { + return H::combine(std::move(h), k.table_name, k.key); + } + bool operator==(const RedisConcurrencyKey &other) const { + return table_name == other.table_name && key == other.key; + } +}; + +inline std::ostream &operator<<(std::ostream &os, const RedisConcurrencyKey &key) { + os << "{" << key.table_name << ", " << key.key << "}"; + return os; +} + // StoreClient using Redis as persistence backend. // Note in redis term a "key" points to a hash table and a "field" is a key, a "value" // is just a value. We double quote "key" and "field" to avoid confusion. @@ -187,7 +205,7 @@ class RedisStoreClient : public StoreClient { // // \return The number of queues newly added. A queue will be added // only when there is no in-flight request for the key. - size_t PushToSendingQueue(const std::vector &keys, + size_t PushToSendingQueue(const std::vector &keys, std::function send_request) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -198,7 +216,7 @@ class RedisStoreClient : public StoreClient { // // \return The requests to send. std::vector> TakeRequestsFromSendingQueue( - const std::vector &keys) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + const std::vector &keys) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); Status DeleteByKeys(const std::string &table_name, const std::vector &keys, @@ -211,7 +229,7 @@ class RedisStoreClient : public StoreClient { // \param keys The keys in the request. // \param args The redis commands // \param redis_callback The callback to call when the reply is received. - void SendRedisCmd(std::vector keys, + void SendRedisCmd(std::vector keys, RedisCommand command, RedisCallback redis_callback); @@ -228,7 +246,7 @@ class RedisStoreClient : public StoreClient { // The pending redis requests queue for each key. // The queue will be poped when the request is processed. - absl::flat_hash_map>> + absl::flat_hash_map>> pending_redis_request_by_key_ ABSL_GUARDED_BY(mu_); FRIEND_TEST(RedisStoreClientTest, Random); }; diff --git a/src/ray/util/container_util.h b/src/ray/util/container_util.h index 6a363dc09d0f..8f28713fcd44 100644 --- a/src/ray/util/container_util.h +++ b/src/ray/util/container_util.h @@ -130,4 +130,15 @@ void erase_if(std::list &list, std::function predicate) { } } +// [T] -> (T -> U) -> [U] +template +auto mapped(const std::vector &vec, F transform) { + std::vector()))> result; + result.reserve(vec.size()); + for (const auto &elem : vec) { + result.push_back(transform(elem)); + } + return result; +} + } // namespace ray From 33800dbff8c7a0e8fd7b7916eb2d2ceddee3af00 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 21 Aug 2024 18:20:03 -0700 Subject: [PATCH 11/29] update doc Signed-off-by: Ruiyang Wang --- .../cluster/kubernetes/user-guides/kuberay-gcs-ft.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index c8ae06a52e77..7bf1ca5a607c 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -141,13 +141,17 @@ kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000" KEYS * # [Example output]: -# 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752:actors" +# 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@INTERNAL_CONFIG" +# 2) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@KV" +# 3) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE" # [Example output Before Ray 2.35.0]: # 2) "864b004c-6305-42e3-ac46-adfa8eb6f752" # # Step 6.5: Check the value of the key. -SCAN 0 MATCH RAY864b004c-6305-42e3-ac46-adfa8eb6f752* +HGETALL RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE +# Before Ray 2.35.0: +# HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752 ``` In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster. From 4755de571b89ebab3d2cecfea3ecdd89472f5b2b Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 21 Aug 2024 18:20:36 -0700 Subject: [PATCH 12/29] lint Signed-off-by: Ruiyang Wang --- python/ray/_private/gcs_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 3c51b7cabbf1..4aa9a829e645 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -144,6 +144,7 @@ def cleanup_redis_storage( if not isinstance(storage_namespace, str): raise ValueError("storage namespace must be a string") - # Right now, GCS stores all data into multiple hashes with keys prefixed by storage_namespace. - # So we only need to delete the specific key to cleanup the cluster. + # Right now, GCS stores all data into multiple hashes with keys prefixed by + # storage_namespace. So we only need to delete the specific key to cleanup the + # cluster. return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace) From 42a63f42252e110e26e919c5958bc1c58426d4d6 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 22 Aug 2024 11:12:58 -0700 Subject: [PATCH 13/29] fixes Signed-off-by: Ruiyang Wang --- .../kubernetes/user-guides/kuberay-gcs-ft.md | 6 +- python/ray/_raylet.pyx | 2 +- .../gcs/store_client/redis_store_client.cc | 71 +++++++++---------- src/ray/gcs/store_client/redis_store_client.h | 26 ++++--- src/ray/util/container_util.h | 7 +- 5 files changed, 59 insertions(+), 53 deletions(-) diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index 7bf1ca5a607c..8843c395367a 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -136,7 +136,7 @@ export REDIS_POD=$(kubectl get pods --selector=app=redis -o custom-columns=POD:m kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000" # Step 6.4: Check the keys in Redis. -# Note: the schema changed in Ray 2.35.0. Previously we use a single HASH table, +# Note: the schema changed in Ray 2.36.0. Previously we use a single HASH table, # now we use multiple HASH tables with a common prefix. KEYS * @@ -144,13 +144,13 @@ KEYS * # 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@INTERNAL_CONFIG" # 2) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@KV" # 3) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE" -# [Example output Before Ray 2.35.0]: +# [Example output Before Ray 2.36.0]: # 2) "864b004c-6305-42e3-ac46-adfa8eb6f752" # # Step 6.5: Check the value of the key. HGETALL RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE -# Before Ray 2.35.0: +# Before Ray 2.36.0: # HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752 ``` diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f45738e53de7..b9547fcf5680 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -5178,7 +5178,7 @@ cdef void async_callback(shared_ptr[CRayObject] obj, def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix): - return RedisDelKeyPrefixSync(host, port, password, use_ssl, key) + return RedisDelKeyPrefixSync(host, port, password, use_ssl, key_prefix) def get_session_key_from_storage(host, port, password, use_ssl, config, key): diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 73110f575172..6795d0027a5f 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -61,10 +61,10 @@ std::vector GenCommandsBatched(const std::string &command, } // namespace -RedisKey::RedisKey(absl::string_view external_storage_namespace, - absl::string_view table_name) - : redis_key(absl::StrCat( - "RAY", external_storage_namespace, kClusterSeparator, table_name)) {} +std::string RedisKey::ToString() const { + // Something like RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE + return absl::StrCat("RAY", external_storage_namespace, kClusterSeparator, table_name); +} RedisMatchPattern RedisMatchPattern::Prefix(const std::string &prefix) { return RedisMatchPattern(absl::StrCat(EscapeMatchPattern(prefix), "*")); @@ -75,7 +75,7 @@ void RedisStoreClient::MGetValues(const std::string &table_name, const MapCallback &callback) { // The `HMGET` command for each shard. auto batched_commands = GenCommandsBatched( - "HMGET", RedisKey(external_storage_namespace_, table_name), keys); + "HMGET", RedisKey{external_storage_namespace_, table_name}, keys); auto total_count = batched_commands.size(); auto finished_count = std::make_shared(0); auto key_value_map = std::make_shared>(); @@ -101,12 +101,7 @@ void RedisStoreClient::MGetValues(const std::string &table_name, callback(std::move(*key_value_map)); } }; - SendRedisCmd(ray::mapped(command.args, - [&table_name](const std::string &key) { - return RedisConcurrencyKey{table_name, key}; - }), - std::move(command), - std::move(mget_callback)); + SendRedisCmd(std::move(command), std::move(mget_callback)); } } @@ -124,7 +119,7 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, bool overwrite, std::function callback) { RedisCommand command{.command = overwrite ? "HSET" : "HSETNX", - .redis_key = RedisKey(external_storage_namespace_, table_name), + .redis_key = RedisKey{external_storage_namespace_, table_name}, .args = {key, data}}; RedisCallback write_callback = nullptr; if (callback) { @@ -134,7 +129,7 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, callback(added_num != 0); }; } - SendRedisCmd({{table_name, key}}, std::move(command), std::move(write_callback)); + SendRedisCmdWithKeys({key}, std::move(command), std::move(write_callback)); return Status::OK(); } @@ -154,9 +149,9 @@ Status RedisStoreClient::AsyncGet(const std::string &table_name, }; RedisCommand command{.command = "HGET", - .redis_key = RedisKey(external_storage_namespace_, table_name), + .redis_key = RedisKey{external_storage_namespace_, table_name}, .args = {key}}; - SendRedisCmd({{table_name, key}}, std::move(command), std::move(redis_callback)); + SendRedisCmd(std::move(command), std::move(redis_callback)); return Status::OK(); } @@ -165,7 +160,7 @@ Status RedisStoreClient::AsyncGetAll( const MapCallback &callback) { RAY_CHECK(callback); RedisScanner::ScanKeysAndValues(redis_client_, - RedisKey(external_storage_namespace_, table_name), + RedisKey{external_storage_namespace_, table_name}, RedisMatchPattern::Any(), callback); return Status::OK(); @@ -249,10 +244,20 @@ std::vector> RedisStoreClient::TakeRequestsFromSendingQueu return send_requests; } -void RedisStoreClient::SendRedisCmd(std::vector keys, - RedisCommand command, - RedisCallback redis_callback) { +void RedisStoreClient::SendRedisCmd(RedisCommand command, RedisCallback redis_callback) { + auto copied = command.args; + SendRedisCmdWithKeys(std::move(copied), std::move(command), std::move(redis_callback)); +} + +void RedisStoreClient::SendRedisCmdWithKeys(std::vector keys, + RedisCommand command, + RedisCallback redis_callback) { RAY_CHECK(!keys.empty()); + auto concurrency_keys = + ray::move_mapped(std::move(keys), [&command](std::string &&key) { + return RedisConcurrencyKey{command.redis_key.table_name, std::move(key)}; + }); + // The number of keys that's ready for this request. // For a query reading or writing multiple keys, we need a counter // to check whether all existing requests for this keys have been @@ -260,16 +265,16 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, auto num_ready_keys = std::make_shared(0); std::function send_redis = [this, num_ready_keys = num_ready_keys, - keys, + concurrency_keys, // Copied! command = std::move(command), redis_callback = std::move(redis_callback)]() mutable { { absl::MutexLock lock(&mu_); *num_ready_keys += 1; - RAY_CHECK(*num_ready_keys <= keys.size()); + RAY_CHECK(*num_ready_keys <= concurrency_keys.size()); // There are still pending requests for these keys. - if (*num_ready_keys != keys.size()) { + if (*num_ready_keys != concurrency_keys.size()) { return; } } @@ -277,12 +282,12 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, auto cxt = redis_client_->GetPrimaryContext(); cxt->RunArgvAsync(command.ToRedisArgs(), [this, - keys = std::move(keys), + concurrency_keys, // Copied! redis_callback = std::move(redis_callback)](auto reply) { std::vector> requests; { absl::MutexLock lock(&mu_); - requests = TakeRequestsFromSendingQueue(keys); + requests = TakeRequestsFromSendingQueue(concurrency_keys); } for (auto &request : requests) { request(); @@ -295,7 +300,7 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, { absl::MutexLock lock(&mu_); - auto keys_ready = PushToSendingQueue(keys, send_redis); + auto keys_ready = PushToSendingQueue(concurrency_keys, send_redis); *num_ready_keys += keys_ready; // If all queues are empty for each key this request depends on // we are safe to fire the request immediately. @@ -314,7 +319,7 @@ Status RedisStoreClient::DeleteByKeys(const std::string &table, const std::vector &keys, std::function callback) { auto del_cmds = - GenCommandsBatched("HDEL", RedisKey(external_storage_namespace_, table), keys); + GenCommandsBatched("HDEL", RedisKey{external_storage_namespace_, table}, keys); auto total_count = del_cmds.size(); auto finished_count = std::make_shared(0); auto num_deleted = std::make_shared(0); @@ -331,12 +336,7 @@ Status RedisStoreClient::DeleteByKeys(const std::string &table, } } }; - SendRedisCmd(ray::mapped(command.args, - [&table](const std::string &key) { - return RedisConcurrencyKey{table, key}; - }), - std::move(command), - std::move(delete_callback)); + SendRedisCmd(std::move(command), std::move(delete_callback)); } return Status::OK(); } @@ -435,7 +435,7 @@ int RedisStoreClient::GetNextJobID() { // Key: "RAYexternal_storage_namespace@JobCounter" // Value: The next job ID. RedisCommand command = { - "INCRBY", RedisKey(external_storage_namespace_, "JobCounter"), {"1"}}; + "INCRBY", RedisKey{external_storage_namespace_, "JobCounter"}, {"1"}}; auto cxt = redis_client_->GetPrimaryContext(); auto reply = cxt->RunArgvSync(command.ToRedisArgs()); @@ -448,7 +448,7 @@ Status RedisStoreClient::AsyncGetKeys( std::function)> callback) { RedisScanner::ScanKeysAndValues( redis_client_, - RedisKey(external_storage_namespace_, table_name), + RedisKey{external_storage_namespace_, table_name}, RedisMatchPattern::Prefix(prefix), [callback](absl::flat_hash_map &&result) { std::vector keys; @@ -465,9 +465,8 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, const std::string &key, std::function callback) { RedisCommand command = { - "HEXISTS", RedisKey(external_storage_namespace_, table_name), {key}}; + "HEXISTS", RedisKey{external_storage_namespace_, table_name}, {key}}; SendRedisCmd( - {{table_name, key}}, std::move(command), [callback = std::move(callback)](const std::shared_ptr &reply) { bool exists = reply->ReadAsInteger() > 0; diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 8170565d7bc1..c2fac2dbb211 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -32,9 +32,9 @@ namespace gcs { // Typed key to avoid forgetting to prepend external_storage_namespace. struct RedisKey { - explicit RedisKey(absl::string_view external_storage_namespace, - absl::string_view table_name); - const std::string redis_key; + const std::string external_storage_namespace; + const std::string table_name; + std::string ToString() const; }; struct RedisMatchPattern { @@ -58,7 +58,7 @@ struct RedisCommand { std::vector ToRedisArgs() const { std::vector redis_args; redis_args.push_back(command); - redis_args.push_back(redis_key.redis_key); + redis_args.push_back(redis_key.ToString()); for (const auto &arg : args) { redis_args.push_back(arg); } @@ -223,15 +223,21 @@ class RedisStoreClient : public StoreClient { std::function callback); // Send the redis command to the server. This method will make request to be - // serialized for each key in keys. At a given time, only one request for a key - // will be in flight. + // serialized for each key in keys. At a given time, only one request for a {table_name, + // key} will be in flight. // - // \param keys The keys in the request. + // \param keys Used as concurrency key. // \param args The redis commands // \param redis_callback The callback to call when the reply is received. - void SendRedisCmd(std::vector keys, - RedisCommand command, - RedisCallback redis_callback); + void SendRedisCmdWithKeys(std::vector keys, + RedisCommand command, + RedisCallback redis_callback); + + // Conveinence method for SendRedisCmdWithKeys with keys = command.args. + // Reason for this method: if you call SendRedisCmd(command.args, std::move(command)), + // it's UB because C++ don't have arg evaluation order guarantee, hence command.args + // may become empty. + void SendRedisCmd(RedisCommand command, RedisCallback redis_callback); // HMGET external_storage_namespace@table_name key1 key2 ... // `keys` are chunked to multiple HMGET commands by diff --git a/src/ray/util/container_util.h b/src/ray/util/container_util.h index 8f28713fcd44..d1a1bb851c91 100644 --- a/src/ray/util/container_util.h +++ b/src/ray/util/container_util.h @@ -131,12 +131,13 @@ void erase_if(std::list &list, std::function predicate) { } // [T] -> (T -> U) -> [U] +// Only supports && input. template -auto mapped(const std::vector &vec, F transform) { +auto move_mapped(std::vector &&vec, F transform) { std::vector()))> result; result.reserve(vec.size()); - for (const auto &elem : vec) { - result.push_back(transform(elem)); + for (T &elem : vec) { + result.emplace_back(transform(std::move(elem))); } return result; } From 842f3b7b2e029cfe4713b547a92897bd5e28cb08 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 26 Aug 2024 10:14:00 -0700 Subject: [PATCH 14/29] nits Signed-off-by: Ruiyang Wang --- .../gcs/store_client/redis_store_client.cc | 23 ++++++++++--------- src/ray/gcs/store_client/redis_store_client.h | 9 ++++---- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 6795d0027a5f..6387b2fb7eb5 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -101,7 +101,7 @@ void RedisStoreClient::MGetValues(const std::string &table_name, callback(std::move(*key_value_map)); } }; - SendRedisCmd(std::move(command), std::move(mget_callback)); + SendRedisCmdArgsAsKeys(std::move(command), std::move(mget_callback)); } } @@ -118,9 +118,9 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, const std::string &data, bool overwrite, std::function callback) { - RedisCommand command{.command = overwrite ? "HSET" : "HSETNX", - .redis_key = RedisKey{external_storage_namespace_, table_name}, - .args = {key, data}}; + RedisCommand command{/*command=*/overwrite ? "HSET" : "HSETNX", + RedisKey{external_storage_namespace_, table_name}, + /*args=*/{key, data}}; RedisCallback write_callback = nullptr; if (callback) { write_callback = @@ -148,10 +148,10 @@ Status RedisStoreClient::AsyncGet(const std::string &table_name, callback(Status::OK(), result); }; - RedisCommand command{.command = "HGET", - .redis_key = RedisKey{external_storage_namespace_, table_name}, - .args = {key}}; - SendRedisCmd(std::move(command), std::move(redis_callback)); + RedisCommand command{/*command=*/"HGET", + RedisKey{external_storage_namespace_, table_name}, + /*args=*/{key}}; + SendRedisCmdArgsAsKeys(std::move(command), std::move(redis_callback)); return Status::OK(); } @@ -244,7 +244,8 @@ std::vector> RedisStoreClient::TakeRequestsFromSendingQueu return send_requests; } -void RedisStoreClient::SendRedisCmd(RedisCommand command, RedisCallback redis_callback) { +void RedisStoreClient::SendRedisCmdArgsAsKeys(RedisCommand command, + RedisCallback redis_callback) { auto copied = command.args; SendRedisCmdWithKeys(std::move(copied), std::move(command), std::move(redis_callback)); } @@ -336,7 +337,7 @@ Status RedisStoreClient::DeleteByKeys(const std::string &table, } } }; - SendRedisCmd(std::move(command), std::move(delete_callback)); + SendRedisCmdArgsAsKeys(std::move(command), std::move(delete_callback)); } return Status::OK(); } @@ -466,7 +467,7 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, std::function callback) { RedisCommand command = { "HEXISTS", RedisKey{external_storage_namespace_, table_name}, {key}}; - SendRedisCmd( + SendRedisCmdArgsAsKeys( std::move(command), [callback = std::move(callback)](const std::shared_ptr &reply) { bool exists = reply->ReadAsInteger() > 0; diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index c2fac2dbb211..95a743991f98 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -57,6 +57,7 @@ struct RedisCommand { std::vector ToRedisArgs() const { std::vector redis_args; + redis_args.reserve(2 + args.size()); redis_args.push_back(command); redis_args.push_back(redis_key.ToString()); for (const auto &arg : args) { @@ -234,10 +235,10 @@ class RedisStoreClient : public StoreClient { RedisCallback redis_callback); // Conveinence method for SendRedisCmdWithKeys with keys = command.args. - // Reason for this method: if you call SendRedisCmd(command.args, std::move(command)), - // it's UB because C++ don't have arg evaluation order guarantee, hence command.args - // may become empty. - void SendRedisCmd(RedisCommand command, RedisCallback redis_callback); + // Reason for this method: if you call SendRedisCmdWithKeys(command.args, + // std::move(command)), it's UB because C++ don't have arg evaluation order guarantee, + // hence command.args may become empty. + void SendRedisCmdArgsAsKeys(RedisCommand command, RedisCallback redis_callback); // HMGET external_storage_namespace@table_name key1 key2 ... // `keys` are chunked to multiple HMGET commands by From f4eae4c05d139c6c49c4b7735e3669c85911135a Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 26 Aug 2024 11:12:10 -0700 Subject: [PATCH 15/29] merge churn Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/redis_store_client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 6387b2fb7eb5..a4e013c3b04d 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -145,7 +145,7 @@ Status RedisStoreClient::AsyncGet(const std::string &table_name, } RAY_CHECK(!reply->IsError()) << "Failed to get from Redis with status: " << reply->ReadAsStatus(); - callback(Status::OK(), result); + callback(Status::OK(), std::move(result)); }; RedisCommand command{/*command=*/"HGET", From 5f8d8480352f7cec15b6900400cb4ffaaa1f4493 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 26 Aug 2024 15:24:37 -0700 Subject: [PATCH 16/29] self-ref Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/redis_store_client.cc | 9 ++++++--- src/ray/gcs/store_client/redis_store_client.h | 5 ++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index a4e013c3b04d..21908bbfff4f 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -366,6 +366,7 @@ void RedisStoreClient::RedisScanner::ScanKeysAndValues( std::move(redis_key), std::move(match_pattern), std::move(callback)); + scanner->self_ref_ = scanner; scanner->Scan(); } @@ -376,6 +377,8 @@ void RedisStoreClient::RedisScanner::Scan() { absl::MutexLock lock(&mutex_); if (!cursor_.has_value()) { callback_(std::move(results_)); + self_ref_.reset(); + return; } size_t batch_count = RayConfig::instance().maximum_gcs_storage_operation_batch_size(); @@ -392,9 +395,9 @@ void RedisStoreClient::RedisScanner::Scan() { auto primary_context = redis_client_->GetPrimaryContext(); primary_context->RunArgvAsync( command.ToRedisArgs(), - [this, - // keeps myself from being destructed waiting for async. - shared_this = shared_from_this()](const std::shared_ptr &reply) { + // self_ref to keep the scanner alive until the callback is called, even if it + // releases its self_ref in Scan(). + [this, self_ref = self_ref_](const std::shared_ptr &reply) { OnScanCallback(reply); }); } diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 95a743991f98..7170ea2b67b6 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -150,7 +150,7 @@ class RedisStoreClient : public StoreClient { /// The scan is not locked with other operations. It's not guaranteed to be consistent /// with other operations. It's batched by /// RAY_maximum_gcs_storage_operation_batch_size. - class RedisScanner : std::enable_shared_from_this { + class RedisScanner { private: // We want a private ctor but can use make_shared. // See https://en.cppreference.com/w/cpp/memory/enable_shared_from_this @@ -197,6 +197,9 @@ class RedisStoreClient : public StoreClient { std::shared_ptr redis_client_; MapCallback callback_; + + // Holds a self-ref until the scan is done. + std::shared_ptr self_ref_; }; // Push a request to the sending queue. From 3f578a105a3ed4e1c008de626cade52c9dd186c0 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 26 Aug 2024 17:14:06 -0700 Subject: [PATCH 17/29] fix removal Signed-off-by: Ruiyang Wang --- python/ray/_private/gcs_utils.py | 6 ++- python/ray/_raylet.pyx | 6 +-- python/ray/includes/global_state_accessor.pxd | 2 +- python/ray/includes/global_state_accessor.pxi | 2 +- python/ray/tests/test_gcs_utils.py | 7 ++- .../gcs/store_client/redis_store_client.cc | 46 ++++++++++++------- src/ray/gcs/store_client/redis_store_client.h | 10 ++-- 7 files changed, 48 insertions(+), 31 deletions(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index fc1014b7c0a7..bcaf917e048b 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -125,7 +125,7 @@ def cleanup_redis_storage( storage_namespace: The namespace of the storage to be deleted. """ - from ray._raylet import del_key_prefix_from_storage # type: ignore + from ray._raylet import del_external_storage_namespace_from_storage # type: ignore if not isinstance(host, str): raise ValueError("Host must be a string") @@ -145,4 +145,6 @@ def cleanup_redis_storage( # Right now, GCS stores all data into multiple hashes with keys prefixed by # storage_namespace. So we only need to delete the specific key to cleanup the # cluster. - return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace) + return del_external_storage_namespace_from_storage( + host, port, password, use_ssl, storage_namespace + ) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 89d6f03f304f..0f66427e6e01 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -160,7 +160,7 @@ from ray.includes.libcoreworker cimport ( from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor -from ray.includes.global_state_accessor cimport RedisDelKeyPrefixSync, RedisGetKeySync +from ray.includes.global_state_accessor cimport RedisDelExternalStorageNamespaceSync, RedisGetKeySync from ray.includes.optional cimport ( optional, nullopt ) @@ -5176,8 +5176,8 @@ cdef void async_callback(shared_ptr[CRayObject] obj, cpython.Py_DECREF(user_callback) -def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix): - return RedisDelKeyPrefixSync(host, port, password, use_ssl, key_prefix) +def del_external_storage_namespace_from_storage(host, port, password, use_ssl, key_prefix): + return RedisDelExternalStorageNamespaceSync(host, port, password, use_ssl, key_prefix) def get_session_key_from_storage(host, port, password, use_ssl, config, key): diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 75c5e46c2559..94817f6eee34 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -128,7 +128,7 @@ cdef extern from * namespace "ray::gcs" nogil: cdef extern from * namespace "ray::gcs" nogil: - c_bool RedisDelKeyPrefixSync(const c_string& host, + c_bool RedisDelExternalStorageNamespaceSync(const c_string& host, c_int32_t port, const c_string& password, c_bool use_ssl, diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index 49c23b803ffd..b15ff472185e 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -15,7 +15,7 @@ from ray.includes.unique_ids cimport ( from ray.includes.global_state_accessor cimport ( CGlobalStateAccessor, - RedisDelKeyPrefixSync, + RedisDelExternalStorageNamespaceSync, ) from ray.includes.optional cimport ( diff --git a/python/ray/tests/test_gcs_utils.py b/python/ray/tests/test_gcs_utils.py index 30ccfa9e31c5..82f34214046e 100644 --- a/python/ray/tests/test_gcs_utils.py +++ b/python/ray/tests/test_gcs_utils.py @@ -295,9 +295,12 @@ def test_redis_cleanup(redis_replicas, shutdown_only): else: cli = redis.Redis(host, int(port)) - assert set(cli.keys()) == {b"c1", b"c2"} + table_names = ["KV", "INTERNAL_CONFIG", "WORKERS", "JobCounter", "NODE", "JOB"] + c1_keys = [f"RAYc1@{name}".encode() for name in table_names] + c2_keys = [f"RAYc2@{name}".encode() for name in table_names] + assert set(cli.keys()) == set(c1_keys + c2_keys) gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c1") - assert set(cli.keys()) == {b"c2"} + assert set(cli.keys()) == set(c2_keys) gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c2") assert len(cli.keys()) == 0 diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 21908bbfff4f..ce6e986dbb22 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -489,11 +489,11 @@ class Cleanup { }; // Returns True if at least 1 key is deleted, False otherwise. -bool RedisDelKeyPrefixSync(const std::string &host, - int32_t port, - const std::string &password, - bool use_ssl, - const std::string &key_prefix) { +bool RedisDelExternalStorageNamespaceSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &external_storage_namespace) { RedisClientOptions options(host, port, password, use_ssl); auto cli = std::make_unique(options); @@ -513,26 +513,38 @@ bool RedisDelKeyPrefixSync(const std::string &host, RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString(); auto context = cli->GetPrimaryContext(); - std::vector cmd{"KEYS", RedisMatchPattern::Prefix(key_prefix).escaped}; + // Delete all such keys by using empty table name. + RedisKey redis_key{external_storage_namespace, /*table_name=*/""}; + std::vector cmd{"KEYS", + RedisMatchPattern::Prefix(redis_key.ToString()).escaped}; auto reply = context->RunArgvSync(cmd); const auto &keys = reply->ReadAsStringArray(); if (keys.empty()) { - RAY_LOG(INFO) << "No keys found for prefix " << key_prefix; + RAY_LOG(INFO) << "No keys found for external storage namespace " + << external_storage_namespace; return true; } - auto del_cmd = std::vector{"DEL"}; + auto delete_one_sync = [context](const std::string &key) { + auto del_cmd = std::vector{"DEL", key}; + auto del_reply = context->RunArgvSync(del_cmd); + return del_reply->ReadAsInteger() > 0; + }; + size_t num_deleted = 0; + size_t num_failed = 0; for (const auto &key : keys) { - if (key.has_value()) { - del_cmd.push_back(key.value()); + if ((!key.has_value()) || key->empty()) { + continue; + } + if (delete_one_sync(*key)) { + num_deleted++; + } else { + num_failed++; } } - auto del_reply = context->RunArgvSync(del_cmd); - if (del_reply->ReadAsInteger() > 0) { - RAY_LOG(INFO) << "Successfully deleted keys with prefix " << key_prefix; - return true; - } - RAY_LOG(ERROR) << "Failed to delete keys with prefix " << key_prefix; - return false; + RAY_LOG(INFO) << "Finished deleting keys with external storage namespace " + << external_storage_namespace << ". Deleted table count: " << num_deleted + << ", Failed table count: " << num_failed; + return num_failed == 0; } } // namespace gcs diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 7170ea2b67b6..c9b37bc2ce9f 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -262,11 +262,11 @@ class RedisStoreClient : public StoreClient { }; // Helper function used by Python to delete all redis HASHes with a given prefix. -bool RedisDelKeyPrefixSync(const std::string &host, - int32_t port, - const std::string &password, - bool use_ssl, - const std::string &key_prefix); +bool RedisDelExternalStorageNamespaceSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &external_storage_namespace); } // namespace gcs From bb905a88a93fbbb2dff1e93dc160596f8a852701 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 26 Aug 2024 17:38:02 -0700 Subject: [PATCH 18/29] lint Signed-off-by: Ruiyang Wang --- python/ray/includes/global_state_accessor.pxd | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 94817f6eee34..6ae6ee4ed539 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -129,7 +129,7 @@ cdef extern from * namespace "ray::gcs" nogil: cdef extern from * namespace "ray::gcs" nogil: c_bool RedisDelExternalStorageNamespaceSync(const c_string& host, - c_int32_t port, - const c_string& password, - c_bool use_ssl, - const c_string& key_prefix) + c_int32_t port, + const c_string& password, + c_bool use_ssl, + const c_string& key_prefix) From 1ff20d79daa106a9daf9ef3afafca7f22df20ddc Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 26 Aug 2024 17:39:40 -0700 Subject: [PATCH 19/29] lint Signed-off-by: Ruiyang Wang --- python/ray/_raylet.pyx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 0f66427e6e01..3b721676b6eb 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -5176,8 +5176,10 @@ cdef void async_callback(shared_ptr[CRayObject] obj, cpython.Py_DECREF(user_callback) -def del_external_storage_namespace_from_storage(host, port, password, use_ssl, key_prefix): - return RedisDelExternalStorageNamespaceSync(host, port, password, use_ssl, key_prefix) +def del_external_storage_namespace_from_storage( + host, port, password, use_ssl, key_prefix): + return RedisDelExternalStorageNamespaceSync( + host, port, password, use_ssl, key_prefix) def get_session_key_from_storage(host, port, password, use_ssl, config, key): From 602f2fadc10ad8a9909b732fc3c51cbe2d2987bb Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 26 Aug 2024 17:40:19 -0700 Subject: [PATCH 20/29] lint Signed-off-by: Ruiyang Wang --- python/ray/_raylet.pyx | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3b721676b6eb..a6e5b214a2ab 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -160,7 +160,10 @@ from ray.includes.libcoreworker cimport ( from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor -from ray.includes.global_state_accessor cimport RedisDelExternalStorageNamespaceSync, RedisGetKeySync +from ray.includes.global_state_accessor cimport ( + RedisDelExternalStorageNamespaceSync, + RedisGetKeySync +) from ray.includes.optional cimport ( optional, nullopt ) From 4f1b68cf5f9d719cfac68e85fb36b77b1576b654 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 4 Sep 2024 22:37:54 -0700 Subject: [PATCH 21/29] fix cpp Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/redis_store_client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index ce6e986dbb22..b09d31b49b36 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -220,7 +220,7 @@ size_t RedisStoreClient::PushToSendingQueue(const std::vectorsecond.push(nullptr); } else { - op_iter->second.push(std::move(send_request)); + op_iter->second.push(send_request); } } return queue_added; From 2467782e4bd081edae720fd3e79b808ba0503a99 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 4 Sep 2024 22:41:37 -0700 Subject: [PATCH 22/29] remove hand built cleanup Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/redis_store_client.cc | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index b09d31b49b36..a3a143d7be9a 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -19,6 +19,7 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "ray/gcs/redis_context.h" @@ -479,15 +480,6 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, return Status::OK(); } -class Cleanup { - public: - explicit Cleanup(std::function f) : f_(std::move(f)) {} - ~Cleanup() { f_(); } - - private: - std::function f_; -}; - // Returns True if at least 1 key is deleted, False otherwise. bool RedisDelExternalStorageNamespaceSync(const std::string &host, int32_t port, @@ -504,7 +496,7 @@ bool RedisDelExternalStorageNamespaceSync(const std::string &host, io_service.run(); }); - Cleanup _([&]() { + auto cleanup_guard = absl::MakeCleanup([&]() { io_service.stop(); thread->join(); }); From 308cd4ae1e92802cca9953b957c134535876a9a8 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 6 Sep 2024 10:59:47 -0700 Subject: [PATCH 23/29] relax the timeout Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/test/store_client_test_base.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/store_client/test/store_client_test_base.h b/src/ray/gcs/store_client/test/store_client_test_base.h index ca373604e30e..c885de33dbcc 100644 --- a/src/ray/gcs/store_client/test/store_client_test_base.h +++ b/src/ray/gcs/store_client/test/store_client_test_base.h @@ -264,7 +264,7 @@ class StoreClientTestBase : public ::testing::Test { std::vector keys_; std::atomic pending_count_{0}; - std::chrono::milliseconds wait_pending_timeout_{5000}; + std::chrono::milliseconds wait_pending_timeout_{30000}; }; } // namespace gcs From c732e118f167d28b8829551416b4665843081bcb Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Sep 2024 16:00:49 -0700 Subject: [PATCH 24/29] up Signed-off-by: Jiajun Yao --- BUILD.bazel | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 10b14b4d0624..53db9c18ebb2 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2233,7 +2233,7 @@ ray_cc_library( ray_cc_test( name = "redis_store_client_test", - size = "small", + size = "medium", srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"], args = [ "$(location redis-server)", @@ -2254,7 +2254,7 @@ ray_cc_test( ray_cc_test( name = "chaos_redis_store_client_test", - size = "small", + size = "medium", srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"], args = [ "$(location redis-server)", From a7ff71eaf8a1ce5e5343e151ada29b7031187cec Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 16 Sep 2024 13:54:34 -0700 Subject: [PATCH 25/29] fix bad BatchDelete and revert test relaxing Signed-off-by: Ruiyang Wang --- BUILD.bazel | 4 ++-- .../gcs/store_client/redis_store_client.cc | 22 +++++++++---------- .../test/store_client_test_base.h | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 53db9c18ebb2..10b14b4d0624 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2233,7 +2233,7 @@ ray_cc_library( ray_cc_test( name = "redis_store_client_test", - size = "medium", + size = "small", srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"], args = [ "$(location redis-server)", @@ -2254,7 +2254,7 @@ ray_cc_test( ray_cc_test( name = "chaos_redis_store_client_test", - size = "medium", + size = "small", srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"], args = [ "$(location redis-server)", diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index a3a143d7be9a..a7a604a1aad9 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -327,17 +327,17 @@ Status RedisStoreClient::DeleteByKeys(const std::string &table, auto num_deleted = std::make_shared(0); auto context = redis_client_->GetPrimaryContext(); for (auto &command : del_cmds) { - auto delete_callback = - [num_deleted, finished_count, total_count, callback = std::move(callback)]( - const std::shared_ptr &reply) { - (*num_deleted) += reply->ReadAsInteger(); - ++(*finished_count); - if (*finished_count == total_count) { - if (callback) { - callback(*num_deleted); - } - } - }; + // `callback` is copied to each `delete_callback` lambda. Don't move. + auto delete_callback = [num_deleted, finished_count, total_count, callback]( + const std::shared_ptr &reply) { + (*num_deleted) += reply->ReadAsInteger(); + ++(*finished_count); + if (*finished_count == total_count) { + if (callback) { + callback(*num_deleted); + } + } + }; SendRedisCmdArgsAsKeys(std::move(command), std::move(delete_callback)); } return Status::OK(); diff --git a/src/ray/gcs/store_client/test/store_client_test_base.h b/src/ray/gcs/store_client/test/store_client_test_base.h index c885de33dbcc..ca373604e30e 100644 --- a/src/ray/gcs/store_client/test/store_client_test_base.h +++ b/src/ray/gcs/store_client/test/store_client_test_base.h @@ -264,7 +264,7 @@ class StoreClientTestBase : public ::testing::Test { std::vector keys_; std::atomic pending_count_{0}; - std::chrono::milliseconds wait_pending_timeout_{30000}; + std::chrono::milliseconds wait_pending_timeout_{5000}; }; } // namespace gcs From 8b4ed5f9f7061709ffc7e93c66061c135b1c6182 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:39:52 -0700 Subject: [PATCH 26/29] Update kuberay-gcs-ft.md 2.38 Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> --- doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index 8843c395367a..7efe202b801b 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -136,7 +136,7 @@ export REDIS_POD=$(kubectl get pods --selector=app=redis -o custom-columns=POD:m kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000" # Step 6.4: Check the keys in Redis. -# Note: the schema changed in Ray 2.36.0. Previously we use a single HASH table, +# Note: the schema changed in Ray 2.38.0. Previously we use a single HASH table, # now we use multiple HASH tables with a common prefix. KEYS * From 58cd649bd3609b7e43a5286b36b4a72e69001739 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:31:57 -0700 Subject: [PATCH 27/29] Update doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md Co-authored-by: Jiajun Yao Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> --- doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index 7efe202b801b..5bae1ee7619e 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -144,7 +144,7 @@ KEYS * # 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@INTERNAL_CONFIG" # 2) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@KV" # 3) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE" -# [Example output Before Ray 2.36.0]: +# [Example output Before Ray 2.38.0]: # 2) "864b004c-6305-42e3-ac46-adfa8eb6f752" # From 21e4eb5ec761712763470884e5da752ba36b3d5c Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:32:04 -0700 Subject: [PATCH 28/29] Update doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md Co-authored-by: Jiajun Yao Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> --- doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index 5bae1ee7619e..59325fb7d166 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -150,7 +150,7 @@ KEYS * # Step 6.5: Check the value of the key. HGETALL RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE -# Before Ray 2.36.0: +# Before Ray 2.38.0: # HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752 ``` From f2638397edff6ad873194daf1c809427e3e54e2d Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 18 Sep 2024 11:48:36 -0700 Subject: [PATCH 29/29] rename func Signed-off-by: Ruiyang Wang --- python/ray/_private/gcs_utils.py | 11 +++++------ python/ray/_raylet.pyx | 10 +++++----- python/ray/includes/global_state_accessor.pxd | 10 +++++----- python/ray/includes/global_state_accessor.pxi | 2 +- src/ray/gcs/store_client/redis_store_client.cc | 10 +++++----- src/ray/gcs/store_client/redis_store_client.h | 10 +++++----- 6 files changed, 26 insertions(+), 27 deletions(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index bcaf917e048b..1cf15b404859 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -125,7 +125,7 @@ def cleanup_redis_storage( storage_namespace: The namespace of the storage to be deleted. """ - from ray._raylet import del_external_storage_namespace_from_storage # type: ignore + from ray._raylet import del_key_prefix_from_storage # type: ignore if not isinstance(host, str): raise ValueError("Host must be a string") @@ -143,8 +143,7 @@ def cleanup_redis_storage( raise ValueError("storage namespace must be a string") # Right now, GCS stores all data into multiple hashes with keys prefixed by - # storage_namespace. So we only need to delete the specific key to cleanup the - # cluster. - return del_external_storage_namespace_from_storage( - host, port, password, use_ssl, storage_namespace - ) + # storage_namespace. So we only need to delete the specific key prefix to cleanup + # the cluster. + # Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`. + return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 7263acb1e738..5bb3e4603b77 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -162,7 +162,7 @@ from ray.includes.libcoreworker cimport ( from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor from ray.includes.global_state_accessor cimport ( - RedisDelExternalStorageNamespaceSync, + RedisDelKeyPrefixSync, RedisGetKeySync ) from ray.includes.optional cimport ( @@ -5179,10 +5179,10 @@ cdef void async_callback(shared_ptr[CRayObject] obj, cpython.Py_DECREF(user_callback) -def del_external_storage_namespace_from_storage( - host, port, password, use_ssl, key_prefix): - return RedisDelExternalStorageNamespaceSync( - host, port, password, use_ssl, key_prefix) +# Note this deletes keys with prefix `RAY{key_prefix}@` +# Example: with key_prefix = `default`, we remove all `RAYdefault@...` keys. +def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix): + return RedisDelKeyPrefixSync(host, port, password, use_ssl, key_prefix) def get_session_key_from_storage(host, port, password, use_ssl, config, key): diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index cbcc9b52223a..a38db9fb0403 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -129,8 +129,8 @@ cdef extern from * namespace "ray::gcs" nogil: cdef extern from * namespace "ray::gcs" nogil: - c_bool RedisDelExternalStorageNamespaceSync(const c_string& host, - c_int32_t port, - const c_string& password, - c_bool use_ssl, - const c_string& key_prefix) + c_bool RedisDelKeyPrefixSync(const c_string& host, + c_int32_t port, + const c_string& password, + c_bool use_ssl, + const c_string& key_prefix) diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index 55a4a2619806..a48957dc0e71 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -15,7 +15,7 @@ from ray.includes.unique_ids cimport ( from ray.includes.global_state_accessor cimport ( CGlobalStateAccessor, - RedisDelExternalStorageNamespaceSync, + RedisDelKeyPrefixSync, ) from ray.includes.optional cimport ( diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index a7a604a1aad9..0a6e53f66f48 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -481,11 +481,11 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, } // Returns True if at least 1 key is deleted, False otherwise. -bool RedisDelExternalStorageNamespaceSync(const std::string &host, - int32_t port, - const std::string &password, - bool use_ssl, - const std::string &external_storage_namespace) { +bool RedisDelKeyPrefixSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &external_storage_namespace) { RedisClientOptions options(host, port, password, use_ssl); auto cli = std::make_unique(options); diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index c9b37bc2ce9f..b9fc2643d7af 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -262,11 +262,11 @@ class RedisStoreClient : public StoreClient { }; // Helper function used by Python to delete all redis HASHes with a given prefix. -bool RedisDelExternalStorageNamespaceSync(const std::string &host, - int32_t port, - const std::string &password, - bool use_ssl, - const std::string &external_storage_namespace); +bool RedisDelKeyPrefixSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &external_storage_namespace); } // namespace gcs