Skip to content

Commit

Permalink
Remove check-fail in ExpireIfNeeded and introduce DFLY LOAD (#2699)
Browse files Browse the repository at this point in the history
* chore: prevent crashing upon inconsistent expiry table

Also, introduce "DFLY LOAD <filename>" command in addition to "DEBUG LOAD"
as an official command to load snapshots into the running server.


---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Mar 12, 2024
1 parent 8a5ed44 commit 954780e
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 21 deletions.
27 changes: 18 additions & 9 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "strings/human_readable.h"
#include "util/fibers/stacktrace.h"

ABSL_FLAG(bool, enable_heartbeat_eviction, true,
"Enable eviction during heartbeat when memory is under pressure.");
Expand Down Expand Up @@ -1071,20 +1072,28 @@ void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key,
}

DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) {
DCHECK(it->second.HasExpire());
if (!it->second.HasExpire()) {
LOG(ERROR) << "Invalid call to ExpireIfNeeded";
return {it, ExpireIterator{}};
}

auto& db = db_arr_[cntx.db_index];

auto expire_it = db->expire.Find(it->first);

CHECK(IsValid(expire_it));

// TODO: to employ multi-generation update of expire-base and the underlying values.
time_t expire_time = ExpireTime(expire_it);

// Never do expiration on replica or if expiration is disabled.
if (time_t(cntx.time_now_ms) < expire_time || owner_->IsReplica() || !expire_allowed_)
return {it, expire_it};
if (IsValid(expire_it)) {
// TODO: to employ multi-generation update of expire-base and the underlying values.
time_t expire_time = ExpireTime(expire_it);

// Never do expiration on replica or if expiration is disabled.
if (time_t(cntx.time_now_ms) < expire_time || owner_->IsReplica() || !expire_allowed_)
return {it, expire_it};
} else {
LOG(ERROR) << "Internal error, entry " << it->first.ToString()
<< " not found in expire table, db_index: " << cntx.db_index
<< ", expire table size: " << db->expire.size()
<< ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace();
}
string tmp_key_buf;
string_view tmp_key;

Expand Down
13 changes: 9 additions & 4 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,14 @@ void DebugCmd::Replica(CmdArgList args) {
}

void DebugCmd::Load(string_view filename) {
if (!ServerState::tlocal()->is_master) {
return cntx_->SendError("Replica cannot load data");
}

auto new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state.first != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state.first) << " in progress, ignored";
return;
return cntx_->SendError("Could not load file");
}

absl::Cleanup rev_state = [this] {
Expand All @@ -561,10 +565,11 @@ void DebugCmd::Load(string_view filename) {

auto fut_ec = sf_.Load(path.generic_string());
if (fut_ec.valid()) {
ec = fut_ec.get();
GenericError ec = fut_ec.get();
if (ec) {
LOG(INFO) << "Could not load file " << ec.message();
return cntx_->SendError(ec.message());
string msg = ec.Format();
LOG(WARNING) << "Could not load file " << msg;
return cntx_->SendError(msg);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/server/debugcmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class DebugCmd {

void Run(CmdArgList args);

// A public function that loads a snapshot.
void Load(std::string_view filename);

static void Shutdown();

private:
Expand All @@ -39,7 +42,7 @@ class DebugCmd {

void Reload(CmdArgList args);
void Replica(CmdArgList args);
void Load(std::string_view filename);

void Exec();
void Inspect(std::string_view key, CmdArgList args);
void Watched();
Expand Down
10 changes: 6 additions & 4 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "facade/cmd_arg_parser.h"
#include "facade/dragonfly_connection.h"
#include "facade/dragonfly_listener.h"
#include "server/debugcmd.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
Expand Down Expand Up @@ -109,10 +110,6 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);

/*if (sub_cmd == "JOURNAL" && args.size() >= 2) {
return Journal(args, cntx);
}*/

if (sub_cmd == "THREAD") {
return Thread(args, cntx);
}
Expand Down Expand Up @@ -141,6 +138,11 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return ReplicaOffset(args, cntx);
}

if (sub_cmd == "LOAD" && args.size() == 2) {
DebugCmd debug_cmd{sf_, cntx};
debug_cmd.Load(ArgS(args, 1));
return;
}
cntx->SendError(kSyntaxErr);
}

Expand Down
10 changes: 7 additions & 3 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,13 @@ TEST_F(RdbTest, ReloadTtl) {
TEST_F(RdbTest, ReloadExpired) {
Run({"set", "key", "val"});
Run({"expire", "key", "2"});
sleep(2);
Run({"debug", "reload"});
auto resp = Run({"get", "key"});
RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");
auto save_info = service_->server_family().GetLastSaveInfo();
AdvanceTime(2000);
resp = Run({"debug", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");
resp = Run({"get", "key"});
ASSERT_THAT(resp, ArgType(RespExpr::NIL));
}

Expand Down

0 comments on commit 954780e

Please sign in to comment.