Skip to content

Commit

Permalink
Rename Array2RESP to ArrayOfBulkStrings
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Jan 15, 2024
1 parent e239e83 commit 5a1e1da
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 42 deletions.
16 changes: 8 additions & 8 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ void ReplicationThread::run() {
}

ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
SendString(bev, redis::Array2RESP({"AUTH", srv_->GetConfig()->masterauth}));
SendString(bev, redis::ArrayOfBulkStrings({"AUTH", srv_->GetConfig()->masterauth}));
LOG(INFO) << "[replication] Auth request was sent, waiting for response";
repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
Expand All @@ -418,7 +418,7 @@ ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { //
}

ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent *bev) {
SendString(bev, redis::Array2RESP({"_db_name"}));
SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
LOG(INFO) << "[replication] Check db name request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -456,7 +456,7 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev)
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
SendString(bev, redis::Array2RESP(data_to_send));
SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
LOG(INFO) << "[replication] replconf request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -513,11 +513,11 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
// Also use old PSYNC if replica can't find replication id from WAL and DB.
if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || replid.length() != kReplIdLength) {
next_try_old_psync_ = false; // Reset next_try_old_psync_
SendString(bev, redis::Array2RESP({"PSYNC", std::to_string(next_seq)}));
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id
SendString(bev, redis::Array2RESP({"PSYNC", replid, std::to_string(next_seq)}));
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid, std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use new psync, current unique replication sequence id: " << replid << ":"
<< cur_seq;
}
Expand Down Expand Up @@ -607,7 +607,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}

ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent *bev) {
SendString(bev, redis::Array2RESP({"_fetch_meta"}));
SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
LOG(INFO) << "[replication] Start syncing data with fullsync";
return CBState::NEXT;
Expand Down Expand Up @@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) {
std::string auth = srv_->GetConfig()->masterauth;
if (!auth.empty()) {
UniqueEvbuf evbuf;
const auto auth_command = redis::Array2RESP({"AUTH", auth});
const auto auth_command = redis::ArrayOfBulkStrings({"AUTH", auth});
auto s = util::SockSend(sock_fd, auth_command, ssl);
if (!s.IsOK()) return s.Prefixed("send auth command err");
while (true) {
Expand Down Expand Up @@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const
}
files_str.pop_back();

const auto fetch_command = redis::Array2RESP({"_fetch_file", files_str});
const auto fetch_command = redis::ArrayOfBulkStrings({"_fetch_file", files_str});
auto s = util::SockSend(sock_fd, fetch_command, ssl);
if (!s.IsOK()) return s.Prefixed("send fetch file command");

Expand Down
25 changes: 13 additions & 12 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ void SlotMigrator::clean() {
}

Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
std::string cmd = redis::Array2RESP({"auth", password});
std::string cmd = redis::ArrayOfBulkStrings({"auth", password});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send AUTH command");
Expand All @@ -455,7 +455,8 @@ Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};

std::string cmd = redis::Array2RESP({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
std::string cmd =
redis::ArrayOfBulkStrings({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send command to the destination node");
Expand Down Expand Up @@ -665,7 +666,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
*restore_cmds += redis::Array2RESP(command);
*restore_cmds += redis::ArrayOfBulkStrings(command);
current_pipeline_size_++;

// Check whether pipeline needs to be sent
Expand Down Expand Up @@ -746,7 +747,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
if (metadata.Type() != kRedisBitmap) {
item_count++;
if (item_count >= kMaxItemsInCommand) {
*restore_cmds += redis::Array2RESP(user_cmd);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
item_count = 0;
// Have to clear saved items
Expand All @@ -763,13 +764,13 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata

// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::Array2RESP(user_cmd);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
}

// Add TTL for complex key
if (metadata.expire > 0) {
*restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -808,7 +809,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
if (!s.IsOK()) {
return s;
}
*restore_cmds += redis::Array2RESP(user_cmd);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;

user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
Expand All @@ -821,14 +822,14 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad

// commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according to the current values on the source
*restore_cmds += redis::Array2RESP({"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED",
std::to_string(metadata.entries_added), "MAXDELETEDID",
metadata.max_deleted_entry_id.ToString()});
*restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(),
"ENTRIESADDED", std::to_string(metadata.entries_added), "MAXDELETEDID",
metadata.max_deleted_entry_id.ToString()});
current_pipeline_size_++;

// Add TTL
if (metadata.expire > 0) {
*restore_cmds += redis::Array2RESP({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -860,7 +861,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr<
uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx;
user_cmd->emplace_back(std::to_string(offset));
user_cmd->emplace_back("1");
*restore_cmds += redis::Array2RESP(*user_cmd);
*restore_cmds += redis::ArrayOfBulkStrings(*user_cmd);
current_pipeline_size_++;
user_cmd->erase(user_cmd->begin() + 2, user_cmd->end());
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/redis_reply.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ std::string Array(const std::vector<std::string> &list) {
return result;
}

std::string Array2RESP(const std::vector<std::string> &elems) {
std::string ArrayOfBulkStrings(const std::vector<std::string> &elems) {
std::string result = "*" + std::to_string(elems.size()) + CRLF;
for (const auto &elem : elems) {
result += BulkString(elem);
Expand Down
2 changes: 1 addition & 1 deletion src/server/redis_reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ std::string MultiLen(T len) {
}

std::string Array(const std::vector<std::string> &list);
std::string Array2RESP(const std::vector<std::string> &elements);
std::string ArrayOfBulkStrings(const std::vector<std::string> &elements);

} // namespace redis
4 changes: 2 additions & 2 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1024,15 +1024,15 @@ void Server::GetRoleInfo(std::string *info) {
roles.emplace_back("connecting");
}
roles.emplace_back(std::to_string(storage->LatestSeqNumber()));
*info = redis::Array2RESP(roles);
*info = redis::ArrayOfBulkStrings(roles);
} else {
std::vector<std::string> list;

slave_threads_mu_.lock();
for (const auto &slave : slave_threads_) {
if (slave->IsStopped()) continue;

list.emplace_back(redis::Array2RESP({
list.emplace_back(redis::ArrayOfBulkStrings({
slave->GetConn()->GetAnnounceIP(),
std::to_string(slave->GetConn()->GetListeningPort()),
std::to_string(slave->GetCurrentReplSeq()),
Expand Down
2 changes: 1 addition & 1 deletion src/stats/log_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ std::string SlowEntry::ToRedisString() const {
output.append(redis::Integer(id));
output.append(redis::Integer(time));
output.append(redis::Integer(duration));
output.append(redis::Array2RESP(args));
output.append(redis::ArrayOfBulkStrings(args));
output.append(redis::BulkString(ip + ":" + std::to_string(port)));
output.append(redis::BulkString(client_name));
return output;
Expand Down
12 changes: 6 additions & 6 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic

if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
resp_commands_[ns].emplace_back(redis::Array2RESP(command_args));
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
if (metadata.expire > 0) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::Array2RESP(command_args));
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
} else if (metadata.expire > 0) {
auto args = log_data_.GetArguments();
Expand All @@ -80,7 +80,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
auto cmd = static_cast<RedisCommand>(*parse_result);
if (cmd == kRedisCmdExpire) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::Array2RESP(command_args));
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
}
}
Expand All @@ -103,7 +103,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
std::to_string(stream_metadata.entries_added),
"MAXDELETEDID",
stream_metadata.max_deleted_entry_id.ToString()};
resp_commands_[ns].emplace_back(redis::Array2RESP(command_args));
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}

return rocksdb::Status::OK();
Expand Down Expand Up @@ -262,7 +262,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
}

if (!command_args.empty()) {
resp_commands_[ns].emplace_back(redis::Array2RESP(command_args));
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}

return rocksdb::Status::OK();
Expand Down Expand Up @@ -387,7 +387,7 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
}

if (!command_args.empty()) {
resp_commands_[ns].emplace_back(redis::Array2RESP(command_args));
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}

return rocksdb::Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/string_reply_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class StringReplyTest : public testing::Test {
std::vector<std::string> StringReplyTest::values;

TEST_F(StringReplyTest, MultiBulkString) {
std::string result = redis::Array2RESP(values);
std::string result = redis::ArrayOfBulkStrings(values);
ASSERT_EQ(result.length(), 13 * 10 + 14 * 90 + 15 * 900 + 17 * 9000 + 18 * 90000 + 9);
}

Expand Down
18 changes: 9 additions & 9 deletions utils/kvrocks2redis/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, uint64_t e
auto [ns, user_key] = ExtractNamespaceKey<std::string>(ns_key, slot_id_encoded_);

auto command =
redis::Array2RESP({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))});
redis::ArrayOfBulkStrings({"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))});
Status s = writer_->Write(ns, {command});
if (!s.IsOK()) return s;

if (expire > 0) {
command = redis::Array2RESP({"EXPIREAT", user_key, std::to_string(expire / 1000)});
command = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, std::to_string(expire / 1000)});
s = writer_->Write(ns, {command});
}

Expand Down Expand Up @@ -105,17 +105,17 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
std::string value = iter->value().ToString();
switch (type) {
case kRedisHash:
output = redis::Array2RESP({"HSET", user_key, sub_key, value});
output = redis::ArrayOfBulkStrings({"HSET", user_key, sub_key, value});
break;
case kRedisSet:
output = redis::Array2RESP({"SADD", user_key, sub_key});
output = redis::ArrayOfBulkStrings({"SADD", user_key, sub_key});
break;
case kRedisList:
output = redis::Array2RESP({"RPUSH", user_key, value});
output = redis::ArrayOfBulkStrings({"RPUSH", user_key, value});
break;
case kRedisZSet: {
double score = DecodeDouble(value.data());
output = redis::Array2RESP({"ZADD", user_key, util::Float2String(score), sub_key});
output = redis::ArrayOfBulkStrings({"ZADD", user_key, util::Float2String(score), sub_key});
break;
}
case kRedisBitmap: {
Expand All @@ -126,7 +126,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
}
case kRedisSortedint: {
std::string val = std::to_string(DecodeFixed64(ikey.GetSubKey().data()));
output = redis::Array2RESP({"ZADD", user_key, val, val});
output = redis::ArrayOfBulkStrings({"ZADD", user_key, val, val});
break;
}
default:
Expand All @@ -140,7 +140,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
}

if (metadata.expire > 0) {
output = redis::Array2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)});
output = redis::ArrayOfBulkStrings({"EXPIREAT", user_key, std::to_string(metadata.expire / 1000)});
Status s = writer_->Write(ns, {output});
if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to AOF");
}
Expand All @@ -158,7 +158,7 @@ Status Parser::parseBitmapSegment(const Slice &ns, const Slice &user_key, int in

s = writer_->Write(
ns.ToString(),
{redis::Array2RESP({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})});
{redis::ArrayOfBulkStrings({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})});
if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to AOF");
}
}
Expand Down
2 changes: 1 addition & 1 deletion utils/kvrocks2redis/redis_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Status RedisWriter::FlushDB(const std::string &ns) {
return s;
}

s = Write(ns, {redis::Array2RESP({"FLUSHDB"})});
s = Write(ns, {redis::ArrayOfBulkStrings({"FLUSHDB"})});
if (!s.IsOK()) return s;

return Status::OK();
Expand Down

0 comments on commit 5a1e1da

Please sign in to comment.