Skip to content

Commit

Permalink
feat(crdt): add custom node_id operations
Browse files Browse the repository at this point in the history
Implement insert_or_update and delete methods with custom node_id
for more flexible CRDT operations. This allows for specifying
a unique node identifier when performing record modifications.
  • Loading branch information
sinkingsugar committed Oct 15, 2024
1 parent 8632597 commit a909d53
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions crdt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,64 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
return *this;
}

// Add these new method declarations in the public section of the CRDT class:

/// Inserts a new record or updates an existing record in the CRDT with a custom node_id.
///
/// # Arguments
///
/// * `record_id` - The unique identifier for the record.
/// * `node_id` - The custom node_id to use for this operation.
/// * `fields` - A variadic list of field name-value pairs.
///
/// Complexity: O(n), where n is the number of fields in the input
template <typename... Pairs>
constexpr void insert_or_update_with_node_id(const K &record_id, CrdtNodeId node_id, Pairs &&...pairs) {
insert_or_update_impl<CrdtVector<Change<K, V>>>(record_id, nullptr, node_id, std::forward<Pairs>(pairs)...);
}

/// Inserts a new record or updates an existing record in the CRDT with a custom node_id, and stores changes.
///
/// # Arguments
///
/// * `record_id` - The unique identifier for the record.
/// * `node_id` - The custom node_id to use for this operation.
/// * `changes` - A reference to a container to store the changes.
/// * `fields` - A variadic list of field name-value pairs.
///
/// Complexity: O(n), where n is the number of fields in the input
template <typename ChangeContainer, typename... Pairs>
constexpr void insert_or_update_with_node_id(const K &record_id, CrdtNodeId node_id, ChangeContainer &changes,
Pairs &&...pairs) {
insert_or_update_impl(record_id, &changes, node_id, std::forward<Pairs>(pairs)...);
}

/// Deletes a record by marking it as tombstoned with a custom node_id.
///
/// # Arguments
///
/// * `record_id` - The unique identifier for the record.
/// * `node_id` - The custom node_id to use for this operation.
///
/// Complexity: O(1)
void delete_record_with_node_id(const K &record_id, CrdtNodeId node_id) {
delete_record_impl<CrdtVector<Change<K, V>>>(record_id, nullptr, node_id);
}

/// Deletes a record by marking it as tombstoned with a custom node_id, and stores the change.
///
/// # Arguments
///
/// * `record_id` - The unique identifier for the record.
/// * `node_id` - The custom node_id to use for this operation.
/// * `changes` - A reference to a container to store the change.
///
/// Complexity: O(1)
template <typename ChangeContainer>
void delete_record_with_node_id(const K &record_id, CrdtNodeId node_id, ChangeContainer &changes) {
delete_record_impl(record_id, &changes, node_id);
}

private:
CrdtNodeId node_id_;
LogicalClock clock_;
Expand Down Expand Up @@ -1033,6 +1091,72 @@ class CRDT : public std::enable_shared_from_this<CRDT<K, V, MergeRuleType, Chang
add_to_container(*changes, Change<K, V>(record_id, std::nullopt, std::nullopt, 1, db_version, node_id_, db_version));
}
}

/// Implementation of insert_or_update with custom node_id
template <typename ChangeContainer, typename... Pairs>
constexpr void insert_or_update_impl(const K &record_id, ChangeContainer *changes, CrdtNodeId node_id, Pairs &&...pairs) {
uint64_t db_version = clock_.tick();

// Check if the record is tombstoned
if (is_record_tombstoned(record_id)) {
return;
}

Record<V> &record = get_or_create_record_unchecked(record_id);

// Helper function to process each pair
auto process_pair = [&](const auto &pair) {
const auto &col_name = pair.first;
const auto &value = pair.second;

uint64_t col_version;
auto col_it = record.column_versions.find(col_name);
if (col_it != record.column_versions.end()) {
col_version = ++col_it->second.col_version;
col_it->second.db_version = db_version;
col_it->second.node_id = node_id;
col_it->second.local_db_version = db_version;
} else {
col_version = 1;
record.column_versions.emplace(col_name, ColumnVersion(col_version, db_version, node_id, db_version));
}

if (changes) {
record.fields[col_name] = value;
add_to_container(*changes, Change<K, V>(record_id, std::move(col_name), std::move(value), col_version, db_version,
node_id, db_version));
} else {
record.fields[std::move(col_name)] = std::move(value);
}
};

// Process all pairs
(process_pair(std::forward<Pairs>(pairs)), ...);
}

/// Implementation of delete_record with custom node_id
template <typename ChangeContainer> void delete_record_impl(const K &record_id, ChangeContainer *changes, CrdtNodeId node_id) {
if (is_record_tombstoned(record_id)) {
return;
}

uint64_t db_version = clock_.tick();

// Mark as tombstone and remove data
tombstones_.emplace(record_id);
data_.erase(record_id);

// Insert deletion clock info
CrdtMap<CrdtString, ColumnVersion> deletion_clock;
deletion_clock.emplace("__deleted__", ColumnVersion(1, db_version, node_id, db_version));

// Store deletion info in the data map
data_.emplace(record_id, Record<V>(CrdtMap<CrdtString, V>(), std::move(deletion_clock)));

if (changes) {
add_to_container(*changes, Change<K, V>(record_id, std::nullopt, std::nullopt, 1, db_version, node_id, db_version));
}
}
};

/// Synchronizes two CRDT nodes.
Expand Down

0 comments on commit a909d53

Please sign in to comment.