Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replica): support multi shard commands new flow #645

Merged
merged 2 commits into from
Jan 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/server/journal/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,10 @@ JournalExecutor::JournalExecutor(Service* service)
}

void JournalExecutor::Execute(DbIndex dbid, std::vector<journal::ParsedEntry::CmdData>& cmds) {
DCHECK_GT(cmds.size(), 1U);
conn_context_.conn_state.db_index = dbid;
std::string multi_cmd = {"MULTI"};
auto ms = MutableSlice{&multi_cmd[0], multi_cmd.size()};
auto span = CmdArgList{&ms, 1};
service_->DispatchCommand(span, &conn_context_);

for (auto& cmd : cmds) {
Execute(cmd);
}

std::string exec_cmd = {"EXEC"};
ms = {&exec_cmd[0], exec_cmd.size()};
span = {&ms, 1};
service_->DispatchCommand(span, &conn_context_);
}

void JournalExecutor::Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd) {
Expand Down
176 changes: 119 additions & 57 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,12 @@ Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* s
}

Replica::~Replica() {
if (sync_fb_.joinable())
if (sync_fb_.joinable()) {
sync_fb_.join();
}
if (execution_fb_.joinable()) {
execution_fb_.join();
}

if (sock_) {
auto ec = sock_->Close();
Expand Down Expand Up @@ -160,7 +164,8 @@ void Replica::Stop() {

// Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections).
sync_fb_.join();
if (sync_fb_.joinable())
sync_fb_.join();
}

void Replica::Pause(bool pause) {
Expand Down Expand Up @@ -561,8 +566,19 @@ error_code Replica::ConsumeDflyStream() {
// Make sure the flows are not in a state transition
lock_guard lk{flows_op_mu_};
DefaultErrorHandler(ge);
for (auto& flow : shard_flows_)
for (auto& flow : shard_flows_) {
flow->CloseSocket();
flow->waker_.notifyAll();
}

// Iterate over map and cancle all blocking entities
{
lock_guard{multi_shard_exe_->map_mu};
for (auto& tx_data : multi_shard_exe_->tx_sync_execution) {
tx_data.second.barrier.Cancel();
tx_data.second.block.Cancel();
}
}
};
Comment on lines +580 to 582
Copy link
Contributor

@dranikpg dranikpg Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot to unblock all the wakers, thats why its stuck (stuck on joining execution fibers). I've added it and it seems to work

RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));

Expand Down Expand Up @@ -605,6 +621,9 @@ void Replica::JoinAllFlows() {
if (flow->sync_fb_.joinable()) {
flow->sync_fb_.join();
}
if (flow->execution_fb_.joinable()) {
flow->execution_fb_.join();
}
}
}

Expand Down Expand Up @@ -688,7 +707,10 @@ error_code Replica::StartStableSyncFlow(Context* cntx) {
CHECK(sock_->IsOpen());
// sock_.reset(mythread->CreateSocket());
// RETURN_ON_ERR(sock_->Connect(master_context_.master_ep));
sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyFb, this, cntx);
sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyReadFb, this, cntx);
// TODO(ADI) : add flag for skipping the flow of blocking on multi shard entries
// and compare performance aginst the blocking on multi shard entries flow.
execution_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyExecFb, this, cntx);

return std::error_code{};
}
Expand Down Expand Up @@ -739,7 +761,7 @@ void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, C
VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes";
}

void Replica::StableSyncDflyFb(Context* cntx) {
void Replica::StableSyncDflyReadFb(Context* cntx) {
// Check leftover from full sync.
io::Bytes prefix{};
if (leftover_buf_ && leftover_buf_->InputLen() > 0) {
Expand All @@ -750,91 +772,131 @@ void Replica::StableSyncDflyFb(Context* cntx) {
io::PrefixSource ps{prefix, &ss};

JournalReader reader{&ps, 0};
JournalExecutor executor{&service_};

while (!cntx->IsCancelled()) {
TranactionData tx_data;
while (!cntx->IsCancelled()) {
waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
});
if (cntx->IsCancelled()) {
return;
}
auto res = reader.ReadEntry();
if (!res) {
cntx->ReportError(res.error(), "Journal format error");
return;
}
bool should_execute = tx_data.UpdateFromParsedEntry(std::move(*res));
if (should_execute == true) {
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
bool tx_data_full = tx_data.UpdateFromParsedEntry(std::move(*res));
if (tx_data_full == true) {
break;
}
}
ExecuteCmd(&executor, std::move(tx_data), cntx);
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
InsertTxDataToShardResource(std::move(tx_data), cntx);
waker_.notify();
}
return;
}

void Replica::ExecuteCmd(JournalExecutor* executor, TranactionData&& tx_data, Context* cntx) {
void Replica::InsertTxDataToShardResource(TranactionData&& tx_data, Context* cntx) {
bool me_insert = false;
if (tx_data.shard_cnt > 1) {
multi_shard_exe_->map_mu.lock();

bool global_cmd =
(tx_data.commands.size() == 1 && tx_data.commands.front().cmd_args.size() == 1);

auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(
std::piecewise_construct, std::forward_as_tuple(tx_data.txid),
std::forward_as_tuple(tx_data.shard_cnt, global_cmd));
VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt
<< " was_insert: " << was_insert;
me_insert = was_insert;
it->second.block.Dec();
multi_shard_exe_->map_mu.unlock();
}

VLOG(2) << "txid: " << tx_data.txid << " pushed to queue";
trans_data_queue_.push(std::make_pair(std::move(tx_data), me_insert));
}

void Replica::StableSyncDflyExecFb(Context* cntx) {
JournalExecutor executor{&service_};

while (!cntx->IsCancelled()) {
waker_.await([&]() { return (!trans_data_queue_.empty() || cntx->IsCancelled()); });
if (cntx->IsCancelled()) {
return;
}
DCHECK(!trans_data_queue_.empty());
auto& data = trans_data_queue_.front();
ExecuteTx(&executor, std::move(data.first), data.second, cntx);
trans_data_queue_.pop();
waker_.notify();
}

return;
}

void Replica::ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, bool inserted_by_me,
Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
if (tx_data.shard_cnt <= 1) { // not multi shard cmd
VLOG(2) << "Execute single shard cmd";
executor->Execute(tx_data.dbid, tx_data.commands.front());
return;
}

// Multi shard command flow:
// step 1: Fiber wait until all the fibers that should execute this tranaction got
// to the journal entry of the transaction. This step enforces that replica will execute multi
// shard commands that finished on master.
// step 2: Execute the commands from one fiber. This step ensures atomicity of replica.
// step 3: Fiber wait until all fibers finished the execution. This step ensures atomicity of
// operations on replica.

// TODO: support error handler in this flow

// Only the first fiber to reach the transaction will create data for transaction in map
VLOG(2) << "Execute txid: " << tx_data.txid;
multi_shard_exe_->map_mu.lock();
auto [it, was_insert] =
multi_shard_exe_->tx_sync_execution.emplace(tx_data.txid, tx_data.shard_cnt);
VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt
<< " was_insert: " << was_insert;

TxId txid = tx_data.txid;
// entries_vec will store all entries of trasaction and will be executed by the fiber that
// inserted the txid to map. In case of global command the inserting fiber will executed his
// entry.
bool global_cmd = (tx_data.commands.size() == 1 && tx_data.commands.front().cmd_args.size() == 1);
if (!global_cmd) {
for (auto& cmd : tx_data.commands) {
it->second.commands.push_back(std::move(cmd));
}
}
auto& tx_sync = it->second;

// Note: we must release the mutex befor calling wait on barrier
auto it = multi_shard_exe_->tx_sync_execution.find(tx_data.txid);
DCHECK(it != multi_shard_exe_->tx_sync_execution.end());
auto& multi_shard_data = it->second;
multi_shard_exe_->map_mu.unlock();

// step 1
tx_sync.barrier.wait();

// step 2
if (was_insert) {
if (global_cmd) {
executor->Execute(tx_data.dbid, tx_data.commands.front());

} else {
executor->Execute(tx_data.dbid, tx_sync.commands);
VLOG(2) << "Execute txid: " << tx_data.txid << " waiting for data in all shards";
// Wait until shards flows got transaction data and inserted to map.
// This step enforces that replica will execute multi shard commands that finished on master
// and replica recieved all the commands from all shards.
multi_shard_data.block.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished";

if (multi_shard_data.is_global_cmd) {
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution";
// Wait until all shards flows get to execution step of this transaction.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
// Global command will be executed only from one flow fiber. This ensure corectness of data in
// replica.
if (inserted_by_me) {
executor->Execute(tx_data.dbid, tx_data.commands);
}
// Wait until exection is done, to make sure we done execute next commands while the global is
// executed.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
} else { // Non gloabl command will be executed by each the flow fiber
VLOG(2) << "Execute txid: " << tx_data.txid << " executing shard transaction commands";
executor->Execute(tx_data.dbid, tx_data.commands);
}

// step 3
tx_sync.barrier.wait();

// Note: erase from map can be done only after all fibers returned from wait.
// Erase from map can be done only after all flow fibers executed the transaction commands.
// The last fiber which will decrease the counter to 0 will be the one to erase the data from map
auto val = tx_sync.counter.fetch_sub(1, std::memory_order_relaxed);
VLOG(2) << "txid: " << txid << " counter: " << val;
auto val = multi_shard_data.counter.fetch_sub(1, std::memory_order_relaxed);
VLOG(2) << "txid: " << tx_data.txid << " counter: " << val;
if (val == 1) {
std::lock_guard lg{multi_shard_exe_->map_mu};
multi_shard_exe_->tx_sync_execution.erase(txid);
multi_shard_exe_->tx_sync_execution.erase(tx_data.txid);
}
}

Expand Down
25 changes: 19 additions & 6 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <boost/fiber/barrier.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/mutex.hpp>
#include <queue>
#include <variant>

#include "base/io_buf.h"
Expand Down Expand Up @@ -57,19 +58,21 @@ class Replica {
uint32_t shard_cnt;
DbIndex dbid;
std::vector<journal::ParsedEntry::CmdData> commands;
// Update the data from ParsedEntry and return if its ready for execution.
// Update the data from ParsedEntry and return if all shard transaction commands were recieved.
bool UpdateFromParsedEntry(journal::ParsedEntry&& entry);
};

struct MultiShardExecution {
boost::fibers::mutex map_mu;

struct TxExecutionSync {
boost::fibers::barrier barrier;
util::fibers_ext::Barrier barrier;
std::atomic_uint32_t counter;
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) {
util::fibers_ext::BlockingCounter block;
bool is_global_cmd;
TxExecutionSync(uint32_t counter, bool is_global)
: barrier(counter), counter(counter), block(counter), is_global_cmd(is_global) {
}
std::vector<journal::ParsedEntry::CmdData> commands;
};

std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
Expand Down Expand Up @@ -125,7 +128,9 @@ class Replica {
Context* cntx);

// Single flow stable state sync fiber spawned by StartStableSyncFlow.
void StableSyncDflyFb(Context* cntx);
void StableSyncDflyReadFb(Context* cntx);

void StableSyncDflyExecFb(Context* cntx);

private: /* Utility */
struct PSyncResponse {
Expand Down Expand Up @@ -153,7 +158,9 @@ class Replica {
// Send command, update last_io_time, return error.
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);

void ExecuteCmd(JournalExecutor* executor, TranactionData&& tx_data, Context* cntx);
void ExecuteTx(JournalExecutor* executor, TranactionData&& tx_data, bool inserted_by_me,
Context* cntx);
void InsertTxDataToShardResource(TranactionData&& tx_data, Context* cntx);

public: /* Utility */
struct Info {
Expand Down Expand Up @@ -189,8 +196,14 @@ class Replica {

std::shared_ptr<MultiShardExecution> multi_shard_exe_;

std::queue<std::pair<TranactionData, bool>> trans_data_queue_;
static constexpr size_t kYieldAfterItemsInQueue = 50;
::util::fibers_ext::EventCount waker_;

// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
::boost::fibers::fiber sync_fb_;
::boost::fibers::fiber execution_fb_;

std::vector<std::unique_ptr<Replica>> shard_flows_;

// Guard operations where flows might be in a mixed state (transition/setup)
Expand Down