diff --git a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md index 2051e819960c..59325fb7d166 100644 --- a/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md +++ b/doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md @@ -136,12 +136,22 @@ export REDIS_POD=$(kubectl get pods --selector=app=redis -o custom-columns=POD:m kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000" # Step 6.4: Check the keys in Redis. +# Note: the schema changed in Ray 2.38.0. Previously we use a single HASH table, +# now we use multiple HASH tables with a common prefix. + KEYS * # [Example output]: -# 1) "864b004c-6305-42e3-ac46-adfa8eb6f752" +# 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@INTERNAL_CONFIG" +# 2) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@KV" +# 3) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE" +# [Example output Before Ray 2.38.0]: +# 2) "864b004c-6305-42e3-ac46-adfa8eb6f752" +# # Step 6.5: Check the value of the key. -HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752 +HGETALL RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE +# Before Ray 2.38.0: +# HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752 ``` In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster. diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 37479d94f804..1cf15b404859 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -125,7 +125,7 @@ def cleanup_redis_storage( storage_namespace: The namespace of the storage to be deleted. """ - from ray._raylet import del_key_from_storage # type: ignore + from ray._raylet import del_key_prefix_from_storage # type: ignore if not isinstance(host, str): raise ValueError("Host must be a string") @@ -142,6 +142,8 @@ def cleanup_redis_storage( if not isinstance(storage_namespace, str): raise ValueError("storage namespace must be a string") - # Right now, GCS store all data into a hash set key by storage_namespace. - # So we only need to delete the specific key to cleanup the cluster. - return del_key_from_storage(host, port, password, use_ssl, storage_namespace) + # Right now, GCS stores all data into multiple hashes with keys prefixed by + # storage_namespace. So we only need to delete the specific key prefix to cleanup + # the cluster. + # Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`. + return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index debd07d65d52..5bb3e4603b77 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -161,7 +161,10 @@ from ray.includes.libcoreworker cimport ( from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor -from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync +from ray.includes.global_state_accessor cimport ( + RedisDelKeyPrefixSync, + RedisGetKeySync +) from ray.includes.optional cimport ( optional, nullopt ) @@ -5176,8 +5179,10 @@ cdef void async_callback(shared_ptr[CRayObject] obj, cpython.Py_DECREF(user_callback) -def del_key_from_storage(host, port, password, use_ssl, key): - return RedisDelKeySync(host, port, password, use_ssl, key) +# Note this deletes keys with prefix `RAY{key_prefix}@` +# Example: with key_prefix = `default`, we remove all `RAYdefault@...` keys. +def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix): + return RedisDelKeyPrefixSync(host, port, password, use_ssl, key_prefix) def get_session_key_from_storage(host, port, password, use_ssl, config, key): diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index c5f939f6aae6..a38db9fb0403 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -129,60 +129,8 @@ cdef extern from * namespace "ray::gcs" nogil: cdef extern from * namespace "ray::gcs" nogil: - """ - #include - #include "ray/gcs/redis_client.h" - namespace ray { - namespace gcs { - - class Cleanup { - public: - Cleanup(std::function f): f_(f) {} - ~Cleanup() { f_(); } - private: - std::function f_; - }; - - bool RedisDelKeySync(const std::string& host, - int32_t port, - const std::string& password, - bool use_ssl, - const std::string& key) { - RedisClientOptions options(host, port, password, use_ssl); - auto cli = std::make_unique(options); - - instrumented_io_context io_service; - - auto thread = std::make_unique([&]() { - boost::asio::io_service::work work(io_service); - io_service.run(); - }); - - Cleanup _([&](){ - io_service.stop(); - thread->join(); - }); - - auto status = cli->Connect(io_service); - RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString(); - - auto context = cli->GetPrimaryContext(); - auto cmd = std::vector{"DEL", key}; - auto reply = context->RunArgvSync(cmd); - if(reply->ReadAsInteger() == 1) { - RAY_LOG(INFO) << "Successfully deleted " << key; - return true; - } else { - RAY_LOG(ERROR) << "Failed to delete " << key; - return false; - } - } - - } - } - """ - c_bool RedisDelKeySync(const c_string& host, - c_int32_t port, - const c_string& password, - c_bool use_ssl, - const c_string& key) + c_bool RedisDelKeyPrefixSync(const c_string& host, + c_int32_t port, + const c_string& password, + c_bool use_ssl, + const c_string& key_prefix) diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index 401b6e7539cc..a48957dc0e71 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -15,7 +15,7 @@ from ray.includes.unique_ids cimport ( from ray.includes.global_state_accessor cimport ( CGlobalStateAccessor, - RedisDelKeySync, + RedisDelKeyPrefixSync, ) from ray.includes.optional cimport ( diff --git a/python/ray/tests/test_gcs_utils.py b/python/ray/tests/test_gcs_utils.py index 30ccfa9e31c5..82f34214046e 100644 --- a/python/ray/tests/test_gcs_utils.py +++ b/python/ray/tests/test_gcs_utils.py @@ -295,9 +295,12 @@ def test_redis_cleanup(redis_replicas, shutdown_only): else: cli = redis.Redis(host, int(port)) - assert set(cli.keys()) == {b"c1", b"c2"} + table_names = ["KV", "INTERNAL_CONFIG", "WORKERS", "JobCounter", "NODE", "JOB"] + c1_keys = [f"RAYc1@{name}".encode() for name in table_names] + c2_keys = [f"RAYc2@{name}".encode() for name in table_names] + assert set(cli.keys()) == set(c1_keys + c2_keys) gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c1") - assert set(cli.keys()) == {b"c2"} + assert set(cli.keys()) == set(c2_keys) gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c2") assert len(cli.keys()) == 0 diff --git a/src/ray/gcs/gcs_server/store_client_kv.cc b/src/ray/gcs/gcs_server/store_client_kv.cc index 11bf15b10291..17228bff6f86 100644 --- a/src/ray/gcs/gcs_server/store_client_kv.cc +++ b/src/ray/gcs/gcs_server/store_client_kv.cc @@ -150,6 +150,7 @@ void StoreClientInternalKV::Keys(const std::string &ns, MakeKey(ns, prefix), [callback = std::move(callback)](std::vector keys) { std::vector true_keys; + true_keys.reserve(keys.size()); for (auto &key : keys) { true_keys.emplace_back(ExtractKey(key)); } diff --git a/src/ray/gcs/redis_client.cc b/src/ray/gcs/redis_client.cc index b98428b7e432..4464e8098968 100644 --- a/src/ray/gcs/redis_client.cc +++ b/src/ray/gcs/redis_client.cc @@ -25,69 +25,6 @@ namespace ray { namespace gcs { -/// Run redis command using specified context and store the result in `reply`. Return true -/// if the number of attemps didn't reach `redis_db_connect_retries`. -static bool RunRedisCommandWithRetries( - redisContext *context, - const char *command, - redisReply **reply, - const std::function &condition) { - int num_attempts = 0; - while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { - // Try to execute the command. - *reply = reinterpret_cast(redisCommand(context, command)); - if (condition(*reply)) { - break; - } - - // Sleep for a little, and try again if the entry isn't there yet. - freeReplyObject(*reply); - std::this_thread::sleep_for(std::chrono::milliseconds( - RayConfig::instance().redis_db_connect_wait_milliseconds())); - num_attempts++; - } - return num_attempts < RayConfig::instance().redis_db_connect_retries(); -} - -static int DoGetNextJobID(redisContext *context) { - // This is bad since duplicate logic lives in redis_client - // and redis_store_client. - // A refactoring is needed to make things clean. - // src/ray/gcs/store_client/redis_store_client.cc#L42 - // TODO (iycheng): Unify the way redis key is formated. - static const std::string kTableSeparator = ":"; - static const std::string kClusterSeparator = "@"; - static std::string key = RayConfig::instance().external_storage_namespace() + - kClusterSeparator + kTableSeparator + "JobCounter"; - static std::string cmd = - "HINCRBY " + RayConfig::instance().external_storage_namespace() + " " + key + " 1"; - - redisReply *reply = nullptr; - bool under_retry_limit = RunRedisCommandWithRetries( - context, cmd.c_str(), &reply, [](const redisReply *reply) { - if (reply == nullptr) { - RAY_LOG(WARNING) << "Didn't get reply for " << cmd; - return false; - } - if (reply->type == REDIS_REPLY_NIL) { - RAY_LOG(WARNING) << "Got nil reply for " << cmd; - return false; - } - if (reply->type == REDIS_REPLY_ERROR) { - RAY_LOG(WARNING) << "Got error reply for " << cmd << " Error is " << reply->str; - return false; - } - return true; - }); - RAY_CHECK(reply); - RAY_CHECK(under_retry_limit) << "No entry found for JobCounter"; - RAY_CHECK(reply->type == REDIS_REPLY_INTEGER) - << "Expected integer, found Redis type " << reply->type << " for JobCounter"; - int counter = reply->integer; - freeReplyObject(reply); - return counter; -} - RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {} Status RedisClient::Connect(instrumented_io_context &io_service) { @@ -127,11 +64,6 @@ void RedisClient::Disconnect() { RAY_LOG(DEBUG) << "RedisClient disconnected."; } -int RedisClient::GetNextJobID() { - RAY_CHECK(primary_context_); - return DoGetNextJobID(primary_context_->sync_context()); -} - } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_client.h b/src/ray/gcs/redis_client.h index 893031fb8771..5207311d6d3f 100644 --- a/src/ray/gcs/redis_client.h +++ b/src/ray/gcs/redis_client.h @@ -69,8 +69,6 @@ class RedisClient { std::shared_ptr GetPrimaryContext() { return primary_context_; } - int GetNextJobID(); - protected: /// Attach this client to an asio event loop. Note that only /// one event loop should be attached at a time. diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 81d1ce292aa7..0a6e53f66f48 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -16,111 +16,83 @@ #include #include +#include +#include +#include "absl/cleanup/cleanup.h" #include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "ray/gcs/redis_context.h" +#include "ray/util/container_util.h" #include "ray/util/logging.h" namespace ray { - namespace gcs { namespace { -const std::string_view kTableSeparator = ":"; const std::string_view kClusterSeparator = "@"; // "[, ], -, ?, *, ^, \" are special chars in Redis pattern matching. // escape them with / according to the doc: // https://redis.io/commands/keys/ std::string EscapeMatchPattern(const std::string &s) { - static std::regex kSpecialChars("\\[|\\]|-|\\?|\\*|\\^|\\\\"); + static std::regex kSpecialChars(R"(\[|\]|-|\?|\*|\^|\\)"); return std::regex_replace(s, kSpecialChars, "\\$&"); } -std::string GenRedisKey(const std::string &external_storage_namespace, - const std::string &table_name, - const std::string &key) { - return absl::StrCat( - external_storage_namespace, kClusterSeparator, table_name, kTableSeparator, key); -} - -std::string GenKeyRedisMatchPattern(const std::string &external_storage_namespace, - const std::string &table_name) { - return absl::StrCat(EscapeMatchPattern(external_storage_namespace), - kClusterSeparator, - EscapeMatchPattern(table_name), - kTableSeparator, - "*"); -} - -std::string GenKeyRedisMatchPattern(const std::string &external_storage_namespace, - const std::string &table_name, - const std::string &key) { - return absl::StrCat(EscapeMatchPattern(external_storage_namespace), - kClusterSeparator, - EscapeMatchPattern(table_name), - kTableSeparator, - EscapeMatchPattern(key), - "*"); -} - -std::vector> GenCommandsBatched( - const std::string &command, - const std::string &hash_field, - const std::vector &keys) { - std::vector> batched_requests; - for (auto &key : keys) { +// Assume `command` can take arbitary number of keys. Chunk the args into multiple +// commands with the same command name and the same redis_key. Each chunk has at most +// `maximum_gcs_storage_operation_batch_size` keys. +std::vector GenCommandsBatched(const std::string &command, + const RedisKey &redis_key, + const std::vector &args) { + std::vector batched_requests; + for (auto &arg : args) { // If it's empty or the last batch is full, add a new batch. if (batched_requests.empty() || - batched_requests.back().size() >= - RayConfig::instance().maximum_gcs_storage_operation_batch_size() + 2) { - batched_requests.emplace_back(std::vector{command, hash_field}); + batched_requests.back().args.size() >= + RayConfig::instance().maximum_gcs_storage_operation_batch_size()) { + batched_requests.emplace_back(RedisCommand{command, redis_key, {}}); } - batched_requests.back().push_back(key); + batched_requests.back().args.push_back(arg); } return batched_requests; } -std::string GetKeyFromRedisKey(const std::string &external_storage_namespace, - const std::string &redis_key, - const std::string &table_name) { - auto pos = external_storage_namespace.size() + kClusterSeparator.size() + - table_name.size() + kTableSeparator.size(); - return redis_key.substr(pos, redis_key.size() - pos); +} // namespace + +std::string RedisKey::ToString() const { + // Something like RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE + return absl::StrCat("RAY", external_storage_namespace, kClusterSeparator, table_name); } -} // namespace +RedisMatchPattern RedisMatchPattern::Prefix(const std::string &prefix) { + return RedisMatchPattern(absl::StrCat(EscapeMatchPattern(prefix), "*")); +} void RedisStoreClient::MGetValues(const std::string &table_name, const std::vector &keys, const MapCallback &callback) { - // The `MGET` command for each shard. - auto batched_commands = GenCommandsBatched("HMGET", external_storage_namespace_, keys); + // The `HMGET` command for each shard. + auto batched_commands = GenCommandsBatched( + "HMGET", RedisKey{external_storage_namespace_, table_name}, keys); auto total_count = batched_commands.size(); auto finished_count = std::make_shared(0); auto key_value_map = std::make_shared>(); for (auto &command : batched_commands) { - auto mget_keys = std::move(command); - std::vector partition_keys(mget_keys.begin() + 2, mget_keys.end()); - auto mget_callback = [this, - table_name, - finished_count, + auto mget_callback = [finished_count, total_count, - mget_keys, + // Copies! + args = command.args, callback, key_value_map](const std::shared_ptr &reply) { if (!reply->IsNil()) { auto value = reply->ReadAsStringArray(); - // The 0 th element of mget_keys is "MGET", so we start from the 1 th - // element. for (size_t index = 0; index < value.size(); ++index) { if (value[index].has_value()) { - (*key_value_map)[GetKeyFromRedisKey( - external_storage_namespace_, mget_keys[index + 2], table_name)] = - *(value[index]); + (*key_value_map)[args[index]] = *(value[index]); } } } @@ -130,8 +102,7 @@ void RedisStoreClient::MGetValues(const std::string &table_name, callback(std::move(*key_value_map)); } }; - SendRedisCmd( - std::move(partition_keys), std::move(mget_keys), std::move(mget_callback)); + SendRedisCmdArgsAsKeys(std::move(command), std::move(mget_callback)); } } @@ -148,10 +119,19 @@ Status RedisStoreClient::AsyncPut(const std::string &table_name, const std::string &data, bool overwrite, std::function callback) { - return DoPut(GenRedisKey(external_storage_namespace_, table_name, key), - data, - overwrite, - callback); + RedisCommand command{/*command=*/overwrite ? "HSET" : "HSETNX", + RedisKey{external_storage_namespace_, table_name}, + /*args=*/{key, data}}; + RedisCallback write_callback = nullptr; + if (callback) { + write_callback = + [callback = std::move(callback)](const std::shared_ptr &reply) { + auto added_num = reply->ReadAsInteger(); + callback(added_num != 0); + }; + } + SendRedisCmdWithKeys({key}, std::move(command), std::move(write_callback)); + return Status::OK(); } Status RedisStoreClient::AsyncGet(const std::string &table_name, @@ -169,9 +149,10 @@ Status RedisStoreClient::AsyncGet(const std::string &table_name, callback(Status::OK(), std::move(result)); }; - std::string redis_key = GenRedisKey(external_storage_namespace_, table_name, key); - std::vector args = {"HGET", external_storage_namespace_, redis_key}; - SendRedisCmd({redis_key}, std::move(args), std::move(redis_callback)); + RedisCommand command{/*command=*/"HGET", + RedisKey{external_storage_namespace_, table_name}, + /*args=*/{key}}; + SendRedisCmdArgsAsKeys(std::move(command), std::move(redis_callback)); return Status::OK(); } @@ -179,15 +160,11 @@ Status RedisStoreClient::AsyncGetAll( const std::string &table_name, const MapCallback &callback) { RAY_CHECK(callback); - std::string match_pattern = - GenKeyRedisMatchPattern(external_storage_namespace_, table_name); - auto scanner = std::make_shared( - redis_client_, external_storage_namespace_, table_name); - auto on_done = [callback, - scanner](absl::flat_hash_map &&result) { - callback(std::move(result)); - }; - return scanner->ScanKeysAndValues(match_pattern, on_done); + RedisScanner::ScanKeysAndValues(redis_client_, + RedisKey{external_storage_namespace_, table_name}, + RedisMatchPattern::Any(), + callback); + return Status::OK(); } Status RedisStoreClient::AsyncDelete(const std::string &table_name, @@ -209,12 +186,7 @@ Status RedisStoreClient::AsyncBatchDelete(const std::string &table_name, } return Status::OK(); } - std::vector redis_keys; - redis_keys.reserve(keys.size()); - for (auto &key : keys) { - redis_keys.push_back(GenRedisKey(external_storage_namespace_, table_name, key)); - } - return DeleteByKeys(redis_keys, callback); + return DeleteByKeys(table_name, keys, callback); } Status RedisStoreClient::AsyncMultiGet( @@ -226,15 +198,11 @@ Status RedisStoreClient::AsyncMultiGet( callback({}); return Status::OK(); } - std::vector true_keys; - for (auto &key : keys) { - true_keys.push_back(GenRedisKey(external_storage_namespace_, table_name, key)); - } - MGetValues(table_name, true_keys, callback); + MGetValues(table_name, keys, callback); return Status::OK(); } -size_t RedisStoreClient::PushToSendingQueue(const std::vector &keys, +size_t RedisStoreClient::PushToSendingQueue(const std::vector &keys, std::function send_request) { size_t queue_added = 0; for (const auto &key : keys) { @@ -260,7 +228,7 @@ size_t RedisStoreClient::PushToSendingQueue(const std::vector &keys } std::vector> RedisStoreClient::TakeRequestsFromSendingQueue( - const std::vector &keys) { + const std::vector &keys) { std::vector> send_requests; for (const auto &key : keys) { auto [op_iter, added] = @@ -277,10 +245,21 @@ std::vector> RedisStoreClient::TakeRequestsFromSendingQueu return send_requests; } -void RedisStoreClient::SendRedisCmd(std::vector keys, - std::vector args, - RedisCallback redis_callback) { +void RedisStoreClient::SendRedisCmdArgsAsKeys(RedisCommand command, + RedisCallback redis_callback) { + auto copied = command.args; + SendRedisCmdWithKeys(std::move(copied), std::move(command), std::move(redis_callback)); +} + +void RedisStoreClient::SendRedisCmdWithKeys(std::vector keys, + RedisCommand command, + RedisCallback redis_callback) { RAY_CHECK(!keys.empty()); + auto concurrency_keys = + ray::move_mapped(std::move(keys), [&command](std::string &&key) { + return RedisConcurrencyKey{command.redis_key.table_name, std::move(key)}; + }); + // The number of keys that's ready for this request. // For a query reading or writing multiple keys, we need a counter // to check whether all existing requests for this keys have been @@ -288,29 +267,29 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, auto num_ready_keys = std::make_shared(0); std::function send_redis = [this, num_ready_keys = num_ready_keys, - keys, - args = std::move(args), + concurrency_keys, // Copied! + command = std::move(command), redis_callback = std::move(redis_callback)]() mutable { { absl::MutexLock lock(&mu_); *num_ready_keys += 1; - RAY_CHECK(*num_ready_keys <= keys.size()); + RAY_CHECK(*num_ready_keys <= concurrency_keys.size()); // There are still pending requests for these keys. - if (*num_ready_keys != keys.size()) { + if (*num_ready_keys != concurrency_keys.size()) { return; } } // Send the actual request auto cxt = redis_client_->GetPrimaryContext(); - cxt->RunArgvAsync(std::move(args), + cxt->RunArgvAsync(command.ToRedisArgs(), [this, - keys = std::move(keys), + concurrency_keys, // Copied! redis_callback = std::move(redis_callback)](auto reply) { std::vector> requests; { absl::MutexLock lock(&mu_); - requests = TakeRequestsFromSendingQueue(keys); + requests = TakeRequestsFromSendingQueue(concurrency_keys); } for (auto &request : requests) { request(); @@ -323,7 +302,7 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, { absl::MutexLock lock(&mu_); - auto keys_ready = PushToSendingQueue(keys, send_redis); + auto keys_ready = PushToSendingQueue(concurrency_keys, send_redis); *num_ready_keys += keys_ready; // If all queues are empty for each key this request depends on // we are safe to fire the request immediately. @@ -338,33 +317,17 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, } } -Status RedisStoreClient::DoPut(const std::string &key, - const std::string &data, - bool overwrite, - std::function callback) { - std::vector args = { - overwrite ? "HSET" : "HSETNX", external_storage_namespace_, key, data}; - RedisCallback write_callback = nullptr; - if (callback) { - write_callback = - [callback = std::move(callback)](const std::shared_ptr &reply) { - auto added_num = reply->ReadAsInteger(); - callback(added_num != 0); - }; - } - SendRedisCmd({key}, std::move(args), std::move(write_callback)); - return Status::OK(); -} - -Status RedisStoreClient::DeleteByKeys(const std::vector &keys, +Status RedisStoreClient::DeleteByKeys(const std::string &table, + const std::vector &keys, std::function callback) { - auto del_cmds = GenCommandsBatched("HDEL", external_storage_namespace_, keys); + auto del_cmds = + GenCommandsBatched("HDEL", RedisKey{external_storage_namespace_, table}, keys); auto total_count = del_cmds.size(); auto finished_count = std::make_shared(0); auto num_deleted = std::make_shared(0); auto context = redis_client_->GetPrimaryContext(); for (auto &command : del_cmds) { - std::vector partition_keys(command.begin() + 2, command.end()); + // `callback` is copied to each `delete_callback` lambda. Don't move. auto delete_callback = [num_deleted, finished_count, total_count, callback]( const std::shared_ptr &reply) { (*num_deleted) += reply->ReadAsInteger(); @@ -375,66 +338,73 @@ Status RedisStoreClient::DeleteByKeys(const std::vector &keys, } } }; - SendRedisCmd( - std::move(partition_keys), std::move(command), std::move(delete_callback)); + SendRedisCmdArgsAsKeys(std::move(command), std::move(delete_callback)); } return Status::OK(); } RedisStoreClient::RedisScanner::RedisScanner( + PrivateCtorTag ctor_tag, std::shared_ptr redis_client, - const std::string &external_storage_namespace, - const std::string &table_name) - : table_name_(table_name), - external_storage_namespace_(external_storage_namespace), - redis_client_(std::move(redis_client)) { + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback) + : redis_key_(std::move(redis_key)), + match_pattern_(std::move(match_pattern)), + redis_client_(std::move(redis_client)), + callback_(std::move(callback)) { cursor_ = 0; + pending_request_count_ = 0; } -Status RedisStoreClient::RedisScanner::ScanKeysAndValues( - const std::string &match_pattern, - const MapCallback &callback) { - auto on_done = [this, callback](const Status &status) { - callback(std::move(results_)); - }; - Scan(match_pattern, on_done); - return Status::OK(); +void RedisStoreClient::RedisScanner::ScanKeysAndValues( + std::shared_ptr redis_client, + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback) { + auto scanner = std::make_shared(PrivateCtorTag(), + std::move(redis_client), + std::move(redis_key), + std::move(match_pattern), + std::move(callback)); + scanner->self_ref_ = scanner; + scanner->Scan(); } -void RedisStoreClient::RedisScanner::Scan(const std::string &match_pattern, - const StatusCallback &callback) { +void RedisStoreClient::RedisScanner::Scan() { // This lock guards cursor_ because the callbacks // can modify cursor_. If performance is a concern, // we should consider using a reader-writer lock. absl::MutexLock lock(&mutex_); if (!cursor_.has_value()) { - callback(Status::OK()); + callback_(std::move(results_)); + self_ref_.reset(); return; } size_t batch_count = RayConfig::instance().maximum_gcs_storage_operation_batch_size(); ++pending_request_count_; - auto scan_callback = - [this, match_pattern, callback](const std::shared_ptr &reply) { - OnScanCallback(match_pattern, reply, callback); - }; // Scan by prefix from Redis. - std::vector args = {"HSCAN", - external_storage_namespace_, - std::to_string(cursor_.value()), - "MATCH", - match_pattern, - "COUNT", - std::to_string(batch_count)}; + RedisCommand command = {"HSCAN", redis_key_, {std::to_string(cursor_.value())}}; + if (match_pattern_.escaped != "*") { + command.args.push_back("MATCH"); + command.args.push_back(match_pattern_.escaped); + } + command.args.push_back("COUNT"); + command.args.push_back(std::to_string(batch_count)); auto primary_context = redis_client_->GetPrimaryContext(); - primary_context->RunArgvAsync(args, scan_callback); + primary_context->RunArgvAsync( + command.ToRedisArgs(), + // self_ref to keep the scanner alive until the callback is called, even if it + // releases its self_ref in Scan(). + [this, self_ref = self_ref_](const std::shared_ptr &reply) { + OnScanCallback(reply); + }); } void RedisStoreClient::RedisScanner::OnScanCallback( - const std::string &match_pattern, - const std::shared_ptr &reply, - const StatusCallback &callback) { + const std::shared_ptr &reply) { RAY_CHECK(reply); std::vector scan_result; size_t cursor = reply->ReadAsScanArray(&scan_result); @@ -448,51 +418,61 @@ void RedisStoreClient::RedisScanner::OnScanCallback( } else { cursor_ = cursor; } + // Result is an array of key-value pairs. + // scan_result[i] = key, scan_result[i+1] = value + // Example req: HSCAN hash_with_cluster_id_for_Jobs + // scan_result = job1 job1_value job2 job2_value RAY_CHECK(scan_result.size() % 2 == 0); for (size_t i = 0; i < scan_result.size(); i += 2) { - auto key = GetKeyFromRedisKey( - external_storage_namespace_, std::move(scan_result[i]), table_name_); - results_.emplace(std::move(key), std::move(scan_result[i + 1])); + results_.emplace(std::move(scan_result[i]), std::move(scan_result[i + 1])); } } // If pending_request_count_ is equal to 0, it means that the scan of this batch is // completed and the next batch is started if any. if (--pending_request_count_ == 0) { - Scan(match_pattern, callback); + Scan(); } } -int RedisStoreClient::GetNextJobID() { return redis_client_->GetNextJobID(); } +int RedisStoreClient::GetNextJobID() { + // Note: This is not a HASH! It's a simple key-value pair. + // Key: "RAYexternal_storage_namespace@JobCounter" + // Value: The next job ID. + RedisCommand command = { + "INCRBY", RedisKey{external_storage_namespace_, "JobCounter"}, {"1"}}; + + auto cxt = redis_client_->GetPrimaryContext(); + auto reply = cxt->RunArgvSync(command.ToRedisArgs()); + return static_cast(reply->ReadAsInteger()); +} Status RedisStoreClient::AsyncGetKeys( const std::string &table_name, const std::string &prefix, std::function)> callback) { - std::string match_pattern = - GenKeyRedisMatchPattern(external_storage_namespace_, table_name, prefix); - auto scanner = std::make_shared( - redis_client_, external_storage_namespace_, table_name); - - auto on_done = [table_name, callback, scanner](auto redis_result) { - std::vector result; - result.reserve(redis_result.size()); - for (const auto &[key, _] : redis_result) { - result.push_back(key); - } - callback(std::move(result)); - }; - return scanner->ScanKeysAndValues(match_pattern, on_done); + RedisScanner::ScanKeysAndValues( + redis_client_, + RedisKey{external_storage_namespace_, table_name}, + RedisMatchPattern::Prefix(prefix), + [callback](absl::flat_hash_map &&result) { + std::vector keys; + keys.reserve(result.size()); + for (const auto &[k, v] : result) { + keys.push_back(k); + } + callback(std::move(keys)); + }); + return Status::OK(); } Status RedisStoreClient::AsyncExists(const std::string &table_name, const std::string &key, std::function callback) { - std::string redis_key = GenRedisKey(external_storage_namespace_, table_name, key); - std::vector args = {"HEXISTS", external_storage_namespace_, redis_key}; - SendRedisCmd( - {redis_key}, - std::move(args), + RedisCommand command = { + "HEXISTS", RedisKey{external_storage_namespace_, table_name}, {key}}; + SendRedisCmdArgsAsKeys( + std::move(command), [callback = std::move(callback)](const std::shared_ptr &reply) { bool exists = reply->ReadAsInteger() > 0; callback(exists); @@ -500,6 +480,65 @@ Status RedisStoreClient::AsyncExists(const std::string &table_name, return Status::OK(); } +// Returns True if at least 1 key is deleted, False otherwise. +bool RedisDelKeyPrefixSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &external_storage_namespace) { + RedisClientOptions options(host, port, password, use_ssl); + auto cli = std::make_unique(options); + + instrumented_io_context io_service; + + auto thread = std::make_unique([&]() { + boost::asio::io_service::work work(io_service); + io_service.run(); + }); + + auto cleanup_guard = absl::MakeCleanup([&]() { + io_service.stop(); + thread->join(); + }); + + auto status = cli->Connect(io_service); + RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString(); + + auto context = cli->GetPrimaryContext(); + // Delete all such keys by using empty table name. + RedisKey redis_key{external_storage_namespace, /*table_name=*/""}; + std::vector cmd{"KEYS", + RedisMatchPattern::Prefix(redis_key.ToString()).escaped}; + auto reply = context->RunArgvSync(cmd); + const auto &keys = reply->ReadAsStringArray(); + if (keys.empty()) { + RAY_LOG(INFO) << "No keys found for external storage namespace " + << external_storage_namespace; + return true; + } + auto delete_one_sync = [context](const std::string &key) { + auto del_cmd = std::vector{"DEL", key}; + auto del_reply = context->RunArgvSync(del_cmd); + return del_reply->ReadAsInteger() > 0; + }; + size_t num_deleted = 0; + size_t num_failed = 0; + for (const auto &key : keys) { + if ((!key.has_value()) || key->empty()) { + continue; + } + if (delete_one_sync(*key)) { + num_deleted++; + } else { + num_failed++; + } + } + RAY_LOG(INFO) << "Finished deleting keys with external storage namespace " + << external_storage_namespace << ". Deleted table count: " << num_deleted + << ", Failed table count: " << num_failed; + return num_failed == 0; +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 9bbba79e02e8..b9fc2643d7af 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -30,6 +30,79 @@ namespace ray { namespace gcs { +// Typed key to avoid forgetting to prepend external_storage_namespace. +struct RedisKey { + const std::string external_storage_namespace; + const std::string table_name; + std::string ToString() const; +}; + +struct RedisMatchPattern { + static RedisMatchPattern Prefix(const std::string &prefix); + static RedisMatchPattern Any() { + static const RedisMatchPattern kAny("*"); + return kAny; + } + const std::string escaped; + + private: + explicit RedisMatchPattern(std::string escaped) : escaped(std::move(escaped)) {} +}; + +struct RedisCommand { + std::string command; + // Redis "key" referring to a HASH. + RedisKey redis_key; + std::vector args; + + std::vector ToRedisArgs() const { + std::vector redis_args; + redis_args.reserve(2 + args.size()); + redis_args.push_back(command); + redis_args.push_back(redis_key.ToString()); + for (const auto &arg : args) { + redis_args.push_back(arg); + } + return redis_args; + } +}; + +struct RedisConcurrencyKey { + std::string table_name; + std::string key; + + template + friend H AbslHashValue(H h, const RedisConcurrencyKey &k) { + return H::combine(std::move(h), k.table_name, k.key); + } + bool operator==(const RedisConcurrencyKey &other) const { + return table_name == other.table_name && key == other.key; + } +}; + +inline std::ostream &operator<<(std::ostream &os, const RedisConcurrencyKey &key) { + os << "{" << key.table_name << ", " << key.key << "}"; + return os; +} + +// StoreClient using Redis as persistence backend. +// Note in redis term a "key" points to a hash table and a "field" is a key, a "value" +// is just a value. We double quote "key" and "field" to avoid confusion. +// +// In variable namings, we stick to the table - key - value terminology. +// +// Schema: +// - Each table is a Redis HASH. The HASH "key" is +// "RAY" + `external_storage_namespace` + "@" + `table_name`. +// - Each key-value pair in the hash is a row in the table. The "field" is the key. +// +// Consistency: +// - All Put/Get/Delete operations to a same (table, key) pair are serialized, see #35123. +// - For MultiGet/BatchDelete operations, they are subject to *all* keys in the operation, +// i.e. only after it's at the queue front of all keys, it will be processed. +// - A big loophole is GetAll and AsyncGetKeys. They're not serialized with other +// operations, since "since it's either RPC call or used during initializing GCS". [1] +// [1] https://github.com/ray-project/ray/pull/35123#issuecomment-1546549046 class RedisStoreClient : public StoreClient { public: explicit RedisStoreClient(std::shared_ptr redis_client); @@ -71,30 +144,42 @@ class RedisStoreClient : public StoreClient { private: /// \class RedisScanner - /// This class is used to scan data from Redis. /// - /// If you called one method, should never call the other methods. - /// Otherwise it will disturb the status of the RedisScanner. + /// This class is used to HSCAN data from a Redis table. + /// + /// The scan is not locked with other operations. It's not guaranteed to be consistent + /// with other operations. It's batched by + /// RAY_maximum_gcs_storage_operation_batch_size. class RedisScanner { + private: + // We want a private ctor but can use make_shared. + // See https://en.cppreference.com/w/cpp/memory/enable_shared_from_this + struct PrivateCtorTag {}; + public: - explicit RedisScanner(std::shared_ptr redis_client, - const std::string &external_storage_namespace, - const std::string &table_name); + explicit RedisScanner(PrivateCtorTag tag, + std::shared_ptr redis_client, + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback); - Status ScanKeysAndValues(const std::string &match_pattern, - const MapCallback &callback); + static void ScanKeysAndValues(std::shared_ptr redis_client, + RedisKey redis_key, + RedisMatchPattern match_pattern, + MapCallback callback); private: - void Scan(const std::string &match_pattern, const StatusCallback &callback); + // Scans the keys and values, one batch a time. Once all keys are scanned, the + // callback will be called. When the calls are in progress, the scanner temporarily + // holds its own reference so users don't need to keep it alive. + void Scan(); - void OnScanCallback(const std::string &match_pattern, - const std::shared_ptr &reply, - const StatusCallback &callback); + void OnScanCallback(const std::shared_ptr &reply); /// The table name that the scanner will scan. - std::string table_name_; + RedisKey redis_key_; - // The namespace of the external storage. Used for isolation. - std::string external_storage_namespace_; + /// The pattern to match the keys. + RedisMatchPattern match_pattern_; /// Mutex to protect the cursor_ field and the keys_ field and the /// key_value_map_ field. @@ -110,6 +195,11 @@ class RedisStoreClient : public StoreClient { std::atomic pending_request_count_{0}; std::shared_ptr redis_client_; + + MapCallback callback_; + + // Holds a self-ref until the scan is done. + std::shared_ptr self_ref_; }; // Push a request to the sending queue. @@ -119,7 +209,7 @@ class RedisStoreClient : public StoreClient { // // \return The number of queues newly added. A queue will be added // only when there is no in-flight request for the key. - size_t PushToSendingQueue(const std::vector &keys, + size_t PushToSendingQueue(const std::vector &keys, std::function send_request) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -130,27 +220,32 @@ class RedisStoreClient : public StoreClient { // // \return The requests to send. std::vector> TakeRequestsFromSendingQueue( - const std::vector &keys) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - - Status DoPut(const std::string &key, - const std::string &data, - bool overwrite, - std::function callback); + const std::vector &keys) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - Status DeleteByKeys(const std::vector &keys, + Status DeleteByKeys(const std::string &table_name, + const std::vector &keys, std::function callback); // Send the redis command to the server. This method will make request to be - // serialized for each key in keys. At a given time, only one request for a key - // will be in flight. + // serialized for each key in keys. At a given time, only one request for a {table_name, + // key} will be in flight. // - // \param keys The keys in the request. + // \param keys Used as concurrency key. // \param args The redis commands // \param redis_callback The callback to call when the reply is received. - void SendRedisCmd(std::vector keys, - std::vector args, - RedisCallback redis_callback); - + void SendRedisCmdWithKeys(std::vector keys, + RedisCommand command, + RedisCallback redis_callback); + + // Conveinence method for SendRedisCmdWithKeys with keys = command.args. + // Reason for this method: if you call SendRedisCmdWithKeys(command.args, + // std::move(command)), it's UB because C++ don't have arg evaluation order guarantee, + // hence command.args may become empty. + void SendRedisCmdArgsAsKeys(RedisCommand command, RedisCallback redis_callback); + + // HMGET external_storage_namespace@table_name key1 key2 ... + // `keys` are chunked to multiple HMGET commands by + // RAY_maximum_gcs_storage_operation_batch_size. void MGetValues(const std::string &table_name, const std::vector &keys, const MapCallback &callback); @@ -161,11 +256,18 @@ class RedisStoreClient : public StoreClient { // The pending redis requests queue for each key. // The queue will be poped when the request is processed. - absl::flat_hash_map>> + absl::flat_hash_map>> pending_redis_request_by_key_ ABSL_GUARDED_BY(mu_); FRIEND_TEST(RedisStoreClientTest, Random); }; +// Helper function used by Python to delete all redis HASHes with a given prefix. +bool RedisDelKeyPrefixSync(const std::string &host, + int32_t port, + const std::string &password, + bool use_ssl, + const std::string &external_storage_namespace); + } // namespace gcs } // namespace ray diff --git a/src/ray/util/container_util.h b/src/ray/util/container_util.h index 6a363dc09d0f..d1a1bb851c91 100644 --- a/src/ray/util/container_util.h +++ b/src/ray/util/container_util.h @@ -130,4 +130,16 @@ void erase_if(std::list &list, std::function predicate) { } } +// [T] -> (T -> U) -> [U] +// Only supports && input. +template +auto move_mapped(std::vector &&vec, F transform) { + std::vector()))> result; + result.reserve(vec.size()); + for (T &elem : vec) { + result.emplace_back(transform(std::move(elem))); + } + return result; +} + } // namespace ray