diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index e03ad2493037..b288f3213a8a 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -134,13 +134,15 @@ class RdbRestoreValue : protected RdbLoaderBase { std::optional RdbRestoreValue::Parse(std::string_view payload) { InMemSource source(payload); src_ = &source; - if (auto type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) { - io::Result io_res = ReadObj(type_id.value()); // load the type from the input stream - if (!io_res) { + if (io::Result type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) { + OpaqueObj obj; + error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream + if (ec) { LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value(); return std::nullopt; } - return std::optional(std::move(io_res.value())); + + return std::optional(std::move(obj)); } else { LOG(ERROR) << "failed to load type id from the input stream or type id is invalid"; return std::nullopt; @@ -149,20 +151,20 @@ std::optional RdbRestoreValue::Parse(std::string_view bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice& db_slice, DbIndex index, uint64_t expire_ms) { - auto value_to_load = Parse(data); - if (!value_to_load) { + auto opaque_res = Parse(data); + if (!opaque_res) { return false; } - Item item{ - .key = std::string(key), .val = std::move(value_to_load.value()), .expire_ms = expire_ms}; + PrimeValue pv; - if (auto ec = Visit(item, &pv); ec) { + if (auto ec = FromOpaque(*opaque_res, &pv); ec) { // we failed - report and exit LOG(WARNING) << "error while trying to save data: " << ec; return false; } + DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()}; - auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), item.expire_ms); + auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), expire_ms); return added; } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 56e3bdb7f9b0..097fb28a2cd6 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1183,69 +1183,90 @@ auto RdbLoaderBase::ReadKey() -> io::Result { return FetchGenericString(); } -auto RdbLoaderBase::ReadObj(int rdbtype) -> io::Result { +error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { + io::Result iores; + switch (rdbtype) { case RDB_TYPE_STRING: { + dest->rdb_type = RDB_TYPE_STRING; + /* Read string value */ - auto fetch = ReadStringObj(); - if (!fetch) - return make_unexpected(fetch.error()); - return OpaqueObj{std::move(*fetch), RDB_TYPE_STRING}; + return ReadStringObj(&dest->obj); } case RDB_TYPE_SET: - return ReadSet(); + iores = ReadSet(); + break; case RDB_TYPE_SET_INTSET: - return ReadIntSet(); + iores = ReadIntSet(); + break; case RDB_TYPE_HASH_ZIPLIST: - return ReadHZiplist(); + iores = ReadHZiplist(); + break; case RDB_TYPE_HASH: - return ReadHMap(); + iores = ReadHMap(); + break; case RDB_TYPE_ZSET: case RDB_TYPE_ZSET_2: - return ReadZSet(rdbtype); + iores = ReadZSet(rdbtype); + break; case RDB_TYPE_ZSET_ZIPLIST: - return ReadZSetZL(); + iores = ReadZSetZL(); + break; case RDB_TYPE_LIST_QUICKLIST: case RDB_TYPE_LIST_QUICKLIST_2: - return ReadListQuicklist(rdbtype); + iores = ReadListQuicklist(rdbtype); + break; case RDB_TYPE_STREAM_LISTPACKS: - return ReadStreams(); + iores = ReadStreams(); break; - } + break; + default: + LOG(ERROR) << "Unsupported rdb type " << rdbtype; - LOG(ERROR) << "Unsupported rdb type " << rdbtype; + return RdbError(errc::invalid_encoding); + } - return Unexpected(errc::invalid_encoding); + if (!iores) + return iores.error(); + *dest = std::move(*iores); + return error_code{}; } -auto RdbLoaderBase::ReadStringObj() -> io::Result { +error_code RdbLoaderBase::ReadStringObj(RdbVariant* dest) { bool isencoded; size_t len; - SET_OR_UNEXPECT(LoadLen(&isencoded), len); + SET_OR_RETURN(LoadLen(&isencoded), len); if (isencoded) { switch (len) { case RDB_ENC_INT8: case RDB_ENC_INT16: - case RDB_ENC_INT32: - return ReadIntObj(len); - case RDB_ENC_LZF: - return ReadLzf(); + case RDB_ENC_INT32: { + io::Result io_int = ReadIntObj(len); + if (!io_int) + return io_int.error(); + dest->emplace(*io_int); + return error_code{}; + } + case RDB_ENC_LZF: { + io::Result lzf = ReadLzf(); + if (!lzf) + return lzf.error(); + + dest->emplace(std::move(lzf.value())); + return error_code{}; + } default: LOG(ERROR) << "Unknown RDB string encoding " << len; - return Unexpected(errc::rdb_file_corrupted); + return RdbError(errc::rdb_file_corrupted); } } - base::PODArray blob; + auto& blob = dest->emplace>(); blob.resize(len); error_code ec = FetchBuf(len, blob.data()); - if (ec) { - return make_unexpected(ec); - } - - return blob; + return ec; } io::Result RdbLoaderBase::ReadIntObj(int enctype) { @@ -1295,24 +1316,24 @@ auto RdbLoaderBase::ReadSet() -> io::Result { unique_ptr res(new LoadTrace); res->arr.resize(len); for (size_t i = 0; i < len; i++) { - io::Result fetch = ReadStringObj(); - if (!fetch) { - return make_unexpected(fetch.error()); + error_code ec = ReadStringObj(&res->arr[i].rdb_var); + if (ec) { + return make_unexpected(ec); } - res->arr[i].rdb_var = std::move(fetch.value()); } return OpaqueObj{std::move(res), RDB_TYPE_SET}; } auto RdbLoaderBase::ReadIntSet() -> io::Result { - io::Result fetch = ReadStringObj(); - if (!fetch) { - return make_unexpected(fetch.error()); + RdbVariant obj; + error_code ec = ReadStringObj(&obj); + if (ec) { + return make_unexpected(ec); } - const LzfString* lzf = get_if(&fetch.value()); - const base::PODArray* arr = get_if>(&fetch.value()); + const LzfString* lzf = get_if(&obj); + const base::PODArray* arr = get_if>(&obj); if (lzf) { if (lzf->uncompressed_len == 0 || lzf->compressed_blob.empty()) @@ -1324,12 +1345,14 @@ auto RdbLoaderBase::ReadIntSet() -> io::Result { return Unexpected(errc::rdb_file_corrupted); } - return OpaqueObj{std::move(*fetch), RDB_TYPE_SET_INTSET}; + return OpaqueObj{std::move(obj), RDB_TYPE_SET_INTSET}; } auto RdbLoaderBase::ReadHZiplist() -> io::Result { RdbVariant str_obj; - SET_OR_UNEXPECT(ReadStringObj(), str_obj); + error_code ec = ReadStringObj(&str_obj); + if (ec) + return make_unexpected(ec); if (StrLen(str_obj) == 0) { return Unexpected(errc::rdb_file_corrupted); @@ -1349,7 +1372,9 @@ auto RdbLoaderBase::ReadHMap() -> io::Result { load_trace->arr.resize(len * 2); for (size_t i = 0; i < load_trace->arr.size(); ++i) { - SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var); + error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var); + if (ec) + return make_unexpected(ec); } return OpaqueObj{std::move(load_trace), RDB_TYPE_HASH}; @@ -1369,7 +1394,9 @@ auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result { double score; for (size_t i = 0; i < load_trace->arr.size(); ++i) { - SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var); + error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var); + if (ec) + return make_unexpected(ec); if (rdbtype == RDB_TYPE_ZSET_2) { SET_OR_UNEXPECT(FetchBinaryDouble(), score); } else { @@ -1388,7 +1415,9 @@ auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result { auto RdbLoaderBase::ReadZSetZL() -> io::Result { RdbVariant str_obj; - SET_OR_UNEXPECT(ReadStringObj(), str_obj); + error_code ec = ReadStringObj(&str_obj); + if (ec) + return make_unexpected(ec); if (StrLen(str_obj) == 0) { return Unexpected(errc::rdb_file_corrupted); @@ -1420,7 +1449,10 @@ auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result { } RdbVariant var; - SET_OR_UNEXPECT(ReadStringObj(), var); + error_code ec = ReadStringObj(&var); + if (ec) + return make_unexpected(ec); + if (StrLen(var) == 0) { return Unexpected(errc::rdb_file_corrupted); } @@ -1438,21 +1470,24 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { unique_ptr load_trace(new LoadTrace); load_trace->arr.resize(listpacks * 2); + error_code ec; for (size_t i = 0; i < listpacks; ++i) { /* Get the master ID, the one we'll use as key of the radix tree * node: the entries inside the listpack itself are delta-encoded * relatively to this ID. */ RdbVariant stream_id, blob; - SET_OR_UNEXPECT(ReadStringObj(), stream_id); - + ec = ReadStringObj(&stream_id); + if (ec) + return make_unexpected(ec); if (StrLen(stream_id) != sizeof(streamID)) { LOG(ERROR) << "Stream node key entry is not the size of a stream ID"; return Unexpected(errc::rdb_file_corrupted); } - SET_OR_UNEXPECT(ReadStringObj(), blob); - + ec = ReadStringObj(&blob); + if (ec) + return make_unexpected(ec); if (StrLen(blob) == 0) { LOG(ERROR) << "Stream listpacks loading failed"; return Unexpected(errc::rdb_file_corrupted); @@ -1484,7 +1519,9 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { // sds cgname; RdbVariant cgname; - SET_OR_UNEXPECT(ReadStringObj(), cgname); + ec = ReadStringObj(&cgname); + if (ec) + return make_unexpected(ec); cgroup.name = std::move(cgname); SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms); @@ -1529,7 +1566,9 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { for (size_t j = 0; j < consumers_num; ++j) { auto& consumer = cgroup.cons_arr[j]; - SET_OR_UNEXPECT(ReadStringObj(), consumer.name); + ec = ReadStringObj(&consumer.name); + if (ec) + return make_unexpected(ec); SET_OR_UNEXPECT(FetchInt(), consumer.seen_time); @@ -1788,10 +1827,10 @@ error_code RdbLoader::Load(io::Source* src) { std::error_code RdbLoaderBase::EnsureRead(size_t min_sz) { // In the flow of reading compressed data, we store the uncompressed data to in uncompressed - // buffer. When parsing entries we call ensure read with 9 bytes to read the length of key/value. - // If the key/value is very small (less than 9 bytes) the remainded data in uncompressed buffer - // might contain less than 9 bytes. We need to make sure that we dont read from sink to the - // uncompressed buffer and therefor in this flow we return here. + // buffer. When parsing entries we call ensure read with 9 bytes to read the length of + // key/value. If the key/value is very small (less than 9 bytes) the remainded data in + // uncompressed buffer might contain less than 9 bytes. We need to make sure that we dont read + // from sink to the uncompressed buffer and therefor in this flow we return here. if (mem_buf_ != &origin_mem_buf_) return std::error_code{}; if (mem_buf_->InputLen() >= min_sz) @@ -1984,18 +2023,17 @@ void RdbLoader::FlushShardAsync(ShardId sid) { if (out_buf.empty()) return; - ItemsBuf* ib = new ItemsBuf{std::move(out_buf)}; - auto cb = [indx = this->cur_db_index_, ib, this] { - this->LoadItemsBuffer(indx, *ib); - delete ib; + auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] { + this->LoadItemsBuffer(indx, ib); }; shard_set->Add(sid, std::move(cb)); } -std::error_code RdbLoaderBase::Visit(const Item& item, CompactObj* pv) { - OpaqueObjLoader visitor(item.val.rdb_type, pv); - std::visit(visitor, item.val.obj); +std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) { + OpaqueObjLoader visitor(opaque.rdb_type, pv); + std::visit(visitor, opaque.obj); + return visitor.ec(); } @@ -2003,21 +2041,25 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { DbSlice& db_slice = EngineShard::tlocal()->db_slice(); DbContext db_cntx{.db_index = db_ind, .time_now_ms = GetCurrentTimeMs()}; - for (const auto& item : ib) { + for (const auto* item : ib) { PrimeValue pv; - if (ec_ = Visit(item, &pv); ec_) { + if (ec_ = FromOpaque(item->val, &pv); ec_) { stop_early_ = true; break; } - if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms) + if (item->expire_ms > 0 && db_cntx.time_now_ms >= item->expire_ms) continue; - auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms); + auto [it, added] = db_slice.AddOrUpdate(db_cntx, item->key, std::move(pv), item->expire_ms); if (!added) { - LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind; + LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind; } } + + for (auto* item : ib) { + delete item; + } } void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) { @@ -2026,22 +2068,19 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) { } error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { - /* Read key */ - string key; - OpaqueObj val; + Item* item = new Item; - // We free key in LoadItemsBuffer. - SET_OR_RETURN(ReadKey(), key); + // Read key + // We free item in LoadItemsBuffer. + SET_OR_RETURN(ReadKey(), item->key); - io::Result io_res = ReadObj(type); + error_code ec = ReadObj(type, &item->val); - if (!io_res) { - VLOG(1) << "ReadObj error " << io_res.error() << " for key " << key; - return io_res.error(); + if (ec) { + VLOG(1) << "ReadObj error " << ec << " for key " << item->key; + return ec; } - val = std::move(io_res.value()); - /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is @@ -2055,12 +2094,12 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { if (should_expire) { // decrRefCount(val); } else { - ShardId sid = Shard(key, shard_set->size()); - uint64_t expire_at_ms = settings->expiretime; + ShardId sid = Shard(item->key, shard_set->size()); + item->expire_ms = settings->expiretime; auto& out_buf = shard_buf_[sid]; - out_buf.emplace_back(Item{std::move(key), std::move(val), expire_at_ms}); + out_buf.emplace_back(item); constexpr size_t kBufSize = 128; if (out_buf.size() >= kBufSize) { diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 2acf46ea011c..f4a3cfae2417 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -87,20 +87,13 @@ class RdbLoaderBase { class OpaqueObjLoader; - struct Item { - std::string key; - OpaqueObj val; - uint64_t expire_ms; - }; - using ItemsBuf = std::vector; - ::io::Result FetchType() { return FetchInt(); } template io::Result FetchInt(); - std::error_code Visit(const Item& item, CompactObj* pv); + static std::error_code FromOpaque(const OpaqueObj& opaque, CompactObj* pv); io::Result LoadLen(bool* is_encoded); std::error_code FetchBuf(size_t size, void* dest); @@ -114,8 +107,8 @@ class RdbLoaderBase { ::io::Result ReadKey(); - ::io::Result ReadObj(int rdbtype); - ::io::Result ReadStringObj(); + std::error_code ReadObj(int rdbtype, OpaqueObj* dest); + std::error_code ReadStringObj(RdbVariant* rdb_variant); ::io::Result ReadIntObj(int encoding); ::io::Result ReadLzf(); @@ -184,6 +177,23 @@ class RdbLoader : protected RdbLoaderBase { } private: + struct Item { + std::string key; + OpaqueObj val; + uint64_t expire_ms; + std::atomic next; + + friend void MPSC_intrusive_store_next(Item* dest, Item* nxt) { + dest->next.store(nxt, std::memory_order_release); + } + + friend Item* MPSC_intrusive_load_next(const Item& src) { + return src.next.load(std::memory_order_acquire); + } + }; + + using ItemsBuf = std::vector; + struct ObjSettings; std::error_code LoadKeyValPair(int type, ObjSettings* settings); void ResizeDb(size_t key_num, size_t expire_num);