Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): adding matrices for memory defrag #535

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,9 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
this_fiber::sleep_for(100ms);
EXPECT_EQ(shard->GetDefragStats().success_count, 0);
EXPECT_EQ(shard->stats().defrag_realloc_total, 0);
// we are expecting to have at least one try by now
EXPECT_GT(shard->GetDefragStats().tries, 0);
EXPECT_GT(shard->stats().defrag_attempt_total, 0);
});

ArgSlice delete_cmd(keys);
Expand All @@ -808,16 +808,16 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
// a "busy wait" to ensure that memory defragmentations was successful:
// the task ran and did it work
auto stats = shard->GetDefragStats();
auto stats = shard->stats();
for (int i = 0; i < kMaxDefragTriesForTests; i++) {
stats = shard->GetDefragStats();
if (stats.success_count > 0) {
stats = shard->stats();
if (stats.defrag_realloc_total > 0) {
break;
}
this_fiber::sleep_for(220ms);
}
// make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.success_count, 0);
EXPECT_GT(stats.defrag_realloc_total, 0);
});
}

Expand Down
14 changes: 8 additions & 6 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ uint64_t TEST_current_time_ms = 0;
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
ooo_runs += o.ooo_runs;
quick_runs += o.quick_runs;
defrag_attempt_total += o.defrag_attempt_total;
defrag_realloc_total += o.defrag_realloc_total;

return *this;
}
Expand Down Expand Up @@ -115,7 +117,7 @@ bool EngineShard::DoDefrag() {
DCHECK(slice.IsDbValid(kDefaultDbIndex));
auto [prime_table, expire_table] = slice.GetTables(kDefaultDbIndex);
PrimeTable::Cursor cur = defrag_state_.cursor;
uint64_t defrag_count = 0;
uint64_t reallocations = 0;
unsigned traverses_count = 0;

do {
Expand All @@ -124,15 +126,15 @@ bool EngineShard::DoDefrag() {
// seats on underutilized page of memory, and if so, do it.
bool did = it->second.DefragIfNeeded(threshold);
if (did) {
defrag_count++;
reallocations++;
}
});
traverses_count++;
} while (traverses_count < kMaxTraverses && cur);

defrag_state_.cursor = cur.value();
if (defrag_count > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << defrag_count
if (reallocations > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << reallocations
<< " times, did it in " << traverses_count << " cursor is at the "
<< (defrag_state_.cursor == 0 ? "end" : "in progress");
} else {
Expand All @@ -141,8 +143,8 @@ bool EngineShard::DoDefrag() {
<< (defrag_state_.cursor == 0 ? "end" : "in progress")
<< " but no location for defrag were found";
}
defrag_state_.stats.success_count += defrag_count;
defrag_state_.stats.tries++;
stats_.defrag_realloc_total += reallocations;
stats_.defrag_attempt_total++;
return defrag_state_.cursor > kCursorDoneState;
}

Expand Down
12 changes: 2 additions & 10 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ class EngineShard {
struct Stats {
uint64_t ooo_runs = 0; // how many times transactions run as OOO.
uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run.
uint64_t defrag_attempt_total = 0;
uint64_t defrag_realloc_total = 0;

Stats& operator+=(const Stats&);
};

struct DefragStats {
uint64_t success_count = 0; // how many objects were moved
uint64_t tries = 0; // how many times we tried
};

// EngineShard() is private down below.
~EngineShard();

Expand Down Expand Up @@ -146,17 +143,12 @@ class EngineShard {
journal_ = j;
}

const DefragStats& GetDefragStats() const {
return defrag_state_.stats;
}

void TEST_EnableHeartbeat();

private:
struct DefragTaskState {
// we will add more data members later
uint64_t cursor = 0u;
DefragStats stats;

// check the current threshold and return true if
// we need to do the de-fermentation
Expand Down
2 changes: 2 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("total_writes_processed", m.conn_stats.io_write_cnt);
append("async_writes_count", m.conn_stats.async_writes_cnt);
append("parser_err_count", m.conn_stats.parser_err_cnt);
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
}

if (should_enter("TIERED", true)) {
Expand Down