Skip to content

Commit

Permalink
feat(crdt): add local_db_version for change tracking
Browse files Browse the repository at this point in the history
Introduce local_db_version field to ColumnVersion and Change structs
to improve local change tracking. Update merge_changes and related
functions to utilize this new field for more accurate versioning.
  • Loading branch information
sinkingsugar committed Oct 2, 2024
1 parent c93d169 commit b32b460
Showing 1 changed file with 49 additions and 38 deletions.
87 changes: 49 additions & 38 deletions crdt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ struct ColumnVersion {
uint64_t db_version;
CrdtNodeId node_id;

ColumnVersion(uint64_t c, uint64_t d, CrdtNodeId n) : col_version(c), db_version(d), node_id(n) {}
// this field is useful only locally when doing things like get_changes_since
// we record the local db_version when the change was created
uint64_t local_db_version;

ColumnVersion(uint64_t c, uint64_t d, CrdtNodeId n, uint64_t ldb_ver = 0)
: col_version(c), db_version(d), node_id(n), local_db_version(ldb_ver) {}
};

/// Represents a record in the CRDT.
Expand Down Expand Up @@ -98,11 +103,16 @@ template <typename K, typename V> struct Change {
uint64_t db_version;
CrdtNodeId node_id;

// this field is useful only locally when doing things like get_changes_since
// we record the local db_version when the change was created
uint64_t local_db_version;

Change() = default;

Change(K rid, std::optional<CrdtString> cname, std::optional<V> val, uint64_t cver, uint64_t dver, CrdtNodeId nid)
Change(K rid, std::optional<CrdtString> cname, std::optional<V> val, uint64_t cver, uint64_t dver, CrdtNodeId nid,
uint64_t ldb_ver = 0)
: record_id(std::move(rid)), col_name(std::move(cname)), value(std::move(val)), col_version(cver), db_version(dver),
node_id(nid) {}
node_id(nid), local_db_version(ldb_ver) {}
};

/// Comparator for sorting Changes
Expand Down Expand Up @@ -303,12 +313,13 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
col_it->second.node_id = node_id_;
} else {
col_version = 1;
record.column_versions.emplace(col_name, ColumnVersion(col_version, db_version, node_id_));
record.column_versions.emplace(col_name, ColumnVersion(col_version, db_version, node_id_, db_version));
}

if constexpr (ReturnChanges) {
record.fields[col_name] = value;
changes.emplace_back(Change<K, V>(record_id, std::move(col_name), std::move(value), col_version, db_version, node_id_));
changes.emplace_back(
Change<K, V>(record_id, std::move(col_name), std::move(value), col_version, db_version, node_id_, db_version));
} else {
record.fields[std::move(col_name)] = std::move(value);
}
Expand Down Expand Up @@ -349,13 +360,13 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th

// Insert deletion clock info
CrdtMap<CrdtString, ColumnVersion> deletion_clock;
deletion_clock.emplace("__deleted__", ColumnVersion(1, db_version, node_id_));
deletion_clock.emplace("__deleted__", ColumnVersion(1, db_version, node_id_, db_version));

// Store deletion info in the data map
data_.emplace(record_id, Record<V>(CrdtMap<CrdtString, V>(), std::move(deletion_clock)));

if constexpr (ReturnChanges) {
changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt, 1, db_version, node_id_));
changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt, 1, db_version, node_id_, db_version));
return changes;
}
}
Expand All @@ -382,7 +393,7 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th

for (const auto &[record_id, record] : data_) {
for (const auto &[col_name, clock_info] : record.column_versions) {
if (clock_info.db_version > last_db_version) {
if (clock_info.local_db_version > last_db_version) {
std::optional<V> value = std::nullopt;
std::optional<CrdtString> name = std::nullopt;
if (col_name != "__deleted__") {
Expand All @@ -393,7 +404,7 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
name = col_name;
}
changes.emplace_back(Change<K, V>(record_id, std::move(name), std::move(value), clock_info.col_version,
clock_info.db_version, clock_info.node_id));
clock_info.db_version, clock_info.node_id, clock_info.local_db_version));
}
}
}
Expand Down Expand Up @@ -424,7 +435,7 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
template <bool ReturnAcceptedChanges = false>
std::conditional_t<ReturnAcceptedChanges, CrdtVector<Change<K, V>>, void> merge_changes(CrdtVector<Change<K, V>> &&changes,
bool ignore_parent = false) {
CrdtVector<Change<K, V>> accepted_changes; // Will be optimized away if ReturnAcceptedChanges is false
CrdtVector<Change<K, V>> accepted_changes;

if (changes.empty()) {
if constexpr (ReturnAcceptedChanges) {
Expand All @@ -434,8 +445,6 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
}
}

auto new_db_version = clock_.tick();

for (auto &&change : changes) {
const K &record_id = change.record_id;
std::optional<CrdtString> col_name = std::move(change.col_name);
Expand All @@ -444,16 +453,17 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
CrdtNodeId remote_node_id = change.node_id;
std::optional<V> remote_value = std::move(change.value);

if (remote_db_version >= new_db_version) {
// If the remote db_version is greater than the new db_version, we need to update the local db_version
new_db_version = clock_.update(remote_db_version);
}
// Always update the logical clock to maintain causal consistency,
// prevent clock drift, and ensure accurate conflict resolution.
// This reflects the node's knowledge of global progress, even for
// non-accepted changes.
auto new_local_db_version = clock_.update(remote_db_version);

// Retrieve local column version information
auto record_ptr = get_record_ptr(record_id, ignore_parent);
ColumnVersion *local_col_info = nullptr;
if (record_ptr != nullptr) {
auto col_it = record_ptr->column_versions.find(col_name ? col_name.value() : "__deleted__");
auto col_it = record_ptr->column_versions.find(col_name ? *col_name : "__deleted__");
if (col_it != record_ptr->column_versions.end()) {
local_col_info = &col_it->second;
}
Expand Down Expand Up @@ -488,50 +498,49 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
}

if (should_accept) {
if (!col_name.has_value()) {
if (!col_name) {
// Handle deletion
tombstones_.emplace(record_id);
data_.erase(record_id);

// Update deletion clock info
CrdtMap<CrdtString, ColumnVersion> deletion_clock;
deletion_clock.emplace("__deleted__", ColumnVersion(remote_col_version, new_db_version, remote_node_id));
deletion_clock.emplace("__deleted__",
ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version));

// Store deletion info in the data map
data_.emplace(record_id, Record<V>(CrdtMap<CrdtString, V>(), std::move(deletion_clock)));

if constexpr (ReturnAcceptedChanges) {
accepted_changes.emplace_back(
Change<K, V>(record_id, std::nullopt, std::nullopt, remote_col_version, new_db_version, remote_node_id));
accepted_changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt, remote_col_version,
remote_db_version, remote_node_id, new_local_db_version));
}
} else if (!is_record_tombstoned(record_id, ignore_parent)) {
// Handle insertion or update
Record<V> &record = get_or_create_record_unchecked(record_id, ignore_parent);

// Update field value
if (remote_value.has_value() && col_name.has_value()) {
// move if ReturnAcceptedChanges is false, otherwise copy
if constexpr (!ReturnAcceptedChanges) {
record.fields[col_name.value()] = std::move(remote_value.value());
if (remote_value.has_value()) {
if constexpr (ReturnAcceptedChanges) {
record.fields[*col_name] = *remote_value;
} else {
record.fields[col_name.value()] = remote_value.value();
record.fields[*col_name] = std::move(*remote_value);
}
} else {
// If remote_value is std::nullopt, remove the field
record.fields.erase(col_name ? col_name.value() : "__deleted__");
record.fields.erase(*col_name);
}

// Update the column version info
if constexpr (ReturnAcceptedChanges) {
// Update the column version info
record.column_versions.insert_or_assign(col_name ? col_name.value() : "__deleted__",
ColumnVersion(remote_col_version, new_db_version, remote_node_id));

record.column_versions.insert_or_assign(
*col_name, ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version));
accepted_changes.emplace_back(Change<K, V>(record_id, std::move(col_name), std::move(remote_value),
remote_col_version, new_db_version, remote_node_id));
remote_col_version, remote_db_version, remote_node_id,
new_local_db_version));
} else {
// Update the column version info
record.column_versions.insert_or_assign(col_name ? std::move(col_name.value()) : "__deleted__",
ColumnVersion(remote_col_version, new_db_version, remote_node_id));
record.column_versions.insert_or_assign(
std::move(*col_name), ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version));
}
}
}
Expand Down Expand Up @@ -678,6 +687,7 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
uint64_t remote_col_version = change.col_version;
uint64_t remote_db_version = change.db_version;
CrdtNodeId remote_node_id = change.node_id;
uint64_t remote_local_db_version = change.local_db_version;
std::optional<V> remote_value = std::move(change.value);

if (!col_name.has_value()) {
Expand All @@ -687,7 +697,8 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th

// Insert deletion clock info
CrdtMap<CrdtString, ColumnVersion> deletion_clock;
deletion_clock.emplace("__deleted__", ColumnVersion(remote_col_version, remote_db_version, remote_node_id));
deletion_clock.emplace("__deleted__",
ColumnVersion(remote_col_version, remote_db_version, remote_node_id, remote_local_db_version));

// Store deletion info in the data map
data_.emplace(record_id, Record<V>(CrdtMap<CrdtString, V>(), std::move(deletion_clock)));
Expand All @@ -702,8 +713,8 @@ template <typename K, typename V> class CRDT : public std::enable_shared_from_th
}

// Update the column version info
record.column_versions.insert_or_assign(std::move(*col_name),
ColumnVersion(remote_col_version, remote_db_version, remote_node_id));
record.column_versions.insert_or_assign(std::move(*col_name), ColumnVersion(remote_col_version, remote_db_version,
remote_node_id, remote_local_db_version));
}
}
}
Expand Down

0 comments on commit b32b460

Please sign in to comment.