Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix]curvefs/client init core dump && add ut #2164

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions curvefs/src/mds/topology/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1735,12 +1735,12 @@ TopoStatusCode TopologyImpl::AllocOrGetMemcacheCluster(
int randId =
static_cast<int>(butil::fast_rand()) % memcacheClusterMap_.size();
auto iter = memcacheClusterMap_.cbegin();
for (int i = 0; i < randId; ++i, ++iter) continue;
*cluster = iter->second;
if (!storage_->StorageFs2MemcacheCluster(fsId, cluster->clusterid())) {
std::advance(iter, randId);
if (!storage_->StorageFs2MemcacheCluster(fsId, iter->first)) {
ret = TopoStatusCode::TOPO_STORGE_FAIL;
} else {
fs2MemcacheCluster_[fsId] = cluster->clusterid();
fs2MemcacheCluster_[fsId] = iter->first;
*cluster = iter->second;
}
}
return ret;
Expand Down
19 changes: 19 additions & 0 deletions curvefs/src/mds/topology/topology_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,8 @@ class MemcacheServer {
MemcacheServer() : port_(0) {}
explicit MemcacheServer(const MemcacheServerInfo& info)
: ip_(info.ip()), port_(info.port()) {}
explicit MemcacheServer(const std::string&& ip, uint32_t port)
: ip_(ip), port_(port) {}

MemcacheServer& operator=(const MemcacheServerInfo& info) {
ip_ = info.ip();
Expand Down Expand Up @@ -888,6 +890,19 @@ class MemcacheCluster {
return info;
}

bool operator==(const MemcacheCluster& rhs) const {
if (rhs.id_ != id_ || rhs.servers_.size() != servers_.size()) {
return false;
}
for (auto const& server : servers_) {
if (std::find(rhs.servers_.cbegin(), rhs.servers_.cend(), server) ==
rhs.servers_.cend()) {
return false;
}
}
return true;
}

std::list<MemcacheServer> GetServers() const {
return servers_;
}
Expand All @@ -899,6 +914,10 @@ class MemcacheCluster {
bool ParseFromString(const std::string& value);
bool SerializeToString(std::string* value) const;

void SetId(MetaServerIdType id) {
id_ = id;
}

private:
MetaServerIdType id_;
std::list<MemcacheServer> servers_;
Expand Down
14 changes: 14 additions & 0 deletions curvefs/src/mds/topology/topology_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,20 @@ void TopologyManager::RegistMemcacheCluster(
// register memcacheCluster as server
WriteLockGuard lock(registMemcacheClusterMutex_);

// idempotence
std::list<MemcacheCluster> clusterList = topology_->ListMemcacheClusters();
MemcacheCluster mCluster(
0, std::list<MemcacheServer>(request->servers().begin(),
request->servers().end()));
for (auto const& cluster : clusterList) {
mCluster.SetId(cluster.GetId());
if (cluster == mCluster) {
// has registered memcache cluster
response->set_clusterid(cluster.GetId());
return;
}
}

// Guarantee the uniqueness of memcacheServer
std::list<MemcacheServer> serverRegisted = topology_->ListMemcacheServers();
std::list<MemcacheServer> serverList;
Expand Down
9 changes: 9 additions & 0 deletions curvefs/src/mds/topology/topology_storage_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "curvefs/src/mds/topology/topology_storage_codec.h"

#include <string>
#include <utility>
#include "curvefs/src/mds/common/storage_key.h"
#include "src/common/encode.h"

namespace curvefs {
namespace mds {
Expand Down Expand Up @@ -178,6 +180,13 @@ std::string TopologyStorageCodec::EncodeFs2MemcacheClusterKey(FsIdType fsId) {
return key;
}

bool TopologyStorageCodec::DecodeFs2MemcacheClusterKey(const std::string& value,
FsIdType* data) {
size_t prefixLen = TOPOLOGY_PREFIX_LENGTH;
*data = curve::common::DecodeBigEndian(&(value[prefixLen]));
return true;
}

} // namespace topology
} // namespace mds
} // namespace curvefs
1 change: 1 addition & 0 deletions curvefs/src/mds/topology/topology_storage_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class TopologyStorageCodec {
MemcacheCluster* data);

std::string EncodeFs2MemcacheClusterKey(FsIdType fsId);
bool DecodeFs2MemcacheClusterKey(const std::string& value, FsIdType* data);
};


Expand Down
15 changes: 13 additions & 2 deletions curvefs/src/mds/topology/topology_storge_etcd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
#include "curvefs/src/mds/topology/topology_storge_etcd.h"

#include <exception>
#include <string>
#include <vector>
#include <map>
Expand Down Expand Up @@ -625,8 +626,18 @@ bool TopologyStorageEtcd::LoadFs2MemcacheCluster(
return false;
}
for (auto const& data : out) {
fs2MemcacheCluster->emplace(
std::make_pair(std::stoul(data.first), std::stoul(data.second)));
FsIdType id;
codec_->DecodeFs2MemcacheClusterKey(data.first, &id);
if (fs2MemcacheCluster->find(id) != fs2MemcacheCluster->end()) {
// Duplicated id
return false;
}
try {
fs2MemcacheCluster->emplace(id, std::stoul(data.second));
} catch (std::exception) {
// for std::stoul(data.second) exceptions
return false;
}
}
return true;
}
Expand Down
24 changes: 24 additions & 0 deletions curvefs/test/client/rpcclient/base_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,30 @@ TEST_F(BaseClientTest, test_RefreshSession) {
<< response.ShortDebugString();
}

TEST_F(BaseClientTest, test_AllocOrGetMemcacheCluster) {
AllocOrGetMemcacheClusterResponse response;
AllocOrGetMemcacheClusterResponse resp;
response.set_statuscode(mds::topology::TOPO_OK);
EXPECT_CALL(mockTopologyService_, AllocOrGetMemcacheCluster(_, _, _, _))
.WillOnce(DoAll(SetArgPointee<2>(response),
Invoke(RpcService<AllocOrGetMemcacheClusterRequest,
AllocOrGetMemcacheClusterResponse>)));

brpc::Controller cntl;
cntl.set_timeout_ms(1000);
brpc::Channel ch;
ASSERT_EQ(0, ch.Init(addr_.c_str(), nullptr));

mdsbasecli_.AllocOrGetMemcacheCluster(1, &resp, &cntl, &ch);

ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(
google::protobuf::util::MessageDifferencer::Equals(resp, response))
<< "resp:\n"
<< resp.ShortDebugString() << "response:\n"
<< response.ShortDebugString();
}

} // namespace rpcclient
} // namespace client
} // namespace curvefs
42 changes: 42 additions & 0 deletions curvefs/test/client/rpcclient/mds_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <gmock/gmock-more-actions.h>
#include <google/protobuf/util/message_differencer.h>
#include <gtest/gtest.h>
#include <cstdint>

#include "curvefs/src/client/rpcclient/mds_client.h"
#include "curvefs/test/client/rpcclient/mock_mds_base_client.h"
Expand Down Expand Up @@ -57,6 +58,12 @@ void UmountFsRpcFailed(const std::string &fsName, const Mountpoint &mountPt,
cntl->SetFailed(112, "Not connected to");
}

void AllocOrGetMemcacheClusterRpcFailed(
uint32_t fsId, AllocOrGetMemcacheClusterResponse* response,
brpc::Controller* cntl, brpc::Channel* channel) {
cntl->SetFailed(112, "Not connected to");
}

void GetFsInfoByFsnameRpcFailed(const std::string &fsName,
GetFsInfoResponse *response,
brpc::Controller *cntl,
Expand Down Expand Up @@ -998,6 +1005,41 @@ TEST_F(MdsClientImplTest, TestReleaseVolumeBlockGroup) {
}
}

TEST_F(MdsClientImplTest, test_AllocOrGetMemcacheCluster) {
AllocOrGetMemcacheClusterResponse response;
MemcacheClusterInfo cluster1;
cluster1.set_clusterid(1);
mds::topology::MemcacheServerInfo server;
server.set_ip("127.0.0.1");
server.set_port(1);
*cluster1.add_servers() = server;
response.set_allocated_cluster(new MemcacheClusterInfo(cluster1));

// 1. ok
response.set_statuscode(curvefs::mds::topology::TOPO_OK);
EXPECT_CALL(mockmdsbasecli_, AllocOrGetMemcacheCluster(_, _, _, _))
.WillOnce(SetArgPointee<1>(response));
MemcacheClusterInfo cluster2;
ASSERT_EQ(true,
mdsclient_.AllocOrGetMemcacheCluster(1, &cluster2));

// 2. no memcached
response.set_statuscode(
curvefs::mds::topology::TOPO_MEMCACHECLUSTER_NOT_FOUND);
EXPECT_CALL(mockmdsbasecli_, AllocOrGetMemcacheCluster(_, _, _, _))
.WillOnce(SetArgPointee<1>(response));
ASSERT_EQ(false,
mdsclient_.AllocOrGetMemcacheCluster(1, &cluster2));

// 3. rpc error
brpc::Controller cntl;
cntl.SetFailed(ECONNRESET, "error connect reset");
EXPECT_CALL(mockmdsbasecli_, AllocOrGetMemcacheCluster(_, _, _, _))
.WillRepeatedly(Invoke(AllocOrGetMemcacheClusterRpcFailed));
ASSERT_EQ(false,
mdsclient_.AllocOrGetMemcacheCluster(1, &cluster2));
}

} // namespace rpcclient
} // namespace client
} // namespace curvefs
5 changes: 5 additions & 0 deletions curvefs/test/client/rpcclient/mock_mds_base_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class MockMDSBaseClient : public MDSBaseClient {
ReleaseBlockGroupResponse *response,
brpc::Controller *cntl,
brpc::Channel *channel));

MOCK_METHOD4(AllocOrGetMemcacheCluster,
void(uint32_t fsId,
AllocOrGetMemcacheClusterResponse* response,
brpc::Controller* cntl, brpc::Channel* channel));
};
} // namespace rpcclient
} // namespace client
Expand Down
8 changes: 8 additions & 0 deletions curvefs/test/client/rpcclient/mock_topology_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ class MockTopologyService : public curvefs::mds::topology::TopologyService {
request,
::curvefs::mds::topology::GetCopysetOfPartitionResponse* response,
::google::protobuf::Closure* done));
MOCK_METHOD4(
AllocOrGetMemcacheCluster,
void(::google::protobuf::RpcController* controller,
const ::curvefs::mds::topology::AllocOrGetMemcacheClusterRequest*
request,
::curvefs::mds::topology::AllocOrGetMemcacheClusterResponse*
response,
::google::protobuf::Closure* done));
};
} // namespace rpcclient
} // namespace client
Expand Down
6 changes: 6 additions & 0 deletions curvefs/test/mds/mock/mock_topology.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,12 @@ class MockTopologyManager : public TopologyManager {
MOCK_METHOD2(GetLatestPartitionsTxId,
void(const std::vector<PartitionTxId> &txIds,
std::vector<PartitionTxId> *needUpdate));

MOCK_METHOD2(RegistMemcacheCluster,
void(const RegistMemcacheClusterRequest*,
RegistMemcacheClusterResponse*));

MOCK_METHOD1(ListMemcacheCluster, void(ListMemcacheClusterResponse*));
};

} // namespace topology
Expand Down
29 changes: 29 additions & 0 deletions curvefs/test/mds/topology/test_topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2343,6 +2343,35 @@ TEST_F(TestTopology, GenCopysetAddrBatch_2pool) {
ASSERT_EQ(pool1Count, 3);
ASSERT_EQ(pool2Count, 12);
}

TEST_F(TestTopology, test_AddMemcacheCluster_success) {
MemcacheCluster data(
1, std::list<MemcacheServer>{MemcacheServer("127.0.0.1", 1),
MemcacheServer("127.0.0.1", 2),
MemcacheServer("127.0.0.1", 3)});

EXPECT_CALL(*storage_, StorageMemcacheCluster(_)).WillOnce(Return(true));

int ret = topology_->AddMemcacheCluster(data);

ASSERT_EQ(TopoStatusCode::TOPO_OK, ret);

ASSERT_EQ(*topology_->ListMemcacheClusters().begin(), data);
}

TEST_F(TestTopology, test_AddMemcacheCluster_fail) {
MemcacheCluster data(
1, std::list<MemcacheServer>{MemcacheServer("127.0.0.1", 1),
MemcacheServer("127.0.0.1", 2),
MemcacheServer("127.0.0.1", 3)});

EXPECT_CALL(*storage_, StorageMemcacheCluster(_)).WillOnce(Return(false));

int ret = topology_->AddMemcacheCluster(data);

ASSERT_EQ(TopoStatusCode::TOPO_STORGE_FAIL, ret);
}

} // namespace topology
} // namespace mds
} // namespace curvefs
Loading