Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): write journal record with optional await based on flag… #791

Merged
merged 4 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ bool ParseDouble(string_view src, double* value) {

void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt,
bool multi_commands) {
op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands);
op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands,
false);
}

void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
Expand All @@ -174,7 +175,13 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
void RecordExpiry(DbIndex dbid, string_view key) {
auto journal = EngineShard::tlocal()->journal();
CHECK(journal);
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key}));
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key}), false);
}

void TriggerJournalWriteToSink() {
auto journal = EngineShard::tlocal()->journal();
CHECK(journal);
journal->RecordEntry(0, journal::Op::NOOP, 0, 0, {}, true);
}

#define ADD(x) (x) += o.x
Expand Down
4 changes: 4 additions & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);
// key.
void RecordExpiry(DbIndex dbid, std::string_view key);

// Trigger journal write to sink, no journal record will be added to journal.
// Must be called from shard thread of journal to sink.
void TriggerJournalWriteToSink();

struct TieredStats {
size_t tiered_reads = 0;
size_t tiered_writes = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,11 @@ void EngineShard::Heartbeat() {
db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget());
}
}
// Journal entries for expired entries are not writen to socket in the loop above.
// Trigger write to socket when loop finishes.
if (auto journal = EngineShard::tlocal()->journal(); journal) {
TriggerJournalWriteToSink();
}
}

void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
Expand Down
14 changes: 12 additions & 2 deletions src/server/io_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,24 @@ io::Result<size_t> BufferedStreamerBase::WriteSome(const iovec* vec, uint32_t le
return io::BufSink{&producer_buf_}.WriteSome(vec, len);
}

void BufferedStreamerBase::NotifyWritten() {
void BufferedStreamerBase::NotifyWritten(bool allow_await) {
if (IsStopped())
return;
buffered_++;
// Wake up the consumer.
waker_.notify();
// Block if we're stalled because the consumer is not keeping up.
waker_.await([this]() { return !IsStalled() || IsStopped(); });
if (allow_await) {
waker_.await([this]() { return !IsStalled() || IsStopped(); });
}
}

void BufferedStreamerBase::AwaitIfWritten() {
if (IsStopped())
return;
if (buffered_) {
waker_.await([this]() { return !IsStalled() || IsStopped(); });
}
}

error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) {
Expand Down
7 changes: 5 additions & 2 deletions src/server/io_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ class BufferedStreamerBase : public io::Sink {
io::Result<size_t> WriteSome(const iovec* vec, uint32_t len) override;

// Report that a batch of data has been written and the consumer can be woken up.
// Blocks if the consumer if not keeping up.
void NotifyWritten();
// Blocks if the consumer if not keeping up, if allow_await is set to true.
void NotifyWritten(bool allow_await);

// Blocks the if the consumer if not keeping up.
void AwaitIfWritten();

// Report producer finished.
void Finalize();
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ bool Journal::EnterLameDuck() {
}

void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
Entry::Payload payload) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)});
Entry::Payload payload, bool await) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)}, await);
}

/*
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class Journal {
*/
LSN GetLsn() const;

void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload);
void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload,
bool await);

private:
mutable boost::fibers::mutex state_mu_;
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ error_code JournalSlice::Close() {
return ec;
}

void JournalSlice::AddLogRecord(const Entry& entry) {
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
DCHECK(ring_buffer_);
iterating_cb_arr_ = true;
for (const auto& k_v : change_cb_arr_) {
k_v.second(entry);
k_v.second(entry, await);
}
iterating_cb_arr_ = false;

Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class JournalSlice {
return bool(shard_file_);
}

void AddLogRecord(const Entry& entry);
void AddLogRecord(const Entry& entry, bool await);

uint32_t RegisterOnChange(ChangeCallback cb);
void UnregisterOnChange(uint32_t);
Expand Down
15 changes: 10 additions & 5 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ namespace dfly {

void JournalStreamer::Start(io::Sink* dest) {
write_fb_ = util::fibers_ext::Fiber(&JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) {
writer_.Write(entry);
record_cnt_.fetch_add(1, std::memory_order_relaxed);
NotifyWritten();
});
journal_cb_id_ =
journal_->RegisterOnChange([this](const journal::Entry& entry, bool allow_await) {
if (entry.opcode == journal::Op::NOOP) {
// No recode to write, just await if data was written so consumer will read the data.
return AwaitIfWritten();
}
writer_.Write(entry);
record_cnt_.fetch_add(1, std::memory_order_relaxed);
NotifyWritten(allow_await);
});
}

uint64_t JournalStreamer::GetRecordCount() const {
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct ParsedEntry : public EntryBase {
CmdData cmd;
};

using ChangeCallback = std::function<void(const Entry&)>;
using ChangeCallback = std::function<void(const Entry&, bool await)>;

} // namespace journal
} // namespace dfly
4 changes: 3 additions & 1 deletion src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and
// no database switch can be performed between those two calls, because they are part of one
// transaction.
void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
// OnJournalEntry registers for changes in journal, the journal change function signature is
// (const journal::Entry& entry, bool await) In snapshot flow we dont use the await argument.
void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_await_arg) {
// We ignore non payload entries like EXEC because we have no transactional ordering during
// LOAD phase on replica.
if (!entry.HasPayload()) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SliceSnapshot {
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);

// Journal listener
void OnJournalEntry(const journal::Entry& entry);
void OnJournalEntry(const journal::Entry& entry, bool unused_await_arg);

// Close dest channel if not closed yet.
void CloseRecordChannel();
Expand Down
11 changes: 6 additions & 5 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
auto journal = shard->journal();

if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()] == true) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {});
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}, true);
}

if (multi_->multi_opts & CO::GLOBAL_TRANS) {
Expand Down Expand Up @@ -1178,18 +1178,19 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
auto cmd = facade::ToSV(cmd_with_full_args_.front());
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
}
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false);
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true);
}

void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt, bool multi_commands) const {
uint32_t shard_cnt, bool multi_commands,
bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);
if (multi_) {
multi_->shard_journal_write[shard->shard_id()] = true;
}
auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload));
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload), allow_await);
}

void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const {
Expand All @@ -1198,7 +1199,7 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
}
auto journal = shard->journal();
CHECK(journal);
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {});
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false);
}

void Transaction::BreakOnShutdown() {
Expand Down
6 changes: 1 addition & 5 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ class Transaction {
// Cancel all blocking watches on shutdown. Set COORD_CANCELLED.
void BreakOnShutdown();

// Log a journal entry on shard with payload and shard count.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function was not implemented

uint32_t shard_cnt) const;

// In some cases for non auto-journaling commands we want to enable the auto journal flow.
void RenableAutoJournal() {
renabled_auto_journal_.store(true, std::memory_order_relaxed);
Expand Down Expand Up @@ -209,7 +205,7 @@ class Transaction {
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
// entry.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
bool multi_commands) const;
bool multi_commands, bool allow_await) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;

private:
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ async def test_expiry(df_local_factory, n_keys=1000):
# Set key differnt expries times in ms
pipe = c_master.pipeline(transaction=True)
for k, _ in gen_test_data(n_keys):
ms = random.randint(100, 500)
ms = random.randint(20, 500)
pipe.pexpire(k, ms)
await pipe.execute()

Expand Down