Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: return Status::code instead of meanless integer #1417

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/storage_serverlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ class storage_serverlet
return nullptr;
}

// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
int handle_request(dsn::message_ex *request)
{
dsn::task_code t = request->rpc_code();
const rpc_handler *ptr = find_handler(t);
if (ptr != nullptr) {
// TODO(yingchun): add return value
(*ptr)(static_cast<T *>(this), request);
} else {
LOG_WARNING("recv message with unhandled rpc name {} from {}, trace_id = {:#018x} ",
Expand Down
1 change: 1 addition & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)

uint64_t start_time_ns = dsn_now_ns();
CHECK(_app, "");
// TODO(yingchun): check the return value.
_app->on_request(request);

// If the corresponding perf counter exist, count the duration of this operation.
Expand Down
14 changes: 7 additions & 7 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <alloca.h>
#include <fcntl.h>
#include <rocksdb/status.h>
#include <algorithm>
#include <fstream>
#include <memory>
Expand Down Expand Up @@ -373,12 +374,11 @@ int replication_app_base::on_batched_write_requests(int64_t decree,
message_ex **requests,
int request_length)
{
int storage_error = 0;
int storage_error = rocksdb::Status::kOk;
for (int i = 0; i < request_length; ++i) {
// TODO(yingchun): better to return error_code
int e = on_request(requests[i]);
if (e != 0) {
LOG_ERROR_PREFIX("got storage error when handler request({})",
if (e != rocksdb::Status::kOk) {
LOG_ERROR_PREFIX("got storage engine error when handler request({})",
requests[i]->header->rpc_name);
storage_error = e;
}
Expand Down Expand Up @@ -425,16 +425,16 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
}
}

int perror = on_batched_write_requests(
int storage_error = on_batched_write_requests(
mu->data.header.decree, mu->data.header.timestamp, batched_requests, batched_count);

// release faked requests
for (int i = 0; i < faked_count; i++) {
faked_requests[i]->release_ref();
}

if (perror != 0) {
LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror);
if (storage_error != rocksdb::Status::kOk) {
LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), storage_error);
// For normal write requests, if got rocksdb error, this replica will be set error and evoke
// learn.
// For ingestion requests, should not do as normal write requests, there are two reasons:
Expand Down
5 changes: 3 additions & 2 deletions src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class replication_app_base : public replica_base
//
virtual replication::decree last_durable_decree() const = 0;
virtual replication::decree last_flushed_decree() const { return last_durable_decree(); }
// return the error generated by storage engine
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) = 0;

//
Expand All @@ -225,8 +225,9 @@ class replication_app_base : public replica_base
// The base class gives a naive implementation that just call on_request
// repeatedly. Storage engine may override this function to get better performance.
//
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_batched_write_requests(int64_t decree,
uint64_t timstamp,
uint64_t timestamp,
message_ex **requests,
int request_length);

Expand Down
8 changes: 2 additions & 6 deletions src/server/pegasus_read_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ class pegasus_read_service : public dsn::replication::replication_app_base,
pegasus_read_service(dsn::replication::replica *r) : dsn::replication::replication_app_base(r)
{
}
virtual ~pegasus_read_service() {}
virtual int on_request(dsn::message_ex *request) override
{
handle_request(request);
return 0;
}

int on_request(dsn::message_ex *request) override { return handle_request(request); }

protected:
// all service handlers to be implemented further
Expand Down
14 changes: 8 additions & 6 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/

#include <rocksdb/status.h>
#include <stdio.h>
#include <thrift/transport/TTransportException.h>
#include <algorithm>
Expand Down Expand Up @@ -84,7 +85,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}",
requests[0]->header->from_address.to_string(),
ex.what());
return 0;
return rocksdb::Status::kOk;
}

return on_batched_writes(requests, count);
Expand All @@ -94,7 +95,7 @@ void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_defau

int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count)
{
int err = 0;
int err = rocksdb::Status::kOk;
{
_write_svc->batch_prepare(_decree);

Expand All @@ -104,7 +105,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.
int local_err = 0;
int local_err = rocksdb::Status::kOk;
try {
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
Expand All @@ -130,13 +131,14 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
ex.what());
}

if (!err && local_err) {
if (err == rocksdb::Status::kOk && local_err != rocksdb::Status::kOk) {
err = local_err;
}
}

if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) {
_write_svc->batch_abort(_decree, err == 0 ? -1 : err);
if (dsn_unlikely(err != rocksdb::Status::kOk ||
(_put_rpc_batch.empty() && _remove_rpc_batch.empty()))) {
_write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 : err);
} else {
err = _write_svc->batch_commit(_decree);
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class pegasus_server_write : public dsn::replication::replica_base
/// Error returned is regarded as the failure of replica, thus will trigger
/// cluster membership changes. Make sure no error is returned because of
/// invalid user argument.
/// As long as the returned error is 0, the operation is guaranteed to be
/// As long as the returned error is rocksdb::Status::kOk, the operation is guaranteed to be
/// successfully applied into rocksdb, which means an empty_put will be called
/// even if there's no write.
int on_batched_write_requests(dsn::message_ex **requests,
Expand Down
4 changes: 2 additions & 2 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ int pegasus_write_service::duplicate(int64_t decree,
remove_rpc remove;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
int err = 0;
int err = rocksdb::Status::kOk;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
put = put_rpc(write);
err = _impl->batch_put(ctx, put.request(), put.response());
Expand Down Expand Up @@ -422,7 +422,7 @@ int pegasus_write_service::ingest_files(int64_t decree,
resp.err = dsn::ERR_OK;
// write empty put to flush decree
resp.rocksdb_error = empty_put(decree);
if (resp.rocksdb_error != 0) {
if (resp.rocksdb_error != rocksdb::Status::kOk) {
resp.err = dsn::ERR_TRY_AGAIN;
return resp.rocksdb_error;
}
Expand Down
8 changes: 4 additions & 4 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,20 @@ class pegasus_write_service : dsn::replication::replica_base
void batch_prepare(int64_t decree);

// Add PUT record in batch write.
// \returns 0 if success, non-0 if failure.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp);

// Add REMOVE record in batch write.
// \returns 0 if success, non-0 if failure.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp);

// Commit batch write.
// \returns 0 if success, non-0 if failure.
// NOTE that if the batch contains no updates, 0 is returned.
// \returns rocksdb::Status::Code.
// NOTE that if the batch contains no updates, rocksdb::Status::kOk is returned.
int batch_commit(int64_t decree);

// Abort batch write.
Expand Down
22 changes: 11 additions & 11 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace pegasus {
namespace server {

/// internal error codes used for fail injection
// TODO(yingchun): Use real rocksdb::Status::code.
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
Expand Down Expand Up @@ -99,12 +100,11 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
int err =
_rocksdb_wrapper->write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0);
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
if (err) {
if (err != rocksdb::Status::kOk) {
return err;
}

err = _rocksdb_wrapper->write(decree);
return err;
return _rocksdb_wrapper->write(decree);
}

int multi_put(const db_write_context &ctx,
Expand Down Expand Up @@ -170,7 +170,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
}

resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == 0) {
if (resp.error == rocksdb::Status::kOk) {
resp.count = update.sort_keys.size();
}
return resp.error;
Expand All @@ -188,7 +188,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
if (err != 0) {
if (err != rocksdb::Status::kOk) {
resp.error = err;
return err;
}
Expand Down Expand Up @@ -252,7 +252,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
}

resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == 0) {
if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}
return resp.error;
Expand Down Expand Up @@ -283,7 +283,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
db_get_context get_context;
dsn::string_view check_raw_key(check_key.data(), check_key.length());
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
if (err != 0) {
if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, "
"check_sort_key: {}",
Expand Down Expand Up @@ -352,7 +352,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}

return 0;
return rocksdb::Status::kOk;
}

int check_and_mutate(int64_t decree,
Expand Down Expand Up @@ -404,7 +404,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
db_get_context get_context;
dsn::string_view check_raw_key(check_key.data(), check_key.length());
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
if (err != 0) {
if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, "
"check_sort_key: {}",
Expand Down Expand Up @@ -475,7 +475,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
resp.error =
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}
return 0;
return rocksdb::Status::kOk;
}

// \return ERR_INVALID_VERSION: replay or commit out-date ingest request
Expand Down Expand Up @@ -508,7 +508,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

// ingest external files
if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) !=
0)) {
rocksdb::Status::kOk)) {
return dsn::ERR_INGESTION_FAILED;
}
return dsn::ERR_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,

db_get_context get_ctx;
int err = get(raw_key, &get_ctx);
if (dsn_unlikely(err != 0)) {
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
return err;
}
// if record exists and is not expired.
Expand Down
7 changes: 4 additions & 3 deletions src/server/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class rocksdb_wrapper : public dsn::replication::replica_base
rocksdb_wrapper(pegasus_server_impl *server);

/// Calls RocksDB Get and store the result into `db_get_context`.
/// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
/// \result ctx.expired=true if record expired. Still 0 is returned.
/// \result ctx.found=false if record is not found. Still 0 is returned.
/// \returns rocksdb::Status::kOk if Get succeeded. On failure, a non-zero rocksdb status code
/// is returned.
/// \result ctx.expired=true if record expired. Still rocksdb::Status::kOk is returned.
/// \result ctx.found=false if record is not found. Still rocksdb::Status::kOk is returned.
int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);

int write_batch_put(int64_t decree,
Expand Down