diff --git a/curvefs/src/client/inode_cache_manager.cpp b/curvefs/src/client/inode_cache_manager.cpp index eed8207ceb..2d8dc57e80 100644 --- a/curvefs/src/client/inode_cache_manager.cpp +++ b/curvefs/src/client/inode_cache_manager.cpp @@ -119,7 +119,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetInodeAttr( return CURVEFS_ERROR::OK; } - MetaStatusCode ret = metaClient_->BatchGetInodeAttr(fsId_, inodeIds, attr); + MetaStatusCode ret = metaClient_->BatchGetInodeAttr(fsId_, *inodeIds, attr); if (MetaStatusCode::OK != ret) { LOG(ERROR) << "metaClient BatchGetInodeAttr failed, MetaStatusCode = " << ret << ", MetaStatusCode_Name = " @@ -150,7 +150,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::BatchGetXAttr( return CURVEFS_ERROR::OK; } - MetaStatusCode ret = metaClient_->BatchGetXAttr(fsId_, inodeIds, xattr); + MetaStatusCode ret = metaClient_->BatchGetXAttr(fsId_, *inodeIds, xattr); if (MetaStatusCode::OK != ret) { LOG(ERROR) << "metaClient BatchGetXAttr failed, MetaStatusCode = " << ret << ", MetaStatusCode_Name = " diff --git a/curvefs/src/client/rpcclient/metaserver_client.cpp b/curvefs/src/client/rpcclient/metaserver_client.cpp index 9033f7c92a..35b7fba9b6 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.cpp +++ b/curvefs/src/client/rpcclient/metaserver_client.cpp @@ -477,14 +477,14 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid, bool GroupInodeIdByPartition( uint32_t fsId, std::shared_ptr metaCache, - std::set *inodeIds, - std::unordered_map> *inodeGroups) { - for (const auto &it : *inodeIds) { + const std::set &inodeIds, + std::unordered_map> *inodeGroups) { + for (const auto &it : inodeIds) { uint32_t pId = 0; if (metaCache->GetPartitionIdByInodeId(fsId, it, &pId)) { auto iter = inodeGroups->find(pId); if (iter == inodeGroups->end()) { - inodeGroups->emplace(pId, std::list({it})); + inodeGroups->emplace(pId, std::vector({it})); } else { iter->second.push_back(it); } @@ -497,163 +497,182 @@ bool GroupInodeIdByPartition( return true; } +bool MetaServerClientImpl::SplitRequestInodes( + uint32_t fsId, + const std::set &inodeIds, + std::vector> *inodeGroups) { + std::unordered_map> groups; + bool ret = GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &groups); + if (!ret) { + return false; + } + for (const auto &it : groups) { + auto iter = it.second.begin(); + while (iter != it.second.end()) { + std::vector tmp; + uint32_t batchLimit = opt_.batchLimit; + while (iter != it.second.end() && batchLimit > 0) { + tmp.emplace_back(*iter); + iter++; + batchLimit--; + } + inodeGroups->emplace_back(std::move(tmp)); + } + } + return true; +} + + MetaStatusCode MetaServerClientImpl::BatchGetInodeAttr(uint32_t fsId, - std::set *inodeIds, + const std::set &inodeIds, std::list *attr) { - uint32_t limit = opt_.batchLimit; - // group inodeid by partition - std::unordered_map> inodeGroups; - if (!GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &inodeGroups)) { + // group inodeid by partition and batchlimit + std::vector> inodeGroups; + if (!SplitRequestInodes(fsId, inodeIds, &inodeGroups)) { return MetaStatusCode::NOT_FOUND; } - // send rpc + // TDOD(wanghai): send rpc parallelly for (const auto &it : inodeGroups) { - auto iter = it.second.begin(); - while (iter != it.second.end()) { - uint64_t inodeId = *iter; - auto task = RPCTask { - metaserverClientMetric_->batchGetInodeAttr.qps.count << 1; - LatencyUpdater updater( - &metaserverClientMetric_->batchGetInodeAttr.latency); - BatchGetInodeAttrRequest request; - BatchGetInodeAttrResponse response; - request.set_poolid(poolID); - request.set_copysetid(copysetID); - request.set_partitionid(partitionID); - request.set_fsid(fsId); - request.set_appliedindex( - metaCache_->GetApplyIndex(CopysetGroupID(poolID, - copysetID))); - uint32_t batchLimit = limit; - while (iter != it.second.end() && batchLimit > 0) { - request.add_inodeid(*iter); - iter++; - batchLimit--; - } - curvefs::metaserver::MetaServerService_Stub stub(channel); - stub.BatchGetInodeAttr(cntl, &request, &response, nullptr); - - if (cntl->Failed()) { - metaserverClientMetric_->batchGetInodeAttr.eps.count << 1; - LOG(WARNING) << "BatchGetInodeAttr Failed, errorcode = " - << cntl->ErrorCode() - << ", error content:" << cntl->ErrorText() - << ", log id = " << cntl->log_id(); - return -cntl->ErrorCode(); - } + if (it.empty()) { + LOG(WARNING) << "BatchGetInodeAttr request empty."; + return MetaStatusCode::PARAM_ERROR; + } + uint64_t inodeId = *it.begin(); + auto task = RPCTask { + metaserverClientMetric_->batchGetInodeAttr.qps.count << 1; + LatencyUpdater updater( + &metaserverClientMetric_->batchGetInodeAttr.latency); + BatchGetInodeAttrRequest request; + BatchGetInodeAttrResponse response; + request.set_poolid(poolID); + request.set_copysetid(copysetID); + request.set_partitionid(partitionID); + request.set_fsid(fsId); + request.set_appliedindex( + metaCache_->GetApplyIndex(CopysetGroupID(poolID, + copysetID))); + *request.mutable_inodeid() = { it.begin(), it.end() }; + + curvefs::metaserver::MetaServerService_Stub stub(channel); + stub.BatchGetInodeAttr(cntl, &request, &response, nullptr); + + if (cntl->Failed()) { + metaserverClientMetric_->batchGetInodeAttr.eps.count << 1; + LOG(WARNING) << "BatchGetInodeAttr Failed, errorcode = " + << cntl->ErrorCode() + << ", error content:" << cntl->ErrorText() + << ", log id = " << cntl->log_id(); + return -cntl->ErrorCode(); + } - MetaStatusCode ret = response.statuscode(); - if (ret != MetaStatusCode::OK) { - LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND) - << "BatchGetInodeAttr failed, errcode = " << ret - << ", errmsg = " << MetaStatusCode_Name(ret); - } else if (response.attr_size() > 0 && - response.has_appliedindex()) { - auto retAttr = response.attr(); - for_each(retAttr.begin(), retAttr.end(), - [&](InodeAttr &a) { attr->push_back(a); }); - metaCache_->UpdateApplyIndex( - CopysetGroupID(poolID, copysetID), - response.appliedindex()); - } else { - LOG(WARNING) << "BatchGetInodeAttr ok, but" - << " applyIndex or attr not set in response: " - << response.DebugString(); - return -1; - } - return ret; - }; - auto taskCtx = std::make_shared( - MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId); - BatchGetInodeAttrExcutor excutor( - opt_, metaCache_, channelManager_, taskCtx); - auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask()); + MetaStatusCode ret = response.statuscode(); if (ret != MetaStatusCode::OK) { - attr->clear(); - return ret; + LOG(ERROR) << "BatchGetInodeAttr failed, errcode = " << ret + << ", errmsg = " << MetaStatusCode_Name(ret); + } else if (response.attr_size() > 0 && + response.has_appliedindex()) { + auto *attrs = response.mutable_attr(); + attr->insert(attr->end(), + std::make_move_iterator(attrs->begin()), + std::make_move_iterator(attrs->end())); + metaCache_->UpdateApplyIndex( + CopysetGroupID(poolID, copysetID), + response.appliedindex()); + } else { + LOG(WARNING) << "BatchGetInodeAttr ok, but" + << " applyIndex or attr not set in response: " + << response.DebugString(); + return -1; } + return ret; + }; + auto taskCtx = std::make_shared( + MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId); + BatchGetInodeAttrExcutor excutor( + opt_, metaCache_, channelManager_, taskCtx); + auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask()); + if (ret != MetaStatusCode::OK) { + attr->clear(); + return ret; } } return MetaStatusCode::OK; } MetaStatusCode MetaServerClientImpl::BatchGetXAttr(uint32_t fsId, - std::set *inodeIds, + const std::set &inodeIds, std::list *xattr) { - uint32_t limit = opt_.batchLimit; - // group inodeid by partition - std::unordered_map> inodeGroups; - if (!GroupInodeIdByPartition(fsId, metaCache_, inodeIds, &inodeGroups)) { + // group inodeid by partition and batchlimit + std::vector> inodeGroups; + if (!SplitRequestInodes(fsId, inodeIds, &inodeGroups)) { return MetaStatusCode::NOT_FOUND; } - // send rpc + // TDOD(wanghai): send rpc parallelly for (const auto &it : inodeGroups) { - auto iter = it.second.begin(); - while (iter != it.second.end()) { - uint64_t inodeId = *iter; - auto task = RPCTask { - metaserverClientMetric_->batchGetXattr.qps.count << 1; - LatencyUpdater updater( - &metaserverClientMetric_->batchGetXattr.latency); - BatchGetXAttrRequest request; - BatchGetXAttrResponse response; - request.set_poolid(poolID); - request.set_copysetid(copysetID); - request.set_partitionid(partitionID); - request.set_fsid(fsId); - request.set_appliedindex( - metaCache_->GetApplyIndex( - CopysetGroupID(poolID, copysetID))); - uint32_t batchLimit = limit; - while (iter != it.second.end() && batchLimit > 0) { - request.add_inodeid(*iter); - iter++; - batchLimit--; - } - curvefs::metaserver::MetaServerService_Stub stub(channel); - stub.BatchGetXAttr(cntl, &request, &response, nullptr); - - if (cntl->Failed()) { - metaserverClientMetric_->batchGetXattr.eps.count << 1; - LOG(WARNING) << "BatchGetXAttr Failed, errorcode = " - << cntl->ErrorCode() - << ", error content:" << cntl->ErrorText() - << ", log id = " << cntl->log_id(); - return -cntl->ErrorCode(); - } + if (it.empty()) { + LOG(WARNING) << "BatchGetInodeXAttr request empty."; + return MetaStatusCode::PARAM_ERROR; + } - MetaStatusCode ret = response.statuscode(); - if (ret != MetaStatusCode::OK) { - LOG_IF(WARNING, ret != MetaStatusCode::NOT_FOUND) - << "BatchGetXAttr failed, errcode = " << ret - << ", errmsg = " << MetaStatusCode_Name(ret); - } else if (response.xattr_size() > 0 && - response.has_appliedindex()) { - auto retXattr = response.xattr(); - for_each(retXattr.begin(), retXattr.end(), - [&](XAttr &a) { xattr->push_back(a); }); - metaCache_->UpdateApplyIndex( - CopysetGroupID(poolID, copysetID), - response.appliedindex()); - } else { - LOG(WARNING) << "BatchGetXAttr ok, but" - << " applyIndex or attr not set in response: " - << response.DebugString(); - return -1; - } - return ret; - }; - auto taskCtx = std::make_shared( - MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId); - BatchGetInodeAttrExcutor excutor( - opt_, metaCache_, channelManager_, taskCtx); - auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask()); + uint64_t inodeId = *it.begin(); + auto task = RPCTask { + metaserverClientMetric_->batchGetXattr.qps.count << 1; + LatencyUpdater updater( + &metaserverClientMetric_->batchGetXattr.latency); + BatchGetXAttrRequest request; + BatchGetXAttrResponse response; + request.set_poolid(poolID); + request.set_copysetid(copysetID); + request.set_partitionid(partitionID); + request.set_fsid(fsId); + request.set_appliedindex( + metaCache_->GetApplyIndex( + CopysetGroupID(poolID, copysetID))); + *request.mutable_inodeid() = { it.begin(), it.end() }; + + curvefs::metaserver::MetaServerService_Stub stub(channel); + stub.BatchGetXAttr(cntl, &request, &response, nullptr); + + if (cntl->Failed()) { + metaserverClientMetric_->batchGetXattr.eps.count << 1; + LOG(WARNING) << "BatchGetXAttr Failed, errorcode = " + << cntl->ErrorCode() + << ", error content:" << cntl->ErrorText() + << ", log id = " << cntl->log_id(); + return -cntl->ErrorCode(); + } + + MetaStatusCode ret = response.statuscode(); if (ret != MetaStatusCode::OK) { - xattr->clear(); - return ret; + LOG(ERROR) << "BatchGetXAttr failed, errcode = " << ret + << ", errmsg = " << MetaStatusCode_Name(ret); + } else if (response.xattr_size() > 0 && + response.has_appliedindex()) { + auto *xattrs = response.mutable_xattr(); + xattr->insert(xattr->end(), + std::make_move_iterator(xattrs->begin()), + std::make_move_iterator(xattrs->end())); + metaCache_->UpdateApplyIndex( + CopysetGroupID(poolID, copysetID), + response.appliedindex()); + } else { + LOG(WARNING) << "BatchGetXAttr ok, but" + << " applyIndex or attr not set in response: " + << response.DebugString(); + return -1; } + return ret; + }; + auto taskCtx = std::make_shared( + MetaServerOpType::BatchGetInodeAttr, task, fsId, inodeId); + BatchGetInodeAttrExcutor excutor( + opt_, metaCache_, channelManager_, taskCtx); + auto ret = ConvertToMetaStatusCode(excutor.DoRPCTask()); + if (ret != MetaStatusCode::OK) { + xattr->clear(); + return ret; } } return MetaStatusCode::OK; diff --git a/curvefs/src/client/rpcclient/metaserver_client.h b/curvefs/src/client/rpcclient/metaserver_client.h index d1fa6573de..f37b838a2e 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.h +++ b/curvefs/src/client/rpcclient/metaserver_client.h @@ -92,11 +92,11 @@ class MetaServerClient { Inode *out, bool* streaming) = 0; virtual MetaStatusCode BatchGetInodeAttr(uint32_t fsId, - std::set *inodeIds, + const std::set &inodeIds, std::list *attr) = 0; virtual MetaStatusCode BatchGetXAttr(uint32_t fsId, - std::set *inodeIds, + const std::set &inodeIds, std::list *xattr) = 0; virtual MetaStatusCode UpdateInode(const Inode &inode, @@ -125,6 +125,10 @@ class MetaServerClient { virtual MetaStatusCode CreateInode(const InodeParam ¶m, Inode *out) = 0; virtual MetaStatusCode DeleteInode(uint32_t fsId, uint64_t inodeid) = 0; + + virtual bool SplitRequestInodes(uint32_t fsId, + const std::set &inodeIds, + std::vector> *inodeGroups) = 0; }; class MetaServerClientImpl : public MetaServerClient { @@ -162,11 +166,11 @@ class MetaServerClientImpl : public MetaServerClient { Inode *out, bool* streaming) override; MetaStatusCode BatchGetInodeAttr(uint32_t fsId, - std::set *inodeIds, + const std::set &inodeIds, std::list *attr) override; MetaStatusCode BatchGetXAttr(uint32_t fsId, - std::set *inodeIds, + const std::set &inodeIds, std::list *xattr) override; MetaStatusCode UpdateInode(const Inode &inode, @@ -195,6 +199,10 @@ class MetaServerClientImpl : public MetaServerClient { MetaStatusCode DeleteInode(uint32_t fsId, uint64_t inodeid) override; + bool SplitRequestInodes(uint32_t fsId, + const std::set &inodeIds, + std::vector> *inodeGroups) override; + private: bool ParseS3MetaStreamBuffer(butil::IOBuf* buffer, uint64_t* chunkIndex, diff --git a/curvefs/test/client/mock_metaserver_client.h b/curvefs/test/client/mock_metaserver_client.h index 92dc7131dc..10e4cc4b2d 100644 --- a/curvefs/test/client/mock_metaserver_client.h +++ b/curvefs/test/client/mock_metaserver_client.h @@ -76,11 +76,11 @@ class MockMetaServerClient : public MetaServerClient { uint32_t fsId, uint64_t inodeid, Inode *out, bool* streaming)); MOCK_METHOD3(BatchGetInodeAttr, MetaStatusCode( - uint32_t fsId, std::set *inodeIds, + uint32_t fsId, const std::set &inodeIds, std::list *attr)); MOCK_METHOD3(BatchGetXAttr, MetaStatusCode( - uint32_t fsId, std::set *inodeIds, + uint32_t fsId, const std::set &inodeIds, std::list *xattr)); MOCK_METHOD2(UpdateInode, @@ -112,6 +112,10 @@ class MockMetaServerClient : public MetaServerClient { const InodeParam ¶m, Inode *out)); MOCK_METHOD2(DeleteInode, MetaStatusCode(uint32_t fsId, uint64_t inodeid)); + + MOCK_METHOD3(SplitRequestInodes, bool(uint32_t fsId, + const std::set &inodeIds, + std::vector> *inodeGroups)); }; } // namespace rpcclient diff --git a/curvefs/test/client/rpcclient/metaserver_client_test.cpp b/curvefs/test/client/rpcclient/metaserver_client_test.cpp index bd636948fe..ade0a84e52 100644 --- a/curvefs/test/client/rpcclient/metaserver_client_test.cpp +++ b/curvefs/test/client/rpcclient/metaserver_client_test.cpp @@ -1154,7 +1154,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetInodeAttr) { SetArgPointee<3>(applyIndex), Return(true))); MetaStatusCode status = metaserverCli_.BatchGetInodeAttr( - fsid, &inodeIds, &attr); + fsid, inodeIds, &attr); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); // test1: batchGetInodeAttr ok @@ -1176,7 +1176,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetInodeAttr) { BatchGetInodeAttrResponse>))); EXPECT_CALL(*mockMetacache_.get(), UpdateApplyIndex(_, _)); - status = metaserverCli_.BatchGetInodeAttr(fsid, &inodeIds, &attr); + status = metaserverCli_.BatchGetInodeAttr(fsid, inodeIds, &attr); ASSERT_EQ(MetaStatusCode::OK, status); ASSERT_EQ(attr.size(), 2); ASSERT_THAT(attr.begin()->inodeid(), AnyOf(inodeId1, inodeId2)); @@ -1192,7 +1192,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetInodeAttr) { DoAll(SetArgPointee<2>(response), Invoke(SetRpcService))); - status = metaserverCli_.BatchGetInodeAttr(fsid, &inodeIds, &attr); + status = metaserverCli_.BatchGetInodeAttr(fsid, inodeIds, &attr); ASSERT_EQ(MetaStatusCode::NOT_FOUND, status); // test3: test response do not have applyindex @@ -1208,7 +1208,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetInodeAttr) { Invoke(SetRpcService))); - status = metaserverCli_.BatchGetInodeAttr(fsid, &inodeIds, &attr); + status = metaserverCli_.BatchGetInodeAttr(fsid, inodeIds, &attr); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); } @@ -1256,7 +1256,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetXAttr) { SetArgPointee<3>(applyIndex), Return(true))); MetaStatusCode status = metaserverCli_.BatchGetXAttr( - fsid, &inodeIds, &xattr); + fsid, inodeIds, &xattr); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); // test1: batchGetXAttr ok @@ -1278,7 +1278,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetXAttr) { BatchGetXAttrResponse>))); EXPECT_CALL(*mockMetacache_.get(), UpdateApplyIndex(_, _)); - status = metaserverCli_.BatchGetXAttr(fsid, &inodeIds, &xattr); + status = metaserverCli_.BatchGetXAttr(fsid, inodeIds, &xattr); ASSERT_EQ(MetaStatusCode::OK, status); ASSERT_EQ(xattr.size(), 2); ASSERT_THAT(xattr.begin()->inodeid(), AnyOf(inodeId1, inodeId2)); @@ -1294,7 +1294,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetXAttr) { DoAll(SetArgPointee<2>(response), Invoke(SetRpcService))); - status = metaserverCli_.BatchGetXAttr(fsid, &inodeIds, &xattr); + status = metaserverCli_.BatchGetXAttr(fsid, inodeIds, &xattr); ASSERT_EQ(MetaStatusCode::NOT_FOUND, status); // test3: test response do not have applyindex @@ -1310,7 +1310,7 @@ TEST_F(MetaServerClientImplTest, test_BatchGetXAttr) { Invoke(SetRpcService))); - status = metaserverCli_.BatchGetXAttr(fsid, &inodeIds, &xattr); + status = metaserverCli_.BatchGetXAttr(fsid, inodeIds, &xattr); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); } diff --git a/curvefs/test/client/test_inode_cache_manager.cpp b/curvefs/test/client/test_inode_cache_manager.cpp index 14ca574f7a..87158968e9 100644 --- a/curvefs/test/client/test_inode_cache_manager.cpp +++ b/curvefs/test/client/test_inode_cache_manager.cpp @@ -234,7 +234,7 @@ TEST_F(TestInodeCacheManager, BatchGetInodeAttr) { attr.set_inodeid(inodeId2); attrs.emplace_back(attr); - EXPECT_CALL(*metaClient_, BatchGetInodeAttr(fsId_, &inodeIds, _)) + EXPECT_CALL(*metaClient_, BatchGetInodeAttr(fsId_, inodeIds, _)) .WillOnce(Return(MetaStatusCode::NOT_FOUND)) .WillOnce(DoAll(SetArgPointee<2>(attrs), Return(MetaStatusCode::OK))); @@ -274,7 +274,7 @@ TEST_F(TestInodeCacheManager, BatchGetXAttr) { xattr.mutable_xattrinfos()->find(XATTRFBYTES)->second = "200"; xattrs.emplace_back(xattr); - EXPECT_CALL(*metaClient_, BatchGetXAttr(fsId_, &inodeIds, _)) + EXPECT_CALL(*metaClient_, BatchGetXAttr(fsId_, inodeIds, _)) .WillOnce(Return(MetaStatusCode::NOT_FOUND)) .WillOnce(DoAll(SetArgPointee<2>(xattrs), Return(MetaStatusCode::OK)));