Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Refactor how we store data in redis. #46861

Merged
merged 36 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
090c3e0
initial
rynewang Jul 30, 2024
a1fc834
wording
rynewang Jul 30, 2024
a5dad23
fixes
rynewang Aug 20, 2024
35c6776
lint
rynewang Aug 20, 2024
dc00698
typed
rynewang Aug 21, 2024
6272738
comment example
rynewang Aug 21, 2024
f1f527f
Update python/ray/_private/gcs_utils.py
rynewang Aug 21, 2024
2fd9155
Update python/ray/_raylet.pyx
rynewang Aug 21, 2024
cc1a7b5
Update python/ray/includes/global_state_accessor.pxd
rynewang Aug 21, 2024
6c245a3
RedisConcurrentKey
rynewang Aug 22, 2024
33800db
update doc
rynewang Aug 22, 2024
4755de5
lint
rynewang Aug 22, 2024
42a63f4
fixes
rynewang Aug 22, 2024
842f3b7
nits
rynewang Aug 26, 2024
7c9bebb
Merge branch 'master' into redis-refactor
rynewang Aug 26, 2024
4a227b4
Merge branch 'master' into redis-refactor
rynewang Aug 26, 2024
f4eae4c
merge churn
rynewang Aug 26, 2024
d2d2810
Merge remote-tracking branch 'ryw/redis-refactor' into redis-refactor
rynewang Aug 26, 2024
5f8d848
self-ref
rynewang Aug 26, 2024
3f578a1
fix removal
rynewang Aug 27, 2024
bb905a8
lint
rynewang Aug 27, 2024
1ff20d7
lint
rynewang Aug 27, 2024
602f2fa
lint
rynewang Aug 27, 2024
cf68677
Merge remote-tracking branch 'origin/master' into redis-refactor
rynewang Sep 5, 2024
4f1b68c
fix cpp
rynewang Sep 5, 2024
2467782
remove hand built cleanup
rynewang Sep 5, 2024
a7dc3bd
Merge branch 'master' into redis-refactor
rynewang Sep 5, 2024
308cd4a
relax the timeout
rynewang Sep 6, 2024
c732e11
up
jjyao Sep 11, 2024
a7ff71e
fix bad BatchDelete and revert test relaxing
rynewang Sep 16, 2024
8b4ed5f
Update kuberay-gcs-ft.md 2.38
rynewang Sep 17, 2024
ca7bcdb
Merge branch 'master' into redis-refactor
rynewang Sep 17, 2024
58cd649
Update doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md
rynewang Sep 18, 2024
21e4eb5
Update doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md
rynewang Sep 18, 2024
f263839
rename func
rynewang Sep 18, 2024
3bcc147
Merge branch 'master' into redis-refactor
rynewang Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ curl -LO https://raw.githubusercontent.com/ray-project/kuberay/v1.0.0/ray-operat
kubectl apply -f ray-cluster.external-redis.yaml
```

### Step 4: Verify the Kubernetes cluster status
### Step 4: Verify the Kubernetes cluster status

```sh
# Step 4.1: List all Pods in the `default` namespace.
Expand Down Expand Up @@ -141,7 +141,7 @@ KEYS *
# 1) "864b004c-6305-42e3-ac46-adfa8eb6f752"

# Step 6.5: Check the value of the key.
HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752
SCAN 0 MATCH 864b004c-6305-42e3-ac46-adfa8eb6f752*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @kevin85421 this is breaking change for kuberay users I think. Would this be okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect users to directly operate ray redis?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is fine. My only concern is about cleaning up Redis. @rynewang will provide me with a Docker image, and I will manually test it.

Copy link
Contributor Author

@rynewang rynewang Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kuberay cleans redis by calling cleanup_redis_storage which I reimplemented so should be OK. I'm giving @kevin85421 a docker image to test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not easy to get a Docker image. Let's merge it and then I will use the nightly image to test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also cc @edoakes we are using the function to clean up redis or the raw command?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I checked with @edoakes before, we also use cleanup_redis_storage

rynewang marked this conversation as resolved.
Show resolved Hide resolved
rynewang marked this conversation as resolved.
Show resolved Hide resolved
```

In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster.
Expand Down
6 changes: 3 additions & 3 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def cleanup_redis_storage(
storage_namespace: The namespace of the storage to be deleted.
"""

from ray._raylet import del_key_from_storage # type: ignore
from ray._raylet import del_key_prefix_from_storage # type: ignore

if not isinstance(host, str):
raise ValueError("Host must be a string")
Expand All @@ -144,6 +144,6 @@ def cleanup_redis_storage(
if not isinstance(storage_namespace, str):
raise ValueError("storage namespace must be a string")

# Right now, GCS store all data into a hash set key by storage_namespace.
# Right now, GCS store all data into a hash keys prefixed by storage_namespace.
rynewang marked this conversation as resolved.
Show resolved Hide resolved
# So we only need to delete the specific key to cleanup the cluster.
return del_key_from_storage(host, port, password, use_ssl, storage_namespace)
return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace)
6 changes: 3 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.global_state_accessor cimport RedisDelKeyPrefixSync, RedisGetKeySync
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -5177,8 +5177,8 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
cpython.Py_DECREF(user_callback)


def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)
def del_key_prefix_from_storage(host, port, password, use_ssl, key):
rynewang marked this conversation as resolved.
Show resolved Hide resolved
return RedisDelKeyPrefixSync(host, port, password, use_ssl, key)


def get_session_key_from_storage(host, port, password, use_ssl, config, key):
Expand Down
62 changes: 5 additions & 57 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -128,60 +128,8 @@ cdef extern from * namespace "ray::gcs" nogil:


cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/redis_client.h"
namespace ray {
namespace gcs {

class Cleanup {
public:
Cleanup(std::function<void()> f): f_(f) {}
~Cleanup() { f_(); }
private:
std::function<void()> f_;
};

bool RedisDelKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& key) {
RedisClientOptions options(host, port, password, use_ssl);
auto cli = std::make_unique<RedisClient>(options);

instrumented_io_context io_service;

auto thread = std::make_unique<std::thread>([&]() {
boost::asio::io_service::work work(io_service);
io_service.run();
});

Cleanup _([&](){
io_service.stop();
thread->join();
});

auto status = cli->Connect(io_service);
RAY_CHECK(status.ok()) << "Failed to connect to redis: " << status.ToString();

auto context = cli->GetPrimaryContext();
auto cmd = std::vector<std::string>{"DEL", key};
auto reply = context->RunArgvSync(cmd);
if(reply->ReadAsInteger() == 1) {
RAY_LOG(INFO) << "Successfully deleted " << key;
return true;
} else {
RAY_LOG(ERROR) << "Failed to delete " << key;
return false;
}
}

}
}
"""
c_bool RedisDelKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& key)
c_bool RedisDelKeyPrefixSync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& key)
rynewang marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ from ray.includes.unique_ids cimport (

from ray.includes.global_state_accessor cimport (
CGlobalStateAccessor,
RedisDelKeySync,
RedisDelKeyPrefixSync,
)

from ray.includes.optional cimport (
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/store_client_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ void StoreClientInternalKV::Keys(const std::string &ns,
MakeKey(ns, prefix),
[callback = std::move(callback)](std::vector<std::string> keys) {
std::vector<std::string> true_keys;
true_keys.reserve(keys.size());
for (auto &key : keys) {
true_keys.emplace_back(ExtractKey(key));
}
Expand Down
68 changes: 0 additions & 68 deletions src/ray/gcs/redis_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,69 +25,6 @@ namespace ray {

namespace gcs {

/// Run redis command using specified context and store the result in `reply`. Return true
/// if the number of attemps didn't reach `redis_db_connect_retries`.
static bool RunRedisCommandWithRetries(
redisContext *context,
const char *command,
redisReply **reply,
const std::function<bool(const redisReply *)> &condition) {
int num_attempts = 0;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to execute the command.
*reply = reinterpret_cast<redisReply *>(redisCommand(context, command));
if (condition(*reply)) {
break;
}

// Sleep for a little, and try again if the entry isn't there yet.
freeReplyObject(*reply);
std::this_thread::sleep_for(std::chrono::milliseconds(
RayConfig::instance().redis_db_connect_wait_milliseconds()));
num_attempts++;
}
return num_attempts < RayConfig::instance().redis_db_connect_retries();
}

static int DoGetNextJobID(redisContext *context) {
// This is bad since duplicate logic lives in redis_client
// and redis_store_client.
// A refactoring is needed to make things clean.
// src/ray/gcs/store_client/redis_store_client.cc#L42
// TODO (iycheng): Unify the way redis key is formated.
static const std::string kTableSeparator = ":";
static const std::string kClusterSeparator = "@";
static std::string key = RayConfig::instance().external_storage_namespace() +
kClusterSeparator + kTableSeparator + "JobCounter";
static std::string cmd =
"HINCRBY " + RayConfig::instance().external_storage_namespace() + " " + key + " 1";

redisReply *reply = nullptr;
bool under_retry_limit = RunRedisCommandWithRetries(
context, cmd.c_str(), &reply, [](const redisReply *reply) {
if (reply == nullptr) {
RAY_LOG(WARNING) << "Didn't get reply for " << cmd;
return false;
}
if (reply->type == REDIS_REPLY_NIL) {
RAY_LOG(WARNING) << "Got nil reply for " << cmd;
return false;
}
if (reply->type == REDIS_REPLY_ERROR) {
RAY_LOG(WARNING) << "Got error reply for " << cmd << " Error is " << reply->str;
return false;
}
return true;
});
RAY_CHECK(reply);
RAY_CHECK(under_retry_limit) << "No entry found for JobCounter";
RAY_CHECK(reply->type == REDIS_REPLY_INTEGER)
<< "Expected integer, found Redis type " << reply->type << " for JobCounter";
int counter = reply->integer;
freeReplyObject(reply);
return counter;
}

RedisClient::RedisClient(const RedisClientOptions &options) : options_(options) {}

Status RedisClient::Connect(instrumented_io_context &io_service) {
Expand Down Expand Up @@ -127,11 +64,6 @@ void RedisClient::Disconnect() {
RAY_LOG(DEBUG) << "RedisClient disconnected.";
}

int RedisClient::GetNextJobID() {
RAY_CHECK(primary_context_);
return DoGetNextJobID(primary_context_->sync_context());
}

} // namespace gcs

} // namespace ray
2 changes: 0 additions & 2 deletions src/ray/gcs/redis_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ class RedisClient {

std::shared_ptr<RedisContext> GetPrimaryContext() { return primary_context_; }

int GetNextJobID();

protected:
/// Attach this client to an asio event loop. Note that only
/// one event loop should be attached at a time.
Expand Down
Loading