Skip to content

Commit

Permalink
refactor: reimplement pegasus_server_write::on_batched_write_requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Jan 18, 2021
1 parent 2344660 commit fa62e0d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 43 deletions.
90 changes: 48 additions & 42 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log)
: replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log)
{
init_non_batch_write_handlers();
}

int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
Expand All @@ -50,43 +51,11 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return _write_svc->empty_put(_decree);
}

dsn::task_code rpc_code(requests[0]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_put_rpc::auto_reply(requests[0]);
return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response());
auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
return iter->second(requests[0]);
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_remove_rpc::auto_reply(requests[0]);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
dassert(count == 1, "count = %d", count);
auto rpc = incr_rpc::auto_reply(requests[0]);
return _write_svc->incr(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
dassert(count == 1, "count = %d", count);
auto rpc = duplicate_rpc::auto_reply(requests[0]);
return _write_svc->duplicate(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
dassert(count == 1, "count = %d", count);
auto rpc = check_and_set_rpc::auto_reply(requests[0]);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
dassert(count == 1, "count = %d", count);
auto rpc = check_and_mutate_rpc::auto_reply(requests[0]);
return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
dassert(count == 1, "count = %d", count);
auto rpc = ingestion_rpc::auto_reply(requests[0]);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
}

return on_batched_writes(requests, count);
}

Expand Down Expand Up @@ -116,13 +85,10 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
dfatal("rpc code not allow batch: %s", rpc_code.to_string());
if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
} else {
dfatal("rpc code not handled: %s", rpc_code.to_string());
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
}
}

Expand Down Expand Up @@ -170,5 +136,45 @@ void pegasus_server_write::request_key_check(int64_t decree,
}
}

void pegasus_server_write::init_non_batch_write_handlers()
{
_non_batch_write_handlers = {
{dsn::apps::RPC_RRDB_RRDB_MULTI_PUT,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_put_rpc::auto_reply(request);
return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_remove_rpc::auto_reply(request);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_INCR,
[this](dsn::message_ex *request) -> int {
auto rpc = incr_rpc::auto_reply(request);
return _write_svc->incr(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
[this](dsn::message_ex *request) -> int {
auto rpc = duplicate_rpc::auto_reply(request);
return _write_svc->duplicate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_set_rpc::auto_reply(request);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_mutate_rpc::auto_reply(request);
return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
[this](dsn::message_ex *request) -> int {
auto rpc = ingestion_rpc::auto_reply(request);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
}},
};
}
} // namespace server
} // namespace pegasus
5 changes: 5 additions & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class pegasus_server_write : public dsn::replication::replica_base
void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key);

private:
void init_non_batch_write_handlers();

friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
Expand All @@ -84,6 +86,9 @@ class pegasus_server_write : public dsn::replication::replica_base
int64_t _decree;

const bool _verbose_log;

typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map;
non_batch_writes_map _non_batch_write_handlers;
};

} // namespace server
Expand Down

0 comments on commit fa62e0d

Please sign in to comment.