Skip to content

Commit

Permalink
[core] Refactor how we store data in redis. (#46861)
Browse files Browse the repository at this point in the history
Ray GCS Fault Tolerance replies on an external Redis. We write all data for a Ray cluster into a single Redis HASH. Within the HASH, we use a Redis field prefix to represent a Ray table. This is OK for single field CRUD, but for "get all entries for a Ray table" we would have to do a full table HSCAN and filter out other Ray tables by key prefix. This slows things down and the call to get_all_node_info and get_all_actor_info are in fact fairly common.

Updates the Redis usage by creating multiple Redis HASHes, 1 for each Ray table. The single field CRUD are largely the same; but "get all" operations now become a simple HSCAN on the target HASH only. This saves lots of compute.

---------

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
  • Loading branch information
rynewang committed Sep 18, 2024
1 parent cfe32c0 commit f298a75
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 350 deletions.
14 changes: 12 additions & 2 deletions doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,22 @@ export REDIS_POD=$(kubectl get pods --selector=app=redis -o custom-columns=POD:m
kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000"

# Step 6.4: Check the keys in Redis.
# Note: the schema changed in Ray 2.38.0. Previously we use a single HASH table,
# now we use multiple HASH tables with a common prefix.

KEYS *
# [Example output]:
# 1) "864b004c-6305-42e3-ac46-adfa8eb6f752"
# 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@INTERNAL_CONFIG"
# 2) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@KV"
# 3) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE"
# [Example output Before Ray 2.38.0]:
# 2) "864b004c-6305-42e3-ac46-adfa8eb6f752"
#

# Step 6.5: Check the value of the key.
HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752
HGETALL RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE
# Before Ray 2.38.0:
# HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752
```

In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster.
Expand Down
10 changes: 6 additions & 4 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def cleanup_redis_storage(
storage_namespace: The namespace of the storage to be deleted.
"""

from ray._raylet import del_key_from_storage # type: ignore
from ray._raylet import del_key_prefix_from_storage # type: ignore

if not isinstance(host, str):
raise ValueError("Host must be a string")
Expand All @@ -142,6 +142,8 @@ def cleanup_redis_storage(
if not isinstance(storage_namespace, str):
raise ValueError("storage namespace must be a string")

# Right now, GCS store all data into a hash set key by storage_namespace.
# So we only need to delete the specific key to cleanup the cluster.
return del_key_from_storage(host, port, password, use_ssl, storage_namespace)
# Right now, GCS stores all data into multiple hashes with keys prefixed by
# storage_namespace. So we only need to delete the specific key prefix to cleanup
# the cluster.
# Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`.
return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace)
11 changes: 8 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.global_state_accessor cimport (
RedisDelKeyPrefixSync,
RedisGetKeySync
)
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -5176,8 +5179,10 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
cpython.Py_DECREF(user_callback)


def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)
# Note this deletes keys with prefix `RAY{key_prefix}@`
# Example: with key_prefix = `default`, we remove all `RAYdefault@...` keys.
def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix):
return RedisDelKeyPrefixSync(host, port, password, use_ssl, key_prefix)


def get_session_key_from_storage(host, port, password, use_ssl, config, key):
Expand Down
62 changes: 5 additions & 57 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -129,60 +129,8 @@ cdef extern from * namespace "ray::gcs" nogil:


cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/redis_client.h"
namespace ray {
namespace gcs {
class Cleanup {
public:
Cleanup(std::function<void()> f): f_(f) {}
~Cleanup() { f_(); }
private:
std::function<void()> f_;
};
bool RedisDelKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& key) {
RedisClientOptions options(host, port, password, use_ssl);
auto cli = std::make_unique<RedisClient>(options);
instrumented_io_context io_service;
auto thread = std::make_unique<std::thread>([&]() {
boost::asio::io_service::work work(io_service);
io_service.run();
});
Cleanup _([&](){
io_service.stop();
thread->join();
});
auto status = cli->Connect(io_service);
RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString();
auto context = cli->GetPrimaryContext();
auto cmd = std::vector<std::string>{"DEL", key};
auto reply = context->RunArgvSync(cmd);
if(reply->ReadAsInteger() == 1) {
RAY_LOG(INFO) << "Successfully deleted " << key;
return true;
} else {
RAY_LOG(ERROR) << "Failed to delete " << key;
return false;
}
}
}
}
"""
c_bool RedisDelKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& key)
c_bool RedisDelKeyPrefixSync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& key_prefix)
2 changes: 1 addition & 1 deletion python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ from ray.includes.unique_ids cimport (

from ray.includes.global_state_accessor cimport (
CGlobalStateAccessor,
RedisDelKeySync,
RedisDelKeyPrefixSync,
)

from ray.includes.optional cimport (
Expand Down
7 changes: 5 additions & 2 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,12 @@ def test_redis_cleanup(redis_replicas, shutdown_only):
else:
cli = redis.Redis(host, int(port))

assert set(cli.keys()) == {b"c1", b"c2"}
table_names = ["KV", "INTERNAL_CONFIG", "WORKERS", "JobCounter", "NODE", "JOB"]
c1_keys = [f"RAYc1@{name}".encode() for name in table_names]
c2_keys = [f"RAYc2@{name}".encode() for name in table_names]
assert set(cli.keys()) == set(c1_keys + c2_keys)
gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c1")
assert set(cli.keys()) == {b"c2"}
assert set(cli.keys()) == set(c2_keys)
gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c2")
assert len(cli.keys()) == 0

Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/store_client_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ void StoreClientInternalKV::Keys(const std::string &ns,
MakeKey(ns, prefix),
[callback = std::move(callback)](std::vector<std::string> keys) {
std::vector<std::string> true_keys;
true_keys.reserve(keys.size());
for (auto &key : keys) {
true_keys.emplace_back(ExtractKey(key));
}
Expand Down
68 changes: 0 additions & 68 deletions src/ray/gcs/redis_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,69 +25,6 @@ namespace ray {

namespace gcs {

/// Run redis command using specified context and store the result in `reply`. Return true
/// if the number of attemps didn't reach `redis_db_connect_retries`.
static bool RunRedisCommandWithRetries(
redisContext *context,
const char *command,
redisReply **reply,
const std::function<bool(const redisReply *)> &condition) {
int num_attempts = 0;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to execute the command.
*reply = reinterpret_cast<redisReply *>(redisCommand(context, command));
if (condition(*reply)) {
break;
}

// Sleep for a little, and try again if the entry isn't there yet.
freeReplyObject(*reply);
std::this_thread::sleep_for(std::chrono::milliseconds(
RayConfig::instance().redis_db_connect_wait_milliseconds()));
num_attempts++;
}
return num_attempts < RayConfig::instance().redis_db_connect_retries();
}

static int DoGetNextJobID(redisContext *context) {
// This is bad since duplicate logic lives in redis_client
// and redis_store_client.
// A refactoring is needed to make things clean.
// src/ray/gcs/store_client/redis_store_client.cc#L42
// TODO (iycheng): Unify the way redis key is formated.
static const std::string kTableSeparator = ":";
static const std::string kClusterSeparator = "@";
static std::string key = RayConfig::instance().external_storage_namespace() +
kClusterSeparator + kTableSeparator + "JobCounter";
static std::string cmd =
"HINCRBY " + RayConfig::instance().external_storage_namespace() + " " + key + " 1";

redisReply *reply = nullptr;
bool under_retry_limit = RunRedisCommandWithRetries(
context, cmd.c_str(), &reply, [](const redisReply *reply) {
if (reply == nullptr) {
RAY_LOG(WARNING) << "Didn't get reply for " << cmd;
return false;
}
if (reply->type == REDIS_REPLY_NIL) {
RAY_LOG(WARNING) << "Got nil reply for " << cmd;
return false;
}
if (reply->type == REDIS_REPLY_ERROR) {
RAY_LOG(WARNING) << "Got error reply for " << cmd << " Error is " << reply->str;
return false;
}
return true;
});
RAY_CHECK(reply);
RAY_CHECK(under_retry_limit) << "No entry found for JobCounter";
RAY_CHECK(reply->type == REDIS_REPLY_INTEGER)
<< "Expected integer, found Redis type " << reply->type << " for JobCounter";
int counter = reply->integer;
freeReplyObject(reply);
return counter;
}

RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {}

Status RedisClient::Connect(instrumented_io_context &io_service) {
Expand Down Expand Up @@ -127,11 +64,6 @@ void RedisClient::Disconnect() {
RAY_LOG(DEBUG) << "RedisClient disconnected.";
}

int RedisClient::GetNextJobID() {
RAY_CHECK(primary_context_);
return DoGetNextJobID(primary_context_->sync_context());
}

} // namespace gcs

} // namespace ray
2 changes: 0 additions & 2 deletions src/ray/gcs/redis_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ class RedisClient {

std::shared_ptr<RedisContext> GetPrimaryContext() { return primary_context_; }

int GetNextJobID();

protected:
/// Attach this client to an asio event loop. Note that only
/// one event loop should be attached at a time.
Expand Down
Loading

0 comments on commit f298a75

Please sign in to comment.