-
Notifications
You must be signed in to change notification settings - Fork 981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): write journal record with optional await based on flag… #791
Conversation
… issue #788 Signed-off-by: adi_holden <adi@dragonflydb.io>
Signed-off-by: adi_holden <adi@dragonflydb.io>
@@ -120,10 +120,6 @@ class Transaction { | |||
// Cancel all blocking watches on shutdown. Set COORD_CANCELLED. | |||
void BreakOnShutdown(); | |||
|
|||
// Log a journal entry on shard with payload and shard count. | |||
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function was not implemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Lets store the field inside the entry, then we won't need all the other changes and ambiguous bool fields
- Please review the journal streamer, I assume the code does not quite do what we expect of it
src/server/io_utils.cc
Outdated
waker_.await([this]() { return !IsStalled() || IsStopped(); }); | ||
waker_.await([this, allow_await]() { return !allow_await || !IsStalled() || IsStopped(); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A wrapping if would more clearly show that we're not awaiting
src/server/io_utils.h
Outdated
// Blocks the if the consumer if not keeping up. | ||
void WakeIfWritten(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is a bit misleading
src/server/transaction.h
Outdated
@@ -209,7 +205,7 @@ class Transaction { | |||
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final | |||
// entry. | |||
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt, | |||
bool multi_commands) const; | |||
bool multi_commands, bool flag) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets not call this flag, lets either use explicitly bool wake or create an enum with flags
src/server/journal/types.h
Outdated
using ChangeCallback = | ||
std::variant<std::function<void(const Entry&)>, std::function<void(const Entry&, bool flag)>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its used in one or two places, we can just make the others accept the flags
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use flags, we can actually make them part of the entry instead of passing them around everywhere
src/server/io_utils.cc
Outdated
void BufferedStreamerBase::WakeIfWritten() { | ||
if (IsStopped()) | ||
return; | ||
if (buffered_) { | ||
waker_.await([this]() { return !IsStalled() || IsStopped(); }); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wake by using waker_.notify()
, the only thing that this can do is blocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I will change the function name, we already notiflied the consumer, we just need to block now till consumer runs
src/server/journal/journal_slice.cc
Outdated
void JournalSlice::AddLogRecord(const Entry& entry) { | ||
template <class... Ts> struct Overloaded : Ts... { using Ts::operator()...; }; | ||
template <class... Ts> Overloaded(Ts...) -> Overloaded<Ts...>; | ||
|
||
void JournalSlice::AddLogRecord(const Entry& entry, bool flag_val) { | ||
DCHECK(ring_buffer_); | ||
iterating_cb_arr_ = true; | ||
for (const auto& k_v : change_cb_arr_) { | ||
k_v.second(entry); | ||
std::visit(Overloaded{[entry](std::function<void(const Entry&)> cb) { cb(entry); }, | ||
[entry, flag_val](std::function<void(const Entry&, bool flag)> cb) { | ||
cb(entry, flag_val); | ||
}}, | ||
k_v.second); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then we won't need this monster
@dranikpg I dont understand this comment. What do we expect of the journal steamer that it does not do.. |
if (entry.opcode == journal::Op::NOOP) { | ||
return WakeIfWritten(); // No recode to write, just wake the consumer if needed. | ||
// No recode to write, just await if data was written so consumer will read the data. | ||
return AwaitIfWritten(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only nit is that NOOP now implicitly means 'flush', but we can think about it later
… issue #788
Signed-off-by: adi_holden adi@dragonflydb.io