Skip to content

Commit

Permalink
feat(dup): support replica http api for duplication state query (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Mar 16, 2020
1 parent 6af76cf commit bda453c
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/dist/replication/lib/duplication/duplication_sync_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,30 @@ void duplication_sync_timer::start()
DUPLICATION_SYNC_PERIOD_SECOND * 1_s);
}

std::multimap<dupid_t, duplication_sync_timer::replica_dup_state>
duplication_sync_timer::get_dup_states(int app_id, /*out*/ bool *app_found)
{
*app_found = false;
std::multimap<dupid_t, replica_dup_state> result;
for (const replica_ptr &r : get_all_primaries()) {
gpid rid = r->get_gpid();
if (rid.get_app_id() != app_id) {
continue;
}
*app_found = true;
replica_dup_state state;
state.id = rid;
auto states = r->get_duplication_manager()->get_dup_states();
decree last_committed_decree = r->last_committed_decree();
for (const auto &s : states) {
state.duplicating = s.duplicating;
state.not_confirmed = std::max(decree(0), last_committed_decree - s.confirmed_decree);
state.not_duplicated = std::max(decree(0), last_committed_decree - s.last_decree);
result.emplace(std::make_pair(s.dupid, state));
}
}
return result;
}

} // namespace replication
} // namespace dsn
9 changes: 9 additions & 0 deletions src/dist/replication/lib/duplication/duplication_sync_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ class duplication_sync_timer

void close();

struct replica_dup_state
{
gpid id;
bool duplicating{false};
decree not_duplicated{0};
decree not_confirmed{0};
};
std::multimap<dupid_t, replica_dup_state> get_dup_states(int app_id, /*out*/ bool *app_found);

private:
// replica server periodically uploads current confirm points to meta server by sending
// `duplication_sync_request`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,24 @@ int64_t replica_duplicator_manager::get_pending_mutations_count() const
return total;
}

std::vector<replica_duplicator_manager::dup_state>
replica_duplicator_manager::get_dup_states() const
{
zauto_lock l(_lock);

std::vector<dup_state> ret;
ret.reserve(_duplications.size());
for (const auto &dup : _duplications) {
dup_state state;
state.dupid = dup.first;
state.duplicating = !dup.second->paused();
auto progress = dup.second->progress();
state.last_decree = progress.last_decree;
state.confirmed_decree = progress.confirmed_decree;
ret.emplace_back(state);
}
return ret;
}

} // namespace replication
} // namespace dsn
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ class replica_duplicator_manager : public replica_base
/// on this replica, for metric "dup.pending_mutations_count".
int64_t get_pending_mutations_count() const;

struct dup_state
{
dupid_t dupid{0};
bool duplicating{false};
decree last_decree{invalid_decree};
decree confirmed_decree{invalid_decree};
};
std::vector<dup_state> get_dup_states() const;

private:
void sync_duplication(const duplication_entry &ent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "dist/replication/test/replica_test/unit_test/replica_test_base.h"
#include "dist/replication/lib/duplication/replica_duplicator.h"
#include "dist/replication/lib/duplication/replica_duplicator_manager.h"
#include "dist/replication/lib/duplication/duplication_sync_timer.h"

namespace dsn {
namespace replication {
Expand All @@ -20,6 +21,7 @@ class duplication_test_base : public replica_test_base
mutation_duplicator::creator = [](replica_base *r, dsn::string_view, dsn::string_view) {
return make_unique<mock_mutation_duplicator>(r);
};
stub->_duplication_sync_timer = make_unique<duplication_sync_timer>(stub.get());
}

void add_dup(mock_replica *r, replica_duplicator_u_ptr dup)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "dist/replication/lib/replica_http_service.h"
#include "duplication_test_base.h"

namespace dsn {
namespace replication {

class replica_http_service_test : public duplication_test_base
{
};

TEST_F(replica_http_service_test, query_duplication_handler)
{
auto pri = stub->add_primary_replica(1, 1);
auto sec = stub->add_non_primary_replica(1, 2);
sec->as_secondary();

// primary confirmed_decree
duplication_entry ent;
ent.dupid = 1583306653;
ent.progress[pri->get_gpid().get_partition_index()] = 1000;
ent.status = duplication_status::DS_PAUSE;
auto dup = dsn::make_unique<replica_duplicator>(ent, pri);
add_dup(pri, std::move(dup));

sec->get_duplication_manager()->update_confirmed_decree_if_secondary(899);

replica_http_service http_svc(stub.get());

http_request req;
http_response resp;
http_svc.query_duplication_handler(req, resp);
ASSERT_EQ(resp.status_code, http_status_code::bad_request); // no appid

req.query_args["appid"] = "2";
http_svc.query_duplication_handler(req, resp);
ASSERT_EQ(resp.status_code, http_status_code::not_found);

req.query_args["appid"] = "2xx";
http_svc.query_duplication_handler(req, resp);
ASSERT_EQ(resp.status_code, http_status_code::bad_request);

req.query_args["appid"] = "1";
http_svc.query_duplication_handler(req, resp);
ASSERT_EQ(resp.status_code, http_status_code::ok);
ASSERT_EQ(resp.body,
R"({)"
R"("1583306653":)"
R"({"1.1":{"confirmed_decree":1000,"duplicating":false,"last_decree":1000}},)"
R"("non-primaries":)"
R"({"1.2":{"confirmed_decree":899}})"
R"(})");
}

} // namespace replication
} // namespace dsn
58 changes: 58 additions & 0 deletions src/dist/replication/lib/replica_http_service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <nlohmann/json.hpp>
#include <fmt/format.h>
#include "replica_http_service.h"
#include "duplication/duplication_sync_timer.h"

namespace dsn {
namespace replication {

void replica_http_service::query_duplication_handler(const http_request &req, http_response &resp)
{
if (!_stub->_duplication_sync_timer) {
resp.body = "duplication is not enabled [duplication_enabled=false]";
resp.status_code = http_status_code::not_found;
return;
}
auto it = req.query_args.find("appid");
if (it == req.query_args.end()) {
resp.body = "appid should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
int32_t appid = -1;
if (!buf2int32(it->second, appid) || appid < 0) {
resp.status_code = http_status_code::bad_request;
resp.body = fmt::format("invalid appid={}", it->second);
return;
}
bool app_found = false;
auto states = _stub->_duplication_sync_timer->get_dup_states(appid, &app_found);
if (!app_found) {
resp.status_code = http_status_code::not_found;
resp.body = fmt::format("no primary for app [appid={}]", appid);
return;
}
if (states.empty()) {
resp.status_code = http_status_code::not_found;
resp.body = fmt::format("no duplication assigned for app [appid={}]", appid);
return;
}

nlohmann::json json;
for (const auto &s : states) {
json[std::to_string(s.first)][s.second.id.to_string()] = nlohmann::json{
{"duplicating", s.second.duplicating},
{"not_confirmed", s.second.not_confirmed},
{"not_duplicated", s.second.not_duplicated},
};
}
resp.status_code = http_status_code::ok;
resp.body = json.dump();
}

} // namespace replication
} // namespace dsn
34 changes: 34 additions & 0 deletions src/dist/replication/lib/replica_http_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <dsn/tool-api/http_server.h>

#include "replica_stub.h"

namespace dsn {
namespace replication {

class replica_http_service : public http_service
{
public:
explicit replica_http_service(replica_stub *stub) : _stub(stub)
{
register_handler("duplication",
std::bind(&replica_http_service::query_duplication_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/replica/duplication?appid=<appid>");
}

std::string path() const override { return "replica"; }

void query_duplication_handler(const http_request &req, http_response &resp);

private:
replica_stub *_stub;
};

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,13 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class load_from_private_log;
friend class ship_mutation;
friend class replica_duplicator;
friend class replica_http_service;

friend class mock_replica_stub;
friend class duplication_sync_timer;
friend class duplication_sync_timer_test;
friend class replica_duplicator_manager_test;
friend class duplication_test_base;
friend class replica_test;

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/lib/replication_service_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "dist/replication/common/replication_common.h"
#include "dist/http/server_info_http_services.h"
#include "replica_stub.h"
#include "replica_http_service.h"

namespace dsn {
namespace replication {
Expand All @@ -48,6 +49,7 @@ replication_service_app::replication_service_app(const service_app_info *info)
_version_http_service = new version_http_service();
_http_server->add_service(_version_http_service);
_http_server->add_service(new recent_start_time_http_service());
_http_server->add_service(new replica_http_service(_stub.get()));
}

replication_service_app::~replication_service_app(void) {}
Expand Down

0 comments on commit bda453c

Please sign in to comment.