From 402bbb8f0557b80354b1441a495fc35b8fd75b91 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Mon, 7 Oct 2024 08:13:30 +0800 Subject: [PATCH] feat(crdt): add custom merge rule support Introduce MergeRule concept and DefaultMergeRule. Refactor CRDT class to use custom merge rules, improving flexibility and extensibility of the merge process. --- crdt.hpp | 103 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 59 insertions(+), 44 deletions(-) diff --git a/crdt.hpp b/crdt.hpp index 64fa662..712b3be 100644 --- a/crdt.hpp +++ b/crdt.hpp @@ -28,6 +28,54 @@ using CrdtNodeId = uint64_t; #include #include #include +#include +#include + +/// Represents a single change in the CRDT. +template struct Change { + K record_id; + std::optional col_name; // std::nullopt represents tombstone of the record + std::optional value; // note std::nullopt represents deletion of the column, not the record + uint64_t col_version; + uint64_t db_version; + CrdtNodeId node_id; + + // this field is useful only locally when doing things like get_changes_since + // we record the local db_version when the change was created + uint64_t local_db_version; + + Change() = default; + + Change(K rid, std::optional cname, std::optional val, uint64_t cver, uint64_t dver, CrdtNodeId nid, + uint64_t ldb_ver = 0) + : record_id(std::move(rid)), col_name(std::move(cname)), value(std::move(val)), col_version(cver), db_version(dver), + node_id(nid), local_db_version(ldb_ver) {} +}; + +// Define a concept for a custom merge rule +template +concept MergeRule = requires(Rule r, const Change &local, const Change &remote) { + { r(local, remote) } -> std::convertible_to; +}; + +// Default merge rule (current behavior) +template struct DefaultMergeRule { + bool operator()(const Change &local, const Change &remote) const { + if (remote.col_version > local.col_version) { + return true; + } else if (remote.col_version < local.col_version) { + return false; + } else { + if (remote.db_version > local.db_version) { + return true; + } else if (remote.db_version < local.db_version) { + return false; + } else { + return (remote.node_id > local.node_id); + } + } + } +}; /// Represents a logical clock for maintaining causality. class LogicalClock { @@ -94,27 +142,6 @@ template bool operator==(const Record &lhs, const Record &rhs return true; } -/// Represents a single change in the CRDT. -template struct Change { - K record_id; - std::optional col_name; // std::nullopt represents tombstone of the record - std::optional value; // note std::nullopt represents deletion of the column, not the record - uint64_t col_version; - uint64_t db_version; - CrdtNodeId node_id; - - // this field is useful only locally when doing things like get_changes_since - // we record the local db_version when the change was created - uint64_t local_db_version; - - Change() = default; - - Change(K rid, std::optional cname, std::optional val, uint64_t cver, uint64_t dver, CrdtNodeId nid, - uint64_t ldb_ver = 0) - : record_id(std::move(rid)), col_name(std::move(cname)), value(std::move(val)), col_version(cver), db_version(dver), - node_id(nid), local_db_version(ldb_ver) {} -}; - /// Comparator for sorting Changes template struct ChangeComparator { bool operator()(const Change &a, const Change &b) const { @@ -135,12 +162,14 @@ template struct ChangeComparator { }; /// Represents the CRDT structure, generic over key (`K`) and value (`V`) types. -template class CRDT : public std::enable_shared_from_this> { +template > +class CRDT : public std::enable_shared_from_this> { public: // Create a new empty CRDT // Complexity: O(1) - CRDT(CrdtNodeId node_id, std::shared_ptr> parent = nullptr) - : node_id_(node_id), clock_(), data_(), tombstones_(), parent_(parent) { + CRDT(CrdtNodeId node_id, std::shared_ptr> parent = nullptr, + MergeRuleType merge_rule = MergeRuleType()) + : node_id_(node_id), clock_(), data_(), tombstones_(), parent_(parent), merge_rule_(std::move(merge_rule)) { if (parent_) { // Set clock to parent's clock clock_ = parent_->clock_; @@ -477,24 +506,9 @@ template class CRDT : public std::enable_shared_from_th should_accept = true; } else { const ColumnVersion &local = *local_col_info; - - if (remote_col_version > local.col_version) { - // Remote change is newer; accept it - should_accept = true; - } else if (remote_col_version < local.col_version) { - // Remote change is older; reject it - should_accept = false; - } else { - // col_version is equal; use db_version as the next tiebreaker - if (remote_db_version > local.db_version) { - should_accept = true; - } else if (remote_db_version < local.db_version) { - should_accept = false; - } else { - // db_version is equal; use node_id for final tiebreaking - should_accept = (remote_node_id > local.node_id); - } - } + Change local_change(record_id, col_name ? *col_name : "__deleted__", std::nullopt, local.col_version, + local.db_version, local.node_id); + should_accept = merge_rule_(local_change, change); } if (should_accept) { @@ -662,7 +676,7 @@ template class CRDT : public std::enable_shared_from_th /// A pointer to the Record if found, or nullptr if not found. /// /// Complexity: O(1) average case for hash table lookup - const Record* get_record(const K& record_id, bool ignore_parent = false) const { + const Record *get_record(const K &record_id, bool ignore_parent = false) const { auto it = data_.find(record_id); if (it != data_.end()) { return &(it->second); @@ -699,8 +713,9 @@ template class CRDT : public std::enable_shared_from_th // our clock won't be shared with the parent // we optionally allow to merge from the parent or push to the parent - std::shared_ptr> parent_; + std::shared_ptr> parent_; uint64_t base_version_; // Tracks the parent’s db_version at the time of child creation + MergeRuleType merge_rule_; /// Applies a list of changes to reconstruct the CRDT state. ///