From 39691b1e0b5b51d1ae0194f77f874ae15cca5e96 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 1 Dec 2020 17:23:55 +0800 Subject: [PATCH 01/21] refactor: db_get --- src/server/pegasus_write_service_impl.h | 118 +++++------------- src/server/rocksdb_wrapper.cpp | 65 ++++++++++ src/server/rocksdb_wrapper.h | 52 ++++++++ src/server/test/CMakeLists.txt | 1 + .../test/pegasus_write_service_impl_test.cpp | 4 +- 5 files changed, 155 insertions(+), 85 deletions(-) create mode 100644 src/server/rocksdb_wrapper.cpp create mode 100644 src/server/rocksdb_wrapper.h diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index baa0507fb9..b3608d803a 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -25,6 +25,7 @@ #include "base/pegasus_key_schema.h" #include "meta_store.h" +#include "rocksdb_wrapper.h" #include #include @@ -38,7 +39,6 @@ namespace server { static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101; static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102; static constexpr int FAIL_DB_WRITE = -103; -static constexpr int FAIL_DB_GET = -104; struct db_get_context { @@ -97,6 +97,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; + _rocksdb_wrapper = dsn::make_unique( + server, server->_db, _pegasus_data_version, server->_data_cf_rd_opts); } int empty_put(int64_t decree) @@ -196,7 +198,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base int64_t new_value = 0; uint32_t new_expire_ts = 0; db_get_context get_ctx; - int err = db_get(raw_key, &get_ctx); + int err = _rocksdb_wrapper->get(raw_key, &get_ctx); if (err != 0) { resp.error = err; return err; @@ -291,41 +293,31 @@ class pegasus_write_service::impl : public dsn::replication::replica_base ::dsn::blob check_key; pegasus_generate_key(check_key, update.hash_key, update.check_sort_key); - rocksdb::Slice check_raw_key(check_key.data(), check_key.length()); - std::string check_raw_value; - rocksdb::Status check_status = _db->Get(_rd_opts, check_raw_key, &check_raw_value); - if (check_status.ok()) { - // read check value succeed - if (check_if_record_expired( - _pegasus_data_version, utils::epoch_now(), check_raw_value)) { - // check value ttl timeout - _pfc_recent_expire_count->increment(); - check_status = rocksdb::Status::NotFound(); - } - } else if (!check_status.IsNotFound()) { + + db_get_context get_context; + dsn::string_view check_raw_key(check_key.data(), check_key.length()); + int err = _rocksdb_wrapper->get(check_raw_key, &get_context); + if (err != 0) { // read check value failed - derror_rocksdb("GetCheckValue for CheckAndSet", - check_status.ToString(), - "decree: {}, hash_key: {}, check_sort_key: {}", + derror_rocksdb("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, " + "check_sort_key: {}", decree, utils::c_escape_string(update.hash_key), utils::c_escape_string(update.check_sort_key)); - resp.error = check_status.code(); + resp.error = err; return resp.error; } - dassert_f(check_status.ok() || check_status.IsNotFound(), - "status = %s", - check_status.ToString().c_str()); ::dsn::blob check_value; - if (check_status.ok()) { + bool value_exist = !get_context.expired && get_context.found; + if (value_exist) { pegasus_extract_user_data( - _pegasus_data_version, std::move(check_raw_value), check_value); + _pegasus_data_version, std::move(get_context.raw_value), check_value); } if (update.return_check_value) { resp.check_value_returned = true; - if (check_status.ok()) { + if (value_exist) { resp.check_value_exist = true; resp.check_value = check_value; } @@ -335,7 +327,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base bool passed = validate_check(decree, update.check_type, update.check_operand, - check_status.ok(), + value_exist, check_value, invalid_argument); @@ -421,41 +413,31 @@ class pegasus_write_service::impl : public dsn::replication::replica_base ::dsn::blob check_key; pegasus_generate_key(check_key, update.hash_key, update.check_sort_key); - rocksdb::Slice check_raw_key(check_key.data(), check_key.length()); - std::string check_raw_value; - rocksdb::Status check_status = _db->Get(_rd_opts, check_raw_key, &check_raw_value); - if (check_status.ok()) { - // read check value succeed - if (check_if_record_expired( - _pegasus_data_version, utils::epoch_now(), check_raw_value)) { - // check value ttl timeout - _pfc_recent_expire_count->increment(); - check_status = rocksdb::Status::NotFound(); - } - } else if (!check_status.IsNotFound()) { + + db_get_context get_context; + dsn::string_view check_raw_key(check_key.data(), check_key.length()); + int err = _rocksdb_wrapper->get(check_raw_key, &get_context); + if (err != 0) { // read check value failed - derror_rocksdb("GetCheckValue for CheckAndMutate", - check_status.ToString(), - "decree: {}, hash_key: {}, check_sort_key: {}", + derror_rocksdb("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, " + "check_sort_key: {}", decree, utils::c_escape_string(update.hash_key), utils::c_escape_string(update.check_sort_key)); - resp.error = check_status.code(); + resp.error = err; return resp.error; } - dassert_f(check_status.ok() || check_status.IsNotFound(), - "status = %s", - check_status.ToString().c_str()); - ::dsn::blob check_value; - if (check_status.ok()) { + bool value_exist = !get_context.expired && get_context.found; + dsn::blob check_value; + if (value_exist) { pegasus_extract_user_data( - _pegasus_data_version, std::move(check_raw_value), check_value); + _pegasus_data_version, std::move(get_context.raw_value), check_value); } if (update.return_check_value) { resp.check_value_returned = true; - if (check_status.ok()) { + if (value_exist) { resp.check_value_exist = true; resp.check_value = check_value; } @@ -465,7 +447,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base bool passed = validate_check(decree, update.check_type, update.check_operand, - check_status.ok(), + value_exist, check_value, invalid_argument); @@ -602,7 +584,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base !raw_key.empty()) { // not an empty write db_get_context get_ctx; - int err = db_get(raw_key, &get_ctx); + int err = _rocksdb_wrapper->get(raw_key, &get_ctx); if (dsn_unlikely(err != 0)) { return err; } @@ -681,40 +663,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return status.code(); } - /// Calls RocksDB Get and store the result into `db_get_context`. - /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. - /// \result ctx.expired=true if record expired. Still 0 is returned. - /// \result ctx.found=false if record is not found. Still 0 is returned. - int db_get(dsn::string_view raw_key, - /*out*/ db_get_context *ctx) - { - FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; }); - - rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); - if (dsn_likely(s.ok())) { - // success - ctx->found = true; - ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value); - if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) { - ctx->expired = true; - } - return 0; - } - if (s.IsNotFound()) { - // NotFound is an acceptable error - ctx->found = false; - return 0; - } - ::dsn::blob hash_key, sort_key; - pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); - derror_rocksdb("Get", - s.ToString(), - "hash_key: {}, sort_key: {}", - utils::c_escape_string(hash_key), - utils::c_escape_string(sort_key)); - return s.code(); - } - void clear_up_batch_states(int64_t decree, int err) { if (!_update_responses.empty()) { @@ -877,6 +825,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base ::dsn::perf_counter_wrapper &_pfc_recent_expire_count; pegasus_value_generator _value_generator; + std::unique_ptr _rocksdb_wrapper; + // for setting update_response.error after committed. std::vector _update_responses; }; diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp new file mode 100644 index 0000000000..fb8348d8b7 --- /dev/null +++ b/src/server/rocksdb_wrapper.cpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "rocksdb_wrapper.h" + +#include "pegasus_write_service_impl.h" + +namespace pegasus { +namespace server { + +rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server, + rocksdb::DB *db, + const uint32_t pegasus_data_version, + rocksdb::ReadOptions &rd_opts) + : replica_base(server), _db(db), _rd_opts(rd_opts), _pegasus_data_version(pegasus_data_version) +{ +} + +int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) +{ + FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; }); + + rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); + if (dsn_likely(s.ok())) { + // success + ctx->found = true; + ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value); + if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) { + ctx->expired = true; + } + return 0; + } else if (s.IsNotFound()) { + // NotFound is an acceptable error + ctx->found = false; + return 0; + } + + ::dsn::blob hash_key, sort_key; + pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); + derror_rocksdb("Get", + s.ToString(), + "hash_key: {}, sort_key: {}", + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key)); + return s.code(); +} + +} // namespace server +} // namespace pegasus diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h new file mode 100644 index 0000000000..3e2cf7bc12 --- /dev/null +++ b/src/server/rocksdb_wrapper.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +namespace pegasus { +namespace server { +struct db_get_context; +class pegasus_server_impl; + +static constexpr int FAIL_DB_GET = -104; + +class rocksdb_wrapper : public dsn::replication::replica_base +{ +public: + rocksdb_wrapper(pegasus_server_impl *server, + rocksdb::DB *db, + const uint32_t _pegasus_data_version, + rocksdb::ReadOptions &_rd_opts); + + /// Calls RocksDB Get and store the result into `db_get_context`. + /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. + /// \result ctx.expired=true if record expired. Still 0 is returned. + /// \result ctx.found=false if record is not found. Still 0 is returned. + int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx); + +private: + rocksdb::DB *_db; + rocksdb::ReadOptions &_rd_opts; + const uint32_t _pegasus_data_version; +}; +} // namespace server +} // namespace pegasus diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 08834451a3..80dc4d6403 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -28,6 +28,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp" "../hotspot_partition_calculator.cpp" "../meta_store.cpp" "../hotkey_collector.cpp" + "../rocksdb_wrapper.cpp" ) set(MY_SRC_SEARCH_MODE "GLOB") diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp index 48f9fa8087..35557bf5b0 100644 --- a/src/server/test/pegasus_write_service_impl_test.cpp +++ b/src/server/test/pegasus_write_service_impl_test.cpp @@ -32,6 +32,7 @@ class pegasus_write_service_impl_test : public pegasus_server_test_base protected: std::unique_ptr _server_write; pegasus_write_service::impl *_write_impl{nullptr}; + rocksdb_wrapper *_rocksdb_wrapper{nullptr}; public: void SetUp() override @@ -39,6 +40,7 @@ class pegasus_write_service_impl_test : public pegasus_server_test_base start(); _server_write = dsn::make_unique(_server.get(), true); _write_impl = _server_write->_write_svc->_impl.get(); + _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get(); } uint64_t read_timestamp_from(dsn::string_view raw_key) @@ -70,7 +72,7 @@ class pegasus_write_service_impl_test : public pegasus_server_test_base int db_get(dsn::string_view raw_key, db_get_context *get_ctx) { - return _write_impl->db_get(raw_key, get_ctx); + return _rocksdb_wrapper->get(raw_key, get_ctx); } void single_set(dsn::blob raw_key, dsn::blob user_value) From 279e90b0262ec2bf9b8ea58853a52ec85d1917d8 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 1 Dec 2020 18:28:06 +0800 Subject: [PATCH 02/21] fix --- src/server/pegasus_write_service_impl.h | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index b3608d803a..4cd720c4b3 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -299,11 +299,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base int err = _rocksdb_wrapper->get(check_raw_key, &get_context); if (err != 0) { // read check value failed - derror_rocksdb("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, " - "check_sort_key: {}", - decree, - utils::c_escape_string(update.hash_key), - utils::c_escape_string(update.check_sort_key)); resp.error = err; return resp.error; } @@ -419,11 +414,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base int err = _rocksdb_wrapper->get(check_raw_key, &get_context); if (err != 0) { // read check value failed - derror_rocksdb("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, " - "check_sort_key: {}", - decree, - utils::c_escape_string(update.hash_key), - utils::c_escape_string(update.check_sort_key)); resp.error = err; return resp.error; } From f7e5f0495e62f261d9c87c03f6aaf3ae324e47b3 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 1 Dec 2020 18:37:49 +0800 Subject: [PATCH 03/21] fix --- src/server/rocksdb_wrapper.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index fb8348d8b7..00269e498c 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -51,8 +51,8 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) return 0; } - ::dsn::blob hash_key, sort_key; - pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); + dsn::blob hash_key, sort_key; + pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); derror_rocksdb("Get", s.ToString(), "hash_key: {}, sort_key: {}", From a70711bfc8b1f47e4c2690d47fd712a837dab6ba Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 1 Dec 2020 18:42:48 +0800 Subject: [PATCH 04/21] fix --- rdsn | 2 +- src/server/pegasus_write_service_impl.h | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rdsn b/rdsn index 64b9e11703..65edf61e37 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 64b9e11703aea7acfa771037cbc3ad8b363abfaa +Subproject commit 65edf61e376b692f41845ab3deb71cb686ea5575 diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 4cd720c4b3..09b2d20540 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -291,7 +291,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return empty_put(decree); } - ::dsn::blob check_key; + dsn::blob check_key; pegasus_generate_key(check_key, update.hash_key, update.check_sort_key); db_get_context get_context; @@ -303,8 +303,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return resp.error; } - ::dsn::blob check_value; bool value_exist = !get_context.expired && get_context.found; + dsn::blob check_value; if (value_exist) { pegasus_extract_user_data( _pegasus_data_version, std::move(get_context.raw_value), check_value); @@ -406,7 +406,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return empty_put(decree); } - ::dsn::blob check_key; + dsn::blob check_key; pegasus_generate_key(check_key, update.hash_key, update.check_sort_key); db_get_context get_context; From e0d8a0cf0256f0707da67b155d183d99028bd01a Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 2 Dec 2020 10:41:33 +0800 Subject: [PATCH 05/21] fix --- src/server/pegasus_write_service_impl.h | 7 +++++-- src/server/rocksdb_wrapper.cpp | 1 + src/server/rocksdb_wrapper.h | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 09b2d20540..c02bfe6595 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -97,8 +97,11 @@ class pegasus_write_service::impl : public dsn::replication::replica_base { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; - _rocksdb_wrapper = dsn::make_unique( - server, server->_db, _pegasus_data_version, server->_data_cf_rd_opts); + _rocksdb_wrapper = dsn::make_unique(server, + server->_db, + _pegasus_data_version, + server->_data_cf_rd_opts, + server->_pfc_recent_expire_count); } int empty_put(int64_t decree) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 00269e498c..7eea6c1db2 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -43,6 +43,7 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value); if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) { ctx->expired = true; + _pfc_recent_expire_count->increment(); } return 0; } else if (s.IsNotFound()) { diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index 3e2cf7bc12..5a5aa72660 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -21,6 +21,7 @@ #include #include +#include namespace pegasus { namespace server { @@ -35,7 +36,8 @@ class rocksdb_wrapper : public dsn::replication::replica_base rocksdb_wrapper(pegasus_server_impl *server, rocksdb::DB *db, const uint32_t _pegasus_data_version, - rocksdb::ReadOptions &_rd_opts); + rocksdb::ReadOptions &_rd_opts, + dsn::perf_counter_wrapper &_pfc_recent_expire_count); /// Calls RocksDB Get and store the result into `db_get_context`. /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. @@ -47,6 +49,7 @@ class rocksdb_wrapper : public dsn::replication::replica_base rocksdb::DB *_db; rocksdb::ReadOptions &_rd_opts; const uint32_t _pegasus_data_version; + dsn::perf_counter_wrapper &_pfc_recent_expire_count; }; } // namespace server } // namespace pegasus From b29581ac5c803541b564617dc4820786998227e6 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 2 Dec 2020 11:07:07 +0800 Subject: [PATCH 06/21] fix --- src/server/pegasus_write_service_impl.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index c02bfe6595..256b1db0f7 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -93,7 +93,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base _meta_cf(server->_meta_cf), _rd_opts(server->_data_cf_rd_opts), _default_ttl(0), - _pfc_recent_expire_count(server->_pfc_recent_expire_count) { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; @@ -212,7 +211,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; } else if (get_ctx.expired) { // ttl timeout, set to 0 before increment - _pfc_recent_expire_count->increment(); new_value = update.increment; new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; } else { @@ -815,7 +813,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base rocksdb::WriteOptions _wt_opts; rocksdb::ReadOptions &_rd_opts; volatile uint32_t _default_ttl; - ::dsn::perf_counter_wrapper &_pfc_recent_expire_count; pegasus_value_generator _value_generator; std::unique_ptr _rocksdb_wrapper; From 3a591a8d031f00233c782c8f6b6a4c21c74dd257 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 2 Dec 2020 11:21:36 +0800 Subject: [PATCH 07/21] fix --- src/server/pegasus_write_service_impl.h | 2 +- src/server/rocksdb_wrapper.cpp | 9 +++++++-- src/server/rocksdb_wrapper.h | 6 +++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 256b1db0f7..3dc67f4b08 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -92,7 +92,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base _data_cf(server->_data_cf), _meta_cf(server->_meta_cf), _rd_opts(server->_data_cf_rd_opts), - _default_ttl(0), + _default_ttl(0) { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 7eea6c1db2..ff56f52d5e 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -27,8 +27,13 @@ namespace server { rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server, rocksdb::DB *db, const uint32_t pegasus_data_version, - rocksdb::ReadOptions &rd_opts) - : replica_base(server), _db(db), _rd_opts(rd_opts), _pegasus_data_version(pegasus_data_version) + rocksdb::ReadOptions &rd_opts, + dsn::perf_counter_wrapper &pfc_recent_expire_count) + : replica_base(server), + _db(db), + _rd_opts(rd_opts), + _pegasus_data_version(pegasus_data_version), + _pfc_recent_expire_count(pfc_recent_expire_count) { } diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index 5a5aa72660..d3275d552b 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -35,9 +35,9 @@ class rocksdb_wrapper : public dsn::replication::replica_base public: rocksdb_wrapper(pegasus_server_impl *server, rocksdb::DB *db, - const uint32_t _pegasus_data_version, - rocksdb::ReadOptions &_rd_opts, - dsn::perf_counter_wrapper &_pfc_recent_expire_count); + const uint32_t pegasus_data_version, + rocksdb::ReadOptions &rd_opts, + dsn::perf_counter_wrapper &pfc_recent_expire_count); /// Calls RocksDB Get and store the result into `db_get_context`. /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. From 1167769e4dbb41d0c8f3a76ae0b14489ac1e6129 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 2 Dec 2020 19:42:36 +0800 Subject: [PATCH 08/21] fix --- src/server/pegasus_write_service_impl.h | 86 +++++++++++++++++-------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 3dc67f4b08..c08bf86266 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -92,7 +92,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base _data_cf(server->_data_cf), _meta_cf(server->_meta_cf), _rd_opts(server->_data_cf_rd_opts), - _default_ttl(0) + _default_ttl(0), + _pfc_recent_expire_count(server->_pfc_recent_expire_count) { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; @@ -292,28 +293,43 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return empty_put(decree); } - dsn::blob check_key; + ::dsn::blob check_key; pegasus_generate_key(check_key, update.hash_key, update.check_sort_key); - - db_get_context get_context; - dsn::string_view check_raw_key(check_key.data(), check_key.length()); - int err = _rocksdb_wrapper->get(check_raw_key, &get_context); - if (err != 0) { + rocksdb::Slice check_raw_key(check_key.data(), check_key.length()); + std::string check_raw_value; + rocksdb::Status check_status = _db->Get(_rd_opts, check_raw_key, &check_raw_value); + if (check_status.ok()) { + // read check value succeed + if (check_if_record_expired( + _pegasus_data_version, utils::epoch_now(), check_raw_value)) { + // check value ttl timeout + _pfc_recent_expire_count->increment(); + check_status = rocksdb::Status::NotFound(); + } + } else if (!check_status.IsNotFound()) { // read check value failed - resp.error = err; + derror_rocksdb("GetCheckValue for CheckAndSet", + check_status.ToString(), + "decree: {}, hash_key: {}, check_sort_key: {}", + decree, + utils::c_escape_string(update.hash_key), + utils::c_escape_string(update.check_sort_key)); + resp.error = check_status.code(); return resp.error; } + dassert_f(check_status.ok() || check_status.IsNotFound(), + "status = %s", + check_status.ToString().c_str()); - bool value_exist = !get_context.expired && get_context.found; - dsn::blob check_value; - if (value_exist) { + ::dsn::blob check_value; + if (check_status.ok()) { pegasus_extract_user_data( - _pegasus_data_version, std::move(get_context.raw_value), check_value); + _pegasus_data_version, std::move(check_raw_value), check_value); } if (update.return_check_value) { resp.check_value_returned = true; - if (value_exist) { + if (check_status.ok()) { resp.check_value_exist = true; resp.check_value = check_value; } @@ -323,7 +339,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base bool passed = validate_check(decree, update.check_type, update.check_operand, - value_exist, + check_status.ok(), check_value, invalid_argument); @@ -407,28 +423,43 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return empty_put(decree); } - dsn::blob check_key; + ::dsn::blob check_key; pegasus_generate_key(check_key, update.hash_key, update.check_sort_key); - - db_get_context get_context; - dsn::string_view check_raw_key(check_key.data(), check_key.length()); - int err = _rocksdb_wrapper->get(check_raw_key, &get_context); - if (err != 0) { + rocksdb::Slice check_raw_key(check_key.data(), check_key.length()); + std::string check_raw_value; + rocksdb::Status check_status = _db->Get(_rd_opts, check_raw_key, &check_raw_value); + if (check_status.ok()) { + // read check value succeed + if (check_if_record_expired( + _pegasus_data_version, utils::epoch_now(), check_raw_value)) { + // check value ttl timeout + _pfc_recent_expire_count->increment(); + check_status = rocksdb::Status::NotFound(); + } + } else if (!check_status.IsNotFound()) { // read check value failed - resp.error = err; + derror_rocksdb("GetCheckValue for CheckAndMutate", + check_status.ToString(), + "decree: {}, hash_key: {}, check_sort_key: {}", + decree, + utils::c_escape_string(update.hash_key), + utils::c_escape_string(update.check_sort_key)); + resp.error = check_status.code(); return resp.error; } + dassert_f(check_status.ok() || check_status.IsNotFound(), + "status = %s", + check_status.ToString().c_str()); - bool value_exist = !get_context.expired && get_context.found; - dsn::blob check_value; - if (value_exist) { + ::dsn::blob check_value; + if (check_status.ok()) { pegasus_extract_user_data( - _pegasus_data_version, std::move(get_context.raw_value), check_value); + _pegasus_data_version, std::move(check_raw_value), check_value); } if (update.return_check_value) { resp.check_value_returned = true; - if (value_exist) { + if (check_status.ok()) { resp.check_value_exist = true; resp.check_value = check_value; } @@ -438,7 +469,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base bool passed = validate_check(decree, update.check_type, update.check_operand, - value_exist, + check_status.ok(), check_value, invalid_argument); @@ -813,6 +844,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base rocksdb::WriteOptions _wt_opts; rocksdb::ReadOptions &_rd_opts; volatile uint32_t _default_ttl; + ::dsn::perf_counter_wrapper &_pfc_recent_expire_count; pegasus_value_generator _value_generator; std::unique_ptr _rocksdb_wrapper; From cae55c12ae651a3a42d51b3ff28f1fa456c180de Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 2 Dec 2020 20:14:34 +0800 Subject: [PATCH 09/21] fix --- src/server/rocksdb_wrapper.cpp | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index ff56f52d5e..c1c75b0ee4 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -39,7 +39,36 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server, int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) { - FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; }); + // An example for str: err=ERR_UNKNOWN,found=false,expired=false,expire_ts=1234 + FAIL_POINT_INJECT_F("db_get", [ctx](dsn::string_view str) -> int { + std::vector args; + dsn::utils::split_args(str.data(), args, ','); + assert(args.size() == 4); + + int res = 0; + for (const auto &arg : args) { + std::vector sub_args; + dsn::utils::split_args(arg.c_str(), sub_args, '='); + assert(args.size() == 2); + + bool convert_res = true; + if (sub_args[0] == "err") { + res = dsn::error_code::try_get(sub_args[1], dsn::ERR_UNKNOWN); + } else if (sub_args[0] == "found") { + convert_res = dsn::buf2bool(sub_args[1], ctx->found); + } else if (sub_args[0] == "expired") { + convert_res = dsn::buf2bool(sub_args[1], ctx->expired); + } else if (sub_args[0] == "expire_ts") { + uint64_t expire_ts; + convert_res = dsn::buf2uint64(sub_args[1], expire_ts); + ctx->expire_ts = expire_ts; + } + if (!convert_res) { + dassert_f(false, "wrong format with {}", arg); + } + } + return res; + }); rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); if (dsn_likely(s.ok())) { From ff4a806475cf68251136a618be34cf2f60f7e111 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 10:57:36 +0800 Subject: [PATCH 10/21] fix --- src/server/rocksdb_wrapper.cpp | 31 +------------------ .../test/pegasus_write_service_impl_test.cpp | 20 ++++++++++++ 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index c1c75b0ee4..ff56f52d5e 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -39,36 +39,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server, int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) { - // An example for str: err=ERR_UNKNOWN,found=false,expired=false,expire_ts=1234 - FAIL_POINT_INJECT_F("db_get", [ctx](dsn::string_view str) -> int { - std::vector args; - dsn::utils::split_args(str.data(), args, ','); - assert(args.size() == 4); - - int res = 0; - for (const auto &arg : args) { - std::vector sub_args; - dsn::utils::split_args(arg.c_str(), sub_args, '='); - assert(args.size() == 2); - - bool convert_res = true; - if (sub_args[0] == "err") { - res = dsn::error_code::try_get(sub_args[1], dsn::ERR_UNKNOWN); - } else if (sub_args[0] == "found") { - convert_res = dsn::buf2bool(sub_args[1], ctx->found); - } else if (sub_args[0] == "expired") { - convert_res = dsn::buf2bool(sub_args[1], ctx->expired); - } else if (sub_args[0] == "expire_ts") { - uint64_t expire_ts; - convert_res = dsn::buf2uint64(sub_args[1], expire_ts); - ctx->expire_ts = expire_ts; - } - if (!convert_res) { - dassert_f(false, "wrong format with {}", arg); - } - } - return res; - }); + FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; }); rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); if (dsn_likely(s.ok())) { diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp index 35557bf5b0..9a83c11450 100644 --- a/src/server/test/pegasus_write_service_impl_test.cpp +++ b/src/server/test/pegasus_write_service_impl_test.cpp @@ -260,5 +260,25 @@ TEST_F(incr_test, fail_on_put) dsn::fail::teardown(); } +TEST_F(incr_test, incr_on_expire_record) +{ + // make the key expired + req.expire_ts_seconds = 1; + _write_impl->incr(0, req, resp); + + // check whether the key is expired + db_get_context get_ctx; + db_get(req.key, &get_ctx); + ASSERT_TRUE(get_ctx.expired); + + // incr the expired key + req.increment = 100; + req.expire_ts_seconds = 0; + _write_impl->incr(0, req, resp); + ASSERT_EQ(resp.new_value, 100); + + db_get(req.key, &get_ctx); + ASSERT_TRUE(get_ctx.found); +} } // namespace server } // namespace pegasus From 05cbc0c5d98a70398415f157b1539c620d75217c Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 11:05:17 +0800 Subject: [PATCH 11/21] fix --- src/server/pegasus_write_service_impl.h | 36 ++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index c08bf86266..b0d06311b1 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -606,7 +606,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base !raw_key.empty()) { // not an empty write db_get_context get_ctx; - int err = _rocksdb_wrapper->get(raw_key, &get_ctx); + int err = db_get(raw_key, &get_ctx); if (dsn_unlikely(err != 0)) { return err; } @@ -685,6 +685,40 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return status.code(); } + /// Calls RocksDB Get and store the result into `db_get_context`. + /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. + /// \result ctx.expired=true if record expired. Still 0 is returned. + /// \result ctx.found=false if record is not found. Still 0 is returned. + int db_get(dsn::string_view raw_key, + /*out*/ db_get_context *ctx) + { + FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; }); + + rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); + if (dsn_likely(s.ok())) { + // success + ctx->found = true; + ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value); + if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) { + ctx->expired = true; + } + return 0; + } + if (s.IsNotFound()) { + // NotFound is an acceptable error + ctx->found = false; + return 0; + } + ::dsn::blob hash_key, sort_key; + pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); + derror_rocksdb("Get", + s.ToString(), + "hash_key: {}, sort_key: {}", + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key)); + return s.code(); + } + void clear_up_batch_states(int64_t decree, int err) { if (!_update_responses.empty()) { From ca4ff691e0757139d5974fcf9c957d19fc968d09 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 12:14:17 +0800 Subject: [PATCH 12/21] fix --- src/server/pegasus_server_write.h | 1 + src/server/pegasus_write_service.h | 1 + src/server/pegasus_write_service_impl.h | 1 + src/server/test/rocksdb_wrapper_test.cpp | 85 ++++++++++++++++++++++++ 4 files changed, 88 insertions(+) create mode 100644 src/server/test/rocksdb_wrapper_test.cpp diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index a2d0043454..33386cfd88 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -74,6 +74,7 @@ class pegasus_server_write : public dsn::replication::replica_base friend class pegasus_server_write_test; friend class pegasus_write_service_test; friend class pegasus_write_service_impl_test; + friend class rocksdb_wrapper_test; std::unique_ptr _write_svc; std::vector _put_rpc_batch; diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index cb26c6f6a3..6852a4cfdb 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -180,6 +180,7 @@ class pegasus_write_service : dsn::replication::replica_base friend class pegasus_write_service_test; friend class pegasus_write_service_impl_test; friend class pegasus_server_write_test; + friend class rocksdb_wrapper_test; pegasus_server_impl *_server; diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index b0d06311b1..cf91729ac4 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -865,6 +865,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base friend class pegasus_write_service_test; friend class pegasus_server_write_test; friend class pegasus_write_service_impl_test; + friend class rocksdb_wrapper_test; FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag); FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0); diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp new file mode 100644 index 0000000000..80afdd361b --- /dev/null +++ b/src/server/test/rocksdb_wrapper_test.cpp @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "server/pegasus_server_write.h" +#include "server/pegasus_write_service_impl.h" +#include "pegasus_server_test_base.h" + +namespace pegasus { +namespace server { +class rocksdb_wrapper_test : public pegasus_server_test_base +{ +protected: + std::unique_ptr _server_write; + pegasus_write_service::impl *_write_impl{nullptr}; + rocksdb_wrapper *_rocksdb_wrapper{nullptr}; + dsn::blob _raw_key; + +public: + void SetUp() override + { + start(); + _server_write = dsn::make_unique(_server.get(), true); + _write_impl = _server_write->_write_svc->_impl.get(); + _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get(); + + pegasus::pegasus_generate_key( + _raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key")); + } + + void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t expire_ts_seconds) + { + dsn::apps::update_request put; + put.key = raw_key; + put.value = user_value; + put.expire_ts_seconds = expire_ts_seconds; + db_write_context write_ctx; + dsn::apps::update_response put_resp; + _write_impl->batch_put(write_ctx, put, put_resp); + ASSERT_EQ(_write_impl->batch_commit(0), 0); + } +}; + +TEST_F(rocksdb_wrapper_test, get) +{ + // not found + db_get_context get_ctx1; + _rocksdb_wrapper->get(_raw_key, &get_ctx1); + ASSERT_FALSE(get_ctx1.found); + + // expired + int32_t expired_ts = 123; + db_get_context get_ctx2; + single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts); + _rocksdb_wrapper->get(_raw_key, &get_ctx2); + ASSERT_TRUE(get_ctx2.found); + ASSERT_TRUE(get_ctx2.expired); + ASSERT_EQ(get_ctx2.expire_ts, expired_ts); + + // found + expired_ts = INT32_MAX; + db_get_context get_ctx3; + single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts); + _rocksdb_wrapper->get(_raw_key, &get_ctx3); + ASSERT_TRUE(get_ctx2.found); + ASSERT_FALSE(get_ctx3.expired); + ASSERT_EQ(get_ctx3.expire_ts, expired_ts); +} +} // namespace server +} // namespace pegasus From 848b926c7d9b86df1d028cfd9533ae0253390127 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 12:33:20 +0800 Subject: [PATCH 13/21] f --- src/server/rocksdb_wrapper.cpp | 2 ++ src/server/rocksdb_wrapper.h | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index ff56f52d5e..050883b153 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -21,6 +21,8 @@ #include "pegasus_write_service_impl.h" +#include + namespace pegasus { namespace server { diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index d3275d552b..0fd1ce7ef9 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -20,8 +20,15 @@ #pragma once #include -#include -#include + +namespace rocksdb { +class DB; +class ReadOptions; +} // namespace rocksdb + +namespace dsn { +class perf_counter_wrapper; +} namespace pegasus { namespace server { From 29869ade2533170755fb4fc2b5f4c4b115f516fd Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 12:33:54 +0800 Subject: [PATCH 14/21] fix --- src/server/rocksdb_wrapper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index 0fd1ce7ef9..a0c84c181e 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -28,7 +28,7 @@ class ReadOptions; namespace dsn { class perf_counter_wrapper; -} +} // namespace dsn namespace pegasus { namespace server { From 1a69dbbbddbf0aa1be548bdcc3d1c4aaeb78dafd Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 12:35:12 +0800 Subject: [PATCH 15/21] fix --- src/server/rocksdb_wrapper.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 050883b153..da16771d62 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -19,9 +19,8 @@ #include "rocksdb_wrapper.h" -#include "pegasus_write_service_impl.h" - #include +#include "pegasus_write_service_impl.h" namespace pegasus { namespace server { From 1d793f235a4efd0828f839623eafeb4acfaf9fae Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 14:28:05 +0800 Subject: [PATCH 16/21] fix --- src/server/test/rocksdb_wrapper_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp index 80afdd361b..6931db5a9a 100644 --- a/src/server/test/rocksdb_wrapper_test.cpp +++ b/src/server/test/rocksdb_wrapper_test.cpp @@ -64,7 +64,7 @@ TEST_F(rocksdb_wrapper_test, get) ASSERT_FALSE(get_ctx1.found); // expired - int32_t expired_ts = 123; + int32_t expired_ts = utils::epoch_now(); db_get_context get_ctx2; single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts); _rocksdb_wrapper->get(_raw_key, &get_ctx2); From c102b86127dc11abcda461642ef4f5512da2dc4d Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 17:12:52 +0800 Subject: [PATCH 17/21] fix --- src/server/pegasus_server_impl.h | 1 + src/server/pegasus_write_service_impl.h | 6 +----- src/server/rocksdb_wrapper.cpp | 12 ++++-------- src/server/rocksdb_wrapper.h | 6 +----- 4 files changed, 7 insertions(+), 18 deletions(-) diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index fa1c849b40..cd70aff913 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -188,6 +188,7 @@ class pegasus_server_impl : public pegasus_read_service friend class pegasus_manual_compact_service; friend class pegasus_write_service; + friend class rocksdb_wrapper; // parse checkpoint directories in the data dir // checkpoint directory format is: "checkpoint.{decree}" diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index cf91729ac4..7ac3f20645 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -97,11 +97,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; - _rocksdb_wrapper = dsn::make_unique(server, - server->_db, - _pegasus_data_version, - server->_data_cf_rd_opts, - server->_pfc_recent_expire_count); + _rocksdb_wrapper = dsn::make_unique(server, _pegasus_data_version); } int empty_put(int64_t decree) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index da16771d62..8a8c739f38 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -25,16 +25,12 @@ namespace pegasus { namespace server { -rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server, - rocksdb::DB *db, - const uint32_t pegasus_data_version, - rocksdb::ReadOptions &rd_opts, - dsn::perf_counter_wrapper &pfc_recent_expire_count) +rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server, const uint32_t pegasus_data_version) : replica_base(server), - _db(db), - _rd_opts(rd_opts), + _db(server->_db), + _rd_opts(server->_data_cf_rd_opts), _pegasus_data_version(pegasus_data_version), - _pfc_recent_expire_count(pfc_recent_expire_count) + _pfc_recent_expire_count(server->_pfc_recent_expire_count) { } diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index a0c84c181e..bdb1c2e808 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -40,11 +40,7 @@ static constexpr int FAIL_DB_GET = -104; class rocksdb_wrapper : public dsn::replication::replica_base { public: - rocksdb_wrapper(pegasus_server_impl *server, - rocksdb::DB *db, - const uint32_t pegasus_data_version, - rocksdb::ReadOptions &rd_opts, - dsn::perf_counter_wrapper &pfc_recent_expire_count); + rocksdb_wrapper(pegasus_server_impl *server, const uint32_t pegasus_data_version); /// Calls RocksDB Get and store the result into `db_get_context`. /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. From b3671f820f6ab16a9eae1c9f06cbaeb5beb83307 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 3 Dec 2020 17:22:12 +0800 Subject: [PATCH 18/21] fix --- src/server/pegasus_write_service_impl.h | 2 +- src/server/rocksdb_wrapper.cpp | 4 ++-- src/server/rocksdb_wrapper.h | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 7ac3f20645..fc0abe53f4 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -97,7 +97,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; - _rocksdb_wrapper = dsn::make_unique(server, _pegasus_data_version); + _rocksdb_wrapper = dsn::make_unique(server); } int empty_put(int64_t decree) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 8a8c739f38..7ea422803d 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -25,11 +25,11 @@ namespace pegasus { namespace server { -rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server, const uint32_t pegasus_data_version) +rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server) : replica_base(server), _db(server->_db), _rd_opts(server->_data_cf_rd_opts), - _pegasus_data_version(pegasus_data_version), + _pegasus_data_version(server->_pegasus_data_version), _pfc_recent_expire_count(server->_pfc_recent_expire_count) { } diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index bdb1c2e808..42d2e006f1 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -40,7 +40,7 @@ static constexpr int FAIL_DB_GET = -104; class rocksdb_wrapper : public dsn::replication::replica_base { public: - rocksdb_wrapper(pegasus_server_impl *server, const uint32_t pegasus_data_version); + rocksdb_wrapper(pegasus_server_impl *server); /// Calls RocksDB Get and store the result into `db_get_context`. /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. From e7d9eaf0de4a436a1a3286e62aaa1073c3399a74 Mon Sep 17 00:00:00 2001 From: levy Date: Mon, 7 Dec 2020 10:26:25 +0800 Subject: [PATCH 19/21] fix --- rdsn | 2 +- src/server/rocksdb_wrapper.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rdsn b/rdsn index 65edf61e37..d3a07c09fc 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 65edf61e376b692f41845ab3deb71cb686ea5575 +Subproject commit d3a07c09fcdc444559174522454ef6b90887066b diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 7ea422803d..bd8f3c932d 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -47,11 +47,11 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) ctx->expired = true; _pfc_recent_expire_count->increment(); } - return 0; + return rocksdb::Status::kOk; } else if (s.IsNotFound()) { // NotFound is an acceptable error ctx->found = false; - return 0; + return rocksdb::Status::kOk; } dsn::blob hash_key, sort_key; From cb0708b7c9bda03087bd9b0eb1177c2ec55b2075 Mon Sep 17 00:00:00 2001 From: levy Date: Mon, 7 Dec 2020 14:36:29 +0800 Subject: [PATCH 20/21] fix --- rdsn | 2 +- src/server/pegasus_write_service_impl.h | 1 + src/server/rocksdb_wrapper.cpp | 2 ++ src/server/rocksdb_wrapper.h | 2 -- src/server/test/pegasus_write_service_impl_test.cpp | 1 + 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rdsn b/rdsn index d3a07c09fc..4a3a2c3f28 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit d3a07c09fcdc444559174522454ef6b90887066b +Subproject commit 4a3a2c3f28fc74a83b67b9af48f9b701ff80db66 diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index fc0abe53f4..d9bdb75c7a 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -39,6 +39,7 @@ namespace server { static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101; static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102; static constexpr int FAIL_DB_WRITE = -103; +extern const int FAIL_DB_GET; struct db_get_context { diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index bd8f3c932d..e7e8bb5f32 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -25,6 +25,8 @@ namespace pegasus { namespace server { +const int FAIL_DB_GET = -104; + rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server) : replica_base(server), _db(server->_db), diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index 42d2e006f1..07f49a4db2 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -35,8 +35,6 @@ namespace server { struct db_get_context; class pegasus_server_impl; -static constexpr int FAIL_DB_GET = -104; - class rocksdb_wrapper : public dsn::replication::replica_base { public: diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp index 9a83c11450..4ba778aa64 100644 --- a/src/server/test/pegasus_write_service_impl_test.cpp +++ b/src/server/test/pegasus_write_service_impl_test.cpp @@ -26,6 +26,7 @@ namespace pegasus { namespace server { +extern const int FAIL_DB_GET; class pegasus_write_service_impl_test : public pegasus_server_test_base { From 49956581dd4c889aee0535c26b4383c85d547f10 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 8 Dec 2020 10:06:09 +0800 Subject: [PATCH 21/21] fix --- rdsn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rdsn b/rdsn index 4a3a2c3f28..65edf61e37 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 4a3a2c3f28fc74a83b67b9af48f9b701ff80db66 +Subproject commit 65edf61e376b692f41845ab3deb71cb686ea5575