From 91bfc94fbb0c4644be7ba2f4ea76b72a195f3a14 Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Sun, 29 Sep 2024 12:24:05 +0100 Subject: [PATCH 1/3] feat(rdb_load): add support for loading huge hmaps --- src/server/rdb_load.cc | 81 ++++++++++++++++++++++++++++++------------ src/server/rdb_test.cc | 32 +++++++++++++---- 2 files changed, 85 insertions(+), 28 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 60ae8d4e09e1..1762197759c5 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -657,12 +657,37 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) { lp = lpShrinkToFit(lp); pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp); } else { - StringMap* string_map = CompactObj::AllocateMR(); - string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs())); + StringMap* string_map; + if (config_.append) { + // Note we only use append_ when the map size exceeds kMaxBlobLen, + // which is greater than 64 so we'll always use a StringMap set not + // listpack. + if (pv_->ObjType() != OBJ_HASH) { + LOG(ERROR) << "Invalid RDB type " << pv_->ObjType(); + ec_ = RdbError(errc::invalid_rdb_type); + return; + } + if (pv_->Encoding() != kEncodingStrMap2) { + LOG(ERROR) << "Invalid encoding " << pv_->Encoding(); + ec_ = RdbError(errc::invalid_encoding); + return; + } + + string_map = static_cast(pv_->RObjPtr()); + } else { + string_map = CompactObj::AllocateMR(); + string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs())); + + // Expand the map up front to avoid rehashing. + string_map->Reserve((config_.reserve > len) ? config_.reserve : len); + } - auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR(string_map); }); + auto cleanup = absl::MakeCleanup([&] { + if (!config_.append) { + CompactObj::DeleteMR(string_map); + } + }); std::string key; - string_map->Reserve(len); for (const auto& seg : ltrace->arr) { for (size_t i = 0; i < seg.size(); i += increment) { // ToSV may reference an internal buffer, therefore we can use only before the @@ -700,7 +725,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) { } } } - pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map); + if (!config_.append) { + pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map); + } std::move(cleanup).Cancel(); } } @@ -1581,27 +1608,37 @@ auto RdbLoaderBase::ReadGeneric(int rdbtype) -> io::Result { auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result { size_t len; - SET_OR_UNEXPECT(LoadLen(nullptr), len); + if (pending_read_.remaining > 0) { + len = pending_read_.remaining; + } else { + SET_OR_UNEXPECT(LoadLen(NULL), len); - unique_ptr load_trace(new LoadTrace); + if (rdbtype == RDB_TYPE_HASH) { + len *= 2; + } else { + DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY); + len *= 3; + } - if (rdbtype == RDB_TYPE_HASH) { - len *= 2; - } else { - DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY); - len *= 3; + pending_read_.reserve = len; } - load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen); - for (size_t i = 0; i < load_trace->arr.size(); ++i) { - size_t n = std::min(len, kMaxBlobLen); - load_trace->arr[i].resize(n); - for (size_t j = 0; j < n; ++j) { - error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var); - if (ec) - return make_unexpected(ec); - } - len -= n; + unique_ptr load_trace(new LoadTrace); + load_trace->arr.resize(1); + size_t n = std::min(len, kMaxBlobLen); + load_trace->arr[0].resize(n); + for (size_t i = 0; i < n; ++i) { + error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var); + if (ec) + return make_unexpected(ec); + } + + // If there are still unread elements, cache the number of remaining + // elements, or clear if the full object has been read. + if (len > n) { + pending_read_.remaining = len - n; + } else if (pending_read_.remaining > 0) { + pending_read_.remaining = 0; } return OpaqueObj{std::move(load_trace), rdbtype}; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index b60ff7e5b09f..4da8222e7c5f 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -567,11 +567,11 @@ TEST_F(RdbTest, DflyLoadAppend) { // Tests loading a huge set, where the set is loaded in multiple partial reads. TEST_F(RdbTest, LoadHugeSet) { - // Add 2 sets with 200k elements each (note must have more than kMaxBlobLen + // Add 2 sets with 100k elements each (note must have more than kMaxBlobLen // elements to test partial reads). - Run({"debug", "populate", "2", "test", "100", "rand", "type", "set", "elements", "200000"}); - ASSERT_EQ(200000, CheckedInt({"scard", "test:0"})); - ASSERT_EQ(200000, CheckedInt({"scard", "test:1"})); + Run({"debug", "populate", "2", "test", "100", "rand", "type", "set", "elements", "100000"}); + ASSERT_EQ(100000, CheckedInt({"scard", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"scard", "test:1"})); RespExpr resp = Run({"save", "df"}); ASSERT_EQ(resp, "OK"); @@ -580,8 +580,28 @@ TEST_F(RdbTest, LoadHugeSet) { resp = Run({"dfly", "load", save_info.file_name}); ASSERT_EQ(resp, "OK"); - ASSERT_EQ(200000, CheckedInt({"scard", "test:0"})); - ASSERT_EQ(200000, CheckedInt({"scard", "test:1"})); + ASSERT_EQ(100000, CheckedInt({"scard", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"scard", "test:1"})); +} + +// Tests loading a huge hmap, where the map is loaded in multiple partial +// reads. +TEST_F(RdbTest, LoadHugeHMap) { + // Add 2 sets with 100k elements each (note must have more than kMaxBlobLen + // elements to test partial reads). + Run({"debug", "populate", "2", "test", "100", "rand", "type", "hash", "elements", "100000"}); + ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"})); + + RespExpr resp = Run({"save", "df"}); + ASSERT_EQ(resp, "OK"); + + auto save_info = service_->server_family().GetLastSaveInfo(); + resp = Run({"dfly", "load", save_info.file_name}); + ASSERT_EQ(resp, "OK"); + + ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"})); } } // namespace dfly From d665a313fe41a42edccc3cd6b8a88b9de7f187c7 Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Sun, 29 Sep 2024 12:38:12 +0100 Subject: [PATCH 2/3] feat(rdb_load): add support for loading huge zsets --- src/server/rdb_load.cc | 99 +++++++++++++++++++++++++++++------------- src/server/rdb_test.cc | 20 +++++++++ 2 files changed, 88 insertions(+), 31 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 1762197759c5..a68a8b4011e0 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -802,16 +802,42 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { size_t zsetlen = ltrace->blob_count(); - detail::SortedMap* zs = CompactObj::AllocateMR(); + unsigned encoding = OBJ_ENCODING_SKIPLIST; - auto cleanup = absl::MakeCleanup([&] { CompactObj::DeleteMR(zs); }); + detail::SortedMap* zs; + if (config_.append) { + // Note we only use append_ when the set size exceeds kMaxBlobLen, + // which is greater than server.zset_max_listpack_entries so we'll always + // use a SortedMap set not listpack. + if (pv_->ObjType() != OBJ_ZSET) { + LOG(ERROR) << "Invalid RDB type " << pv_->ObjType(); + ec_ = RdbError(errc::invalid_rdb_type); + return; + } + if (pv_->Encoding() != OBJ_ENCODING_SKIPLIST) { + LOG(ERROR) << "Invalid encoding " << pv_->Encoding(); + ec_ = RdbError(errc::invalid_encoding); + return; + } - if (zsetlen > 2 && !zs->Reserve(zsetlen)) { - LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; - ec_ = RdbError(errc::out_of_memory); - return; + zs = static_cast(pv_->RObjPtr()); + } else { + zs = CompactObj::AllocateMR(); + + size_t reserve = (config_.reserve > zsetlen) ? config_.reserve : zsetlen; + if (reserve > 2 && !zs->Reserve(reserve)) { + LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; + ec_ = RdbError(errc::out_of_memory); + return; + } } + auto cleanup = absl::MakeCleanup([&] { + if (!config_.append) { + CompactObj::DeleteMR(zs); + } + }); + size_t maxelelen = 0, totelelen = 0; Iterate(*ltrace, [&](const LoadBlob& blob) { @@ -849,7 +875,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { std::move(cleanup).Cancel(); - pv_->InitRobj(OBJ_ZSET, encoding, inner); + if (!config_.append) { + pv_->InitRobj(OBJ_ZSET, encoding, inner); + } } void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { @@ -1623,6 +1651,7 @@ auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result { pending_read_.reserve = len; } + // Limit each read to kMaxBlobLen elements. unique_ptr load_trace(new LoadTrace); load_trace->arr.resize(1); size_t n = std::min(len, kMaxBlobLen); @@ -1645,38 +1674,46 @@ auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result { } auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result { - /* Read sorted set value. */ uint64_t zsetlen; - SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen); + if (pending_read_.remaining > 0) { + zsetlen = pending_read_.remaining; + } else { + SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen); + pending_read_.reserve = zsetlen; + } if (zsetlen == 0) return Unexpected(errc::empty_key); - unique_ptr load_trace(new LoadTrace); - load_trace->arr.resize((zsetlen + kMaxBlobLen - 1) / kMaxBlobLen); - double score; - for (size_t i = 0; i < load_trace->arr.size(); ++i) { - size_t n = std::min(zsetlen, kMaxBlobLen); - load_trace->arr[i].resize(n); - for (size_t j = 0; j < n; ++j) { - error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var); - if (ec) - return make_unexpected(ec); - if (rdbtype == RDB_TYPE_ZSET_2) { - SET_OR_UNEXPECT(FetchBinaryDouble(), score); - } else { - SET_OR_UNEXPECT(FetchDouble(), score); - } - - if (isnan(score)) { - LOG(ERROR) << "Zset with NAN score detected"; - return Unexpected(errc::rdb_file_corrupted); - } - load_trace->arr[i][j].score = score; + // Limit each read to kMaxBlobLen elements. + unique_ptr load_trace(new LoadTrace); + load_trace->arr.resize(1); + size_t n = std::min(zsetlen, kMaxBlobLen); + load_trace->arr[0].resize(n); + for (size_t i = 0; i < n; ++i) { + error_code ec = ReadStringObj(&load_trace->arr[0][i].rdb_var); + if (ec) + return make_unexpected(ec); + if (rdbtype == RDB_TYPE_ZSET_2) { + SET_OR_UNEXPECT(FetchBinaryDouble(), score); + } else { + SET_OR_UNEXPECT(FetchDouble(), score); + } + if (isnan(score)) { + LOG(ERROR) << "Zset with NAN score detected"; + return Unexpected(errc::rdb_file_corrupted); } - zsetlen -= n; + load_trace->arr[0][i].score = score; + } + + // If there are still unread elements, cache the number of remaining + // elements, or clear if the full object has been read. + if (zsetlen > n) { + pending_read_.remaining = zsetlen - n; + } else if (pending_read_.remaining > 0) { + pending_read_.remaining = 0; } return OpaqueObj{std::move(load_trace), rdbtype}; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 4da8222e7c5f..3f6025fc3721 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -604,4 +604,24 @@ TEST_F(RdbTest, LoadHugeHMap) { ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"})); } +// Tests loading a huge zset, where the zset is loaded in multiple partial +// reads. +TEST_F(RdbTest, LoadHugeZSet) { + // Add 2 sets with 100k elements each (note must have more than kMaxBlobLen + // elements to test partial reads). + Run({"debug", "populate", "2", "test", "100", "rand", "type", "zset", "elements", "100000"}); + ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"})); + + RespExpr resp = Run({"save", "df"}); + ASSERT_EQ(resp, "OK"); + + auto save_info = service_->server_family().GetLastSaveInfo(); + resp = Run({"dfly", "load", save_info.file_name}); + ASSERT_EQ(resp, "OK"); + + ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"})); + ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"})); +} + } // namespace dfly From 72524626e55f8b4b4cffd8619ad8098cecdd7f3e Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Mon, 30 Sep 2024 08:56:49 +0100 Subject: [PATCH 3/3] feat(rdb_load): log DFATAL when append fails --- src/server/rdb_load.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index a68a8b4011e0..75162c09cba1 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -545,12 +545,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) { // which is greater than SetFamily::MaxIntsetEntries so we'll always use // a string set not an int set. if (pv_->ObjType() != OBJ_SET) { - LOG(ERROR) << "Invalid RDB type " << pv_->ObjType(); + LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType(); ec_ = RdbError(errc::invalid_rdb_type); return; } if (pv_->Encoding() != kEncodingStrMap2) { - LOG(ERROR) << "Invalid encoding " << pv_->Encoding(); + LOG(DFATAL) << "Invalid encoding " << pv_->Encoding(); ec_ = RdbError(errc::invalid_encoding); return; } @@ -663,12 +663,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) { // which is greater than 64 so we'll always use a StringMap set not // listpack. if (pv_->ObjType() != OBJ_HASH) { - LOG(ERROR) << "Invalid RDB type " << pv_->ObjType(); + LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType(); ec_ = RdbError(errc::invalid_rdb_type); return; } if (pv_->Encoding() != kEncodingStrMap2) { - LOG(ERROR) << "Invalid encoding " << pv_->Encoding(); + LOG(DFATAL) << "Invalid encoding " << pv_->Encoding(); ec_ = RdbError(errc::invalid_encoding); return; } @@ -810,12 +810,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { // which is greater than server.zset_max_listpack_entries so we'll always // use a SortedMap set not listpack. if (pv_->ObjType() != OBJ_ZSET) { - LOG(ERROR) << "Invalid RDB type " << pv_->ObjType(); + LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType(); ec_ = RdbError(errc::invalid_rdb_type); return; } if (pv_->Encoding() != OBJ_ENCODING_SKIPLIST) { - LOG(ERROR) << "Invalid encoding " << pv_->Encoding(); + LOG(DFATAL) << "Invalid encoding " << pv_->Encoding(); ec_ = RdbError(errc::invalid_encoding); return; }