Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return string instead of output pointer in metadata #1671

Merged
merged 5 commits into from
Aug 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cluster/redis_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ uint16_t Crc16(const char *buf, size_t len) {
return crc;
}

uint16_t GetSlotIdFromKey(const std::string &key) {
uint16_t GetSlotIdFromKey(std::string_view key) {
auto tag = GetTagFromKey(key);
if (tag.empty()) {
tag = key;
Expand All @@ -63,7 +63,7 @@ uint16_t GetSlotIdFromKey(const std::string &key) {
return crc & HASH_SLOTS_MASK;
}

std::string GetTagFromKey(const std::string &key) {
std::string_view GetTagFromKey(std::string_view key) {
auto left_pos = key.find('{');
if (left_pos == std::string::npos) return {};

Expand Down
4 changes: 2 additions & 2 deletions src/cluster/redis_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ constexpr const uint16_t HASH_SLOTS_SIZE = HASH_SLOTS_MASK + 1; // 16384
constexpr const uint16_t HASH_SLOTS_MAX_ITERATIONS = 50;

uint16_t Crc16(const char *buf, size_t len);
uint16_t GetSlotIdFromKey(const std::string &key);
std::string GetTagFromKey(const std::string &key);
uint16_t GetSlotIdFromKey(std::string_view key);
std::string_view GetTagFromKey(std::string_view key);
17 changes: 6 additions & 11 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ Status SlotMigrator::sendSnapshot() {
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));

// Construct key prefix to iterate the keys belong to the target slot
std::string prefix;
ComposeSlotKeyPrefix(namespace_, slot, &prefix);
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;

// Seek to the beginning of keys start with 'prefix' and iterate all these keys
Expand All @@ -331,8 +330,7 @@ Status SlotMigrator::sendSnapshot() {
}

// Get user key
std::string ns, user_key;
ExtractNamespaceKey(iter->key(), &ns, &user_key, true);
auto [_, user_key] = ExtractNamespaceKey(iter->key(), true);

// Add key's constructed commands to restore_cmds, send pipeline or not according to task's max_pipeline_size
auto result = migrateOneKey(user_key, iter->value(), &restore_cmds);
Expand Down Expand Up @@ -665,9 +663,8 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options));

// Construct key prefix to iterate values of the complex type user key
std::string slot_key, prefix_subkey;
AppendNamespacePrefix(key, &slot_key);
InternalKey(slot_key, "", metadata.version, true).Encode(&prefix_subkey);
std::string slot_key = AppendNamespacePrefix(key);
std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, true).Encode();
int item_count = 0;

for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
Expand Down Expand Up @@ -766,11 +763,9 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
auto iter = util::UniqueIterator(
storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(engine::kStreamColumnFamilyName)));

std::string ns_key;
AppendNamespacePrefix(key, &ns_key);
std::string ns_key = AppendNamespacePrefix(key);
// Construct key prefix to iterate values of the stream
std::string prefix_key;
InternalKey(ns_key, "", metadata.version, true).Encode(&prefix_key);
std::string prefix_key = InternalKey(ns_key, "", metadata.version, true).Encode();

std::vector<std::string> user_cmd = {type_to_cmd[metadata.Type()], key.ToString()};

Expand Down
3 changes: 1 addition & 2 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,7 @@ class CommandCompact : public Commander {
auto ns = conn->GetNamespace();

if (ns != kDefaultNamespace) {
std::string prefix;
ComposeNamespaceKey(ns, "", &prefix, false);
std::string prefix = ComposeNamespaceKey(ns, "", false);

redis::Database redis_db(svr->storage, conn->GetNamespace());
auto s = redis_db.FindKeyRangeWithPrefix(prefix, std::string(), &begin_key, &end_key);
Expand Down
13 changes: 13 additions & 0 deletions src/common/db_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <memory>

#include "fmt/ostream.h"
#include "fmt/std.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/backup_engine.h"
Expand Down Expand Up @@ -103,3 +105,14 @@ inline StatusOr<std::unique_ptr<rocksdb::BackupEngine>> BackupEngineOpen(rocksdb
}

} // namespace util

namespace rocksdb {

inline std::ostream& operator<<(std::ostream& os, const Slice& slice) {
return os << std::string_view{slice.data(), slice.size()};
}

} // namespace rocksdb

template <>
struct fmt::formatter<rocksdb::Slice> : fmt::ostream_formatter {};
9 changes: 4 additions & 5 deletions src/stats/disk_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ namespace redis {
rocksdb::Status Disk::GetApproximateSizes(const Metadata &metadata, const Slice &ns_key,
rocksdb::ColumnFamilyHandle *column_family, uint64_t *key_size,
Slice subkeyleft, Slice subkeyright) {
std::string prefix_key, next_version_prefix_key;
InternalKey(ns_key, subkeyleft, metadata.version, storage_->IsSlotIdEncoded()).Encode(&prefix_key);
InternalKey(ns_key, subkeyright, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(&next_version_prefix_key);
std::string prefix_key = InternalKey(ns_key, subkeyleft, metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, subkeyright, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
auto key_range = rocksdb::Range(prefix_key, next_version_prefix_key);
uint64_t tmp_size = 0;
rocksdb::Status s = storage_->GetDB()->GetApproximateSizes(option_, column_family, &key_range, 1, &tmp_size);
Expand All @@ -46,8 +46,7 @@ rocksdb::Status Disk::GetApproximateSizes(const Metadata &metadata, const Slice

rocksdb::Status Disk::GetKeySize(const Slice &user_key, RedisType type, uint64_t *key_size) {
*key_size = 0;
std::string ns_key;
AppendNamespacePrefix(user_key, &ns_key);
std::string ns_key = AppendNamespacePrefix(user_key);
switch (type) {
case RedisType::kRedisString:
return GetStringSize(ns_key, key_size);
Expand Down
27 changes: 15 additions & 12 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,26 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
return rocksdb::Status::OK();
}

std::string ns, user_key;
std::string ns;
std::vector<std::string> command_args;

if (column_family_id == kColumnFamilyIDMetadata) {
ExtractNamespaceKey(key, &ns, &user_key, is_slot_id_encoded_);
if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != GetSlotIdFromKey(user_key)) {
auto [ns_s, user_key] = ExtractNamespaceKey(key, is_slot_id_encoded_);
if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != GetSlotIdFromKey(user_key.ToStringView())) {
return rocksdb::Status::OK();
}

ns = ns_s.ToString();
auto user_key_str = user_key.ToString();

Metadata metadata(kRedisNone);
metadata.Decode(value.ToString());

if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
command_args = {"SET", user_key_str, value.ToString().substr(Metadata::GetOffsetAfterExpire(value[0]))};
resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
if (metadata.expire > 0) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
command_args = {"PEXPIREAT", user_key_str, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
}
} else if (metadata.expire > 0) {
Expand All @@ -78,7 +81,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic

auto cmd = static_cast<RedisCommand>(*parse_result);
if (cmd == kRedisCmdExpire) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
command_args = {"PEXPIREAT", user_key_str, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::Command2RESP(command_args));
}
}
Expand All @@ -96,7 +99,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
if (!s.ok()) return s;

command_args = {"XSETID",
user_key,
user_key_str,
stream_metadata.last_entry_id.ToString(),
"ENTRIESADDED",
std::to_string(stream_metadata.entries_added),
Expand All @@ -110,7 +113,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic

if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slot_id_encoded_);
user_key = ikey.GetKey().ToString();
auto user_key = ikey.GetKey().ToString();
if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != GetSlotIdFromKey(user_key)) {
return rocksdb::Status::OK();
}
Expand Down Expand Up @@ -271,14 +274,14 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
std::string ns;

if (column_family_id == kColumnFamilyIDMetadata) {
std::string user_key;
ExtractNamespaceKey(key, &ns, &user_key, is_slot_id_encoded_);
auto [ns_s, user_key] = ExtractNamespaceKey(key, is_slot_id_encoded_);
ns = ns_s.ToString();

if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != GetSlotIdFromKey(user_key)) {
if (slot_id_ >= 0 && static_cast<uint16_t>(slot_id_) != GetSlotIdFromKey(user_key.ToStringView())) {
return rocksdb::Status::OK();
}

command_args = {"DEL", user_key};
command_args = {"DEL", user_key.ToString()};
} else if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slot_id_encoded_);
std::string user_key = ikey.GetKey().ToString();
Expand Down
9 changes: 4 additions & 5 deletions src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <string>
#include <utility>

#include "db_util.h"
#include "time_util.h"
#include "types/redis_bitmap.h"

Expand All @@ -34,10 +35,10 @@ using rocksdb::Slice;

bool MetadataFilter::Filter(int level, const Slice &key, const Slice &value, std::string *new_value,
bool *modified) const {
std::string ns, user_key, bytes = value.ToString();
std::string bytes = value.ToString();
Metadata metadata(kRedisNone, false);
rocksdb::Status s = metadata.Decode(bytes);
ExtractNamespaceKey(key, &ns, &user_key, stor_->IsSlotIdEncoded());
auto [ns, user_key] = ExtractNamespaceKey(key, stor_->IsSlotIdEncoded());
if (!s.ok()) {
LOG(WARNING) << "[compact_filter/metadata] Failed to decode,"
<< ", namespace: " << ns << ", key: " << user_key << ", err: " << s.ToString();
Expand All @@ -50,13 +51,11 @@ bool MetadataFilter::Filter(int level, const Slice &key, const Slice &value, std
}

Status SubKeyFilter::GetMetadata(const InternalKey &ikey, Metadata *metadata) const {
std::string metadata_key;

auto db = stor_->GetDB();
const auto cf_handles = stor_->GetCFHandles();
// storage close the would delete the column family handler and DB
if (!db || cf_handles->size() < 2) return {Status::NotOK, "storage is closed"};
ComposeNamespaceKey(ikey.GetNamespace(), ikey.GetKey(), &metadata_key, stor_->IsSlotIdEncoded());
std::string metadata_key = ComposeNamespaceKey(ikey.GetNamespace(), ikey.GetKey(), stor_->IsSlotIdEncoded());

if (cached_key_.empty() || metadata_key != cached_key_) {
std::string bytes;
Expand Down
Loading