Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rdb_load): add support for loading huge hmaps and zsets #3823

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 129 additions & 55 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
// which is greater than SetFamily::MaxIntsetEntries so we'll always use
// a string set not an int set.
if (pv_->ObjType() != OBJ_SET) {
LOG(ERROR) << "Invalid RDB type " << pv_->ObjType();
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != kEncodingStrMap2) {
LOG(ERROR) << "Invalid encoding " << pv_->Encoding();
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}
Expand Down Expand Up @@ -657,12 +657,37 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
lp = lpShrinkToFit(lp);
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
} else {
StringMap* string_map = CompactObj::AllocateMR<StringMap>();
string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
StringMap* string_map;
if (config_.append) {
// Note we only use append_ when the map size exceeds kMaxBlobLen,
// which is greater than 64 so we'll always use a StringMap set not
// listpack.
if (pv_->ObjType() != OBJ_HASH) {
romange marked this conversation as resolved.
Show resolved Hide resolved
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != kEncodingStrMap2) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}

string_map = static_cast<StringMap*>(pv_->RObjPtr());
} else {
string_map = CompactObj::AllocateMR<StringMap>();
string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs()));

auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR<StringMap>(string_map); });
// Expand the map up front to avoid rehashing.
string_map->Reserve((config_.reserve > len) ? config_.reserve : len);
}

auto cleanup = absl::MakeCleanup([&] {
if (!config_.append) {
CompactObj::DeleteMR<StringMap>(string_map);
}
});
std::string key;
string_map->Reserve(len);
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += increment) {
// ToSV may reference an internal buffer, therefore we can use only before the
Expand Down Expand Up @@ -700,7 +725,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
}
}
}
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
if (!config_.append) {
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
}
std::move(cleanup).Cancel();
}
}
Expand Down Expand Up @@ -775,16 +802,42 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {

void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
size_t zsetlen = ltrace->blob_count();
detail::SortedMap* zs = CompactObj::AllocateMR<detail::SortedMap>();

unsigned encoding = OBJ_ENCODING_SKIPLIST;
auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR<detail::SortedMap>(zs); });
detail::SortedMap* zs;
if (config_.append) {
// Note we only use append_ when the set size exceeds kMaxBlobLen,
// which is greater than server.zset_max_listpack_entries so we'll always
// use a SortedMap set not listpack.
if (pv_->ObjType() != OBJ_ZSET) {
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != OBJ_ENCODING_SKIPLIST) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}

if (zsetlen > 2 && !zs->Reserve(zsetlen)) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
return;
zs = static_cast<detail::SortedMap*>(pv_->RObjPtr());
} else {
zs = CompactObj::AllocateMR<detail::SortedMap>();

size_t reserve = (config_.reserve > zsetlen) ? config_.reserve : zsetlen;
if (reserve > 2 && !zs->Reserve(reserve)) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
return;
}
}

auto cleanup = absl::MakeCleanup([&] {
if (!config_.append) {
CompactObj::DeleteMR<detail::SortedMap>(zs);
}
});

size_t maxelelen = 0, totelelen = 0;

Iterate(*ltrace, [&](const LoadBlob& blob) {
Expand Down Expand Up @@ -822,7 +875,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {

std::move(cleanup).Cancel();

pv_->InitRobj(OBJ_ZSET, encoding, inner);
if (!config_.append) {
pv_->InitRobj(OBJ_ZSET, encoding, inner);
}
}

void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
Expand Down Expand Up @@ -1581,65 +1636,84 @@ auto RdbLoaderBase::ReadGeneric(int rdbtype) -> io::Result<OpaqueObj> {

auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result<OpaqueObj> {
size_t len;
SET_OR_UNEXPECT(LoadLen(nullptr), len);
if (pending_read_.remaining > 0) {
len = pending_read_.remaining;
} else {
SET_OR_UNEXPECT(LoadLen(NULL), len);

unique_ptr<LoadTrace> load_trace(new LoadTrace);
if (rdbtype == RDB_TYPE_HASH) {
len *= 2;
} else {
DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY);
len *= 3;
}

if (rdbtype == RDB_TYPE_HASH) {
len *= 2;
} else {
DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY);
len *= 3;
pending_read_.reserve = len;
}

load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min<size_t>(len, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec)
return make_unexpected(ec);
}
len -= n;
// Limit each read to kMaxBlobLen elements.
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(1);
size_t n = std::min<size_t>(len, kMaxBlobLen);
load_trace->arr[0].resize(n);
for (size_t i = 0; i < n; ++i) {
error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var);
if (ec)
return make_unexpected(ec);
}

// If there are still unread elements, cache the number of remaining
// elements, or clear if the full object has been read.
if (len > n) {
pending_read_.remaining = len - n;
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
}

return OpaqueObj{std::move(load_trace), rdbtype};
}

auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {
/* Read sorted set value. */
uint64_t zsetlen;
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
if (pending_read_.remaining > 0) {
zsetlen = pending_read_.remaining;
} else {
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
pending_read_.reserve = zsetlen;
}

if (zsetlen == 0)
return Unexpected(errc::empty_key);

unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize((zsetlen + kMaxBlobLen - 1) / kMaxBlobLen);

double score;

for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min<size_t>(zsetlen, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}

if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[i][j].score = score;
// Limit each read to kMaxBlobLen elements.
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(1);
size_t n = std::min<size_t>(zsetlen, kMaxBlobLen);
load_trace->arr[0].resize(n);
for (size_t i = 0; i < n; ++i) {
error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}
if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
zsetlen -= n;
load_trace->arr[0][i].score = score;
}

// If there are still unread elements, cache the number of remaining
// elements, or clear if the full object has been read.
if (zsetlen > n) {
pending_read_.remaining = zsetlen - n;
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
}

return OpaqueObj{std::move(load_trace), rdbtype};
Expand Down
52 changes: 46 additions & 6 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,11 +567,11 @@ TEST_F(RdbTest, DflyLoadAppend) {

// Tests loading a huge set, where the set is loaded in multiple partial reads.
TEST_F(RdbTest, LoadHugeSet) {
// Add 2 sets with 200k elements each (note must have more than kMaxBlobLen
// Add 2 sets with 100k elements each (note must have more than kMaxBlobLen
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "set", "elements", "200000"});
ASSERT_EQ(200000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(200000, CheckedInt({"scard", "test:1"}));
Run({"debug", "populate", "2", "test", "100", "rand", "type", "set", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"scard", "test:1"}));

RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");
Expand All @@ -580,8 +580,48 @@ TEST_F(RdbTest, LoadHugeSet) {
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");

ASSERT_EQ(200000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(200000, CheckedInt({"scard", "test:1"}));
ASSERT_EQ(100000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"scard", "test:1"}));
}

// Tests loading a huge hmap, where the map is loaded in multiple partial
// reads.
TEST_F(RdbTest, LoadHugeHMap) {
// Add 2 sets with 100k elements each (note must have more than kMaxBlobLen
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "hash", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"}));

RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");

auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");

ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"}));
}

// Tests loading a huge zset, where the zset is loaded in multiple partial
// reads.
TEST_F(RdbTest, LoadHugeZSet) {
// Add 2 sets with 100k elements each (note must have more than kMaxBlobLen
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "zset", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));

RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");

auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");

ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));
}

} // namespace dfly
Loading