-
Notifications
You must be signed in to change notification settings - Fork 981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): Switch to stable state replication #473
Conversation
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
src/server/rdb_load.h
Outdated
std::function<void()> fullsyncb; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should the loader be extended to react to events? Callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which events? what are you trying to do? (I have not reviewed the code yet).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to fullsync cut opcodes. The loader only finishes and unblocks once it has received the EOF code, but we need to record the FULLSYNC_CUT code earlier
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
// pass leftover data from the loader. | ||
io::PrefixSource chained(loader.Leftover(), &ps); | ||
VLOG(1) << "Before reading from chained stream"; | ||
io::Result<size_t> eof_res = chained.Read(io::MutableBytes{buf.get(), eof_token.size()}); | ||
if (!eof_res || *eof_res != eof_token.size()) { | ||
unique_ptr<uint8_t[]> buf{new uint8_t[eof_token.size()]}; | ||
|
||
io::Result<size_t> res = | ||
chained_tail.ReadAtLeast(io::MutableBytes{buf.get(), eof_token.size()}, eof_token.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You had a bug here: We need to use ReadAtLeast
because the source might return less than n in a single call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure? based on the comments in io.h
ReadAtLeast(buf, buf.size())
is equivalent to Read(buf)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read can return less than buf.size() in a single call (see the PrefixSource read, it returns only the prefix even if its smaller than buf.size())
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
1086978
to
e495985
Compare
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
0e08789
to
62b6f48
Compare
src/server/rdb_load.cc
Outdated
auto [fit, _] = db_slice.FindExt(db_cntx, item.key); | ||
if (IsValid(fit)) | ||
db_slice.Del(db_cntx.db_index, fit); | ||
|
||
auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's ineffective but its just a temporary stub.
The issue with AddEntry is that I give up my primevalue by moving it. In theory it should:
- insert or fail
- update if failed
Because looking it up separately would make the hot path a lot slower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added AddOrUpdate to DbSlice as a possible solution
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
src/server/transaction.cc
Outdated
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); | ||
sd.local_mask |= KEYLOCK_ACQUIRED; | ||
DCHECK(!lock_acquired); // Because CheckLock above failed. | ||
// DCHECK(!lock_acquired); // Because CheckLock above failed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a careful check: If we failed to run quickly due to the shard being locked, then this lock acquisition doesn't fail. So I commented the DCHECK out... But does the logic change in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's all good. you can remove this DCHECK.
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
ff47e04
to
d57a17e
Compare
I added a line to the rdb.h file and now the ci-linter complains about formatting. Shouldn't this part of code be an exclusion? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the commit message and the pr description accordingly.
VLOG(1) << "io error " << io_error; | ||
return io_error; | ||
if (save_mode_ == SaveMode::SUMMARY) { | ||
impl_->serializer()->SendFullSyncCut(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to send it here as well? it's not for a flow channel, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The io thread runs the rdb saver in SUMMARY
mode to transfer only the header and lua scripts. I just decided it'll be more consistent if all threads use this opcode without any corner cases. So I need the summary mode to write is as well.
@@ -581,6 +582,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { | |||
return error_code{}; | |||
} | |||
|
|||
error_code RdbSerializer::SendFullSyncCut() { | |||
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END)); | |||
return FlushMem(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need FlushMem here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I want it to be sent immediately to the replica. Can't it be stuck inside the buffer if I don't flush it? It seems like it can
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not really matter. but you call this twice: once in summary flow - it does not matter there, and the second place calls flushmem right after as far as i remember
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I call it twice for the SUMMARY flow, right. But don't forget about the snapshot that sends a FS cut as well. It can be stuck there
src/server/transaction.cc
Outdated
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); | ||
sd.local_mask |= KEYLOCK_ACQUIRED; | ||
DCHECK(!lock_acquired); // Because CheckLock above failed. | ||
// DCHECK(!lock_acquired); // Because CheckLock above failed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's all good. you can remove this DCHECK.
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
@@ -126,6 +126,9 @@ void SliceSnapshot::SerializeEntriesFb() { | |||
mu_.lock(); | |||
mu_.unlock(); | |||
|
|||
CHECK(!rdb_serializer_->SendFullSyncCut()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I have no error propagation... I can just remove it
Signed-off-by: Vladislav <vlad@dragonflydb.io>
Can I get a reply on this? 🤨 😅 #473 (comment) |
You've been just volunteered to check whether it's possible to exclude dirs in pre-commit checks. Инициатива наказуема! |
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Migrating replication branch on main, part 6
Add basic stable state replication.
Mixed implementation. The general concept it taken from my Replication MVP.