From a3150db55ab9a1ebadbc9b9525a220f20067de6f Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Sun, 29 Sep 2024 12:38:12 +0100 Subject: [PATCH] feat(rdb_load): add support for loading huge zsets --- src/server/rdb_load.cc | 95 ++++++++++++++++++++++++++++-------------- src/server/rdb_test.cc | 20 +++++++++ 2 files changed, 84 insertions(+), 31 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 1762197759c5..1af7998dcf7e 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -802,16 +802,39 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { size_t zsetlen = ltrace->blob_count(); - detail::SortedMap* zs = CompactObj::AllocateMR(); + unsigned encoding = OBJ_ENCODING_SKIPLIST; - auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR(zs); }); + detail::SortedMap* zs; + if (config_.append) { + if (pv_->ObjType() != OBJ_ZSET) { + LOG(ERROR) << "Invalid RDB type " << pv_->ObjType(); + ec_ = RdbError(errc::invalid_rdb_type); + return; + } + if (pv_->Encoding() != OBJ_ENCODING_SKIPLIST) { + LOG(ERROR) << "Invalid encoding " << pv_->Encoding(); + ec_ = RdbError(errc::invalid_encoding); + return; + } - if (zsetlen > 2 && !zs->Reserve(zsetlen)) { - LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; - ec_ = RdbError(errc::out_of_memory); - return; + zs = static_cast(pv_->RObjPtr()); + } else { + zs = CompactObj::AllocateMR(); + + size_t reserve = (config_.reserve > zsetlen) ? config_.reserve : zsetlen; + if (reserve > 2 && !zs->Reserve(reserve)) { + LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; + ec_ = RdbError(errc::out_of_memory); + return; + } } + auto cleanup = absl::MakeCleanup([&] { + if (!config_.append) { + CompactObj::DeleteMR(zs); + } + }); + size_t maxelelen = 0, totelelen = 0; Iterate(*ltrace, [&](const LoadBlob& blob) { @@ -849,7 +872,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { std::move(cleanup).Cancel(); - pv_->InitRobj(OBJ_ZSET, encoding, inner); + if (!config_.append) { + pv_->InitRobj(OBJ_ZSET, encoding, inner); + } } void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { @@ -1645,38 +1670,46 @@ auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result { } auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result { - /* Read sorted set value. */ uint64_t zsetlen; - SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen); + if (pending_read_.remaining > 0) { + zsetlen = pending_read_.remaining; + } else { + SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen); + pending_read_.reserve = zsetlen; + } if (zsetlen == 0) return Unexpected(errc::empty_key); - unique_ptr load_trace(new LoadTrace); - load_trace->arr.resize((zsetlen + kMaxBlobLen - 1) / kMaxBlobLen); - double score; - for (size_t i = 0; i < load_trace->arr.size(); ++i) { - size_t n = std::min(zsetlen, kMaxBlobLen); - load_trace->arr[i].resize(n); - for (size_t j = 0; j < n; ++j) { - error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var); - if (ec) - return make_unexpected(ec); - if (rdbtype == RDB_TYPE_ZSET_2) { - SET_OR_UNEXPECT(FetchBinaryDouble(), score); - } else { - SET_OR_UNEXPECT(FetchDouble(), score); - } - - if (isnan(score)) { - LOG(ERROR) << "Zset with NAN score detected"; - return Unexpected(errc::rdb_file_corrupted); - } - load_trace->arr[i][j].score = score; + // Limit each read to kMaxBlobLen elements. + unique_ptr load_trace(new LoadTrace); + load_trace->arr.resize(1); + size_t n = std::min(zsetlen, kMaxBlobLen); + load_trace->arr[0].resize(n); + for (size_t i = 0; i < n; ++i) { + error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var); + if (ec) + return make_unexpected(ec); + if (rdbtype == RDB_TYPE_ZSET_2) { + SET_OR_UNEXPECT(FetchBinaryDouble(), score); + } else { + SET_OR_UNEXPECT(FetchDouble(), score); + } + if (isnan(score)) { + LOG(ERROR) << "Zset with NAN score detected"; + return Unexpected(errc::rdb_file_corrupted); } - zsetlen -= n; + load_trace->arr[0][i].score = score; + } + + // If there are still unread elements, cache the number of remaining + // elements, or clear if the full object has been read. + if (zsetlen > n) { + pending_read_.remaining = zsetlen - n; + } else if (pending_read_.remaining > 0) { + pending_read_.remaining = 0; } return OpaqueObj{std::move(load_trace), rdbtype}; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 4da8222e7c5f..3f6025fc3721 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -604,4 +604,24 @@ TEST_F(RdbTest, LoadHugeHMap) { ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"})); } +// Tests loading a huge zset, where the zset is loaded in multiple partial +// reads. +TEST_F(RdbTest, LoadHugeZSet) { + // Add 2 sets with 100k elements each (note must have more than kMaxBlobLen + // elements to test partial reads). + Run({"debug", "populate", "2", "test", "100", "rand", "type", "zset", "elements", "100000"}); + ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"})); + + RespExpr resp = Run({"save", "df"}); + ASSERT_EQ(resp, "OK"); + + auto save_info = service_->server_family().GetLastSaveInfo(); + resp = Run({"dfly", "load", save_info.file_name}); + ASSERT_EQ(resp, "OK"); + + ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"})); +} + } // namespace dfly