Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved syntax for CRR / CRDT creation #324

Merged
merged 12 commits into from
Aug 15, 2023
2 changes: 1 addition & 1 deletion core/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,4 @@ $(TARGET_FUZZ): $(prefix) $(TARGET_SQLITE3_EXTRA_C) src/fuzzer.cc $(ext_files)
valgrind \
ubsan analyzer fuzz asan static

FORCE: ;
FORCE: ;
21 changes: 17 additions & 4 deletions core/rs/core/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ pub fn backfill_table(
pk_cols: Vec<&str>,
non_pk_cols: Vec<&str>,
is_commit_alter: bool,
no_tx: bool,
) -> Result<ResultCode, ResultCode> {
db.exec_safe("SAVEPOINT backfill")?;
if !no_tx {
db.exec_safe("SAVEPOINT backfill")?;
}

let sql = format!(
"SELECT {pk_cols} FROM \"{table}\" AS t1
Expand Down Expand Up @@ -46,16 +49,26 @@ pub fn backfill_table(
};

if let Err(e) = result {
db.exec_safe("ROLLBACK TO backfill")?;
if !no_tx {
db.exec_safe("ROLLBACK TO backfill")?;
}

return Err(e);
}

if let Err(e) = backfill_missing_columns(db, table, &pk_cols, &non_pk_cols, is_commit_alter) {
db.exec_safe("ROLLBACK TO backfill")?;
if !no_tx {
db.exec_safe("ROLLBACK TO backfill")?;
}

return Err(e);
}

db.exec_safe("RELEASE backfill")
if !no_tx {
db.exec_safe("RELEASE backfill")
} else {
Ok(ResultCode::OK)
}
}

/**
Expand Down
8 changes: 8 additions & 0 deletions core/rs/core/src/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ extern "C" {
ext_data: *mut crsql_ExtData,
err_msg: *mut *mut c_char,
) -> c_int;
pub fn crsql_createCrr(
db: *mut sqlite::sqlite3,
schemaName: *const c_char,
tblName: *const c_char,
isCommitAlter: c_int,
noTx: c_int,
err: *mut *mut c_char,
) -> c_int;
}

#[test]
Expand Down
263 changes: 263 additions & 0 deletions core/rs/core/src/create_cl_set_vtab.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
extern crate alloc;

use core::ffi::{c_char, c_int, c_void};

use crate::alloc::borrow::ToOwned;
use crate::c::crsql_createCrr;
use alloc::boxed::Box;
use alloc::ffi::CString;
use alloc::format;
use alloc::string::String;
use sqlite::{convert_rc, sqlite3, Connection, CursorRef, StrRef, VTabArgs, VTabRef};
use sqlite_nostd as sqlite;
use sqlite_nostd::ResultCode;

// Virtual table definition to create a causal length set backed table.

#[repr(C)]
struct CLSetTab {
base: sqlite::vtab,
base_table_name: String,
db_name: String,
db: *mut sqlite3,
}

// used in response to `create virtual table ... using clset`
extern "C" fn create(
db: *mut sqlite::sqlite3,
_aux: *mut c_void,
argc: c_int,
argv: *const *const c_char,
vtab: *mut *mut sqlite::vtab,
err: *mut *mut c_char,
) -> c_int {
match create_impl(db, argc, argv, vtab, err) {
Ok(rc) => rc as c_int,
Err(rc) => {
// deallocate the vtab on error.
unsafe {
if *vtab != core::ptr::null_mut() {
let tab = Box::from_raw((*vtab).cast::<CLSetTab>());
drop(tab);
*vtab = core::ptr::null_mut();
}
}
rc as c_int
}
}
}

fn create_impl(
db: *mut sqlite::sqlite3,
argc: c_int,
argv: *const *const c_char,
vtab: *mut *mut sqlite::vtab,
err: *mut *mut c_char,
) -> Result<ResultCode, ResultCode> {
// This is the schema component
let vtab_args = sqlite::parse_vtab_args(argc, argv)?;
connect_create_shared(db, vtab, &vtab_args)?;

// We can't wrap this in a savepoint for some reason. I guess because the `CREATE VIRTUAL TABLE..`
// statement is processing? 🤷‍♂️
create_clset_storage(db, &vtab_args, err)?;
let db_name_c = CString::new(vtab_args.database_name)?;
let table_name_c = CString::new(base_name_from_virtual_name(vtab_args.table_name))?;

// TODO: move `createCrr` to Rust
let rc = unsafe { crsql_createCrr(db, db_name_c.as_ptr(), table_name_c.as_ptr(), 0, 1, err) };
convert_rc(rc)
}

fn create_clset_storage(
db: *mut sqlite::sqlite3,
args: &VTabArgs,
err: *mut *mut c_char,
) -> Result<ResultCode, ResultCode> {
// Is the _last_ arg all the args? Or is it comma separated in some way?
// What about index definitions...
// Let the user later create them against the base table? Or via insertions into our vtab schema?
let table_def = args.arguments.join(",");
if !args.table_name.ends_with("_schema") {
err.set("CLSet virtual table names must end with `_schema`");
return Err(ResultCode::MISUSE);
}

db.exec_safe(&format!(
"CREATE TABLE \"{db_name}\".\"{table_name}\" ({table_def})",
db_name = crate::util::escape_ident(args.database_name),
table_name = crate::util::escape_ident(base_name_from_virtual_name(args.table_name)),
table_def = table_def
))
}

fn base_name_from_virtual_name(virtual_name: &str) -> &str {
&virtual_name[0..(virtual_name.len() - "_schema".len())]
}

// connect to an existing virtual table previously created by `create virtual table`
extern "C" fn connect(
db: *mut sqlite::sqlite3,
_aux: *mut c_void,
argc: c_int,
argv: *const *const c_char,
vtab: *mut *mut sqlite::vtab,
_err: *mut *mut c_char,
) -> c_int {
let vtab_args = sqlite::parse_vtab_args(argc, argv);
match vtab_args {
Ok(vtab_args) => match connect_create_shared(db, vtab, &vtab_args) {
Ok(rc) | Err(rc) => rc as c_int,
},
Err(_e) => {
// free the tab if it was allocated
unsafe {
if *vtab != core::ptr::null_mut() {
let tab = Box::from_raw((*vtab).cast::<CLSetTab>());
drop(tab);
*vtab = core::ptr::null_mut();
}
};
ResultCode::FORMAT as c_int
}
}
}

fn connect_create_shared(
db: *mut sqlite::sqlite3,
vtab: *mut *mut sqlite::vtab,
args: &VTabArgs,
) -> Result<ResultCode, ResultCode> {
sqlite::declare_vtab(
db,
"CREATE TABLE x(alteration TEXT HIDDEN, schema TEXT HIDDEN);",
)?;
let tab = Box::new(CLSetTab {
base: sqlite::vtab {
nRef: 0,
pModule: core::ptr::null(),
zErrMsg: core::ptr::null_mut(),
},
base_table_name: base_name_from_virtual_name(args.table_name).to_owned(),
db_name: args.database_name.to_owned(),
db: db,
});
vtab.set(tab);
Ok(ResultCode::OK)
}

extern "C" fn best_index(_vtab: *mut sqlite::vtab, _index_info: *mut sqlite::index_info) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int {
if vtab != core::ptr::null_mut() {
let tab = unsafe { Box::from_raw(vtab.cast::<CLSetTab>()) };
drop(tab);
}
ResultCode::OK as c_int
}

extern "C" fn destroy(vtab: *mut sqlite::vtab) -> c_int {
let tab = unsafe { Box::from_raw(vtab.cast::<CLSetTab>()) };
let ret = tab.db.exec_safe(&format!(
"DROP TABLE \"{db_name}\".\"{table_name}\";
DROP TABLE \"{db_name}\".\"{table_name}__crsql_clock\";",
table_name = crate::util::escape_ident(&tab.base_table_name),
db_name = crate::util::escape_ident(&tab.db_name)
));
match ret {
Err(rc) | Ok(rc) => rc as c_int,
}
}

extern "C" fn open(_vtab: *mut sqlite::vtab, cursor: *mut *mut sqlite::vtab_cursor) -> c_int {
cursor.set(Box::new(sqlite::vtab_cursor {
pVtab: core::ptr::null_mut(),
}));
ResultCode::OK as c_int
}

extern "C" fn close(cursor: *mut sqlite::vtab_cursor) -> c_int {
unsafe {
drop(Box::from_raw(cursor));
}
ResultCode::OK as c_int
}

extern "C" fn filter(
_cursor: *mut sqlite::vtab_cursor,
_idx_num: c_int,
_idx_str: *const c_char,
_argc: c_int,
_argv: *mut *mut sqlite::value,
) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn next(_cursor: *mut sqlite::vtab_cursor) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn eof(_cursor: *mut sqlite::vtab_cursor) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn column(
_cursor: *mut sqlite::vtab_cursor,
_ctx: *mut sqlite::context,
_col_num: c_int,
) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn rowid(_cursor: *mut sqlite::vtab_cursor, _row_id: *mut sqlite::int64) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn begin(_vtab: *mut sqlite::vtab) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn commit(_vtab: *mut sqlite::vtab) -> c_int {
ResultCode::OK as c_int
}

extern "C" fn rollback(_vtab: *mut sqlite::vtab) -> c_int {
ResultCode::OK as c_int
}

static MODULE: sqlite_nostd::module = sqlite_nostd::module {
iVersion: 0,
xCreate: Some(create),
xConnect: Some(connect),
xBestIndex: Some(best_index),
xDisconnect: Some(disconnect),
xDestroy: Some(destroy),
xOpen: Some(open),
xClose: Some(close),
xFilter: Some(filter),
xNext: Some(next),
xEof: Some(eof),
xColumn: Some(column),
xRowid: Some(rowid),
xUpdate: None,
xBegin: Some(begin),
xSync: None,
xCommit: Some(commit),
xRollback: Some(rollback),
xFindFunction: None,
xRename: None,
xSavepoint: None,
xRelease: None,
xRollbackTo: None,
xShadowName: None,
};

pub fn create_module(db: *mut sqlite::sqlite3) -> Result<ResultCode, ResultCode> {
db.create_module_v2("clset", &MODULE, None, None)?;

// xCreate(|x| 0);

Ok(ResultCode::OK)
}
22 changes: 16 additions & 6 deletions core/rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod changes_vtab_read;
mod changes_vtab_write;
mod compare_values;
mod consts;
mod create_cl_set_vtab;
mod is_crr;
mod pack_columns;
mod stmt_cache;
Expand All @@ -31,7 +32,7 @@ pub use pack_columns::unpack_columns;
pub use pack_columns::ColumnValue;
use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{context, Connection, Context, Value};
use sqlite_nostd::{Connection, Context, Value};
pub use teardown::*;

pub extern "C" fn crsql_as_table(
Expand Down Expand Up @@ -123,18 +124,23 @@ pub extern "C" fn sqlite3_crsqlcore_init(
}

let rc = unpack_columns_vtab::create_module(db).unwrap_or(sqlite::ResultCode::ERROR);
if rc != ResultCode::OK {
return rc as c_int;
}
let rc = create_cl_set_vtab::create_module(db).unwrap_or(ResultCode::ERROR);
return rc as c_int;
}

#[no_mangle]
pub extern "C" fn crsql_backfill_table(
context: *mut context,
db: *mut sqlite::sqlite3,
table: *const c_char,
pk_cols: *const *const c_char,
pk_cols_len: c_int,
non_pk_cols: *const *const c_char,
non_pk_cols_len: c_int,
is_commit_alter: c_int,
no_tx: c_int,
) -> c_int {
let table = unsafe { CStr::from_ptr(table).to_str() };
let pk_cols = unsafe {
Expand All @@ -153,10 +159,14 @@ pub extern "C" fn crsql_backfill_table(
};

let result = match (table, pk_cols, non_pk_cols) {
(Ok(table), Ok(pk_cols), Ok(non_pk_cols)) => {
let db = context.db_handle();
backfill_table(db, table, pk_cols, non_pk_cols, is_commit_alter != 0)
}
(Ok(table), Ok(pk_cols), Ok(non_pk_cols)) => backfill_table(
db,
table,
pk_cols,
non_pk_cols,
is_commit_alter != 0,
no_tx != 0,
),
_ => Err(ResultCode::ERROR),
};

Expand Down
Loading
Loading