From c212da634c177c7cfd3ee64bea4c2f1658c05342 Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Thu, 10 Dec 2020 08:45:10 +0800 Subject: [PATCH] refactor: use rocksdb_wrapper::get to reimplement incr (#651) --- src/server/pegasus_server_impl.h | 1 + src/server/pegasus_server_write.h | 1 + src/server/pegasus_write_service.h | 1 + src/server/pegasus_write_service_impl.h | 10 ++- src/server/rocksdb_wrapper.cpp | 70 +++++++++++++++ src/server/rocksdb_wrapper.h | 56 ++++++++++++ src/server/test/CMakeLists.txt | 1 + .../test/pegasus_write_service_impl_test.cpp | 25 +++++- src/server/test/rocksdb_wrapper_test.cpp | 85 +++++++++++++++++++ 9 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 src/server/rocksdb_wrapper.cpp create mode 100644 src/server/rocksdb_wrapper.h create mode 100644 src/server/test/rocksdb_wrapper_test.cpp diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 5226212c3d..6f58ee501e 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -189,6 +189,7 @@ class pegasus_server_impl : public pegasus_read_service friend class pegasus_manual_compact_service; friend class pegasus_write_service; + friend class rocksdb_wrapper; // parse checkpoint directories in the data dir // checkpoint directory format is: "checkpoint.{decree}" diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index a2d0043454..33386cfd88 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -74,6 +74,7 @@ class pegasus_server_write : public dsn::replication::replica_base friend class pegasus_server_write_test; friend class pegasus_write_service_test; friend class pegasus_write_service_impl_test; + friend class rocksdb_wrapper_test; std::unique_ptr _write_svc; std::vector _put_rpc_batch; diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index cb26c6f6a3..6852a4cfdb 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -180,6 +180,7 @@ class pegasus_write_service : dsn::replication::replica_base friend class pegasus_write_service_test; friend class pegasus_write_service_impl_test; friend class pegasus_server_write_test; + friend class rocksdb_wrapper_test; pegasus_server_impl *_server; diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index baa0507fb9..d9bdb75c7a 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -25,6 +25,7 @@ #include "base/pegasus_key_schema.h" #include "meta_store.h" +#include "rocksdb_wrapper.h" #include #include @@ -38,7 +39,7 @@ namespace server { static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101; static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102; static constexpr int FAIL_DB_WRITE = -103; -static constexpr int FAIL_DB_GET = -104; +extern const int FAIL_DB_GET; struct db_get_context { @@ -97,6 +98,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; + _rocksdb_wrapper = dsn::make_unique(server); } int empty_put(int64_t decree) @@ -196,7 +198,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base int64_t new_value = 0; uint32_t new_expire_ts = 0; db_get_context get_ctx; - int err = db_get(raw_key, &get_ctx); + int err = _rocksdb_wrapper->get(raw_key, &get_ctx); if (err != 0) { resp.error = err; return err; @@ -207,7 +209,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; } else if (get_ctx.expired) { // ttl timeout, set to 0 before increment - _pfc_recent_expire_count->increment(); new_value = update.increment; new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0; } else { @@ -861,6 +862,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base friend class pegasus_write_service_test; friend class pegasus_server_write_test; friend class pegasus_write_service_impl_test; + friend class rocksdb_wrapper_test; FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag); FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0); @@ -877,6 +879,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base ::dsn::perf_counter_wrapper &_pfc_recent_expire_count; pegasus_value_generator _value_generator; + std::unique_ptr _rocksdb_wrapper; + // for setting update_response.error after committed. std::vector _update_responses; }; diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp new file mode 100644 index 0000000000..e7e8bb5f32 --- /dev/null +++ b/src/server/rocksdb_wrapper.cpp @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "rocksdb_wrapper.h" + +#include +#include "pegasus_write_service_impl.h" + +namespace pegasus { +namespace server { + +const int FAIL_DB_GET = -104; + +rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server) + : replica_base(server), + _db(server->_db), + _rd_opts(server->_data_cf_rd_opts), + _pegasus_data_version(server->_pegasus_data_version), + _pfc_recent_expire_count(server->_pfc_recent_expire_count) +{ +} + +int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) +{ + FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; }); + + rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); + if (dsn_likely(s.ok())) { + // success + ctx->found = true; + ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value); + if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) { + ctx->expired = true; + _pfc_recent_expire_count->increment(); + } + return rocksdb::Status::kOk; + } else if (s.IsNotFound()) { + // NotFound is an acceptable error + ctx->found = false; + return rocksdb::Status::kOk; + } + + dsn::blob hash_key, sort_key; + pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); + derror_rocksdb("Get", + s.ToString(), + "hash_key: {}, sort_key: {}", + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key)); + return s.code(); +} + +} // namespace server +} // namespace pegasus diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h new file mode 100644 index 0000000000..07f49a4db2 --- /dev/null +++ b/src/server/rocksdb_wrapper.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +namespace rocksdb { +class DB; +class ReadOptions; +} // namespace rocksdb + +namespace dsn { +class perf_counter_wrapper; +} // namespace dsn + +namespace pegasus { +namespace server { +struct db_get_context; +class pegasus_server_impl; + +class rocksdb_wrapper : public dsn::replication::replica_base +{ +public: + rocksdb_wrapper(pegasus_server_impl *server); + + /// Calls RocksDB Get and store the result into `db_get_context`. + /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned. + /// \result ctx.expired=true if record expired. Still 0 is returned. + /// \result ctx.found=false if record is not found. Still 0 is returned. + int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx); + +private: + rocksdb::DB *_db; + rocksdb::ReadOptions &_rd_opts; + const uint32_t _pegasus_data_version; + dsn::perf_counter_wrapper &_pfc_recent_expire_count; +}; +} // namespace server +} // namespace pegasus diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 08834451a3..80dc4d6403 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -28,6 +28,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp" "../hotspot_partition_calculator.cpp" "../meta_store.cpp" "../hotkey_collector.cpp" + "../rocksdb_wrapper.cpp" ) set(MY_SRC_SEARCH_MODE "GLOB") diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp index 48f9fa8087..4ba778aa64 100644 --- a/src/server/test/pegasus_write_service_impl_test.cpp +++ b/src/server/test/pegasus_write_service_impl_test.cpp @@ -26,12 +26,14 @@ namespace pegasus { namespace server { +extern const int FAIL_DB_GET; class pegasus_write_service_impl_test : public pegasus_server_test_base { protected: std::unique_ptr _server_write; pegasus_write_service::impl *_write_impl{nullptr}; + rocksdb_wrapper *_rocksdb_wrapper{nullptr}; public: void SetUp() override @@ -39,6 +41,7 @@ class pegasus_write_service_impl_test : public pegasus_server_test_base start(); _server_write = dsn::make_unique(_server.get(), true); _write_impl = _server_write->_write_svc->_impl.get(); + _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get(); } uint64_t read_timestamp_from(dsn::string_view raw_key) @@ -70,7 +73,7 @@ class pegasus_write_service_impl_test : public pegasus_server_test_base int db_get(dsn::string_view raw_key, db_get_context *get_ctx) { - return _write_impl->db_get(raw_key, get_ctx); + return _rocksdb_wrapper->get(raw_key, get_ctx); } void single_set(dsn::blob raw_key, dsn::blob user_value) @@ -258,5 +261,25 @@ TEST_F(incr_test, fail_on_put) dsn::fail::teardown(); } +TEST_F(incr_test, incr_on_expire_record) +{ + // make the key expired + req.expire_ts_seconds = 1; + _write_impl->incr(0, req, resp); + + // check whether the key is expired + db_get_context get_ctx; + db_get(req.key, &get_ctx); + ASSERT_TRUE(get_ctx.expired); + + // incr the expired key + req.increment = 100; + req.expire_ts_seconds = 0; + _write_impl->incr(0, req, resp); + ASSERT_EQ(resp.new_value, 100); + + db_get(req.key, &get_ctx); + ASSERT_TRUE(get_ctx.found); +} } // namespace server } // namespace pegasus diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp new file mode 100644 index 0000000000..6931db5a9a --- /dev/null +++ b/src/server/test/rocksdb_wrapper_test.cpp @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "server/pegasus_server_write.h" +#include "server/pegasus_write_service_impl.h" +#include "pegasus_server_test_base.h" + +namespace pegasus { +namespace server { +class rocksdb_wrapper_test : public pegasus_server_test_base +{ +protected: + std::unique_ptr _server_write; + pegasus_write_service::impl *_write_impl{nullptr}; + rocksdb_wrapper *_rocksdb_wrapper{nullptr}; + dsn::blob _raw_key; + +public: + void SetUp() override + { + start(); + _server_write = dsn::make_unique(_server.get(), true); + _write_impl = _server_write->_write_svc->_impl.get(); + _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get(); + + pegasus::pegasus_generate_key( + _raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key")); + } + + void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t expire_ts_seconds) + { + dsn::apps::update_request put; + put.key = raw_key; + put.value = user_value; + put.expire_ts_seconds = expire_ts_seconds; + db_write_context write_ctx; + dsn::apps::update_response put_resp; + _write_impl->batch_put(write_ctx, put, put_resp); + ASSERT_EQ(_write_impl->batch_commit(0), 0); + } +}; + +TEST_F(rocksdb_wrapper_test, get) +{ + // not found + db_get_context get_ctx1; + _rocksdb_wrapper->get(_raw_key, &get_ctx1); + ASSERT_FALSE(get_ctx1.found); + + // expired + int32_t expired_ts = utils::epoch_now(); + db_get_context get_ctx2; + single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts); + _rocksdb_wrapper->get(_raw_key, &get_ctx2); + ASSERT_TRUE(get_ctx2.found); + ASSERT_TRUE(get_ctx2.expired); + ASSERT_EQ(get_ctx2.expire_ts, expired_ts); + + // found + expired_ts = INT32_MAX; + db_get_context get_ctx3; + single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts); + _rocksdb_wrapper->get(_raw_key, &get_ctx3); + ASSERT_TRUE(get_ctx2.found); + ASSERT_FALSE(get_ctx3.expired); + ASSERT_EQ(get_ctx3.expire_ts, expired_ts); +} +} // namespace server +} // namespace pegasus