Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use rocksdb_wrapper to reimplement batch operations #678

Merged
merged 10 commits into from
Jan 11, 2021
186 changes: 6 additions & 180 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "meta_store.h"
#include "rocksdb_wrapper.h"

#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
#include <gtest/gtest_prod.h>
Expand All @@ -40,7 +39,7 @@ namespace server {
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
extern const int FAIL_DB_GET;
static constexpr int FAIL_DB_GET = -104;

struct db_get_context
{
Expand Down Expand Up @@ -90,15 +89,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
: replica_base(server),
_primary_address(server->_primary_address),
_pegasus_data_version(server->_pegasus_data_version),
_db(server->_db),
_data_cf(server->_data_cf),
_meta_cf(server->_meta_cf),
_rd_opts(server->_data_cf_rd_opts),
_default_ttl(0),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
_rocksdb_wrapper = dsn::make_unique<rocksdb_wrapper>(server);
}

Expand Down Expand Up @@ -515,179 +507,31 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
resp.error = db_write_batch_put_ctx(
resp.error = _rocksdb_wrapper->write_batch_put_ctx(
ctx, update.key, update.value, static_cast<uint32_t>(update.expire_ts_seconds));
_update_responses.emplace_back(&resp);
return resp.error;
}

int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp)
{
resp.error = db_write_batch_delete(decree, key);
resp.error = _rocksdb_wrapper->write_batch_delete(decree, key);
_update_responses.emplace_back(&resp);
return resp.error;
}

int batch_commit(int64_t decree)
{
int err = db_write(decree);
int err = _rocksdb_wrapper->write(decree);
clear_up_batch_states(decree, err);
return err;
}

void batch_abort(int64_t decree, int err) { clear_up_batch_states(decree, err); }

void set_default_ttl(uint32_t ttl)
{
// TODO(zlw): remove these lines after the refactor is done
if (_default_ttl != ttl) {
_default_ttl = ttl;
ddebug_replica("update _default_ttl to {}.", ttl);
}

_rocksdb_wrapper->set_default_ttl(ttl);
}
void set_default_ttl(uint32_t ttl) { _rocksdb_wrapper->set_default_ttl(ttl); }

private:
int db_write_batch_put(int64_t decree,
dsn::string_view raw_key,
dsn::string_view value,
uint32_t expire_sec)
{
return db_write_batch_put_ctx(db_write_context::empty(decree), raw_key, value, expire_sec);
}

int db_write_batch_put_ctx(const db_write_context &ctx,
dsn::string_view raw_key,
dsn::string_view value,
uint32_t expire_sec)
{
FAIL_POINT_INJECT_F("db_write_batch_put",
[](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; });

uint64_t new_timetag = ctx.remote_timetag;
if (!ctx.is_duplicated_write()) { // local write
new_timetag = generate_timetag(ctx.timestamp, get_cluster_id_if_exists(), false);
}

if (ctx.verify_timetag && // needs read-before-write
_pegasus_data_version >= 1 && // data version 0 doesn't support timetag.
!raw_key.empty()) { // not an empty write

db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
if (dsn_unlikely(err != 0)) {
return err;
}
// if record exists and is not expired.
if (get_ctx.found && !get_ctx.expired) {
uint64_t local_timetag =
pegasus_extract_timetag(_pegasus_data_version, get_ctx.raw_value);

if (local_timetag >= new_timetag) {
// ignore this stale update with lower timetag,
// and write an empty record instead
raw_key = value = dsn::string_view();
}
}
}

rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue = _value_generator.generate_value(
_pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
rocksdb::Status s = _batch.Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("WriteBatchPut",
s.ToString(),
"decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}",
ctx.decree,
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key),
expire_sec);
}
return s.code();
}

int db_write_batch_delete(int64_t decree, dsn::string_view raw_key)
{
FAIL_POINT_INJECT_F("db_write_batch_delete",
[](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; });

rocksdb::Status s = _batch.Delete(utils::to_rocksdb_slice(raw_key));
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("WriteBatchDelete",
s.ToString(),
"decree: {}, hash_key: {}, sort_key: {}",
decree,
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
}
return s.code();
}

// Apply the write batch into rocksdb.
int db_write(int64_t decree)
{
dassert(_batch.Count() != 0, "");

FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });

rocksdb::Status status =
_batch.Put(_meta_cf, meta_store::LAST_FLUSHED_DECREE, std::to_string(decree));
if (dsn_unlikely(!status.ok())) {
derror_rocksdb("Write",
status.ToString(),
"put decree of meta cf into batch error, decree: {}",
decree);
return status.code();
}

status = _db->Write(_wt_opts, &_batch);
if (dsn_unlikely(!status.ok())) {
derror_rocksdb("Write", status.ToString(), "write rocksdb error, decree: {}", decree);
}
return status.code();
}

/// Calls RocksDB Get and store the result into `db_get_context`.
/// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
/// \result ctx.expired=true if record expired. Still 0 is returned.
/// \result ctx.found=false if record is not found. Still 0 is returned.
int db_get(dsn::string_view raw_key,
/*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; });

rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value));
if (dsn_likely(s.ok())) {
// success
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
}
return 0;
}
if (s.IsNotFound()) {
// NotFound is an acceptable error
ctx->found = false;
return 0;
}
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("Get",
s.ToString(),
"hash_key: {}, sort_key: {}",
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
return s.code();
}

void clear_up_batch_states(int64_t decree, int err)
{
if (!_update_responses.empty()) {
Expand All @@ -703,7 +547,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
_update_responses.clear();
}

_batch.Clear();
_rocksdb_wrapper->clear_up_write_batch();
}

static dsn::blob composite_raw_key(dsn::string_view hash_key, dsn::string_view sort_key)
Expand Down Expand Up @@ -820,16 +664,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return false;
}

uint32_t db_expire_ts(uint32_t expire_ts)
{
// use '_default_ttl' when ttl is not set for this write operation.
if (_default_ttl != 0 && expire_ts == 0) {
return utils::epoch_now() + _default_ttl;
}

return expire_ts;
}

private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
Expand All @@ -841,15 +675,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
const std::string _primary_address;
const uint32_t _pegasus_data_version;

rocksdb::WriteBatch _batch;
rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
rocksdb::WriteOptions _wt_opts;
rocksdb::ReadOptions &_rd_opts;
volatile uint32_t _default_ttl;
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
pegasus_value_generator _value_generator;

std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;

Expand Down
3 changes: 1 addition & 2 deletions src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

#include "rocksdb_wrapper.h"

#include <dsn/utility/fail_point.h>
#include <rocksdb/db.h>
#include "pegasus_write_service_impl.h"
#include "base/pegasus_value_schema.h"

namespace pegasus {
namespace server {

const int FAIL_DB_GET = -104;

rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
Expand Down
2 changes: 2 additions & 0 deletions src/server/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class rocksdb_wrapper : public dsn::replication::replica_base
volatile uint32_t _default_ttl;

friend class rocksdb_wrapper_test;
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
FRIEND_TEST(rocksdb_wrapper_test, put_verify_timetag);
FRIEND_TEST(rocksdb_wrapper_test, verify_timetag_compatible_with_version_0);
FRIEND_TEST(rocksdb_wrapper_test, get);
Expand Down
3 changes: 2 additions & 1 deletion src/server/test/pegasus_server_write_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class pegasus_server_write_test : public pegasus_server_test_base
ASSERT_TRUE(_server_write->_write_svc->_batch_qps_perfcounters.empty());
ASSERT_TRUE(_server_write->_write_svc->_batch_latency_perfcounters.empty());
ASSERT_EQ(_server_write->_write_svc->_batch_start_time, 0);
ASSERT_EQ(_server_write->_write_svc->_impl->_batch.Count(), 0);
ASSERT_EQ(_server_write->_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(),
0);
ASSERT_EQ(_server_write->_write_svc->_impl->_update_responses.size(), 0);

ASSERT_EQ(put_rpc::mail_box().size(), put_rpc_cnt);
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/pegasus_write_service_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
* under the License.
*/

#include <dsn/utility/fail_point.h>
#include "pegasus_server_test_base.h"
#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service_impl.h"
#include "message_utils.h"

namespace pegasus {
namespace server {
extern const int FAIL_DB_GET;

class pegasus_write_service_impl_test : public pegasus_server_test_base
{
Expand Down
3 changes: 2 additions & 1 deletion src/server/test/pegasus_write_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/

#include <dsn/utility/fail_point.h>
#include "base/pegasus_key_schema.h"
#include "pegasus_server_test_base.h"
#include "server/pegasus_server_write.h"
Expand Down Expand Up @@ -195,7 +196,7 @@ class pegasus_write_service_test : public pegasus_server_test_base
ASSERT_EQ(response.partition_index, _gpid.get_partition_index());
ASSERT_EQ(response.decree, decree);
ASSERT_EQ(response.server, _write_svc->_impl->_primary_address);
ASSERT_EQ(_write_svc->_impl->_batch.Count(), 0);
ASSERT_EQ(_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(), 0);
ASSERT_EQ(_write_svc->_impl->_update_responses.size(), 0);
}
};
Expand Down