Skip to content

Commit

Permalink
refactor: use rocksdb_wrapper::get to reimplement incr (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Dec 10, 2020
1 parent cea104d commit c212da6
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class pegasus_server_impl : public pegasus_read_service

friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
friend class rocksdb_wrapper;

// parse checkpoint directories in the data dir
// checkpoint directory format is: "checkpoint.{decree}"
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class pegasus_server_write : public dsn::replication::replica_base
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class rocksdb_wrapper_test;

std::unique_ptr<pegasus_write_service> _write_svc;
std::vector<put_rpc> _put_rpc_batch;
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class pegasus_write_service : dsn::replication::replica_base
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class pegasus_server_write_test;
friend class rocksdb_wrapper_test;

pegasus_server_impl *_server;

Expand Down
10 changes: 7 additions & 3 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "base/pegasus_key_schema.h"
#include "meta_store.h"
#include "rocksdb_wrapper.h"

#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
Expand All @@ -38,7 +39,7 @@ namespace server {
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
static constexpr int FAIL_DB_GET = -104;
extern const int FAIL_DB_GET;

struct db_get_context
{
Expand Down Expand Up @@ -97,6 +98,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
_rocksdb_wrapper = dsn::make_unique<rocksdb_wrapper>(server);
}

int empty_put(int64_t decree)
Expand Down Expand Up @@ -196,7 +198,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
int64_t new_value = 0;
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
if (err != 0) {
resp.error = err;
return err;
Expand All @@ -207,7 +209,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
_pfc_recent_expire_count->increment();
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
Expand Down Expand Up @@ -861,6 +862,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
friend class rocksdb_wrapper_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0);

Expand All @@ -877,6 +879,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
pegasus_value_generator _value_generator;

std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;

// for setting update_response.error after committed.
std::vector<dsn::apps::update_response *> _update_responses;
};
Expand Down
70 changes: 70 additions & 0 deletions src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "rocksdb_wrapper.h"

#include <rocksdb/db.h>
#include "pegasus_write_service_impl.h"

namespace pegasus {
namespace server {

const int FAIL_DB_GET = -104;

rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
_rd_opts(server->_data_cf_rd_opts),
_pegasus_data_version(server->_pegasus_data_version),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
{
}

int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; });

rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value));
if (dsn_likely(s.ok())) {
// success
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
_pfc_recent_expire_count->increment();
}
return rocksdb::Status::kOk;
} else if (s.IsNotFound()) {
// NotFound is an acceptable error
ctx->found = false;
return rocksdb::Status::kOk;
}

dsn::blob hash_key, sort_key;
pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("Get",
s.ToString(),
"hash_key: {}, sort_key: {}",
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
return s.code();
}

} // namespace server
} // namespace pegasus
56 changes: 56 additions & 0 deletions src/server/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <dsn/dist/replication/replica_base.h>

namespace rocksdb {
class DB;
class ReadOptions;
} // namespace rocksdb

namespace dsn {
class perf_counter_wrapper;
} // namespace dsn

namespace pegasus {
namespace server {
struct db_get_context;
class pegasus_server_impl;

class rocksdb_wrapper : public dsn::replication::replica_base
{
public:
rocksdb_wrapper(pegasus_server_impl *server);

/// Calls RocksDB Get and store the result into `db_get_context`.
/// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
/// \result ctx.expired=true if record expired. Still 0 is returned.
/// \result ctx.found=false if record is not found. Still 0 is returned.
int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);

private:
rocksdb::DB *_db;
rocksdb::ReadOptions &_rd_opts;
const uint32_t _pegasus_data_version;
dsn::perf_counter_wrapper &_pfc_recent_expire_count;
};
} // namespace server
} // namespace pegasus
1 change: 1 addition & 0 deletions src/server/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
"../rocksdb_wrapper.cpp"
)

set(MY_SRC_SEARCH_MODE "GLOB")
Expand Down
25 changes: 24 additions & 1 deletion src/server/test/pegasus_write_service_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@

namespace pegasus {
namespace server {
extern const int FAIL_DB_GET;

class pegasus_write_service_impl_test : public pegasus_server_test_base
{
protected:
std::unique_ptr<pegasus_server_write> _server_write;
pegasus_write_service::impl *_write_impl{nullptr};
rocksdb_wrapper *_rocksdb_wrapper{nullptr};

public:
void SetUp() override
{
start();
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(), true);
_write_impl = _server_write->_write_svc->_impl.get();
_rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
}

uint64_t read_timestamp_from(dsn::string_view raw_key)
Expand Down Expand Up @@ -70,7 +73,7 @@ class pegasus_write_service_impl_test : public pegasus_server_test_base

int db_get(dsn::string_view raw_key, db_get_context *get_ctx)
{
return _write_impl->db_get(raw_key, get_ctx);
return _rocksdb_wrapper->get(raw_key, get_ctx);
}

void single_set(dsn::blob raw_key, dsn::blob user_value)
Expand Down Expand Up @@ -258,5 +261,25 @@ TEST_F(incr_test, fail_on_put)
dsn::fail::teardown();
}

TEST_F(incr_test, incr_on_expire_record)
{
// make the key expired
req.expire_ts_seconds = 1;
_write_impl->incr(0, req, resp);

// check whether the key is expired
db_get_context get_ctx;
db_get(req.key, &get_ctx);
ASSERT_TRUE(get_ctx.expired);

// incr the expired key
req.increment = 100;
req.expire_ts_seconds = 0;
_write_impl->incr(0, req, resp);
ASSERT_EQ(resp.new_value, 100);

db_get(req.key, &get_ctx);
ASSERT_TRUE(get_ctx.found);
}
} // namespace server
} // namespace pegasus
85 changes: 85 additions & 0 deletions src/server/test/rocksdb_wrapper_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service_impl.h"
#include "pegasus_server_test_base.h"

namespace pegasus {
namespace server {
class rocksdb_wrapper_test : public pegasus_server_test_base
{
protected:
std::unique_ptr<pegasus_server_write> _server_write;
pegasus_write_service::impl *_write_impl{nullptr};
rocksdb_wrapper *_rocksdb_wrapper{nullptr};
dsn::blob _raw_key;

public:
void SetUp() override
{
start();
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(), true);
_write_impl = _server_write->_write_svc->_impl.get();
_rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();

pegasus::pegasus_generate_key(
_raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key"));
}

void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t expire_ts_seconds)
{
dsn::apps::update_request put;
put.key = raw_key;
put.value = user_value;
put.expire_ts_seconds = expire_ts_seconds;
db_write_context write_ctx;
dsn::apps::update_response put_resp;
_write_impl->batch_put(write_ctx, put, put_resp);
ASSERT_EQ(_write_impl->batch_commit(0), 0);
}
};

TEST_F(rocksdb_wrapper_test, get)
{
// not found
db_get_context get_ctx1;
_rocksdb_wrapper->get(_raw_key, &get_ctx1);
ASSERT_FALSE(get_ctx1.found);

// expired
int32_t expired_ts = utils::epoch_now();
db_get_context get_ctx2;
single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
_rocksdb_wrapper->get(_raw_key, &get_ctx2);
ASSERT_TRUE(get_ctx2.found);
ASSERT_TRUE(get_ctx2.expired);
ASSERT_EQ(get_ctx2.expire_ts, expired_ts);

// found
expired_ts = INT32_MAX;
db_get_context get_ctx3;
single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
_rocksdb_wrapper->get(_raw_key, &get_ctx3);
ASSERT_TRUE(get_ctx2.found);
ASSERT_FALSE(get_ctx3.expired);
ASSERT_EQ(get_ctx3.expire_ts, expired_ts);
}
} // namespace server
} // namespace pegasus

0 comments on commit c212da6

Please sign in to comment.