From 9452785486f893f58fbcfd027fb42210faa4bdf6 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Sat, 25 Mar 2023 17:32:24 +0800 Subject: [PATCH] refactor: return Status::code instead of meanless int --- src/common/storage_serverlet.h | 2 ++ src/replica/replica.cpp | 1 + src/replica/replication_app_base.cpp | 14 +++++++------- src/replica/replication_app_base.h | 5 +++-- src/server/pegasus_read_service.h | 8 ++------ src/server/pegasus_server_write.cpp | 14 ++++++++------ src/server/pegasus_server_write.h | 2 +- src/server/pegasus_write_service.cpp | 4 ++-- src/server/pegasus_write_service.h | 8 ++++---- src/server/pegasus_write_service_impl.h | 22 +++++++++++----------- src/server/rocksdb_wrapper.cpp | 2 +- src/server/rocksdb_wrapper.h | 7 ++++--- 12 files changed, 46 insertions(+), 43 deletions(-) diff --git a/src/common/storage_serverlet.h b/src/common/storage_serverlet.h index 8eb7c67e0f..30fa562872 100644 --- a/src/common/storage_serverlet.h +++ b/src/common/storage_serverlet.h @@ -122,11 +122,13 @@ class storage_serverlet return nullptr; } + // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK. int handle_request(dsn::message_ex *request) { dsn::task_code t = request->rpc_code(); const rpc_handler *ptr = find_handler(t); if (ptr != nullptr) { + // TODO(yingchun): add return value (*ptr)(static_cast(this), request); } else { LOG_WARNING("recv message with unhandled rpc name {} from {}, trace_id = {:#018x} ", diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index ef363b2fe2..be2cbd3092 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -299,6 +299,7 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling) uint64_t start_time_ns = dsn_now_ns(); CHECK(_app, ""); + // TODO(yingchun): check the return value. _app->on_request(request); // If the corresponding perf counter exist, count the duration of this operation. diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index 596a4e674c..5ae07b90bb 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -373,12 +374,11 @@ int replication_app_base::on_batched_write_requests(int64_t decree, message_ex **requests, int request_length) { - int storage_error = 0; + int storage_error = rocksdb::Status::kOk; for (int i = 0; i < request_length; ++i) { - // TODO(yingchun): better to return error_code int e = on_request(requests[i]); - if (e != 0) { - LOG_ERROR_PREFIX("got storage error when handler request({})", + if (e != rocksdb::Status::kOk) { + LOG_ERROR_PREFIX("got storage engine error when handler request({})", requests[i]->header->rpc_name); storage_error = e; } @@ -425,7 +425,7 @@ error_code replication_app_base::apply_mutation(const mutation *mu) } } - int perror = on_batched_write_requests( + int storage_error = on_batched_write_requests( mu->data.header.decree, mu->data.header.timestamp, batched_requests, batched_count); // release faked requests @@ -433,8 +433,8 @@ error_code replication_app_base::apply_mutation(const mutation *mu) faked_requests[i]->release_ref(); } - if (perror != 0) { - LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror); + if (storage_error != rocksdb::Status::kOk) { + LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), storage_error); // For normal write requests, if got rocksdb error, this replica will be set error and evoke // learn. // For ingestion requests, should not do as normal write requests, there are two reasons: diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h index 9e9257e95c..e0e292d6d0 100644 --- a/src/replica/replication_app_base.h +++ b/src/replica/replication_app_base.h @@ -215,7 +215,7 @@ class replication_app_base : public replica_base // virtual replication::decree last_durable_decree() const = 0; virtual replication::decree last_flushed_decree() const { return last_durable_decree(); } - // return the error generated by storage engine + // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK. virtual int on_request(message_ex *request) = 0; // @@ -225,8 +225,9 @@ class replication_app_base : public replica_base // The base class gives a naive implementation that just call on_request // repeatedly. Storage engine may override this function to get better performance. // + // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK. virtual int on_batched_write_requests(int64_t decree, - uint64_t timstamp, + uint64_t timestamp, message_ex **requests, int request_length); diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h index 697274d832..163a529d3a 100644 --- a/src/server/pegasus_read_service.h +++ b/src/server/pegasus_read_service.h @@ -42,12 +42,8 @@ class pegasus_read_service : public dsn::replication::replication_app_base, pegasus_read_service(dsn::replication::replica *r) : dsn::replication::replication_app_base(r) { } - virtual ~pegasus_read_service() {} - virtual int on_request(dsn::message_ex *request) override - { - handle_request(request); - return 0; - } + + int on_request(dsn::message_ex *request) override { return handle_request(request); } protected: // all service handlers to be implemented further diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 8daacc48b5..0147050a65 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -17,6 +17,7 @@ * under the License. */ +#include #include #include #include @@ -84,7 +85,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}", requests[0]->header->from_address.to_string(), ex.what()); - return 0; + return rocksdb::Status::kOk; } return on_batched_writes(requests, count); @@ -94,7 +95,7 @@ void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_defau int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count) { - int err = 0; + int err = rocksdb::Status::kOk; { _write_svc->batch_prepare(_decree); @@ -104,7 +105,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun // Make sure all writes are batched even if they are failed, // since we need to record the total qps and rpc latencies, // and respond for all RPCs regardless of their result. - int local_err = 0; + int local_err = rocksdb::Status::kOk; try { dsn::task_code rpc_code(requests[i]->rpc_code()); if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) { @@ -130,13 +131,14 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun ex.what()); } - if (!err && local_err) { + if (err == rocksdb::Status::kOk && local_err != rocksdb::Status::kOk) { err = local_err; } } - if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) { - _write_svc->batch_abort(_decree, err == 0 ? -1 : err); + if (dsn_unlikely(err != rocksdb::Status::kOk || + (_put_rpc_batch.empty() && _remove_rpc_batch.empty()))) { + _write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 : err); } else { err = _write_svc->batch_commit(_decree); } diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 7bf35b3471..6a002c2993 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -52,7 +52,7 @@ class pegasus_server_write : public dsn::replication::replica_base /// Error returned is regarded as the failure of replica, thus will trigger /// cluster membership changes. Make sure no error is returned because of /// invalid user argument. - /// As long as the returned error is 0, the operation is guaranteed to be + /// As long as the returned error is rocksdb::Status::kOk, the operation is guaranteed to be /// successfully applied into rocksdb, which means an empty_put will be called /// even if there's no write. int on_batched_write_requests(dsn::message_ex **requests, diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index dd82e205d8..73f6cc8d6f 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -386,7 +386,7 @@ int pegasus_write_service::duplicate(int64_t decree, remove_rpc remove; if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT || request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { - int err = 0; + int err = rocksdb::Status::kOk; if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) { put = put_rpc(write); err = _impl->batch_put(ctx, put.request(), put.response()); @@ -422,7 +422,7 @@ int pegasus_write_service::ingest_files(int64_t decree, resp.err = dsn::ERR_OK; // write empty put to flush decree resp.rocksdb_error = empty_put(decree); - if (resp.rocksdb_error != 0) { + if (resp.rocksdb_error != rocksdb::Status::kOk) { resp.err = dsn::ERR_TRY_AGAIN; return resp.rocksdb_error; } diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 4136f02b67..9fb854ffda 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -176,20 +176,20 @@ class pegasus_write_service : dsn::replication::replica_base void batch_prepare(int64_t decree); // Add PUT record in batch write. - // \returns 0 if success, non-0 if failure. + // \returns rocksdb::Status::Code. // NOTE that `resp` should not be moved or freed while the batch is not committed. int batch_put(const db_write_context &ctx, const dsn::apps::update_request &update, dsn::apps::update_response &resp); // Add REMOVE record in batch write. - // \returns 0 if success, non-0 if failure. + // \returns rocksdb::Status::Code. // NOTE that `resp` should not be moved or freed while the batch is not committed. int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp); // Commit batch write. - // \returns 0 if success, non-0 if failure. - // NOTE that if the batch contains no updates, 0 is returned. + // \returns rocksdb::Status::Code. + // NOTE that if the batch contains no updates, rocksdb::Status::kOk is returned. int batch_commit(int64_t decree); // Abort batch write. diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index e0310d7419..b75b379a3f 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -36,6 +36,7 @@ namespace pegasus { namespace server { /// internal error codes used for fail injection +// TODO(yingchun): Use real rocksdb::Status::code. static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101; static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102; static constexpr int FAIL_DB_WRITE = -103; @@ -99,12 +100,11 @@ class pegasus_write_service::impl : public dsn::replication::replica_base int err = _rocksdb_wrapper->write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0); auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); }); - if (err) { + if (err != rocksdb::Status::kOk) { return err; } - err = _rocksdb_wrapper->write(decree); - return err; + return _rocksdb_wrapper->write(decree); } int multi_put(const db_write_context &ctx, @@ -170,7 +170,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base } resp.error = _rocksdb_wrapper->write(decree); - if (resp.error == 0) { + if (resp.error == rocksdb::Status::kOk) { resp.count = update.sort_keys.size(); } return resp.error; @@ -188,7 +188,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base uint32_t new_expire_ts = 0; db_get_context get_ctx; int err = _rocksdb_wrapper->get(raw_key, &get_ctx); - if (err != 0) { + if (err != rocksdb::Status::kOk) { resp.error = err; return err; } @@ -252,7 +252,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base } resp.error = _rocksdb_wrapper->write(decree); - if (resp.error == 0) { + if (resp.error == rocksdb::Status::kOk) { resp.new_value = new_value; } return resp.error; @@ -283,7 +283,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base db_get_context get_context; dsn::string_view check_raw_key(check_key.data(), check_key.length()); int err = _rocksdb_wrapper->get(check_raw_key, &get_context); - if (err != 0) { + if (err != rocksdb::Status::kOk) { // read check value failed LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, " "check_sort_key: {}", @@ -352,7 +352,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain; } - return 0; + return rocksdb::Status::kOk; } int check_and_mutate(int64_t decree, @@ -404,7 +404,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base db_get_context get_context; dsn::string_view check_raw_key(check_key.data(), check_key.length()); int err = _rocksdb_wrapper->get(check_raw_key, &get_context); - if (err != 0) { + if (err != rocksdb::Status::kOk) { // read check value failed LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, " "check_sort_key: {}", @@ -475,7 +475,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base resp.error = invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain; } - return 0; + return rocksdb::Status::kOk; } // \return ERR_INVALID_VERSION: replay or commit out-date ingest request @@ -508,7 +508,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base // ingest external files if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) != - 0)) { + rocksdb::Status::kOk)) { return dsn::ERR_INGESTION_FAILED; } return dsn::ERR_OK; diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 574b00a843..c5872ec13d 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -115,7 +115,7 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx, db_get_context get_ctx; int err = get(raw_key, &get_ctx); - if (dsn_unlikely(err != 0)) { + if (dsn_unlikely(err != rocksdb::Status::kOk)) { return err; } // if record exists and is not expired. diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index 05b73dc760..e3c713512d 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -53,9 +53,10 @@ class rocksdb_wrapper : public dsn::replication::replica_base rocksdb_wrapper(pegasus_server_impl *server); /// Calls RocksDB Get and store the result into `db_get_context`. - /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. - /// \result ctx.expired=true if record expired. Still 0 is returned. - /// \result ctx.found=false if record is not found. Still 0 is returned. + /// \returns rocksdb::Status::kOk if Get succeeded. On failure, a non-zero rocksdb status code + /// is returned. + /// \result ctx.expired=true if record expired. Still rocksdb::Status::kOk is returned. + /// \result ctx.found=false if record is not found. Still rocksdb::Status::kOk is returned. int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx); int write_batch_put(int64_t decree,