Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use rocksdb_wrapper::get to reimplement incr #651

Merged
merged 25 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class pegasus_server_impl : public pegasus_read_service

friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
friend class rocksdb_wrapper;

// parse checkpoint directories in the data dir
// checkpoint directory format is: "checkpoint.{decree}"
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class pegasus_server_write : public dsn::replication::replica_base
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class rocksdb_wrapper_test;

std::unique_ptr<pegasus_write_service> _write_svc;
std::vector<put_rpc> _put_rpc_batch;
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class pegasus_write_service : dsn::replication::replica_base
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class pegasus_server_write_test;
friend class rocksdb_wrapper_test;

pegasus_server_impl *_server;

Expand Down
10 changes: 7 additions & 3 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "base/pegasus_key_schema.h"
#include "meta_store.h"
#include "rocksdb_wrapper.h"

#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
Expand All @@ -38,7 +39,7 @@ namespace server {
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
static constexpr int FAIL_DB_GET = -104;
extern const int FAIL_DB_GET;

struct db_get_context
{
Expand Down Expand Up @@ -97,6 +98,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
_rocksdb_wrapper = dsn::make_unique<rocksdb_wrapper>(server);
}

int empty_put(int64_t decree)
Expand Down Expand Up @@ -196,7 +198,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
int64_t new_value = 0;
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
if (err != 0) {
resp.error = err;
return err;
Expand All @@ -207,7 +209,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
_pfc_recent_expire_count->increment();
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
Expand Down Expand Up @@ -861,6 +862,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
friend class rocksdb_wrapper_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0);

Expand All @@ -877,6 +879,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
pegasus_value_generator _value_generator;

std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;

// for setting update_response.error after committed.
std::vector<dsn::apps::update_response *> _update_responses;
};
Expand Down
70 changes: 70 additions & 0 deletions src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "rocksdb_wrapper.h"

#include <rocksdb/db.h>
#include "pegasus_write_service_impl.h"

namespace pegasus {
namespace server {

const int FAIL_DB_GET = -104;

rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
_rd_opts(server->_data_cf_rd_opts),
_pegasus_data_version(server->_pegasus_data_version),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
{
}

int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; });

rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value));
if (dsn_likely(s.ok())) {
// success
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
_pfc_recent_expire_count->increment();
}
return rocksdb::Status::kOk;
} else if (s.IsNotFound()) {
// NotFound is an acceptable error
ctx->found = false;
return rocksdb::Status::kOk;
}

dsn::blob hash_key, sort_key;
pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("Get",
s.ToString(),
"hash_key: {}, sort_key: {}",
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
return s.code();
}

} // namespace server
} // namespace pegasus
56 changes: 56 additions & 0 deletions src/server/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <dsn/dist/replication/replica_base.h>

namespace rocksdb {
class DB;
class ReadOptions;
} // namespace rocksdb

namespace dsn {
class perf_counter_wrapper;
} // namespace dsn

namespace pegasus {
namespace server {
struct db_get_context;
class pegasus_server_impl;

class rocksdb_wrapper : public dsn::replication::replica_base
{
public:
rocksdb_wrapper(pegasus_server_impl *server);

/// Calls RocksDB Get and store the result into `db_get_context`.
/// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
/// \result ctx.expired=true if record expired. Still 0 is returned.
/// \result ctx.found=false if record is not found. Still 0 is returned.
int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);

private:
rocksdb::DB *_db;
rocksdb::ReadOptions &_rd_opts;
const uint32_t _pegasus_data_version;
dsn::perf_counter_wrapper &_pfc_recent_expire_count;
};
} // namespace server
} // namespace pegasus
1 change: 1 addition & 0 deletions src/server/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
"../rocksdb_wrapper.cpp"
)

set(MY_SRC_SEARCH_MODE "GLOB")
Expand Down
25 changes: 24 additions & 1 deletion src/server/test/pegasus_write_service_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@

namespace pegasus {
namespace server {
extern const int FAIL_DB_GET;

class pegasus_write_service_impl_test : public pegasus_server_test_base
{
protected:
std::unique_ptr<pegasus_server_write> _server_write;
pegasus_write_service::impl *_write_impl{nullptr};
rocksdb_wrapper *_rocksdb_wrapper{nullptr};

public:
void SetUp() override
{
start();
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(), true);
_write_impl = _server_write->_write_svc->_impl.get();
_rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
}

uint64_t read_timestamp_from(dsn::string_view raw_key)
Expand Down Expand Up @@ -70,7 +73,7 @@ class pegasus_write_service_impl_test : public pegasus_server_test_base

int db_get(dsn::string_view raw_key, db_get_context *get_ctx)
{
return _write_impl->db_get(raw_key, get_ctx);
return _rocksdb_wrapper->get(raw_key, get_ctx);
}

void single_set(dsn::blob raw_key, dsn::blob user_value)
Expand Down Expand Up @@ -258,5 +261,25 @@ TEST_F(incr_test, fail_on_put)
dsn::fail::teardown();
}

TEST_F(incr_test, incr_on_expire_record)
{
// make the key expired
req.expire_ts_seconds = 1;
_write_impl->incr(0, req, resp);

// check whether the key is expired
db_get_context get_ctx;
db_get(req.key, &get_ctx);
ASSERT_TRUE(get_ctx.expired);

// incr the expired key
req.increment = 100;
req.expire_ts_seconds = 0;
_write_impl->incr(0, req, resp);
ASSERT_EQ(resp.new_value, 100);

db_get(req.key, &get_ctx);
ASSERT_TRUE(get_ctx.found);
}
} // namespace server
} // namespace pegasus
85 changes: 85 additions & 0 deletions src/server/test/rocksdb_wrapper_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service_impl.h"
#include "pegasus_server_test_base.h"

namespace pegasus {
namespace server {
class rocksdb_wrapper_test : public pegasus_server_test_base
{
protected:
std::unique_ptr<pegasus_server_write> _server_write;
pegasus_write_service::impl *_write_impl{nullptr};
rocksdb_wrapper *_rocksdb_wrapper{nullptr};
dsn::blob _raw_key;

public:
void SetUp() override
{
start();
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(), true);
_write_impl = _server_write->_write_svc->_impl.get();
_rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();

pegasus::pegasus_generate_key(
_raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key"));
}

void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t expire_ts_seconds)
{
dsn::apps::update_request put;
put.key = raw_key;
put.value = user_value;
put.expire_ts_seconds = expire_ts_seconds;
db_write_context write_ctx;
dsn::apps::update_response put_resp;
_write_impl->batch_put(write_ctx, put, put_resp);
ASSERT_EQ(_write_impl->batch_commit(0), 0);
}
};

TEST_F(rocksdb_wrapper_test, get)
{
// not found
db_get_context get_ctx1;
_rocksdb_wrapper->get(_raw_key, &get_ctx1);
ASSERT_FALSE(get_ctx1.found);

// expired
int32_t expired_ts = utils::epoch_now();
db_get_context get_ctx2;
single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
_rocksdb_wrapper->get(_raw_key, &get_ctx2);
ASSERT_TRUE(get_ctx2.found);
ASSERT_TRUE(get_ctx2.expired);
ASSERT_EQ(get_ctx2.expire_ts, expired_ts);

// found
expired_ts = INT32_MAX;
db_get_context get_ctx3;
single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
_rocksdb_wrapper->get(_raw_key, &get_ctx3);
ASSERT_TRUE(get_ctx2.found);
ASSERT_FALSE(get_ctx3.expired);
ASSERT_EQ(get_ctx3.expire_ts, expired_ts);
}
} // namespace server
} // namespace pegasus