Skip to content

Commit

Permalink
feat(rdb_load): add support for loading huge zsets
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Sep 29, 2024
1 parent 521db55 commit a3150db
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 31 deletions.
95 changes: 64 additions & 31 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,16 +802,39 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {

void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
size_t zsetlen = ltrace->blob_count();
detail::SortedMap* zs = CompactObj::AllocateMR<detail::SortedMap>();

unsigned encoding = OBJ_ENCODING_SKIPLIST;
auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR<detail::SortedMap>(zs); });
detail::SortedMap* zs;
if (config_.append) {
if (pv_->ObjType() != OBJ_ZSET) {
LOG(ERROR) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != OBJ_ENCODING_SKIPLIST) {
LOG(ERROR) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
return;
}

if (zsetlen > 2 && !zs->Reserve(zsetlen)) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
return;
zs = static_cast<detail::SortedMap*>(pv_->RObjPtr());
} else {
zs = CompactObj::AllocateMR<detail::SortedMap>();

size_t reserve = (config_.reserve > zsetlen) ? config_.reserve : zsetlen;
if (reserve > 2 && !zs->Reserve(reserve)) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
return;
}
}

auto cleanup = absl::MakeCleanup([&] {
if (!config_.append) {
CompactObj::DeleteMR<detail::SortedMap>(zs);
}
});

size_t maxelelen = 0, totelelen = 0;

Iterate(*ltrace, [&](const LoadBlob& blob) {
Expand Down Expand Up @@ -849,7 +872,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {

std::move(cleanup).Cancel();

pv_->InitRobj(OBJ_ZSET, encoding, inner);
if (!config_.append) {
pv_->InitRobj(OBJ_ZSET, encoding, inner);
}
}

void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
Expand Down Expand Up @@ -1645,38 +1670,46 @@ auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result<OpaqueObj> {
}

auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {
/* Read sorted set value. */
uint64_t zsetlen;
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
if (pending_read_.remaining > 0) {
zsetlen = pending_read_.remaining;
} else {
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
pending_read_.reserve = zsetlen;
}

if (zsetlen == 0)
return Unexpected(errc::empty_key);

unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize((zsetlen + kMaxBlobLen - 1) / kMaxBlobLen);

double score;

for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min<size_t>(zsetlen, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}

if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[i][j].score = score;
// Limit each read to kMaxBlobLen elements.
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(1);
size_t n = std::min<size_t>(zsetlen, kMaxBlobLen);
load_trace->arr[0].resize(n);
for (size_t i = 0; i < n; ++i) {
error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}
if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
zsetlen -= n;
load_trace->arr[0][i].score = score;
}

// If there are still unread elements, cache the number of remaining
// elements, or clear if the full object has been read.
if (zsetlen > n) {
pending_read_.remaining = zsetlen - n;
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
}

return OpaqueObj{std::move(load_trace), rdbtype};
Expand Down
20 changes: 20 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,4 +604,24 @@ TEST_F(RdbTest, LoadHugeHMap) {
ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"}));
}

// Tests loading a huge zset, where the zset is loaded in multiple partial
// reads.
TEST_F(RdbTest, LoadHugeZSet) {
// Add 2 sets with 100k elements each (note must have more than kMaxBlobLen
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "zset", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));

RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");

auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");

ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));
}

} // namespace dfly

0 comments on commit a3150db

Please sign in to comment.