-
Notifications
You must be signed in to change notification settings - Fork 473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support rapid slot migration based on raw key value #1534
Conversation
cool, thanks for your great efforts! |
kvrocks.conf
Outdated
# - redis_command: Migrate data by redis serialization protocol(RESP). | ||
# - raw_key_value: Migrate the raw key value data of the storage engine directly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redis-command
and raw-key-value
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to distinguish the item value from the item name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean dash instead of underlines
@@ -169,6 +195,7 @@ class SlotMigrator : public redis::Database { | |||
std::string dst_ip_; | |||
int dst_port_ = -1; | |||
UniqueFD dst_fd_; | |||
redisContext *dst_redis_context_ = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the redisFree call for deallocating dst_redis_context_ is missing?
if (reply != nullptr) { | ||
freeReplyObject(reply); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullptr check seems unnecessary here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems the check still exists here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend you to add unique_ptr wrappers for redisContext and redisReply pointers. You can refer to event_util.h.
It seems some memory leaks are found by ASAN. https://github.com/apache/kvrocks/actions/runs/5527836049/jobs/10085132766?pr=1534 |
@PragmaTwice I wanted to use unique_ptr to encapsulate |
}); | ||
|
||
if (redis_context->err != 0) { | ||
return {Status::NotOK, std::string(redis_context->errstr)}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return {Status::NotOK, std::string(redis_context->errstr)}; | |
return {Status::NotOK, redis_context->errstr}; |
auto error_str = std::string(reply->str); | ||
return {Status::NotOK, error_str}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto error_str = std::string(reply->str); | |
return {Status::NotOK, error_str}; | |
return {Status::NotOK, reply->str}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually i do not understand why construct a temp string here
return {Status::NotOK, "get invalid reply from TIME command"}; | ||
} | ||
|
||
uint64_t dst_timestamp = std::stoul(reply->element[0]->str) * 1000000 + std::stoul(reply->element[1]->str); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stoul will throws exceptions while the parsing fails, so i wonder if the parsing will always succeed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, normal TIME
command will parse successfully.
Status CreateRedisContextFromConnectedFd(int fd, int timeout, redisContext **redis_context) { | ||
*redis_context = redisConnectFd(fd); | ||
if (*redis_context == nullptr) { | ||
return {Status::NotOK, "init failed"}; | ||
} | ||
|
||
if ((*redis_context)->err != 0) { | ||
auto error_str = std::string((*redis_context)->errstr); | ||
redisFree(*redis_context); | ||
return {Status::NotOK, error_str}; | ||
} | ||
|
||
if (redisSetTimeout(*redis_context, timeval{timeout, 0}) != REDIS_OK) { | ||
redisFree(*redis_context); | ||
return {Status::NotOK, "set timeout failed"}; | ||
} | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status CreateRedisContextFromConnectedFd(int fd, int timeout, redisContext **redis_context) { | |
*redis_context = redisConnectFd(fd); | |
if (*redis_context == nullptr) { | |
return {Status::NotOK, "init failed"}; | |
} | |
if ((*redis_context)->err != 0) { | |
auto error_str = std::string((*redis_context)->errstr); | |
redisFree(*redis_context); | |
return {Status::NotOK, error_str}; | |
} | |
if (redisSetTimeout(*redis_context, timeval{timeout, 0}) != REDIS_OK) { | |
redisFree(*redis_context); | |
return {Status::NotOK, "set timeout failed"}; | |
} | |
return Status::OK(); | |
} | |
using StaticRedisFree = StaticFunction<decltype(redisFree), redisFree>; | |
struct UniqueRedisContext : std::unique_ptr<redisContext, StaticRedisFree> { | |
using BaseType = std::unique_ptr<redisContext, StaticRedisFree>; | |
using BaseType::BaseType; | |
}; | |
StatusOr<UniqueRedisContext> CreateRedisContextFromConnectedFd(int fd, int timeout) { | |
auto redis_context = UniqueRedisContext{redisConnectFd(fd)}; | |
if (!redis_context) { | |
return {Status::NotOK, "init failed"}; | |
} | |
if (redis_context->err != 0) { | |
return {Status::NotOK, redis_context->errstr}; | |
} | |
if (redisSetTimeout(redis_context.get(), timeval{timeout, 0}) != REDIS_OK) { | |
return {Status::NotOK, "set timeout failed"}; | |
} | |
return redis_context; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit more complex than my imagination, maybe it needs some time to test on the local.
return log_data_; | ||
} | ||
|
||
void MigrateIterator::findMetaData() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findMetaData => findMetadata
if (redis_type == RedisType::kRedisStream) { | ||
stream_iter_->Reset(); | ||
} else { | ||
subdata_iter_->Reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subdata_iter_ => subkey_iter_ to keep consistent with the column family name
|
||
items_.push_back(MigrateItem{metadata_cf_, metadata_iter_->key().ToString(), metadata_iter_->value().ToString()}); | ||
|
||
if (redis_type == RedisType::kRedisStream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can move this reset inside the initSubData?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it looks fine
For introducing hiredis, here are something need to be clarified:
|
There is no TLS requirement, we can simply wrapper In the future, if we need more, we can introduce some more complex clients, such as redis-plus-plus, for now I think a lightweight library |
TLS support for replication is an ongoing effort, and we will support TLS for slot migration in the future. So we need to consider these problems now, to prevent TLS being more hard to support. |
In the future, if there is a need, we can import other libraries and use this library on other existing code. For now, we can start with a simple library. |
We cannot introduce libraries so casually. When introducing a library, we need to make sure that it can indeed solve the requirements/simplify the code and is easy to maintain. We will not randomly introduce a library in the early stages and then introduce other similar libraries later when problems arise: this is not something a well-managed project should do, as it would make the code worse by having multiple libraries doing the same thing. We need to carefully consider before introducing any library. I am not against introducing hiredis, but I believe that these issues should be considered clearly when introducing it. Instead of "randomly introducing a simple library and considering what comes next later." hiredis seems to support TLS, but I'm not sure if it can reuse our SSL_CTX instead of reconstructing it. cc @git-hulk |
We certainly can't just arbitrarily import a library, so I've introduced the most official, and simplest library — |
Yes, need to think through before introducing the new library. Kvrocks's dependencies are more and more, we need to take care of this situation to avoid being out of control. |
I know that hiredis is the official library, but it doesn't seem to answer our question. I would like to provide more specific explanations and understand the context of introducing hiredis: In the current codebase, such as in replication, when we need to construct a client connection, we directly build a raw TCP connection and then use our own redis protocol tools (such as redis::SimpleString and other protocol encoding/decoding functions). These processes are not well encapsulated and there are even some manual protocol constructions, such as Of course, we can introduce a library to solve this problem. However, it is important to consider the aforementioned points when introducing a library:
|
I understand what you mean, you want to use a client library to do all the things you mentioned above. I introduced the These are the things you mention that I probably don't have the answers to yet, and I don't have the answers yet to figure out how to do them, so introducing a simple, easily replaceable library is the best option for now. If we come up with a better way to handle the things you mentioned above, we can easily replace |
I'm fine to import the hiredis which can make the read/write operation easier, but for other libraries like redis-plus-plus, it needs the solid reason why we need it. |
This PR introduces a new migration type called rawkv-based migration(in code named
raw_key_value
), which directly sends rocksdb's key values to the target.As mentioned in in #1223, rawkv-based migration does not need to determine the expiration time during migration, which solves the data consistency problem and avoids the migration failure problem that frequently occurs when migrating data to be expired.
It can save a lot of CPU by eliminating the need to convert Key-Value to the RESP. In my tests, it can save up to 2x CPU on the target side; for small values(100byte), the rawkv-based migration is 2.75 times faster than the command-based migration
For the stability and code review simplicity, I did not combine the two migration types in this CR. Instead, I used two different if else branches. I've included the
hiredis
library for interacting with the destination, and in the future we can usehiredis
to streamline command-based migrations as well.Here is the basic idea of this pull:
TIME
command to check the machine clock gap.BATCHSET
command so that the outside could use this command to inject data directly(may saving some CPU).BATCHSET
command returns an integer, currently is the return data size, the reason for returning an integer is that I plan to make adaptive throttling.MigrateIterator
) and a sender (BatchSender
). This decouples the traversal and sending, which makes the code clearer.