Skip to content

Commit

Permalink
feat(crdt): add custom merge rule support
Browse files Browse the repository at this point in the history
Introduce MergeRule concept and DefaultMergeRule. Refactor CRDT
class to use custom merge rules, improving flexibility and
extensibility of the merge process.
  • Loading branch information
sinkingsugar committed Oct 7, 2024
1 parent eb33c15 commit 402bbb8
Showing 1 changed file with 59 additions and 44 deletions.
103 changes: 59 additions & 44 deletions crdt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,54 @@ using CrdtNodeId = uint64_t;
#include <iostream>
#include <optional>
#include <memory>
#include <type_traits>
#include <concepts>

/// Represents a single change in the CRDT.
template <typename K, typename V> struct Change {
K record_id;
std::optional<CrdtString> col_name; // std::nullopt represents tombstone of the record
std::optional<V> value; // note std::nullopt represents deletion of the column, not the record
uint64_t col_version;
uint64_t db_version;
CrdtNodeId node_id;

// this field is useful only locally when doing things like get_changes_since
// we record the local db_version when the change was created
uint64_t local_db_version;

Change() = default;

Change(K rid, std::optional<CrdtString> cname, std::optional<V> val, uint64_t cver, uint64_t dver, CrdtNodeId nid,
uint64_t ldb_ver = 0)
: record_id(std::move(rid)), col_name(std::move(cname)), value(std::move(val)), col_version(cver), db_version(dver),
node_id(nid), local_db_version(ldb_ver) {}
};

// Define a concept for a custom merge rule
template <typename Rule, typename K, typename V>
concept MergeRule = requires(Rule r, const Change<K, V> &local, const Change<K, V> &remote) {
{ r(local, remote) } -> std::convertible_to<bool>;
};

// Default merge rule (current behavior)
template <typename K, typename V> struct DefaultMergeRule {
bool operator()(const Change<K, V> &local, const Change<K, V> &remote) const {
if (remote.col_version > local.col_version) {
return true;
} else if (remote.col_version < local.col_version) {
return false;
} else {
if (remote.db_version > local.db_version) {
return true;
} else if (remote.db_version < local.db_version) {
return false;
} else {
return (remote.node_id > local.node_id);
}
}
}
};

/// Represents a logical clock for maintaining causality.
class LogicalClock {
Expand Down Expand Up @@ -94,27 +142,6 @@ template <typename V> bool operator==(const Record<V> &lhs, const Record<V> &rhs
return true;
}

/// Represents a single change in the CRDT.
template <typename K, typename V> struct Change {
K record_id;
std::optional<CrdtString> col_name; // std::nullopt represents tombstone of the record
std::optional<V> value; // note std::nullopt represents deletion of the column, not the record
uint64_t col_version;
uint64_t db_version;
CrdtNodeId node_id;

// this field is useful only locally when doing things like get_changes_since
// we record the local db_version when the change was created
uint64_t local_db_version;

Change() = default;

Change(K rid, std::optional<CrdtString> cname, std::optional<V> val, uint64_t cver, uint64_t dver, CrdtNodeId nid,
uint64_t ldb_ver = 0)
: record_id(std::move(rid)), col_name(std::move(cname)), value(std::move(val)), col_version(cver), db_version(dver),
node_id(nid), local_db_version(ldb_ver) {}
};

/// Comparator for sorting Changes
template <typename K, typename V> struct ChangeComparator {
bool operator()(const Change<K, V> &a, const Change<K, V> &b) const {
Expand All @@ -135,12 +162,14 @@ template <typename K, typename V> struct ChangeComparator {
};

/// Represents the CRDT structure, generic over key (`K`) and value (`V`) types.
template <typename K, typename V> class CRDT : public std::enable_shared_from_this<CRDT<K, V>> {
template <typename K, typename V, typename MergeRuleType = DefaultMergeRule<K, V>>
class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType>> {
public:
// Create a new empty CRDT
// Complexity: O(1)
CRDT(CrdtNodeId node_id, std::shared_ptr<CRDT<K, V>> parent = nullptr)
: node_id_(node_id), clock_(), data_(), tombstones_(), parent_(parent) {
CRDT(CrdtNodeId node_id, std::shared_ptr<CRDT<K, V, MergeRuleType>> parent = nullptr,
MergeRuleType merge_rule = MergeRuleType())
: node_id_(node_id), clock_(), data_(), tombstones_(), parent_(parent), merge_rule_(std::move(merge_rule)) {
if (parent_) {
// Set clock to parent's clock
clock_ = parent_->clock_;
Expand Down Expand Up @@ -477,24 +506,9 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
should_accept = true;
} else {
const ColumnVersion &local = *local_col_info;

if (remote_col_version > local.col_version) {
// Remote change is newer; accept it
should_accept = true;
} else if (remote_col_version < local.col_version) {
// Remote change is older; reject it
should_accept = false;
} else {
// col_version is equal; use db_version as the next tiebreaker
if (remote_db_version > local.db_version) {
should_accept = true;
} else if (remote_db_version < local.db_version) {
should_accept = false;
} else {
// db_version is equal; use node_id for final tiebreaking
should_accept = (remote_node_id > local.node_id);
}
}
Change<K, V> local_change(record_id, col_name ? *col_name : "__deleted__", std::nullopt, local.col_version,
local.db_version, local.node_id);
should_accept = merge_rule_(local_change, change);
}

if (should_accept) {
Expand Down Expand Up @@ -662,7 +676,7 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
/// A pointer to the Record<V> if found, or nullptr if not found.
///
/// Complexity: O(1) average case for hash table lookup
const Record<V>* get_record(const K& record_id, bool ignore_parent = false) const {
const Record<V> *get_record(const K &record_id, bool ignore_parent = false) const {
auto it = data_.find(record_id);
if (it != data_.end()) {
return &(it->second);
Expand Down Expand Up @@ -699,8 +713,9 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th

// our clock won't be shared with the parent
// we optionally allow to merge from the parent or push to the parent
std::shared_ptr<CRDT<K, V>> parent_;
std::shared_ptr<CRDT<K, V, MergeRuleType>> parent_;
uint64_t base_version_; // Tracks the parent’s db_version at the time of child creation
MergeRuleType merge_rule_;

/// Applies a list of changes to reconstruct the CRDT state.
///
Expand Down

0 comments on commit 402bbb8

Please sign in to comment.