Skip to content

Commit

Permalink
fix: Fault-tolerant storage engine errors for read operations (#1447)
Browse files Browse the repository at this point in the history
#1383

ReplicaServer doesn't handle the error returned from storage engine, thus
even if the storage engine is corrupted, the server doesn't recognize these
situactions, and still running happily. However, the client always gets an
error status.
This situaction will not recover automatically except stopping the server
and moving away the corrupted RocksDB directories manually.

This patch handle the kCorruption error returned from storage engine, then
close the replcia, move the directory to ".err" trash path. The replica is
able to recover automatically (if RF > 1).
  • Loading branch information
acelyc111 authored Apr 24, 2023
1 parent 64414e6 commit 12c8b56
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 13 deletions.
37 changes: 30 additions & 7 deletions src/common/storage_serverlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <unordered_map>
#include <functional>

#include "replica/storage/simple_kv/simple_kv.code.definition.h"
#include "rrdb/rrdb.code.definition.h"
#include "runtime/api_task.h"
#include "runtime/api_layer1.h"
#include "runtime/app_model.h"
Expand All @@ -50,7 +52,7 @@ template <typename T>
class storage_serverlet
{
protected:
typedef std::function<void(T *, dsn::message_ex *req)> rpc_handler;
typedef std::function<int(T *, dsn::message_ex *req)> rpc_handler;
static std::unordered_map<std::string, rpc_handler> s_handlers;
static std::vector<rpc_handler> s_vhandlers;

Expand All @@ -60,11 +62,19 @@ class storage_serverlet
const char *name,
void (*handler)(T *svc, const TReq &req, rpc_replier<TResp> &resp))
{
// Only allowed to register simple.kv rpc handler.
CHECK(dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_READ == rpc_code ||
dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_WRITE == rpc_code ||
dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_APPEND == rpc_code,
"Not allowed to register with rpc_code {}",
rpc_code);
rpc_handler h = [handler](T *p, dsn::message_ex *r) {
TReq req;
::dsn::unmarshall(r, req);
rpc_replier<TResp> replier(r->create_response());
handler(p, req, replier);
// For simple.kv, always return 0 which means success.
return 0;
};

return register_async_rpc_handler(rpc_code, name, h);
Expand All @@ -76,7 +86,9 @@ class storage_serverlet
void (*handler)(T *svc, TRpcHolder))
{
rpc_handler h = [handler](T *p, dsn::message_ex *request) {
handler(p, TRpcHolder::auto_reply(request));
auto rh = TRpcHolder::auto_reply(request);
handler(p, rh);
return rh.response().error;
};

return register_async_rpc_handler(rpc_code, name, h);
Expand All @@ -87,10 +99,17 @@ class storage_serverlet
const char *name,
void (*handler)(T *svc, const TReq &req))
{
// Only allowed to register RPC_RRDB_RRDB_CLEAR_SCANNER handler.
CHECK_EQ_MSG(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER,
rpc_code,
"Not allowed to register with rpc_code {}",
rpc_code);
rpc_handler h = [handler](T *p, dsn::message_ex *r) {
TReq req;
::dsn::unmarshall(r, req);
handler(p, req);
// For RPC_RRDB_RRDB_CLEAR_SCANNER, always return 0 which means success.
return 0;
};

return register_async_rpc_handler(rpc_code, name, h);
Expand All @@ -114,11 +133,15 @@ class storage_serverlet

static const rpc_handler *find_handler(dsn::task_code rpc_code)
{
if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr)
if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr) {
return &s_vhandlers[rpc_code];
}

auto iter = s_handlers.find(rpc_code.to_string());
if (iter != s_handlers.end())
if (iter != s_handlers.end()) {
return &(iter->second);
}

return nullptr;
}

Expand All @@ -128,16 +151,16 @@ class storage_serverlet
dsn::task_code t = request->rpc_code();
const rpc_handler *ptr = find_handler(t);
if (ptr != nullptr) {
// TODO(yingchun): add return value
(*ptr)(static_cast<T *>(this), request);
return (*ptr)(static_cast<T *>(this), request);
} else {
LOG_WARNING("recv message with unhandled rpc name {} from {}, trace_id = {:#018x} ",
t,
request->header->from_address,
request->header->trace_id);
dsn_rpc_reply(request->create_response(), ::dsn::ERR_HANDLER_NOT_FOUND);
// TODO(yingchun): return a non-zero value
return 0;
}
return 0;
}
};

Expand Down
16 changes: 14 additions & 2 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <algorithm>
#include <functional>
#include <iosfwd>
#include <rocksdb/status.h>
#include <set>

#include "backup/replica_backup_manager.h"
Expand Down Expand Up @@ -299,8 +300,19 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)

uint64_t start_time_ns = dsn_now_ns();
CHECK(_app, "");
// TODO(yingchun): check the return value.
_app->on_request(request);
auto storage_error = _app->on_request(request);
if (dsn_unlikely(storage_error != ERR_OK)) {
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
case rocksdb::Status::kCorruption:
handle_local_failure(ERR_RDB_CORRUPTION);
break;
default:
LOG_ERROR_PREFIX("client read encountered an unhandled error: {}", storage_error);
}
return;
}

// If the corresponding perf counter exist, count the duration of this operation.
// rpc code of request is already checked in message_ex::rpc_code, so it will always be legal
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class replication_app_base : public replica_base
//
virtual replication::decree last_durable_decree() const = 0;
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) = 0;
virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;

//
// Parameters:
Expand Down
6 changes: 5 additions & 1 deletion src/replica/storage/simple_kv/simple_kv.server.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ class simple_kv_service : public replication_app_base, public storage_serverlet<
simple_kv_service(replica *r) : replication_app_base(r) {}
virtual ~simple_kv_service() {}

virtual int on_request(dsn::message_ex *request) override { return handle_request(request); }
virtual int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
{
return handle_request(request);
}

protected:
// all service handlers to be implemented further
// RPC_SIMPLE_KV_SIMPLE_KV_READ
Expand Down
2 changes: 1 addition & 1 deletion src/replica/test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class mock_replication_app_base : public replication_app_base
utils::filesystem::create_file(fmt::format("{}/checkpoint.file", checkpoint_dir));
return ERR_OK;
}
int on_request(message_ex *request) override { return 0; }
int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 0; }
std::string query_compact_state() const { return ""; };

// we mock the followings
Expand Down
5 changes: 4 additions & 1 deletion src/server/pegasus_read_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ class pegasus_read_service : public dsn::replication::replication_app_base,
{
}

int on_request(dsn::message_ex *request) override { return handle_request(request); }
int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
{
return handle_request(request);
}

protected:
// all service handlers to be implemented further
Expand Down
10 changes: 10 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ DSN_DEFINE_int32(pegasus.server,
update_rdb_stat_interval,
60,
"The interval seconds to update RocksDB statistics, in seconds.");
DSN_DEFINE_int32(pegasus.server,
inject_read_error_for_test,
0,
"Which error code to inject in read path, 0 means no error. Only for test.");
DSN_TAG_VARIABLE(inject_read_error_for_test, FT_MUTABLE);

static std::string chkpt_get_dir_name(int64_t decree)
{
Expand Down Expand Up @@ -329,6 +334,11 @@ void pegasus_server_impl::on_get(get_rpc rpc)
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;

if (dsn_unlikely(FLAGS_inject_read_error_for_test != rocksdb::Status::kOk)) {
resp.error = FLAGS_inject_read_error_for_test;
return;
}

if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
_counter_recent_read_throttling_reject_count->increment();
Expand Down
62 changes: 62 additions & 0 deletions src/test/function_test/base_api_test/integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,65 @@ TEST_F(integration_test, write_corrupt_db)

ASSERT_IN_TIME([&] { ASSERT_EQ(3, get_alive_replica_server_count()); }, 60);
}

TEST_F(integration_test, read_corrupt_db)
{
// Make best effort to rebalance the cluster,
ASSERT_NO_FATAL_FAILURE(
run_cmd_from_project_root("echo 'set_meta_level lively' | ./run.sh shell"));
// Make sure RS-1 has some primaries of table 'temp'.
ASSERT_IN_TIME([&] { ASSERT_GT(get_leader_count("temp", 1), 0); }, 120);

// Inject a read error kCorruption to RS-1.
ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
"curl 'localhost:34801/updateConfig?inject_read_error_for_test=2'"));

std::string skey = "skey";
std::string value = "value";
for (int i = 0; i < 1000; i++) {
std::string hkey = fmt::format("hkey.read_corrupt_db.{}", i);
ASSERT_EQ(PERR_OK, client_->set(hkey, skey, value));
}

int ok_count = 0;
int corruption_count = 0;
for (int i = 0; i < 1000; i++) {
std::string hkey = fmt::format("hkey.read_corrupt_db.{}", i);
std::string got_value;
int ret = PERR_OK;
do {
ret = client_->get(hkey, skey, got_value);
if (ret == PERR_OK) {
ASSERT_EQ(value, got_value);
ok_count++;
break;
} else if (ret == PERR_CORRUPTION) {
// Suppose there must some primaries on RS-1.
corruption_count++;
break;
} else if (ret == PERR_TIMEOUT) {
corruption_count++;
// If RS-1 crashed before (encounter a read kCorruption error from storage engine),
// a new read operation on the primary replica it ever held will cause timeout.
// Force to fetch the latest route table.
client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
ASSERT_TRUE(client_ != nullptr);
} else {
ASSERT_TRUE(false) << ret;
}
} while (true);
}

ASSERT_GT(ok_count, 0);
ASSERT_GT(corruption_count, 0);
std::cout << "ok_count: " << ok_count << ", corruption_count: " << corruption_count
<< std::endl;

// All replica servers in this cluster are healthy.
ASSERT_IN_TIME([&] { ASSERT_EQ(3, get_alive_replica_server_count()); }, 60);

// Recover the injected read error for RS-1.
ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
"curl 'localhost:34801/updateConfig?inject_read_error_for_test=0'"));
}

0 comments on commit 12c8b56

Please sign in to comment.