diff --git a/src/splinterdb.c b/src/splinterdb.c index a885789b..2386b33c 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -16,6 +16,7 @@ #include "splinterdb/splinterdb.h" #include "platform.h" #include "clockcache.h" +#include "platform_linux/platform.h" #include "rc_allocator.h" #include "trunk.h" #include "btree_private.h" @@ -215,7 +216,7 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN cfg.use_log, cfg.use_stats, FALSE, - NULL); + Platform_default_log_handle); if (!SUCCESS(rc)) { return rc; } diff --git a/src/trunk.c b/src/trunk.c index 4bd0c415..ce004101 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -449,6 +449,7 @@ trunk_log_node_if_enabled(platform_stream_handle *stream, typedef struct ONDISK trunk_super_block { uint64 root_addr; // Address of the root of the trunk for the instance // referenced by this superblock. + uint64 next_node_id; uint64 meta_tail; uint64 log_addr; uint64 log_meta_addr; @@ -527,6 +528,7 @@ typedef struct ONDISK trunk_bundle { *----------------------------------------------------------------------------- */ typedef struct ONDISK trunk_hdr { + uint64 node_id; uint16 num_pivot_keys; // number of used pivot keys (== num_children + 1) uint16 height; // height of the node uint64 pivot_generation; // counter incremented when new pivots are added @@ -712,6 +714,7 @@ struct trunk_compact_bundle_req { uint64 addr; key_buffer start_key; key_buffer end_key; + uint64 node_id; uint16 height; uint16 bundle_no; trunk_compaction_type type; @@ -925,9 +928,10 @@ trunk_set_super_block(trunk_handle *spl, wait = 1; cache_lock(spl->cc, super_page); - super = (trunk_super_block *)super_page->data; - super->root_addr = spl->root_addr; - super->meta_tail = mini_meta_tail(&spl->mini); + super = (trunk_super_block *)super_page->data; + super->root_addr = spl->root_addr; + super->next_node_id = spl->next_node_id; + super->meta_tail = mini_meta_tail(&spl->mini); if (spl->cfg.use_log) { if (spl->log) { super->log_addr = log_addr(spl->log); @@ -3939,8 +3943,7 @@ trunk_compact_bundle_node_has_split(trunk_handle *spl, trunk_compact_bundle_req *req, trunk_node *node) { - return trunk_key_compare( - spl, key_buffer_key(&req->end_key), trunk_max_key(spl, node)); + return req->node_id != node->hdr->node_id; } static inline platform_status @@ -4463,6 +4466,8 @@ trunk_flush_into_bundle(trunk_handle *spl, // IN key_buffer_init_from_key( &req->end_key, spl->heap_id, trunk_max_key(spl, child)); + req->node_id = child->hdr->node_id; + uint16 num_children = trunk_num_children(spl, child); for (uint16 pivot_no = 0; pivot_no < num_children; pivot_no++) { trunk_pivot_data *pdata = trunk_get_pivot_data(spl, child, pivot_no); @@ -5128,14 +5133,18 @@ trunk_compact_bundle(void *arg, void *scratch_buf) "addr: %lu\n" "node range: %s-%s\n" "req range: %s-%s\n" - "key compare: %d\n", + "key compare: %d\n" + "req->node_id: %lu\n" + "node->node_id: %lu\n", node.addr, key_string(trunk_data_config(spl), trunk_min_key(spl, &node)), key_string(trunk_data_config(spl), trunk_max_key(spl, &node)), key_string(trunk_data_config(spl), key_buffer_key(&req->start_key)), key_string(trunk_data_config(spl), key_buffer_key(&req->end_key)), trunk_key_compare( - spl, trunk_max_key(spl, &node), key_buffer_key(&req->end_key))); + spl, trunk_max_key(spl, &node), key_buffer_key(&req->end_key)), + req->node_id, + node.hdr->node_id); /* * 2. The bundle may have been completely flushed, if so abort @@ -5360,7 +5369,9 @@ trunk_compact_bundle(void *arg, void *scratch_buf) /* * 11b. ...unless node is internal and bundle has been flushed */ - platform_assert(height != 0); + platform_assert(height != 0, + "impossible: bundles flushed from leaf: %lu\n", + node.addr); trunk_log_stream_if_enabled( spl, &stream, "compact_bundle discarded flushed %lu\n", node.addr); } @@ -5463,6 +5474,12 @@ trunk_needs_split(trunk_handle *spl, trunk_node *node) return trunk_num_children(spl, node) > spl->cfg.fanout; } +static inline uint64 +trunk_next_node_id(trunk_handle *spl) +{ + return __sync_fetch_and_add(&spl->next_node_id, 1); +} + void trunk_split_index(trunk_handle *spl, trunk_node *parent, @@ -5549,7 +5566,12 @@ trunk_split_index(trunk_handle *spl, trunk_log_node_if_enabled(&stream, spl, &right_node); trunk_close_log_stream_if_enabled(spl, &stream); + right_node.hdr->node_id = trunk_next_node_id(spl); + left_node->hdr->node_id = trunk_next_node_id(spl); + if (req != NULL) { + req->node_id = left_node->hdr->node_id; + trunk_compact_bundle_req *next_req = TYPED_MALLOC(spl->heap_id, next_req); memmove(next_req, req, sizeof(trunk_compact_bundle_req)); next_req->addr = right_node.addr; @@ -5558,6 +5580,8 @@ trunk_split_index(trunk_handle *spl, key_buffer_init_from_key( &next_req->end_key, spl->heap_id, trunk_max_key(spl, &right_node)); + next_req->node_id = right_node.hdr->node_id; + platform_assert(!trunk_key_compare( spl, key_buffer_key(&req->start_key), trunk_min_key(spl, left_node))); key_buffer_copy_key(&req->end_key, trunk_max_key(spl, left_node)); @@ -5890,6 +5914,8 @@ trunk_split_leaf(trunk_handle *spl, new_leaf = *leaf; } + new_leaf.hdr->node_id = trunk_next_node_id(spl); + /* Adjust max key first so that we always have ordered pivots (enforced by * trunk_set_pivot in debug mode) */ // adjust max key @@ -5956,6 +5982,7 @@ trunk_split_leaf(trunk_handle *spl, &req->start_key, spl->heap_id, trunk_min_key(spl, leaf)); key_buffer_init_from_key( &req->end_key, spl->heap_id, trunk_max_key(spl, leaf)); + req->node_id = leaf->hdr->node_id; rc = trunk_compact_bundle_enqueue(spl, "enqueue", req); platform_assert_status_ok(rc); @@ -5990,6 +6017,7 @@ trunk_split_leaf(trunk_handle *spl, &req->start_key, spl->heap_id, trunk_min_key(spl, leaf)); key_buffer_init_from_key( &req->end_key, spl->heap_id, trunk_max_key(spl, leaf)); + req->node_id = leaf->hdr->node_id; // issue compact_bundle for leaf and release rc = trunk_compact_bundle_enqueue(spl, "enqueue", req); @@ -6554,6 +6582,7 @@ trunk_compact_leaf(trunk_handle *spl, trunk_node *leaf) &req->start_key, spl->heap_id, trunk_min_key(spl, leaf)); key_buffer_init_from_key( &req->end_key, spl->heap_id, trunk_max_key(spl, leaf)); + req->node_id = leaf->hdr->node_id; rc = trunk_compact_bundle_enqueue(spl, "enqueue", req); platform_assert_status_ok(rc); @@ -7725,9 +7754,10 @@ trunk_mount(trunk_config *cfg, trunk_super_block *super = trunk_get_super_block_if_valid(spl, &super_page); if (super != NULL) { if (super->unmounted && super->timestamp > latest_timestamp) { - spl->root_addr = super->root_addr; - meta_tail = super->meta_tail; - latest_timestamp = super->timestamp; + spl->root_addr = super->root_addr; + spl->next_node_id = super->next_node_id; + meta_tail = super->meta_tail; + latest_timestamp = super->timestamp; } trunk_release_super_block(spl, super_page); } diff --git a/src/trunk.h b/src/trunk.h index 15b6ad3a..afdf8420 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -180,6 +180,7 @@ typedef struct trunk_compacted_memtable { struct trunk_handle { volatile uint64 root_addr; uint64 super_block_idx; + uint64 next_node_id; trunk_config cfg; platform_heap_id heap_id; platform_batch_rwlock trunk_root_lock;