Skip to content

Commit

Permalink
fix(dup_enhancement#21): add batch replay log to support batch send m…
Browse files Browse the repository at this point in the history
…utation (apache#1080)
  • Loading branch information
foreverneverer committed Mar 29, 2022
1 parent 516ff0c commit 997c2dc
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 27 deletions.
3 changes: 3 additions & 0 deletions include/dsn/dist/replication/duplication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
#include <dsn/cpp/rpc_holder.h>
#include <dsn/utility/errors.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/utility/flags.h>

namespace dsn {
namespace replication {

DSN_DECLARE_uint32(duplicate_log_batch_bytes);

typedef rpc_holder<duplication_modify_request, duplication_modify_response> duplication_modify_rpc;
typedef rpc_holder<duplication_add_request, duplication_add_response> duplication_add_rpc;
typedef rpc_holder<duplication_query_request, duplication_query_response> duplication_query_rpc;
Expand Down
6 changes: 6 additions & 0 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
namespace dsn {
namespace replication {

DSN_DEFINE_uint32("replication",
duplicate_log_batch_bytes,
4096,
"send mutation log batch bytes size per rpc");
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);

const std::string duplication_constants::kDuplicationCheckpointRootDir /*NOLINT*/ = "duplication";
const std::string duplication_constants::kClustersSectionName /*NOLINT*/ = "pegasus.clusters";
const std::string duplication_constants::kDuplicationEnvMasterClusterKey /*NOLINT*/ =
Expand Down
25 changes: 17 additions & 8 deletions src/replica/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,7 @@ void load_from_private_log::replay_log_block()
},
_start_offset,
_current_global_end_offset);
if (!err.is_ok()) {
if (err.code() == ERR_HANDLE_EOF && switch_to_next_log_file()) {
repeat();
return;
}

if (!err.is_ok() && err.code() != ERR_HANDLE_EOF) {
// Error handling on loading failure:
// - If block loading failed for `MAX_ALLOWED_REPEATS` times, it restarts reading the file.
// - If file loading failed for `MAX_ALLOWED_FILE_REPEATS` times, which means it
Expand Down Expand Up @@ -220,9 +215,23 @@ void load_from_private_log::replay_log_block()
return;
}

_start_offset = static_cast<size_t>(_current_global_end_offset - _current->start_offset());

if (err.is_ok()) {
_start_offset = static_cast<size_t>(_current_global_end_offset - _current->start_offset());
if (_mutation_batch.bytes() < FLAGS_duplicate_log_batch_bytes) {
repeat();
return;
}
} else if (switch_to_next_log_file()) {
// !err.is_ok() means that err.code() == ERR_HANDLE_EOF, the current file read completed and
// try next file
repeat();
return;
}
// update last_decree even for empty batch.
// case1: err.is_ok(err.code() != ERR_HANDLE_EOF), but _mutation_batch.bytes() >=
// FLAGS_duplicate_log_batch_bytes
// case2: !err.is_ok(err.code() == ERR_HANDLE_EOF) and no next file, need commit the last
// mutations()
step_down_next_stage(_mutation_batch.last_decree(), _mutation_batch.move_all_mutations());
}

Expand Down
10 changes: 6 additions & 4 deletions src/replica/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ mutation_tuple_set mutation_batch::move_all_mutations()
{
// free the internal space
_mutation_buffer->truncate(last_decree());
_total_bytes = 0;
return std::move(_loaded_mutations);
}

Expand All @@ -135,15 +136,14 @@ mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r)
_mutation_buffer =
make_unique<mutation_buffer>(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) {
// committer
add_mutation_if_valid(mu, _loaded_mutations, _start_decree);
add_mutation_if_valid(mu, _start_decree);
});

// start duplication from confirmed_decree
_mutation_buffer->reset(r->progress().confirmed_decree);
}

/*extern*/ void
add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree start_decree)
void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree)
{
if (mu->get_decree() < start_decree) {
// ignore
Expand All @@ -169,7 +169,9 @@ add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree st
bb = blob::create_from_bytes(update.data.data(), update.data.length());
}

mutations.emplace(std::make_tuple(mu->data.header.timestamp, update.code, std::move(bb)));
_total_bytes += bb.length();
_loaded_mutations.emplace(
std::make_tuple(mu->data.header.timestamp, update.code, std::move(bb)));
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/replica/duplication/mutation_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class mutation_batch : replica_base

error_s add(mutation_ptr mu);

void add_mutation_if_valid(mutation_ptr &, decree start_decree);

mutation_tuple_set move_all_mutations();

decree last_decree() const;
Expand All @@ -64,19 +66,20 @@ class mutation_batch : replica_base

size_t size() const { return _loaded_mutations.size(); }

uint64_t bytes() const { return _total_bytes; }

private:
friend class replica_duplicator_test;
friend class mutation_batch_test;

std::unique_ptr<prepare_list> _mutation_buffer;
mutation_tuple_set _loaded_mutations;
decree _start_decree{invalid_decree};
uint64_t _total_bytes{0};
};

using mutation_batch_u_ptr = std::unique_ptr<mutation_batch>;

/// Extract mutations into mutation_tuple_set if they are not WRITE_EMPTY.
extern void add_mutation_if_valid(mutation_ptr &, mutation_tuple_set &, decree start_decree);

} // namespace replication
} // namespace dsn
12 changes: 9 additions & 3 deletions src/replica/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r)
} else {
_progress.last_decree = _progress.confirmed_decree = it->second;
}
ddebug_replica(
"initialize replica_duplicator [dupid:{}, meta_confirmed_decree:{}]", id(), it->second);
ddebug_replica("initialize replica_duplicator[{}] [dupid:{}, meta_confirmed_decree:{}]",
duplication_status_to_string(_status),
id(),
it->second);
thread_pool(LPC_REPLICATION_LOW).task_tracker(tracker()).thread_hash(get_gpid().thread_hash());

prepare_dup();
if (_status == duplication_status::DS_PREPARE) {
prepare_dup();
} else if (_status == duplication_status::DS_LOG) {
start_dup_log();
}
}

void replica_duplicator::prepare_dup()
Expand Down
2 changes: 2 additions & 0 deletions src/replica/duplication/replica_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class replica_duplicator : public replica_base, public pipeline::base
// For metric "dup.pending_mutations_count"
uint64_t get_pending_mutations_count() const;

duplication_status::type status() const { return _status; };

private:
friend class duplication_test_base;
friend class replica_duplicator_test;
Expand Down
10 changes: 8 additions & 2 deletions src/replica/duplication/replica_duplicator_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const
for (const auto &kv : _duplications) {
replica_duplicator *duplicator = kv.second.get();
duplication_progress p = duplicator->progress();
if (p.last_decree != p.confirmed_decree && p.checkpoint_has_prepared) {
dcheck_gt_replica(p.last_decree, p.confirmed_decree);
if (p.last_decree != p.confirmed_decree ||
(kv.second->status() == duplication_status::DS_PREPARE && p.checkpoint_has_prepared)) {
if (p.last_decree < p.confirmed_decree) {
derror_replica("invalid decree state: p.last_decree({}) < p.confirmed_decree({})",
p.last_decree,
p.confirmed_decree);
continue;
}
duplication_confirm_entry entry;
entry.dupid = duplicator->id();
entry.confirmed_decree = p.last_decree;
Expand Down
2 changes: 2 additions & 0 deletions src/replica/duplication/replica_duplicator_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class replica_duplicator_manager : public replica_base
if (_duplications.empty())
return;

dwarn_replica("remove all duplication, replica status = {}",
enum_to_string(_replica->status()));
_duplications.clear();
}

Expand Down
21 changes: 14 additions & 7 deletions src/replica/duplication/test/mutation_batch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,44 @@ class mutation_batch_test : public duplication_test_base

TEST_F(mutation_batch_test, add_mutation_if_valid)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());

mutation_tuple_set result;

std::string s = "hello";
mutation_ptr mu1 = create_test_mutation(1, s);
add_mutation_if_valid(mu1, result, 0);
batcher.add_mutation_if_valid(mu1, 0);
result = batcher.move_all_mutations();
mutation_tuple mt1 = *result.begin();

result.clear();

s = "world";
mutation_ptr mu2 = create_test_mutation(2, s);
add_mutation_if_valid(mu2, result, 0);
batcher.add_mutation_if_valid(mu2, 0);
result = batcher.move_all_mutations();
mutation_tuple mt2 = *result.begin();

ASSERT_EQ(std::get<2>(mt1).to_string(), "hello");
ASSERT_EQ(std::get<2>(mt2).to_string(), "world");

// decree 1 should be ignored
mutation_ptr mu3 = create_test_mutation(1, s);
add_mutation_if_valid(mu2, result, 2);
batcher.add_mutation_if_valid(mu2, 2);
batcher.add_mutation_if_valid(mu3, 1);
result = batcher.move_all_mutations();
ASSERT_EQ(result.size(), 2);
}

TEST_F(mutation_batch_test, ignore_non_idempotent_write)
{
mutation_tuple_set result;
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());

std::string s = "hello";
mutation_ptr mu = create_test_mutation(1, s);
mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
add_mutation_if_valid(mu, result, 0);
batcher.add_mutation_if_valid(mu, 0);
mutation_tuple_set result = batcher.move_all_mutations();
ASSERT_EQ(result.size(), 0);
}

Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ DSN_DEFINE_bool("replication",

DSN_DEFINE_uint32("replication",
max_concurrent_manual_emergency_checkpointing_count,
1,
10,
"max concurrent manual emergency checkpoint running count");
DSN_TAG_VARIABLE(max_concurrent_manual_emergency_checkpointing_count, FT_MUTABLE);

Expand Down
1 change: 1 addition & 0 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ TEST_F(replica_test, test_trigger_manual_emergency_checkpoint)
// test exceed max concurrent count
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_OK);
force_update_checkpointing(false);
FLAGS_max_concurrent_manual_emergency_checkpointing_count = 1;
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_TRY_AGAIN);
ASSERT_FALSE(is_checkpointing());
_mock_replica->tracker()->wait_outstanding_tasks();
Expand Down

0 comments on commit 997c2dc

Please sign in to comment.