Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(dup): implement ship_mutation stage and mutation_batch (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and qinzuoyan committed Sep 19, 2019
1 parent f3d0031 commit 782adba
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/dist/replication/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set(DUPLICATION_SRC
duplication/replica_duplicator.cpp
duplication/duplication_pipeline.cpp
duplication/load_from_private_log.cpp
duplication/mutation_batch.cpp
)

# Source files under CURRENT project directory will be automatically included.
Expand Down
47 changes: 44 additions & 3 deletions src/dist/replication/lib/duplication/duplication_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
namespace dsn {
namespace replication {

// //
// mutation_duplicator //
// //

/*static*/ std::function<std::unique_ptr<mutation_duplicator>(
replica_base *, string_view /*remote cluster*/, string_view /*app*/)>
mutation_duplicator::creator;

// //
// load_mutation //
// //
Expand All @@ -35,14 +43,47 @@ load_mutation::load_mutation(replica_duplicator *duplicator,
// ship_mutation //
// //

void ship_mutation::ship(mutation_tuple_set &&in)
{
_mutation_duplicator->duplicate(std::move(in), [this](size_t total_shipped_size) mutable {
update_progress();
step_down_next_stage();
});
}

void ship_mutation::run(decree &&last_decree, mutation_tuple_set &&in)
{
// TBD
_last_decree = last_decree;

if (in.empty()) {
update_progress();
step_down_next_stage();
return;
}

ship(std::move(in));
}

ship_mutation::ship_mutation(replica_duplicator *duplicator) : replica_base(duplicator)
void ship_mutation::update_progress()
{
// TBD
dcheck_eq_replica(
_duplicator->update_progress(duplication_progress().set_last_decree(_last_decree)),
error_s::ok());

// committed decree never decreases
decree last_committed_decree = _replica->last_committed_decree();
dcheck_ge_replica(last_committed_decree, _last_decree);
}

ship_mutation::ship_mutation(replica_duplicator *duplicator)
: replica_base(duplicator),
_duplicator(duplicator),
_replica(duplicator->_replica),
_stub(duplicator->_replica->get_replica_stub())
{
_mutation_duplicator = new_mutation_duplicator(
duplicator, _duplicator->remote_cluster_name(), _replica->get_app_info()->app_name);
_mutation_duplicator->set_task_environment(duplicator);
}

} // namespace replication
Expand Down
16 changes: 16 additions & 0 deletions src/dist/replication/lib/duplication/duplication_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ class ship_mutation : public replica_base,
/// ==== Implementation ==== ///

explicit ship_mutation(replica_duplicator *duplicator);

void ship(mutation_tuple_set &&in);

private:
void update_progress();

friend struct ship_mutation_test;
friend class replica_duplicator_test;

std::unique_ptr<mutation_duplicator> _mutation_duplicator;

replica_duplicator *_duplicator;
replica *_replica;
replica_stub *_stub;

decree _last_decree{invalid_decree};
};

} // namespace replication
Expand Down
119 changes: 119 additions & 0 deletions src/dist/replication/lib/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#include <dsn/dist/fmt_logging.h>
#include <dsn/cpp/message_utils.h>

#include "replica_duplicator.h"
#include "mutation_batch.h"
#include "dist/replication/lib/prepare_list.h"

namespace dsn {
namespace replication {

/*static*/ constexpr int64_t mutation_batch::PREPARE_LIST_NUM_ENTRIES;

error_s mutation_batch::add(mutation_ptr mu)
{
if (mu->get_decree() <= _mutation_buffer->last_committed_decree()) {
// ignore
return error_s::ok();
}

auto old = _mutation_buffer->get_mutation_by_decree(mu->get_decree());
if (old != nullptr && old->data.header.ballot >= mu->data.header.ballot) {
// ignore
return error_s::ok();
}

error_code ec = _mutation_buffer->prepare(mu, partition_status::PS_INACTIVE);
if (ec != ERR_OK) {
return FMT_ERR(
ERR_INVALID_DATA,
"failed to add mutation [err:{}, logged:{}, decree:{}, committed:{}, start_decree:{}]",
ec.to_string(),
mu->is_logged(),
mu->get_decree(),
mu->data.header.last_committed_decree,
_start_decree);
}

return error_s::ok();
}

decree mutation_batch::last_decree() const { return _mutation_buffer->last_committed_decree(); }

void mutation_batch::set_start_decree(decree d) { _start_decree = d; }

mutation_tuple_set mutation_batch::move_all_mutations()
{
// free the internal space
_mutation_buffer->truncate(last_decree());
return std::move(_loaded_mutations);
}

mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r)
{
// Prepend a special tag identifying this is a mutation_batch,
// so `dxxx_replica` logging in prepare_list will print along with its real caller.
// This helps for debugging.
replica_base base(r->get_gpid(), std::string("mutation_batch@") + r->replica_name());
_mutation_buffer =
make_unique<prepare_list>(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) {
// committer
add_mutation_if_valid(mu, _loaded_mutations, _start_decree);
});

// start duplication from confirmed_decree
_mutation_buffer->reset(r->progress().confirmed_decree);
}

/*extern*/ void
add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree start_decree)
{
if (mu->get_decree() < start_decree) {
// ignore
return;
}
for (mutation_update &update : mu->data.updates) {
// ignore WRITE_EMPTY
if (update.code == RPC_REPLICATION_WRITE_EMPTY) {
continue;
}

blob bb;
if (update.data.buffer() != nullptr) {
bb = std::move(update.data);
} else {
bb = blob::create_from_bytes(update.data.data(), update.data.length());
}

mutations.emplace(std::make_tuple(mu->data.header.timestamp, update.code, std::move(bb)));
}
}

} // namespace replication
} // namespace dsn
73 changes: 73 additions & 0 deletions src/dist/replication/lib/duplication/mutation_batch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#pragma once

#include <dsn/dist/replication/mutation_duplicator.h>

#include "dist/replication/lib/mutation.h"

namespace dsn {
namespace replication {

class replica_duplicator;
class prepare_list;

// A sorted array of committed mutations that are ready for duplication.
// Not thread-safe.
class mutation_batch : replica_base
{
public:
static constexpr int64_t PREPARE_LIST_NUM_ENTRIES{200};

explicit mutation_batch(replica_duplicator *r);

error_s add(mutation_ptr mu);

mutation_tuple_set move_all_mutations();

decree last_decree() const;

// mutations with decree < d will be ignored.
void set_start_decree(decree d);

size_t size() const { return _loaded_mutations.size(); }

private:
friend class replica_duplicator_test;

std::unique_ptr<prepare_list> _mutation_buffer;
mutation_tuple_set _loaded_mutations;
decree _start_decree{invalid_decree};
};

using mutation_batch_u_ptr = std::unique_ptr<mutation_batch>;

/// Extract mutations into mutation_tuple_set if they are not WRITE_EMPTY.
extern void add_mutation_if_valid(mutation_ptr &, mutation_tuple_set &, decree start_decree);

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/dist/replication/lib/duplication/replica_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class replica_duplicator : public replica_base, public pipeline::base

dupid_t id() const { return _id; }

const std::string &remote_cluster_name() const { return _remote_cluster_name; }

// Thread-safe
duplication_progress progress() const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ namespace replication {
class duplication_test_base : public replica_test_base
{
public:
duplication_test_base() {}
duplication_test_base()
{
mutation_duplicator::creator = [](replica_base *r, dsn::string_view, dsn::string_view) {
return make_unique<mock_mutation_duplicator>(r);
};
}

void add_dup(mock_replica *r, replica_duplicator_u_ptr dup)
{
Expand All @@ -29,6 +34,16 @@ class duplication_test_base : public replica_test_base
}
return dup_entities[dupid].get();
}

std::unique_ptr<replica_duplicator> create_test_duplicator(decree confirmed = invalid_decree)
{
duplication_entry dup_ent;
dup_ent.dupid = 1;
dup_ent.remote = "remote_address";
dup_ent.status = duplication_status::DS_PAUSE;
dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed;
return make_unique<replica_duplicator>(dup_ent, _replica.get());
}
};

} // namespace replication
Expand Down
63 changes: 63 additions & 0 deletions src/dist/replication/lib/duplication/test/mutation_batch_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#include "dist/replication/test/replica_test/unit_test/replica_test_base.h"
#include "dist/replication/lib/duplication/mutation_batch.h"

namespace dsn {
namespace replication {

class mutation_batch_test : public replica_test_base
{
};

TEST_F(mutation_batch_test, add_mutation_if_valid)
{
mutation_tuple_set result;

std::string s = "hello";
mutation_ptr mu1 = create_test_mutation(1, s);
add_mutation_if_valid(mu1, result, 0);
mutation_tuple mt1 = *result.begin();

result.clear();

s = "world";
mutation_ptr mu2 = create_test_mutation(2, s);
add_mutation_if_valid(mu2, result, 0);
mutation_tuple mt2 = *result.begin();

ASSERT_EQ(std::get<2>(mt1).to_string(), "hello");
ASSERT_EQ(std::get<2>(mt2).to_string(), "world");

// decree 1 should be ignored
mutation_ptr mu3 = create_test_mutation(1, s);
add_mutation_if_valid(mu2, result, 2);
ASSERT_EQ(result.size(), 2);
}

} // namespace replication
} // namespace dsn
Loading

0 comments on commit 782adba

Please sign in to comment.