From bda453c0ab11b6fd6b62049937735f2f19f29bad Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Mon, 16 Mar 2020 16:16:13 +0800 Subject: [PATCH] feat(dup): support replica http api for duplication state query (#415) --- .../duplication/duplication_sync_timer.cpp | 25 ++++++++ .../lib/duplication/duplication_sync_timer.h | 9 +++ .../replica_duplicator_manager.cpp | 19 ++++++ .../duplication/replica_duplicator_manager.h | 9 +++ .../duplication/test/duplication_test_base.h | 2 + .../test/replica_http_service_test.cpp | 59 +++++++++++++++++++ .../replication/lib/replica_http_service.cpp | 58 ++++++++++++++++++ .../replication/lib/replica_http_service.h | 34 +++++++++++ src/dist/replication/lib/replica_stub.h | 2 + .../lib/replication_service_app.cpp | 2 + 10 files changed, 219 insertions(+) create mode 100644 src/dist/replication/lib/duplication/test/replica_http_service_test.cpp create mode 100644 src/dist/replication/lib/replica_http_service.cpp create mode 100644 src/dist/replication/lib/replica_http_service.h diff --git a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp index cfbd142280..bce490179f 100644 --- a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp +++ b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp @@ -153,5 +153,30 @@ void duplication_sync_timer::start() DUPLICATION_SYNC_PERIOD_SECOND * 1_s); } +std::multimap +duplication_sync_timer::get_dup_states(int app_id, /*out*/ bool *app_found) +{ + *app_found = false; + std::multimap result; + for (const replica_ptr &r : get_all_primaries()) { + gpid rid = r->get_gpid(); + if (rid.get_app_id() != app_id) { + continue; + } + *app_found = true; + replica_dup_state state; + state.id = rid; + auto states = r->get_duplication_manager()->get_dup_states(); + decree last_committed_decree = r->last_committed_decree(); + for (const auto &s : states) { + state.duplicating = s.duplicating; + state.not_confirmed = std::max(decree(0), last_committed_decree - s.confirmed_decree); + state.not_duplicated = std::max(decree(0), last_committed_decree - s.last_decree); + result.emplace(std::make_pair(s.dupid, state)); + } + } + return result; +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/duplication_sync_timer.h b/src/dist/replication/lib/duplication/duplication_sync_timer.h index 2cc5c5390e..83427cf18e 100644 --- a/src/dist/replication/lib/duplication/duplication_sync_timer.h +++ b/src/dist/replication/lib/duplication/duplication_sync_timer.h @@ -30,6 +30,15 @@ class duplication_sync_timer void close(); + struct replica_dup_state + { + gpid id; + bool duplicating{false}; + decree not_duplicated{0}; + decree not_confirmed{0}; + }; + std::multimap get_dup_states(int app_id, /*out*/ bool *app_found); + private: // replica server periodically uploads current confirm points to meta server by sending // `duplication_sync_request`. diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp index 9139813f22..85ae458577 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp @@ -128,5 +128,24 @@ int64_t replica_duplicator_manager::get_pending_mutations_count() const return total; } +std::vector +replica_duplicator_manager::get_dup_states() const +{ + zauto_lock l(_lock); + + std::vector ret; + ret.reserve(_duplications.size()); + for (const auto &dup : _duplications) { + dup_state state; + state.dupid = dup.first; + state.duplicating = !dup.second->paused(); + auto progress = dup.second->progress(); + state.last_decree = progress.last_decree; + state.confirmed_decree = progress.confirmed_decree; + ret.emplace_back(state); + } + return ret; +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.h b/src/dist/replication/lib/duplication/replica_duplicator_manager.h index 6c0eabb00b..60de1a2fe6 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.h +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.h @@ -58,6 +58,15 @@ class replica_duplicator_manager : public replica_base /// on this replica, for metric "dup.pending_mutations_count". int64_t get_pending_mutations_count() const; + struct dup_state + { + dupid_t dupid{0}; + bool duplicating{false}; + decree last_decree{invalid_decree}; + decree confirmed_decree{invalid_decree}; + }; + std::vector get_dup_states() const; + private: void sync_duplication(const duplication_entry &ent); diff --git a/src/dist/replication/lib/duplication/test/duplication_test_base.h b/src/dist/replication/lib/duplication/test/duplication_test_base.h index 22cd40b49f..18237ebe75 100644 --- a/src/dist/replication/lib/duplication/test/duplication_test_base.h +++ b/src/dist/replication/lib/duplication/test/duplication_test_base.h @@ -8,6 +8,7 @@ #include "dist/replication/test/replica_test/unit_test/replica_test_base.h" #include "dist/replication/lib/duplication/replica_duplicator.h" #include "dist/replication/lib/duplication/replica_duplicator_manager.h" +#include "dist/replication/lib/duplication/duplication_sync_timer.h" namespace dsn { namespace replication { @@ -20,6 +21,7 @@ class duplication_test_base : public replica_test_base mutation_duplicator::creator = [](replica_base *r, dsn::string_view, dsn::string_view) { return make_unique(r); }; + stub->_duplication_sync_timer = make_unique(stub.get()); } void add_dup(mock_replica *r, replica_duplicator_u_ptr dup) diff --git a/src/dist/replication/lib/duplication/test/replica_http_service_test.cpp b/src/dist/replication/lib/duplication/test/replica_http_service_test.cpp new file mode 100644 index 0000000000..7050b88d21 --- /dev/null +++ b/src/dist/replication/lib/duplication/test/replica_http_service_test.cpp @@ -0,0 +1,59 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "dist/replication/lib/replica_http_service.h" +#include "duplication_test_base.h" + +namespace dsn { +namespace replication { + +class replica_http_service_test : public duplication_test_base +{ +}; + +TEST_F(replica_http_service_test, query_duplication_handler) +{ + auto pri = stub->add_primary_replica(1, 1); + auto sec = stub->add_non_primary_replica(1, 2); + sec->as_secondary(); + + // primary confirmed_decree + duplication_entry ent; + ent.dupid = 1583306653; + ent.progress[pri->get_gpid().get_partition_index()] = 1000; + ent.status = duplication_status::DS_PAUSE; + auto dup = dsn::make_unique(ent, pri); + add_dup(pri, std::move(dup)); + + sec->get_duplication_manager()->update_confirmed_decree_if_secondary(899); + + replica_http_service http_svc(stub.get()); + + http_request req; + http_response resp; + http_svc.query_duplication_handler(req, resp); + ASSERT_EQ(resp.status_code, http_status_code::bad_request); // no appid + + req.query_args["appid"] = "2"; + http_svc.query_duplication_handler(req, resp); + ASSERT_EQ(resp.status_code, http_status_code::not_found); + + req.query_args["appid"] = "2xx"; + http_svc.query_duplication_handler(req, resp); + ASSERT_EQ(resp.status_code, http_status_code::bad_request); + + req.query_args["appid"] = "1"; + http_svc.query_duplication_handler(req, resp); + ASSERT_EQ(resp.status_code, http_status_code::ok); + ASSERT_EQ(resp.body, + R"({)" + R"("1583306653":)" + R"({"1.1":{"confirmed_decree":1000,"duplicating":false,"last_decree":1000}},)" + R"("non-primaries":)" + R"({"1.2":{"confirmed_decree":899}})" + R"(})"); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_http_service.cpp b/src/dist/replication/lib/replica_http_service.cpp new file mode 100644 index 0000000000..0e43bebefe --- /dev/null +++ b/src/dist/replication/lib/replica_http_service.cpp @@ -0,0 +1,58 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include +#include "replica_http_service.h" +#include "duplication/duplication_sync_timer.h" + +namespace dsn { +namespace replication { + +void replica_http_service::query_duplication_handler(const http_request &req, http_response &resp) +{ + if (!_stub->_duplication_sync_timer) { + resp.body = "duplication is not enabled [duplication_enabled=false]"; + resp.status_code = http_status_code::not_found; + return; + } + auto it = req.query_args.find("appid"); + if (it == req.query_args.end()) { + resp.body = "appid should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + int32_t appid = -1; + if (!buf2int32(it->second, appid) || appid < 0) { + resp.status_code = http_status_code::bad_request; + resp.body = fmt::format("invalid appid={}", it->second); + return; + } + bool app_found = false; + auto states = _stub->_duplication_sync_timer->get_dup_states(appid, &app_found); + if (!app_found) { + resp.status_code = http_status_code::not_found; + resp.body = fmt::format("no primary for app [appid={}]", appid); + return; + } + if (states.empty()) { + resp.status_code = http_status_code::not_found; + resp.body = fmt::format("no duplication assigned for app [appid={}]", appid); + return; + } + + nlohmann::json json; + for (const auto &s : states) { + json[std::to_string(s.first)][s.second.id.to_string()] = nlohmann::json{ + {"duplicating", s.second.duplicating}, + {"not_confirmed", s.second.not_confirmed}, + {"not_duplicated", s.second.not_duplicated}, + }; + } + resp.status_code = http_status_code::ok; + resp.body = json.dump(); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_http_service.h b/src/dist/replication/lib/replica_http_service.h new file mode 100644 index 0000000000..9c0d5f91d7 --- /dev/null +++ b/src/dist/replication/lib/replica_http_service.h @@ -0,0 +1,34 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include + +#include "replica_stub.h" + +namespace dsn { +namespace replication { + +class replica_http_service : public http_service +{ +public: + explicit replica_http_service(replica_stub *stub) : _stub(stub) + { + register_handler("duplication", + std::bind(&replica_http_service::query_duplication_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/replica/duplication?appid="); + } + + std::string path() const override { return "replica"; } + + void query_duplication_handler(const http_request &req, http_response &resp); + +private: + replica_stub *_stub; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index a3b90ae7c4..d415fe63b1 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -268,11 +268,13 @@ class replica_stub : public serverlet, public ref_counter friend class load_from_private_log; friend class ship_mutation; friend class replica_duplicator; + friend class replica_http_service; friend class mock_replica_stub; friend class duplication_sync_timer; friend class duplication_sync_timer_test; friend class replica_duplicator_manager_test; + friend class duplication_test_base; friend class replica_test; typedef std::unordered_map opening_replicas; diff --git a/src/dist/replication/lib/replication_service_app.cpp b/src/dist/replication/lib/replication_service_app.cpp index 0f27520495..b9289b0185 100644 --- a/src/dist/replication/lib/replication_service_app.cpp +++ b/src/dist/replication/lib/replication_service_app.cpp @@ -30,6 +30,7 @@ #include "dist/replication/common/replication_common.h" #include "dist/http/server_info_http_services.h" #include "replica_stub.h" +#include "replica_http_service.h" namespace dsn { namespace replication { @@ -48,6 +49,7 @@ replication_service_app::replication_service_app(const service_app_info *info) _version_http_service = new version_http_service(); _http_server->add_service(_version_http_service); _http_server->add_service(new recent_start_time_http_service()); + _http_server->add_service(new replica_http_service(_stub.get())); } replication_service_app::~replication_service_app(void) {}