Skip to content

Commit

Permalink
Enable custom aggregate functions (take 2) (#529)
Browse files Browse the repository at this point in the history
* initial commit.

* documentation

* remove no-longer-valid type

* close over state initialization for performance

* link documentation in comment

* more testing

* run tests if they're main

* accept a single arg

* this kind of works but I'm abandoning this branch

Basically it seems that the sqlite extension pattern of 'allocate
a struct and stick it in the context pointer' is not going to work
for us here. I wonder if using the id of the pointer returned by
sqlite3_aggregate_context would be enough? Since no two functions
could use the same pointer, per https://www.sqlite.org/c3ref/aggregate_context.html ?

* a middle road sqlite3_agg_context solution

* try out auto-updating state

* improve quantile test, add multiple agg test

* add a null to the test

* acorn fails to parse ||=, whatever

* make eslint happy

* make initial_value an argument

* test step and finalize exceptions

* add memory leak test

* update docs to current interface

* delete state in exception handlers

* remove null state

* return init function and document object

* more tests and update back to init function

* update redefinition test for new interface

* update README to match fixed signature

* more consistent test formatting

* Update README.md

Co-authored-by: Ophir LOJKINE <contact@ophir.dev>

* clarify what exactly the result will contain

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* Improve documentation and type annotations

* ignore documentation in eslintrc

* reduce code size

Thanks a lot, @llimllib !
 
Co-authored-by: dogquery <>
Co-authored-by: Ophir LOJKINE <contact@ophir.dev>
  • Loading branch information
llimllib and lovasoa authored Sep 8, 2022
1 parent 577056b commit 5e5b063
Show file tree
Hide file tree
Showing 6 changed files with 451 additions and 62 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module.exports = {
ignorePatterns: [
"/dist/",
"/examples/",
"/documentation/",
"/node_modules/",
"/out/",
"/src/shell-post.js",
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ db.create_function("add_js", add);
// Run a query in which the function is used
db.run("INSERT INTO hello VALUES (add_js(7, 3), add_js('Hello ', 'world'));"); // Inserts 10 and 'Hello world'

// You can create custom aggregation functions, by passing a name
// and a set of functions to `db.create_aggregate`:
//
// - an `init` function. This function receives no argument and returns
// the initial value for the state of the aggregate function.
// - a `step` function. This function takes two arguments
// - the current state of the aggregation
// - a new value to aggregate to the state
// It should return a new value for the state.
// - a `finalize` function. This function receives a state object, and
// returns the final value of the aggregate. It can be omitted, in which case
// the final value of the state will be returned directly by the aggregate function.
//
// Here is an example aggregation function, `json_agg`, which will collect all
// input values and return them as a JSON array:
db.create_aggregate(
"json_agg",
{
init: () => [],
step: (state, val) => [...state, val],
finalize: (state) => JSON.stringify(state),
}
);

db.exec("SELECT json_agg(column1) FROM (VALUES ('hello'), ('world'))");
// -> The result of the query is the string '["hello","world"]'

// Export the database to an Uint8Array containing the SQLite database file
const binaryArray = db.export();
```
Expand Down
272 changes: 210 additions & 62 deletions src/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ Module["onRuntimeInitialized"] = function onRuntimeInitialized() {
"",
["number", "string", "number"]
);

// https://www.sqlite.org/c3ref/aggregate_context.html
// void *sqlite3_aggregate_context(sqlite3_context*, int nBytes)
var sqlite3_aggregate_context = cwrap(
"sqlite3_aggregate_context",
"number",
["number", "number"]
);
var registerExtensionFunctions = cwrap(
"RegisterExtensionFunctions",
"number",
Expand Down Expand Up @@ -1131,81 +1139,90 @@ Module["onRuntimeInitialized"] = function onRuntimeInitialized() {
return sqlite3_changes(this.db);
};

/** Register a custom function with SQLite
@example Register a simple function
db.create_function("addOne", function (x) {return x+1;})
db.exec("SELECT addOne(1)") // = 2
var extract_blob = function extract_blob(ptr) {
var size = sqlite3_value_bytes(ptr);
var blob_ptr = sqlite3_value_blob(ptr);
var blob_arg = new Uint8Array(size);
for (var j = 0; j < size; j += 1) {
blob_arg[j] = HEAP8[blob_ptr + j];
}
return blob_arg;
};

@param {string} name the name of the function as referenced in
SQL statements.
@param {function} func the actual function to be executed.
@return {Database} The database object. Useful for method chaining
*/
var parseFunctionArguments = function parseFunctionArguments(argc, argv) {
var args = [];
for (var i = 0; i < argc; i += 1) {
var value_ptr = getValue(argv + (4 * i), "i32");
var value_type = sqlite3_value_type(value_ptr);
var arg;
if (
value_type === SQLITE_INTEGER
|| value_type === SQLITE_FLOAT
) {
arg = sqlite3_value_double(value_ptr);
} else if (value_type === SQLITE_TEXT) {
arg = sqlite3_value_text(value_ptr);
} else if (value_type === SQLITE_BLOB) {
arg = extract_blob(value_ptr);
} else arg = null;
args.push(arg);
}
return args;
};
var setFunctionResult = function setFunctionResult(cx, result) {
switch (typeof result) {
case "boolean":
sqlite3_result_int(cx, result ? 1 : 0);
break;
case "number":
sqlite3_result_double(cx, result);
break;
case "string":
sqlite3_result_text(cx, result, -1, -1);
break;
case "object":
if (result === null) {
sqlite3_result_null(cx);
} else if (result.length != null) {
var blobptr = allocate(result, ALLOC_NORMAL);
sqlite3_result_blob(cx, blobptr, result.length, -1);
_free(blobptr);
} else {
sqlite3_result_error(cx, (
"Wrong API use : tried to return a value "
+ "of an unknown type (" + result + ")."
), -1);
}
break;
default:
sqlite3_result_null(cx);
}
};

/** Register a custom function with SQLite
@example <caption>Register a simple function</caption>
db.create_function("addOne", function (x) {return x+1;})
db.exec("SELECT addOne(1)") // = 2
@param {string} name the name of the function as referenced in
SQL statements.
@param {function} func the actual function to be executed.
@return {Database} The database object. Useful for method chaining
*/
Database.prototype["create_function"] = function create_function(
name,
func
) {
function wrapped_func(cx, argc, argv) {
var args = parseFunctionArguments(argc, argv);
var result;
function extract_blob(ptr) {
var size = sqlite3_value_bytes(ptr);
var blob_ptr = sqlite3_value_blob(ptr);
var blob_arg = new Uint8Array(size);
for (var j = 0; j < size; j += 1) {
blob_arg[j] = HEAP8[blob_ptr + j];
}
return blob_arg;
}
var args = [];
for (var i = 0; i < argc; i += 1) {
var value_ptr = getValue(argv + (4 * i), "i32");
var value_type = sqlite3_value_type(value_ptr);
var arg;
if (
value_type === SQLITE_INTEGER
|| value_type === SQLITE_FLOAT
) {
arg = sqlite3_value_double(value_ptr);
} else if (value_type === SQLITE_TEXT) {
arg = sqlite3_value_text(value_ptr);
} else if (value_type === SQLITE_BLOB) {
arg = extract_blob(value_ptr);
} else arg = null;
args.push(arg);
}
try {
result = func.apply(null, args);
} catch (error) {
sqlite3_result_error(cx, error, -1);
return;
}
switch (typeof result) {
case "boolean":
sqlite3_result_int(cx, result ? 1 : 0);
break;
case "number":
sqlite3_result_double(cx, result);
break;
case "string":
sqlite3_result_text(cx, result, -1, -1);
break;
case "object":
if (result === null) {
sqlite3_result_null(cx);
} else if (result.length != null) {
var blobptr = allocate(result, ALLOC_NORMAL);
sqlite3_result_blob(cx, blobptr, result.length, -1);
_free(blobptr);
} else {
sqlite3_result_error(cx, (
"Wrong API use : tried to return a value "
+ "of an unknown type (" + result + ")."
), -1);
}
break;
default:
sqlite3_result_null(cx);
}
setFunctionResult(cx, result);
}
if (Object.prototype.hasOwnProperty.call(this.functions, name)) {
removeFunction(this.functions[name]);
Expand All @@ -1229,6 +1246,137 @@ Module["onRuntimeInitialized"] = function onRuntimeInitialized() {
return this;
};

/** Register a custom aggregate with SQLite
@example <caption>Register a custom sum function</caption>
db.create_aggregate("js_sum", {
init: () => 0,
step: (state, value) => state + value,
finalize: state => state
});
db.exec("SELECT js_sum(column1) FROM (VALUES (1), (2))"); // = 3
@param {string} name the name of the aggregate as referenced in
SQL statements.
@param {object} aggregateFunctions
object containing at least a step function.
@param {function(): T} [aggregateFunctions.init = ()=>null]
a function receiving no arguments and returning an initial
value for the aggregate function. The initial value will be
null if this key is omitted.
@param {function(T, any) : T} aggregateFunctions.step
a function receiving the current state and a value to aggregate
and returning a new state.
Will receive the value from init for the first step.
@param {function(T): any} [aggregateFunctions.finalize = (state)=>state]
a function returning the result of the aggregate function
given its final state.
If omitted, the value returned by the last step
will be used as the final value.
@return {Database} The database object. Useful for method chaining
@template T
*/
Database.prototype["create_aggregate"] = function create_aggregate(
name,
aggregateFunctions
) {
// Default initializer and finalizer
var init = aggregateFunctions["init"]
|| function init() { return null; };
var finalize = aggregateFunctions["finalize"]
|| function finalize(state) { return state; };
var step = aggregateFunctions["step"];

if (!step) {
throw "An aggregate function must have a step function in " + name;
}

// state is a state object; we'll use the pointer p to serve as the
// key for where we hold our state so that multiple invocations of
// this function never step on each other
var state = {};

function wrapped_step(cx, argc, argv) {
// > The first time the sqlite3_aggregate_context(C,N) routine is
// > called for a particular aggregate function, SQLite allocates N
// > bytes of memory, zeroes out that memory, and returns a pointer
// > to the new memory.
//
// We're going to use that pointer as a key to our state array,
// since using sqlite3_aggregate_context as it's meant to be used
// through webassembly seems to be very difficult. Just allocate
// one byte.
var p = sqlite3_aggregate_context(cx, 1);

// If this is the first invocation of wrapped_step, call `init`
//
// Make sure that every path through the step and finalize
// functions deletes the value state[p] when it's done so we don't
// leak memory and possibly stomp the init value of future calls
if (!Object.hasOwnProperty.call(state, p)) state[p] = init();

var args = parseFunctionArguments(argc, argv);
var mergedArgs = [state[p]].concat(args);
try {
state[p] = step.apply(null, mergedArgs);
} catch (error) {
delete state[p];
sqlite3_result_error(cx, error, -1);
}
}

function wrapped_finalize(cx) {
var result;
var p = sqlite3_aggregate_context(cx, 1);
try {
result = finalize(state[p]);
} catch (error) {
delete state[p];
sqlite3_result_error(cx, error, -1);
return;
}
setFunctionResult(cx, result);
delete state[p];
}

if (Object.hasOwnProperty.call(this.functions, name)) {
removeFunction(this.functions[name]);
delete this.functions[name];
}
var finalize_name = name + "__finalize";
if (Object.hasOwnProperty.call(this.functions, finalize_name)) {
removeFunction(this.functions[finalize_name]);
delete this.functions[finalize_name];
}
// The signature of the wrapped function is :
// void wrapped(sqlite3_context *db, int argc, sqlite3_value **argv)
var step_ptr = addFunction(wrapped_step, "viii");

// The signature of the wrapped function is :
// void wrapped(sqlite3_context *db)
var finalize_ptr = addFunction(wrapped_finalize, "vi");
this.functions[name] = step_ptr;
this.functions[finalize_name] = finalize_ptr;

// passing null to the sixth parameter defines this as an aggregate
// function
//
// > An aggregate SQL function requires an implementation of xStep and
// > xFinal and NULL pointer must be passed for xFunc.
// - http://www.sqlite.org/c3ref/create_function.html
this.handleError(sqlite3_create_function_v2(
this.db,
name,
step.length - 1,
SQLITE_UTF8,
0,
0,
step_ptr,
finalize_ptr,
0
));
return this;
};

// export Database to Module
Module.Database = Database;
};
1 change: 1 addition & 0 deletions src/exported_functions.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@
"_sqlite3_result_int",
"_sqlite3_result_int64",
"_sqlite3_result_error",
"_sqlite3_aggregate_context",
"_RegisterExtensionFunctions"
]
Loading

0 comments on commit 5e5b063

Please sign in to comment.