Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into robj/fallocate-blockd…
Browse files Browse the repository at this point in the history
…ev-fix
  • Loading branch information
rtjohnso committed May 16, 2024
2 parents a1873ab + ac426e4 commit ab286ad
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
3 changes: 2 additions & 1 deletion src/splinterdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "splinterdb/splinterdb.h"
#include "platform.h"
#include "clockcache.h"
#include "platform_linux/platform.h"
#include "rc_allocator.h"
#include "trunk.h"
#include "btree_private.h"
Expand Down Expand Up @@ -215,7 +216,7 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN
cfg.use_log,
cfg.use_stats,
FALSE,
NULL);
Platform_default_log_handle);
if (!SUCCESS(rc)) {
return rc;
}
Expand Down
52 changes: 41 additions & 11 deletions src/trunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ trunk_log_node_if_enabled(platform_stream_handle *stream,
typedef struct ONDISK trunk_super_block {
uint64 root_addr; // Address of the root of the trunk for the instance
// referenced by this superblock.
uint64 next_node_id;
uint64 meta_tail;
uint64 log_addr;
uint64 log_meta_addr;
Expand Down Expand Up @@ -527,6 +528,7 @@ typedef struct ONDISK trunk_bundle {
*-----------------------------------------------------------------------------
*/
typedef struct ONDISK trunk_hdr {
uint64 node_id;
uint16 num_pivot_keys; // number of used pivot keys (== num_children + 1)
uint16 height; // height of the node
uint64 pivot_generation; // counter incremented when new pivots are added
Expand Down Expand Up @@ -712,6 +714,7 @@ struct trunk_compact_bundle_req {
uint64 addr;
key_buffer start_key;
key_buffer end_key;
uint64 node_id;
uint16 height;
uint16 bundle_no;
trunk_compaction_type type;
Expand Down Expand Up @@ -925,9 +928,10 @@ trunk_set_super_block(trunk_handle *spl,
wait = 1;
cache_lock(spl->cc, super_page);

super = (trunk_super_block *)super_page->data;
super->root_addr = spl->root_addr;
super->meta_tail = mini_meta_tail(&spl->mini);
super = (trunk_super_block *)super_page->data;
super->root_addr = spl->root_addr;
super->next_node_id = spl->next_node_id;
super->meta_tail = mini_meta_tail(&spl->mini);
if (spl->cfg.use_log) {
if (spl->log) {
super->log_addr = log_addr(spl->log);
Expand Down Expand Up @@ -3939,8 +3943,7 @@ trunk_compact_bundle_node_has_split(trunk_handle *spl,
trunk_compact_bundle_req *req,
trunk_node *node)
{
return trunk_key_compare(
spl, key_buffer_key(&req->end_key), trunk_max_key(spl, node));
return req->node_id != node->hdr->node_id;
}

static inline platform_status
Expand Down Expand Up @@ -4463,6 +4466,8 @@ trunk_flush_into_bundle(trunk_handle *spl, // IN
key_buffer_init_from_key(
&req->end_key, spl->heap_id, trunk_max_key(spl, child));

req->node_id = child->hdr->node_id;

uint16 num_children = trunk_num_children(spl, child);
for (uint16 pivot_no = 0; pivot_no < num_children; pivot_no++) {
trunk_pivot_data *pdata = trunk_get_pivot_data(spl, child, pivot_no);
Expand Down Expand Up @@ -5128,14 +5133,18 @@ trunk_compact_bundle(void *arg, void *scratch_buf)
"addr: %lu\n"
"node range: %s-%s\n"
"req range: %s-%s\n"
"key compare: %d\n",
"key compare: %d\n"
"req->node_id: %lu\n"
"node->node_id: %lu\n",
node.addr,
key_string(trunk_data_config(spl), trunk_min_key(spl, &node)),
key_string(trunk_data_config(spl), trunk_max_key(spl, &node)),
key_string(trunk_data_config(spl), key_buffer_key(&req->start_key)),
key_string(trunk_data_config(spl), key_buffer_key(&req->end_key)),
trunk_key_compare(
spl, trunk_max_key(spl, &node), key_buffer_key(&req->end_key)));
spl, trunk_max_key(spl, &node), key_buffer_key(&req->end_key)),
req->node_id,
node.hdr->node_id);

/*
* 2. The bundle may have been completely flushed, if so abort
Expand Down Expand Up @@ -5360,7 +5369,9 @@ trunk_compact_bundle(void *arg, void *scratch_buf)
/*
* 11b. ...unless node is internal and bundle has been flushed
*/
platform_assert(height != 0);
platform_assert(height != 0,
"impossible: bundles flushed from leaf: %lu\n",
node.addr);
trunk_log_stream_if_enabled(
spl, &stream, "compact_bundle discarded flushed %lu\n", node.addr);
}
Expand Down Expand Up @@ -5463,6 +5474,12 @@ trunk_needs_split(trunk_handle *spl, trunk_node *node)
return trunk_num_children(spl, node) > spl->cfg.fanout;
}

static inline uint64
trunk_next_node_id(trunk_handle *spl)
{
return __sync_fetch_and_add(&spl->next_node_id, 1);
}

void
trunk_split_index(trunk_handle *spl,
trunk_node *parent,
Expand Down Expand Up @@ -5549,7 +5566,12 @@ trunk_split_index(trunk_handle *spl,
trunk_log_node_if_enabled(&stream, spl, &right_node);
trunk_close_log_stream_if_enabled(spl, &stream);

right_node.hdr->node_id = trunk_next_node_id(spl);
left_node->hdr->node_id = trunk_next_node_id(spl);

if (req != NULL) {
req->node_id = left_node->hdr->node_id;

trunk_compact_bundle_req *next_req = TYPED_MALLOC(spl->heap_id, next_req);
memmove(next_req, req, sizeof(trunk_compact_bundle_req));
next_req->addr = right_node.addr;
Expand All @@ -5558,6 +5580,8 @@ trunk_split_index(trunk_handle *spl,
key_buffer_init_from_key(
&next_req->end_key, spl->heap_id, trunk_max_key(spl, &right_node));

next_req->node_id = right_node.hdr->node_id;

platform_assert(!trunk_key_compare(
spl, key_buffer_key(&req->start_key), trunk_min_key(spl, left_node)));
key_buffer_copy_key(&req->end_key, trunk_max_key(spl, left_node));
Expand Down Expand Up @@ -5890,6 +5914,8 @@ trunk_split_leaf(trunk_handle *spl,
new_leaf = *leaf;
}

new_leaf.hdr->node_id = trunk_next_node_id(spl);

/* Adjust max key first so that we always have ordered pivots (enforced by
* trunk_set_pivot in debug mode) */
// adjust max key
Expand Down Expand Up @@ -5956,6 +5982,7 @@ trunk_split_leaf(trunk_handle *spl,
&req->start_key, spl->heap_id, trunk_min_key(spl, leaf));
key_buffer_init_from_key(
&req->end_key, spl->heap_id, trunk_max_key(spl, leaf));
req->node_id = leaf->hdr->node_id;

rc = trunk_compact_bundle_enqueue(spl, "enqueue", req);
platform_assert_status_ok(rc);
Expand Down Expand Up @@ -5990,6 +6017,7 @@ trunk_split_leaf(trunk_handle *spl,
&req->start_key, spl->heap_id, trunk_min_key(spl, leaf));
key_buffer_init_from_key(
&req->end_key, spl->heap_id, trunk_max_key(spl, leaf));
req->node_id = leaf->hdr->node_id;

// issue compact_bundle for leaf and release
rc = trunk_compact_bundle_enqueue(spl, "enqueue", req);
Expand Down Expand Up @@ -6554,6 +6582,7 @@ trunk_compact_leaf(trunk_handle *spl, trunk_node *leaf)
&req->start_key, spl->heap_id, trunk_min_key(spl, leaf));
key_buffer_init_from_key(
&req->end_key, spl->heap_id, trunk_max_key(spl, leaf));
req->node_id = leaf->hdr->node_id;

rc = trunk_compact_bundle_enqueue(spl, "enqueue", req);
platform_assert_status_ok(rc);
Expand Down Expand Up @@ -7725,9 +7754,10 @@ trunk_mount(trunk_config *cfg,
trunk_super_block *super = trunk_get_super_block_if_valid(spl, &super_page);
if (super != NULL) {
if (super->unmounted && super->timestamp > latest_timestamp) {
spl->root_addr = super->root_addr;
meta_tail = super->meta_tail;
latest_timestamp = super->timestamp;
spl->root_addr = super->root_addr;
spl->next_node_id = super->next_node_id;
meta_tail = super->meta_tail;
latest_timestamp = super->timestamp;
}
trunk_release_super_block(spl, super_page);
}
Expand Down
1 change: 1 addition & 0 deletions src/trunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ typedef struct trunk_compacted_memtable {
struct trunk_handle {
volatile uint64 root_addr;
uint64 super_block_idx;
uint64 next_node_id;
trunk_config cfg;
platform_heap_id heap_id;
platform_batch_rwlock trunk_root_lock;
Expand Down

0 comments on commit ab286ad

Please sign in to comment.