Skip to content

Commit

Permalink
chore: support qlist compression when accounting for memory
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed Nov 29, 2024
1 parent a4b3724 commit 2b1dfe1
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 34 deletions.
65 changes: 35 additions & 30 deletions src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ inline ssize_t NodeSetEntry(quicklistNode* node, uint8_t* entry) {
bool CompressNode(quicklistNode* node) {
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
DCHECK(!node->dont_compress);
#ifdef SERVER_TEST
node->attempted_compress = 1;
#endif

/* validate that the node is neither
* tail nor head (it has prev and next)*/
Expand Down Expand Up @@ -219,12 +216,13 @@ bool CompressNode(quicklistNode* node) {
return true;
}

bool CompressNodeIfNeeded(quicklistNode* node) {
ssize_t CompressNodeIfNeeded(quicklistNode* node) {
DCHECK(node);
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) {
return CompressNode(node);
if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
}
return false;
return 0;
}

/* Uncompress the listpack in 'node' and update encoding details.
Expand All @@ -247,17 +245,24 @@ bool DecompressNode(bool recompress, quicklistNode* node) {

/* Decompress only compressed nodes.
recompress: if true, the node will be marked for recompression after decompression.
returns by how much the size of the node has increased.
*/
void DecompressNodeIfNeeded(bool recompress, quicklistNode* node) {
ssize_t DecompressNodeIfNeeded(bool recompress, quicklistNode* node) {
if ((node) && (node)->encoding == QUICKLIST_NODE_ENCODING_LZF) {
DecompressNode(recompress, node);
size_t compressed_sz = ((quicklistLZF*)node->entry)->sz;
if (DecompressNode(recompress, node)) {
return node->sz - compressed_sz;
}
}
return 0;
}

void RecompressOnly(quicklistNode* node) {
ssize_t RecompressOnly(quicklistNode* node) {
if (node->recompress && !node->dont_compress) {
CompressNode(node);
if (CompressNode(node))
return ((quicklistLZF*)node->entry)->sz - node->sz;
}
return 0;
}

quicklistNode* SplitNode(quicklistNode* node, int offset, bool after, ssize_t* diff) {
Expand Down Expand Up @@ -564,7 +569,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
if (QL_NODE_IS_PLAIN(node) || (at_tail && after) || (at_head && !after)) {
InsertPlainNode(node, elem, insert_opt);
} else {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0;
quicklistNode* new_node = SplitNode(node, it.offset_, after, &diff_existing);
quicklistNode* entry_node = InsertPlainNode(node, elem, insert_opt);
Expand All @@ -576,32 +581,32 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {

/* Now determine where and how to insert the new element */
if (!full) {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
uint8_t* new_entry = LP_Insert(node->entry, elem, it.zi_, after ? LP_AFTER : LP_BEFORE);
malloc_size_ += NodeSetEntry(node, new_entry);
node->count++;
RecompressOnly(node);
malloc_size_ += RecompressOnly(node);
} else {
bool insert_tail = at_tail && after;
bool insert_head = at_head && !after;
if (insert_tail && avail_next) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
auto* new_node = node->next;
DecompressNodeIfNeeded(true, new_node);
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Prepend(new_node->entry, elem));
new_node->count++;
RecompressOnly(new_node);
RecompressOnly(node);
malloc_size_ += RecompressOnly(new_node);
malloc_size_ += RecompressOnly(node);
} else if (insert_head && avail_prev) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
auto* new_node = node->prev;
DecompressNodeIfNeeded(true, new_node);
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
malloc_size_ += NodeSetEntry(new_node, LP_Append(new_node->entry, elem));
new_node->count++;
RecompressOnly(new_node);
RecompressOnly(node);
malloc_size_ += RecompressOnly(new_node);
malloc_size_ += RecompressOnly(node);
} else if (insert_tail || insert_head) {
/* If we are: full, and our prev/next has no available space, then:
* - create new node and attach to qlist */
Expand All @@ -610,7 +615,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
} else {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
ssize_t diff_existing = 0;
auto* new_node = SplitNode(node, it.offset_, after, &diff_existing);
auto func = after ? LP_Prepend : LP_Append;
Expand Down Expand Up @@ -710,8 +715,8 @@ void QList::Compress(quicklistNode* node) {
int depth = 0;
int in_depth = 0;
while (depth++ < compress_) {
DecompressNodeIfNeeded(false, forward);
DecompressNodeIfNeeded(false, reverse);
malloc_size_ += DecompressNodeIfNeeded(false, forward);
malloc_size_ += DecompressNodeIfNeeded(false, reverse);

if (forward == node || reverse == node)
in_depth = 1;
Expand All @@ -726,11 +731,11 @@ void QList::Compress(quicklistNode* node) {
}

if (!in_depth && node)
CompressNodeIfNeeded(node);
malloc_size_ += CompressNodeIfNeeded(node);

/* At this point, forward and reverse are one node beyond depth */
CompressNodeIfNeeded(forward);
CompressNodeIfNeeded(reverse);
malloc_size_ += CompressNodeIfNeeded(forward);
malloc_size_ += CompressNodeIfNeeded(reverse);
}

/* Attempt to merge listpacks within two nodes on either side of 'center'.
Expand Down Expand Up @@ -801,8 +806,8 @@ quicklistNode* QList::MergeNodes(quicklistNode* center) {
* Returns the input node picked to merge against or NULL if
* merging was not possible. */
quicklistNode* QList::ListpackMerge(quicklistNode* a, quicklistNode* b) {
DecompressNodeIfNeeded(false, a);
DecompressNodeIfNeeded(false, b);
malloc_size_ += DecompressNodeIfNeeded(false, a);
malloc_size_ += DecompressNodeIfNeeded(false, b);
if ((lpMerge(&a->entry, &b->entry))) {
/* We merged listpacks! Now remove the unused quicklistNode. */
quicklistNode *keep = NULL, *nokeep = NULL;
Expand Down Expand Up @@ -1047,14 +1052,14 @@ bool QList::Erase(const long start, unsigned count) {
if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
DelNode(node);
} else {
DecompressNodeIfNeeded(true, node);
malloc_size_ += DecompressNodeIfNeeded(true, node);
malloc_size_ += NodeSetEntry(node, lpDeleteRange(node->entry, offset, del));
node->count -= del;
count_ -= del;
if (node->count == 0) {
DelNode(node);
} else {
RecompressOnly(node);
malloc_size_ += RecompressOnly(node);
}
}

Expand All @@ -1081,7 +1086,7 @@ bool QList::Iterator::Next() {
int plain = QL_NODE_IS_PLAIN(current_);
if (!zi_) {
/* If !zi, use current index. */
DecompressNodeIfNeeded(true, current_);
const_cast<QList*>(owner_)->malloc_size_ += DecompressNodeIfNeeded(true, current_);
if (ABSL_PREDICT_FALSE(plain))
zi_ = current_->entry;
else
Expand Down
44 changes: 40 additions & 4 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct ObjInfo {

// for lists - how many nodes do they have.
unsigned num_nodes = 0;
unsigned num_compressed = 0;

enum LockStatus { NONE, S, X } lock_status = NONE;

Expand Down Expand Up @@ -322,6 +323,14 @@ ObjInfo InspectOp(ConnectionContext* cntx, string_view key) {
if (pv.ObjType() == OBJ_LIST && pv.Encoding() == kEncodingQL2) {
const QList* qlist = static_cast<const QList*>(pv.RObjPtr());
oinfo.num_nodes = qlist->node_count();
auto* node = qlist->Head();

while (node) {
if (node->encoding == QUICKLIST_NODE_ENCODING_LZF) {
++oinfo.num_compressed;
}
node = node->next;
}
}

if (pv.IsExternal()) {
Expand Down Expand Up @@ -357,13 +366,31 @@ OpResult<ValueCompressInfo> EstimateCompression(ConnectionContext* cntx, string_
}

// Only strings are supported right now.
if (it->second.ObjType() != OBJ_STRING) {
if (it->second.ObjType() != OBJ_STRING && it->second.ObjType() != OBJ_LIST) {
return OpStatus::WRONG_TYPE;
}
ValueCompressInfo info;

if (it->second.ObjType() == OBJ_LIST) {
if (it->second.Encoding() != kEncodingQL2) {
return OpStatus::WRONG_TYPE;
}

const QList* src = static_cast<const QList*>(it->second.RObjPtr());
info.raw_size = src->MallocUsed(true);
QList qlist(-2, 1);
auto copy_cb = [&](QList::Entry entry) {
qlist.Push(entry.view(), QList::HEAD);
return true;
};
src->Iterate(copy_cb, 0, -1);
info.compressed_size = qlist.MallocUsed(true);
return info;
}

string scratch;
string_view value = it->second.GetSlice(&scratch);

ValueCompressInfo info;
info.raw_size = value.size();
info.compressed_size = info.raw_size;

Expand Down Expand Up @@ -849,7 +876,10 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
ShardId sid = Shard(key, ess.size());
VLOG(1) << "DebugCmd::Inspect " << key;

bool check_compression = (args.size() == 1) && ArgS(args, 0) == "COMPRESS";
bool check_compression = false;
if (args.size() == 1) {
check_compression = absl::AsciiStrToUpper(ArgS(args, 0)) == "COMPRESS";
}
string resp;

if (check_compression) {
Expand Down Expand Up @@ -886,7 +916,13 @@ void DebugCmd::Inspect(string_view key, CmdArgList args, facade::SinkReplyBuilde
}

if (res.num_nodes) {
StrAppend(&resp, " ns:", res.num_nodes);
// node count
StrAppend(&resp, " nc:", res.num_nodes);
}

if (res.num_compressed) {
// compressed nodes
StrAppend(&resp, " cn:", res.num_compressed);
}

if (res.lock_status != ObjInfo::NONE) {
Expand Down

0 comments on commit 2b1dfe1

Please sign in to comment.