Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Switch to stable state replication #473

Merged
merged 9 commits into from
Nov 17, 2022
3 changes: 3 additions & 0 deletions src/redis/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@
/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))

// DFLY EXTENSIONS: WHAT FILE TO PUT THEM?
#define RDB_OPCODE_FULLSYNC_END 233

dranikpg marked this conversation as resolved.
Show resolved Hide resolved
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
Expand Down
146 changes: 127 additions & 19 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return Sync(args, cntx);
}

if (sub_cmd == "STARTSTABLE" && args.size() == 3) {
return StartStable(args, cntx);
}

if (sub_cmd == "EXPIRE") {
return Expire(args, cntx);
}
Expand All @@ -101,6 +105,8 @@ void DflyCmd::OnClose(ConnectionContext* cntx) {
if (!session_id)
return;

VLOG(0) << "Disconnected !!! " << flow_id;

if (flow_id == kuint32max) {
DeleteSyncSession(session_id);
} else {
Expand Down Expand Up @@ -258,16 +264,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return;

unique_lock lk(sync_info->mu);
if (sync_info->state != SyncState::PREPARATION)
return rb->SendError(kInvalidState);

// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info->flows) {
if (!flow.conn) {
return rb->SendError(kInvalidState);
}
}
if (!CheckReplicaStateOrReply(*sync_info, SyncState::PREPARATION, rb))
return;

// Start full sync.
{
Expand All @@ -288,6 +286,60 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return rb->SendOk();
}

void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
string_view sync_id_str = ArgS(args, 2);

VLOG(0) << "Got DFLY STARTSTABLE " << sync_id_str;

auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb);
if (!sync_id)
return;

unique_lock lk(sync_info->mu);
if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb))
return;

// TODO: Temporary solution
{
AggregateStatus status;

auto cb = [this, &status, sync_info = sync_info](Transaction* t, EngineShard* shard) {
unsigned index = shard->shard_id();
status = StartStableSyncInThread(&sync_info->flows[index], shard);
return OpStatus::OK;
};
cntx->transaction->ScheduleSingleHop(std::move(cb));

auto cb2 = [this, &status, sync_info = sync_info](unsigned index, auto*) {
if (EngineShard::tlocal() != nullptr) return OpStatus::OK;
status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal());
return OpStatus::OK;
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb2));

if (*status != OpStatus::OK)
return rb->SendError(kInvalidState);
}

//{
// TransactionGuard tg{cntx->transaction};
// AggregateStatus status;

// auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) {
// status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal());
// return OpStatus::OK;
// };
// shard_set->pool()->AwaitFiberOnAll(std::move(cb));

// if (*status != OpStatus::OK)
// return rb->SendError(kInvalidState);
//}

sync_info->state = SyncState::STABLE_SYNC;
return rb->SendOk();
}

void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) {
Expand All @@ -305,13 +357,38 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false));

if (shard != nullptr) {
flow->saver->StartSnapshotInShard(false, shard);
auto ec = sf_->journal()->OpenInThread(false, string_view());
CHECK(!ec);
flow->saver->StartSnapshotInShard(true, shard);
}

flow->fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow);
return OpStatus::OK;
}

OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) {
if (shard != nullptr) {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
flow->saver->StopSnapshotInShard(shard);
}

// Wait for full sync to finish.
if (flow->fb.joinable()) {
flow->fb.join();
}

if (shard != nullptr) {
flow->saver.reset();
romange marked this conversation as resolved.
Show resolved Hide resolved

// Start stable state fiber.
flow->fb = boost::fibers::fiber(&DflyCmd::StableSyncFb, this, flow);

// TODO: Temporary solution
flow->fb.join();
}

return OpStatus::OK;
}

void DflyCmd::FullSyncFb(FlowInfo* flow) {
error_code ec;
RdbSaver* saver = flow->saver.get();
Expand All @@ -328,22 +405,34 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) {
return;
}

if (saver->Mode() != SaveMode::SUMMARY) {
// TODO: we should be able to stop earlier if requested.
ec = saver->SaveBody(nullptr);
if (ec) {
LOG(ERROR) << ec;
return;
}
// TODO: we should be able to stop earlier if requested.
romange marked this conversation as resolved.
Show resolved Hide resolved
ec = saver->SaveBody(nullptr);
if (ec) {
LOG(ERROR) << ec;
return;
}

VLOG(1) << "Sending full sync EOF";

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
LOG(ERROR) << ec;
return;
}

ec = flow->conn->socket()->Shutdown(SHUT_RDWR);
// ec = flow->conn->socket()->Shutdown(SHUT_RDWR);
}

void DflyCmd::StableSyncFb(FlowInfo* flow) {
auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) {
// TODO: Temporary solution
if (flow->conn == nullptr) return;

// TODO: Serialize event.
ReqSerializer serializer{flow->conn->socket()};
serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString()));
//CHECK(!serializer.ec());
});
}

uint32_t DflyCmd::CreateSyncSession() {
Expand Down Expand Up @@ -429,6 +518,25 @@ pair<uint32_t, shared_ptr<DflyCmd::SyncInfo>> DflyCmd::GetSyncInfoOrReply(std::s
return {sync_id, sync_it->second};
}

bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected,
RedisReplyBuilder* rb) {
if (sync_info.state != expected) {
rb->SendError(kInvalidState);
return false;
}

// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info.flows) {
if (!flow.conn) {
rb->SendError(kInvalidState);
return false;
}
}

return true;
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}
Expand Down
19 changes: 15 additions & 4 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Journal;

class DflyCmd {
public:
enum class SyncState { PREPARATION, FULL_SYNC, CANCELLED };
enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED };

struct FlowInfo {
FlowInfo() = default;
Expand Down Expand Up @@ -80,21 +80,30 @@ class DflyCmd {
// Register connection as flow for sync session.
void Flow(CmdArgList args, ConnectionContext* cntx);

// SYNC <masterid> <syncid> <flowid>
// Migrate connection to required flow thread.
// Stub: will be replcaed with full sync.
// SYNC <syncid>
// Initiate full sync.
void Sync(CmdArgList args, ConnectionContext* cntx);

// STARTSTABLE <syncid>
// Switch to stable state replication.
void StartStable(CmdArgList args, ConnectionContext* cntx);

// EXPIRE
// Check all keys for expiry.
void Expire(CmdArgList args, ConnectionContext* cntx);

// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard);

// Start stable sync in thread. Start StableSyncFB. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard);

// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow);

// Fiber that runs stable sync for each flow.
void StableSyncFb(FlowInfo* flow);

// Unregister flow. Must be called when flow disconnects.
void UnregisterFlow(FlowInfo*);

Expand All @@ -108,6 +117,8 @@ class DflyCmd {
std::pair<uint32_t, std::shared_ptr<SyncInfo>> GetSyncInfoOrReply(std::string_view id,
facade::RedisReplyBuilder* rb);

bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected, facade::RedisReplyBuilder* rb);

ServerFamily* sf_;

util::ListenerInterface* listener_;
Expand Down
12 changes: 12 additions & 0 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,14 @@ error_code RdbLoader::Load(io::Source* src) {
break;
}

if (type == RDB_OPCODE_FULLSYNC_END) {
VLOG(0) << "GOT FULLSYNC OPCODE";
if (fullsyncb)
fullsyncb();
// notify full sync end
continue;
}

if (type == RDB_OPCODE_SELECTDB) {
unsigned dbid = 0;

Expand Down Expand Up @@ -1815,6 +1823,10 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms)
continue;

auto [fit, _] = db_slice.FindExt(db_cntx, item.key);
if (IsValid(fit))
db_slice.Del(db_cntx.db_index, fit);

auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ineffective but its just a temporary stub.

The issue with AddEntry is that I give up my primevalue by moving it. In theory it should:

  • insert or fail
  • update if failed

Because looking it up separately would make the hot path a lot slower.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added AddOrUpdate to DbSlice as a possible solution


if (!added) {
Expand Down
2 changes: 2 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}

std::function<void()> fullsyncb;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should the loader be extended to react to events? Callback?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which events? what are you trying to do? (I have not reviewed the code yet).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to fullsync cut opcodes. The loader only finishes and unblocks once it has received the EOF code, but we need to record the FULLSYNC_CUT code earlier

private:
struct ObjSettings;
std::error_code LoadKeyValPair(int type, ObjSettings* settings);
Expand Down
20 changes: 14 additions & 6 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{};
}

error_code RdbSerializer::SendFullSyncCut() {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
return FlushMem();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need FlushMem here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I want it to be sent immediately to the replica. Can't it be stuck inside the buffer if I don't flush it? It seems like it can

Copy link
Collaborator

@romange romange Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not really matter. but you call this twice: once in summary flow - it does not matter there, and the second place calls flushmem right after as far as i remember

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I call it twice for the SUMMARY flow, right. But don't forget about the snapshot that sends a FS cut as well. It can be stuck there

}

// TODO: if buf is large enough, it makes sense to write both mem_buf and buf
// directly to sink_.
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
Expand Down Expand Up @@ -918,12 +923,15 @@ error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR(impl_->serializer()->FlushMem());

VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();

error_code io_error = impl_->ConsumeChannel();
if (io_error) {
VLOG(1) << "io error " << io_error;
return io_error;
if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to send it here as well? it's not for a flow channel, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The io thread runs the rdb saver in SUMMARY mode to transfer only the header and lua scripts. I just decided it'll be more consistent if all threads use this opcode without any corner cases. So I need the summary mode to write is as well.

} else {
VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();
error_code io_error = impl_->ConsumeChannel();
if (io_error) {
VLOG(1) << "io error " << io_error;
return io_error;
}
}

RETURN_ON_ERR(SaveEpilog());
Expand Down
2 changes: 2 additions & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class RdbSerializer {
// for the dump command - thus it is public function
std::error_code SaveValue(const PrimeValue& pv);

std::error_code SendFullSyncCut();

private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const PrimeValue& pv);
Expand Down
Loading