Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch tie-breaker to site_id #393

Merged
merged 8 commits into from
Oct 30, 2023
29 changes: 23 additions & 6 deletions core/rs/core/src/changes_vtab_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ use crate::util::slab_rowid;
*/
fn did_cid_win(
db: *mut sqlite3,
ext_data: *mut crsql_ExtData,
insert_tbl: &str,
tbl_info: &TableInfo,
unpacked_pks: &Vec<ColumnValue>,
key: sqlite::int64,
col_name: &str,
insert_val: *mut sqlite::value,
insert_site_id: &[u8],
col_name: &str,
col_version: sqlite::int64,
errmsg: *mut *mut c_char,
) -> Result<bool, ResultCode> {
Expand Down Expand Up @@ -76,9 +78,19 @@ fn did_cid_win(
}

// versions are equal
// need to pull the current value and compare
// we could compare on site_id if we can guarantee site_id is always provided.
// would be slightly more performant..
// need to compare site ids
let ret = unsafe {
let my_site_id = core::slice::from_raw_parts((*ext_data).siteId, 16);
insert_site_id.cmp(my_site_id) as c_int
};

// site id lost.
if ret <= 0 {
return Ok(false);
}

// site id won
// last thing, compare values to ensure we're not changing state on equal values
let col_val_stmt_ref = tbl_info.get_col_value_stmt(db, col_name)?;
let col_val_stmt = col_val_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

Expand All @@ -94,7 +106,8 @@ fn did_cid_win(
let local_value = col_val_stmt.column_value(0)?;
let ret = crsql_compare_sqlite_values(insert_val, local_value);
reset_cached_stmt(col_val_stmt.stmt)?;
return Ok(ret > 0);
// insert site id won and values differ. We should take the update.
return Ok(ret != 0);
}
_ => {
// ResultCode::DONE would happen if clock values exist but actual values are missing.
Expand All @@ -108,6 +121,8 @@ fn did_cid_win(
return Err(ResultCode::ERROR);
}
}

return Ok(ret > 0);
}

fn set_winner_clock(
Expand Down Expand Up @@ -585,12 +600,14 @@ unsafe fn merge_insert(
|| !row_exists_locally
|| did_cid_win(
db,
(*tab).pExtData,
insert_tbl,
&tbl_info,
&unpacked_pks,
key,
insert_col,
insert_val,
insert_site_id,
insert_col,
insert_col_vrsn,
errmsg,
)?;
Expand Down
29 changes: 29 additions & 0 deletions core/src/crsqlite.test.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,20 @@ static sqlite3_int64 getDbVersion(sqlite3 *db) {
return db2v;
}

static void *getSiteId(sqlite3 *db) {
sqlite3_stmt *pStmt = 0;
int rc = sqlite3_prepare_v2(db, "SELECT crsql_site_id()", -1, &pStmt, 0);
if (rc != SQLITE_OK) {
return 0;
}

sqlite3_step(pStmt);
void *site = sqlite3_column_blob(pStmt, 0);
sqlite3_finalize(pStmt);

return site;
}

static void testLamportCondition() {
printf("LamportCondition\n");
// syncing from A -> B, while no changes happen on B, moves up
Expand Down Expand Up @@ -511,6 +525,17 @@ static void noopsDoNotMoveClocks() {
rc += sqlite3_open(":memory:", &db1);
rc += sqlite3_open(":memory:", &db2);

void *db1SiteId = getSiteId(db1);
void *db2SiteId = getSiteId(db2);
int cmp = memcmp(db1SiteId, db2SiteId, 16);
if (cmp > 0) {
sqlite3 *temp = db1;
db1 = db2;
db2 = temp;
}

// swap dbs based on site id compare to make it a noop.

rc += sqlite3_exec(
db1, "CREATE TABLE \"hoot\" (\"a\", \"b\" primary key not null, \"c\")",
0, 0, 0);
Expand Down Expand Up @@ -545,6 +570,10 @@ static void noopsDoNotMoveClocks() {
sqlite3_int64 db1vPost = getDbVersion(db1);
sqlite3_int64 db2vPost = getDbVersion(db2);

// TODO: we still need to compare values so as not to bump the db_version
// forward on a no-difference
// this fails sometimes because site id winning.
printf("db1 pre: %lld db2 post: %lld", db1vPre, db2vPost);
assert(db1vPre == db2vPost);
assert(db1vPre == db1vPost);

Expand Down
34 changes: 31 additions & 3 deletions core/src/rows-impacted.test.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ static void testCreateThatDoesNotChangeAnything() {
printf("\t\e[0;32mSuccess\e[0m\n");
}

static void testValueWin() {
static void testValueWouldWinButSiteIdLoses() {
printf("ValueWin\n");
int rc = SQLITE_OK;
char *err = 0;
Expand All @@ -326,10 +326,37 @@ static void testValueWin() {
rc = sqlite3_exec(db, "BEGIN", 0, 0, 0);
rc += sqlite3_exec(db,
"INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', "
"3, 1, 1, NULL, 1, 1)",
"3, 1, 1, X'00000000000000000000000000000000', 1, 1)",
0, 0, &err);
sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0);
sqlite3_step(pStmt);
// value is greater but site id lower, a loss and now rows changed.
assert(sqlite3_column_int(pStmt, 0) == 0);
sqlite3_finalize(pStmt);
rc += sqlite3_exec(db, "COMMIT", 0, 0, 0);
assert(rc == SQLITE_OK);

crsql_close(db);
printf("\t\e[0;32mSuccess\e[0m\n");
}

static void testSiteIdWin() {
printf("SiteIdWin\n");
int rc = SQLITE_OK;
char *err = 0;
sqlite3 *db = createDb();
sqlite3_stmt *pStmt = 0;

rc = sqlite3_exec(db, "INSERT INTO foo VALUES (1, 2)", 0, 0, 0);

rc = sqlite3_exec(db, "BEGIN", 0, 0, 0);
rc += sqlite3_exec(db,
"INSERT INTO crsql_changes VALUES ('foo', X'010901', 'b', "
"3, 1, 1, X'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 1, 1)",
0, 0, &err);
sqlite3_prepare_v2(db, "SELECT crsql_rows_impacted()", -1, &pStmt, 0);
sqlite3_step(pStmt);
// site id is larger, a win
assert(sqlite3_column_int(pStmt, 0) == 1);
sqlite3_finalize(pStmt);
rc += sqlite3_exec(db, "COMMIT", 0, 0, 0);
Expand Down Expand Up @@ -374,7 +401,8 @@ void rowsImpactedTestSuite() {
testUpdateThatDoesNotChangeAnything();
testDeleteThatDoesNotChangeAnything();
testCreateThatDoesNotChangeAnything();
testValueWin();
testValueWouldWinButSiteIdLoses();
testSiteIdWin();
testClockWin();
testDelete();
}
22 changes: 20 additions & 2 deletions py/correctness/tests/test_cl_merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def make_simple_schema():
return c


def get_site_id(c):
return c.execute("SELECT crsql_site_id()").fetchone()[0]


def make_pko_schema():
c = connect(":memory:")
c.execute("CREATE TABLE foo (a INTEGER PRIMARY KEY NOT NULL) STRICT;")
Expand Down Expand Up @@ -812,11 +816,10 @@ def test_ordered_delta_merge_proxy(a_script, c_script):
close(b)
close(c)


# TODO: repeat above hypothesis tests with:
# 1. more tables
# 2. differing schemas (e.g., pk only tables, junction tables)


def test_larger_col_version_same_cl():
c1 = make_simple_schema()
c2 = make_simple_schema()
Expand All @@ -838,17 +841,32 @@ def test_larger_col_version_same_cl():
close(c2)


# should be a no-op.
# values do not break ties.
# site id loses on the merge
def test_larger_col_value_same_cl_and_col_version():
c1 = make_simple_schema()
c2 = make_simple_schema()

# greater site id wins so we need to swap
if get_site_id(c1) > get_site_id(c2):
temp = c1
c1 = c2
c2 = temp

c1.execute("INSERT INTO foo VALUES (1, 4)")
c1.commit()
c2.execute("INSERT INTO foo VALUES (1, 1)")
c2.commit()

sync_left_to_right(c1, c2, 0)

assert (c1.execute("SELECT * FROM foo").fetchall() !=
c2.execute("SELECT * FROM foo").fetchall())

sync_left_to_right(c2, c1, 0)

# swapping direcitons it'll merge because the other guy had the bigger site id
assert (c1.execute("SELECT * FROM foo").fetchall() ==
c2.execute("SELECT * FROM foo").fetchall())

Expand Down
Loading