Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cr-sqlite 0.16 #128

Merged
merged 11 commits into from
Dec 30, 2023
26 changes: 7 additions & 19 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,10 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
let schema = {
let mut conn = pool.write_priority().await?;
migrate(&mut conn)?;

let mut schema = init_schema(&conn)?;
schema.constrain()?;

info!("Ensuring clock table indexes for fast compaction");
let start = Instant::now();
for table in schema.tables.keys() {
conn.execute_batch(&format!("CREATE INDEX IF NOT EXISTS corro_{table}__crsql_clock_site_id_dbv ON {table}__crsql_clock (site_id, db_version);"))?;
}
info!("Ensured indexes in {:?}", start.elapsed());

schema
};

Expand Down Expand Up @@ -1259,7 +1253,6 @@ fn find_cleared_db_versions(
.query_row([actor_id], |row| row.get(0))
.optional()?
{
Some(0) => None, // this is the current crsql_site_id which is going to be NULL in clock tables
Some(ordinal) => Some(ordinal),
None => {
warn!(actor_id = %actor_id, "could not find crsql ordinal for actor");
Expand Down Expand Up @@ -1287,7 +1280,7 @@ fn find_cleared_db_versions(
.iter()
.map(|table| {
params.push(&clock_site_id);
format!("SELECT DISTINCT db_version FROM {table} WHERE site_id IS ?")
format!("SELECT DISTINCT db_version FROM {table} WHERE site_id = ?")
})
.collect::<Vec<_>>()
.join(" UNION ")
Expand Down Expand Up @@ -3208,14 +3201,9 @@ pub mod tests {
let conn = ta.agent.pool().read().await?;
let counts: HashMap<ActorId, i64> = conn
.prepare_cached(
"SELECT COALESCE(site_id, crsql_site_id()), count(*) FROM crsql_changes GROUP BY site_id;",
"SELECT site_id, count(*) FROM crsql_changes GROUP BY site_id;",
)?
.query_map([], |row| {
Ok((
row.get(0)?,
row.get(1)?,
))
})?
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<rusqlite::Result<_>>()?;

debug!("versions count: {counts:?}");
Expand Down Expand Up @@ -3336,8 +3324,8 @@ pub mod tests {
}

{
let mut prepped = conn.prepare("EXPLAIN QUERY PLAN SELECT DISTINCT db_version FROM foo2__crsql_clock WHERE site_id IS ? UNION SELECT DISTINCT db_version FROM foo__crsql_clock WHERE site_id IS ?;")?;
let mut rows = prepped.query([rusqlite::types::Null, rusqlite::types::Null])?;
let mut prepped = conn.prepare("EXPLAIN QUERY PLAN SELECT DISTINCT db_version FROM foo2__crsql_clock WHERE site_id = ? UNION SELECT DISTINCT db_version FROM foo__crsql_clock WHERE site_id = ?;")?;
let mut rows = prepped.query([0, 0])?;

println!("matching clock rows:");
while let Ok(Some(row)) = rows.next() {
Expand Down Expand Up @@ -3656,7 +3644,7 @@ pub mod tests {

#[test]
fn test_store_empty_changeset() -> eyre::Result<()> {
let mut conn = Connection::open_in_memory()?;
let mut conn = CrConn::init(Connection::open_in_memory()?)?;

corro_types::sqlite::setup_conn(&mut conn)?;
migrate(&mut conn)?;
Expand Down
38 changes: 10 additions & 28 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ const ADAPT_CHUNK_SIZE_THRESHOLD: Duration = Duration::from_millis(500);
fn handle_known_version(
conn: &mut Connection,
actor_id: ActorId,
is_local: bool,
version: Version,
init_known: KnownDbVersion,
booked: &Booked,
Expand Down Expand Up @@ -393,23 +392,22 @@ fn handle_known_version(
// this is a read transaction!
let tx = conn.transaction()?;

let mut prepped = tx.prepare_cached(r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl
let mut prepped = tx.prepare_cached(
r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes
WHERE site_id IS ?
WHERE site_id = ?
AND db_version = ?
AND seq >= ? AND seq <= ?
ORDER BY seq ASC
"#)?;
let site_id: Option<[u8; 16]> = (!is_local)
.then_some(actor_id)
.map(|actor_id| actor_id.to_bytes());
"#,
)?;

let start_seq = range_needed.start();
let end_seq = range_needed.end();

let rows = prepped.query_map(
params![site_id, db_version, start_seq, end_seq],
params![actor_id, db_version, start_seq, end_seq],
row_to_change,
)?;

Expand Down Expand Up @@ -499,7 +497,6 @@ fn handle_known_version(
return handle_known_version(
conn,
actor_id,
is_local,
version,
current,
booked,
Expand Down Expand Up @@ -573,7 +570,6 @@ fn handle_known_version(
async fn process_version(
pool: &SplitPool,
actor_id: ActorId,
is_local: bool,
version: Version,
known_version: KnownDbVersion,
booked: &Booked,
Expand All @@ -599,7 +595,6 @@ async fn process_version(
handle_known_version(
&mut conn,
actor_id,
is_local,
version,
known_version,
booked,
Expand Down Expand Up @@ -674,7 +669,6 @@ fn send_change_chunks<I: Iterator<Item = rusqlite::Result<Change>>>(
}

async fn process_sync(
local_actor_id: ActorId,
pool: SplitPool,
bookie: Bookie,
sender: Sender<SyncMessage>,
Expand Down Expand Up @@ -725,8 +719,6 @@ async fn process_sync(
None => continue,
};

let is_local = actor_id == local_actor_id;

let mut cleared: RangeInclusiveSet<Version> = RangeInclusiveSet::new();

{
Expand Down Expand Up @@ -787,7 +779,6 @@ async fn process_sync(
process_version(
&pool,
actor_id,
is_local,
version,
known_version,
&booked,
Expand All @@ -811,7 +802,6 @@ async fn process_sync(
process_version(
&pool,
actor_id,
is_local,
version,
known_version,
&booked,
Expand Down Expand Up @@ -1373,15 +1363,9 @@ pub async fn serve_sync(
let (tx, mut rx) = mpsc::channel::<SyncMessage>(256);

tokio::spawn(
process_sync(
agent.actor_id(),
agent.pool().clone(),
agent.bookie().clone(),
tx,
rx_need,
)
.instrument(info_span!("process_sync"))
.inspect_err(|e| error!("could not process sync request: {e}")),
process_sync(agent.pool().clone(), agent.bookie().clone(), tx, rx_need)
.instrument(info_span!("process_sync"))
.inspect_err(|e| error!("could not process sync request: {e}")),
);

let (send_res, recv_res) = tokio::join!(
Expand Down Expand Up @@ -1666,7 +1650,6 @@ mod tests {
handle_known_version(
&mut conn,
actor_id,
false,
Version(1),
known1,
&booked,
Expand Down Expand Up @@ -1696,7 +1679,6 @@ mod tests {
handle_known_version(
&mut conn,
actor_id,
false,
Version(2),
known2,
&booked,
Expand Down
31 changes: 19 additions & 12 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,18 @@ where
})?;

let has_changes: bool = tx
.prepare_cached(
"SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE site_id IS NULL AND db_version = ?);",
).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: None})?
.query_row([db_version], |row| row.get(0)).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: None})?;
.prepare_cached("SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE db_version = ?);")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?
.query_row([db_version], |row| row.get(0))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: None,
})?;

if !has_changes {
tx.commit().map_err(|source| ChangeError::Rusqlite {
Expand All @@ -105,9 +113,7 @@ where
trace!("version: {version}");

let last_seq: CrsqlSeq = tx
.prepare_cached(
"SELECT MAX(seq) FROM crsql_changes WHERE site_id IS NULL AND db_version = ?",
)
.prepare_cached("SELECT MAX(seq) FROM crsql_changes WHERE db_version = ?")
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
Expand Down Expand Up @@ -174,13 +180,14 @@ where

block_in_place(|| {
// TODO: make this more generic so both sync and local changes can use it.
let mut prepped = conn.prepare_cached(r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl
let mut prepped = conn.prepare_cached(
r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes
WHERE site_id IS NULL
AND db_version = ?
WHERE db_version = ?
ORDER BY seq ASC
"#)?;
"#,
)?;
let rows = prepped.query_map([db_version], row_to_change)?;
let chunked =
ChunkedChanges::new(rows, CrsqlSeq(0), last_seq, MAX_CHANGES_BYTE_SIZE);
Expand Down
19 changes: 8 additions & 11 deletions crates/corro-pg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2288,9 +2288,7 @@ fn handle_commit(agent: &Agent, conn: &Connection) -> rusqlite::Result<()> {
.query_row((), |row| row.get(0))?;

let has_changes: bool = conn
.prepare_cached(
"SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE site_id IS NULL AND db_version = ?);",
)?
.prepare_cached("SELECT EXISTS(SELECT 1 FROM crsql_changes WHERE db_version = ?);")?
.query_row([db_version], |row| row.get(0))?;

if !has_changes {
Expand All @@ -2306,9 +2304,7 @@ fn handle_commit(agent: &Agent, conn: &Connection) -> rusqlite::Result<()> {
};

let last_seq: CrsqlSeq = conn
.prepare_cached(
"SELECT MAX(seq) FROM crsql_changes WHERE site_id IS NULL AND db_version = ?",
)?
.prepare_cached("SELECT MAX(seq) FROM crsql_changes WHERE db_version = ?")?
.query_row([db_version], |row| row.get(0))?;

let mut book_writer = booked.blocking_write("handle_write_tx(book_writer)");
Expand Down Expand Up @@ -2356,13 +2352,14 @@ fn handle_commit(agent: &Agent, conn: &Connection) -> rusqlite::Result<()> {

block_in_place(|| {
// TODO: make this more generic so both sync and local changes can use it.
let mut prepped = conn.prepare_cached(r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl
let mut prepped = conn.prepare_cached(
r#"
SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes
WHERE site_id IS NULL
AND db_version = ?
WHERE db_version = ?
ORDER BY seq ASC
"#)?;
"#,
)?;
let rows = prepped.query_map([db_version], row_to_change)?;
let chunked =
ChunkedChanges::new(rows, CrsqlSeq(0), last_seq, MAX_CHANGES_BYTE_SIZE);
Expand Down
Binary file modified crates/corro-types/crsqlite-darwin-aarch64.dylib
100644 → 100755
Binary file not shown.
Binary file modified crates/corro-types/crsqlite-darwin-x86_64.dylib
100644 → 100755
Binary file not shown.
Binary file modified crates/corro-types/crsqlite-linux-aarch64.so
100644 → 100755
Binary file not shown.
Binary file modified crates/corro-types/crsqlite-linux-x86_64.so
100644 → 100755
Binary file not shown.
65 changes: 59 additions & 6 deletions crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,10 @@ impl Agent {
pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> {
let migrations: Vec<Box<dyn Migration>> = vec![
Box::new(init_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(v0_2_0_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(v0_2_0_1_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(v0_2_0_2_migration as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(bookkeeping_db_version_index as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(create_corro_subs as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(refactor_corro_members as fn(&Transaction) -> rusqlite::Result<()>),
Box::new(crsqlite_v0_16_migration as fn(&Transaction) -> rusqlite::Result<()>),
];

crate::sqlite::migrate(conn, migrations)
Expand Down Expand Up @@ -321,15 +322,15 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> {
Ok(())
}

fn v0_2_0_migration(tx: &Transaction) -> rusqlite::Result<()> {
fn bookkeeping_db_version_index(tx: &Transaction) -> rusqlite::Result<()> {
tx.execute_batch(
"
CREATE INDEX __corro_bookkeeping_db_version ON __corro_bookkeeping (db_version);
",
)
}

fn v0_2_0_1_migration(tx: &Transaction) -> rusqlite::Result<()> {
fn create_corro_subs(tx: &Transaction) -> rusqlite::Result<()> {
tx.execute_batch(
r#"
-- where subscriptions are stored
Expand All @@ -342,7 +343,7 @@ fn v0_2_0_1_migration(tx: &Transaction) -> rusqlite::Result<()> {
)
}

fn v0_2_0_2_migration(tx: &Transaction) -> rusqlite::Result<()> {
fn refactor_corro_members(tx: &Transaction) -> rusqlite::Result<()> {
tx.execute_batch(
r#"
-- remove state
Expand All @@ -357,6 +358,58 @@ fn v0_2_0_2_migration(tx: &Transaction) -> rusqlite::Result<()> {
)
}

// since crsqlite 0.16, site_id is NOT NULL in clock tables
// also sets the new 'merge-equal-values' config to true.
fn crsqlite_v0_16_migration(tx: &Transaction) -> rusqlite::Result<()> {
let tables: Vec<String> = tx.prepare("SELECT tbl_name FROM sqlite_master WHERE type='table' AND tbl_name LIKE '%__crsql_clock'")?.query_map([], |row| row.get(0))?.collect::<rusqlite::Result<Vec<_>>>()?;

for table in tables {
let indexes: Vec<String> = tx
.prepare(&format!(
"SELECT sql FROM sqlite_master WHERE type='index' AND name LIKE '{table}%'"
))?
.query_map([], |row| row.get(0))?
.collect::<rusqlite::Result<Vec<_>>>()?;

tx.execute_batch(
&format!(r#"
CREATE TABLE {table}_new (
key INTEGER NOT NULL,
col_name TEXT NOT NULL,
col_version INTEGER NOT NULL,
db_version INTEGER NOT NULL,
site_id INTEGER NOT NULL DEFAULT 0,
seq INTEGER NOT NULL,
PRIMARY KEY (key, col_name)
) WITHOUT ROWID, STRICT;
INSERT INTO {table}_new SELECT key, col_name, col_version, db_version, COALESCE(site_id, 0), seq FROM {table};
ALTER TABLE {table} RENAME TO {table}_old;
ALTER TABLE {table}_new RENAME TO {table};
DROP TABLE {table}_old;
CREATE INDEX IF NOT EXISTS corro_{table}_site_id_dbv ON {table} (site_id, db_version);
"#),
)?;

// recreate the indexes
for sql in indexes {
tx.execute_batch(&sql)?;
}
}

// we want this to be true or else we'll assuredly make our DB inconsistent.
let _value: i64 = tx.query_row(
"SELECT crsql_config_set('merge-equal-values', 1);",
[],
|row| row.get(0),
)?;

Ok(())
}

#[derive(Debug, Clone)]
pub struct SplitPool(Arc<SplitPoolInner>);

Expand Down
Loading