Skip to content

Commit

Permalink
use rayon to spawn threads in the global pool
Browse files Browse the repository at this point in the history
  • Loading branch information
zh217 committed Dec 8, 2023
1 parent ea5f3f2 commit 303ab1f
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 20 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion cozo-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ async-stream = "0.3.3"
futures = "0.3.25"
crossbeam = "0.8.2"
eventsource-client = "0.12.0"
tower-http = { version = "0.5.0", features = ["full"] }
tower-http = { version = "0.5.0", features = ["full"] }
rayon = "1.8.0"
5 changes: 1 addition & 4 deletions cozo-bin/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ use std::collections::BTreeMap;
use std::convert::Infallible;
use std::net::{Ipv6Addr, SocketAddr};
use std::str::FromStr;
// use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;
// use std::thread;

use axum::body::Body;
use axum::extract::{DefaultBodyLimit, Path, Query, State};
Expand Down Expand Up @@ -583,7 +580,7 @@ async fn register_rule(
let rule_senders = st.rule_senders.clone();
let rule_counter = st.rule_counter.clone();

thread::spawn(move || {
rayon::spawn(move || {
for (inputs, options, sender) in task_receiver {
let id = rule_counter.fetch_add(1, Ordering::AcqRel);
let inputs: serde_json::Value =
Expand Down
3 changes: 1 addition & 2 deletions cozo-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

use std::collections::BTreeMap;
use std::path::Path;
use std::thread;
#[allow(unused_imports)]
use std::time::Instant;

Expand Down Expand Up @@ -506,7 +505,7 @@ impl DbInstance {
let (app2db_send, app2db_recv) = bounded(1);
let (db2app_send, db2app_recv) = bounded(1);
let db = self.clone();
thread::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
rayon::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
MultiTransaction {
sender: app2db_send,
receiver: db2app_recv,
Expand Down
1 change: 1 addition & 0 deletions cozo-lib-nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ lazy_static = "1.4.0"
crossbeam = "0.8.2"
miette = "5.5.0"
serde_json = "1.0.96"
rayon = "1.8.0"

[dependencies.neon]
version = "0.10"
Expand Down
19 changes: 9 additions & 10 deletions cozo-lib-nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

use crossbeam::channel::Sender;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -410,7 +409,7 @@ fn query_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {

let channel = cx.channel();

thread::spawn(move || {
rayon::spawn(move || {
let result = db.run_script(
&query,
params,
Expand Down Expand Up @@ -457,7 +456,7 @@ fn query_tx(mut cx: FunctionContext) -> JsResult<JsUndefined> {
.send(TransactionPayload::Query((query.clone(), params)))
{
Ok(_) => {
thread::spawn(move || {
rayon::spawn(move || {
let result = tx.receiver.recv();
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
Expand Down Expand Up @@ -497,7 +496,7 @@ fn backup_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(2)?.root(&mut cx);
let channel = cx.channel();

thread::spawn(move || {
rayon::spawn(move || {
let result = db.backup_db(&path);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
Expand All @@ -523,7 +522,7 @@ fn restore_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(2)?.root(&mut cx);
let channel = cx.channel();

thread::spawn(move || {
rayon::spawn(move || {
let result = db.restore_backup(&path);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
Expand Down Expand Up @@ -553,7 +552,7 @@ fn export_relations(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(2)?.root(&mut cx);
let channel = cx.channel();

thread::spawn(move || {
rayon::spawn(move || {
let result = db.export_relations(relations.iter());
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
Expand Down Expand Up @@ -599,7 +598,7 @@ fn import_relations(mut cx: FunctionContext) -> JsResult<JsUndefined> {
rels.insert(name, nr);
}

thread::spawn(move || {
rayon::spawn(move || {
let result = db.import_relations(rels);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
Expand Down Expand Up @@ -632,7 +631,7 @@ fn import_from_backup(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(3)?.root(&mut cx);
let channel = cx.channel();

thread::spawn(move || {
rayon::spawn(move || {
let result = db.import_from_backup(path, &relations);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
Expand Down Expand Up @@ -665,7 +664,7 @@ fn register_callback(mut cx: FunctionContext) -> JsResult<JsNumber> {
let channel = cx.channel();

let (rid, recv) = db.register_callback(&name, capacity);
thread::spawn(move || {
rayon::spawn(move || {
for (op, new, old) in recv {
let cb = callback.clone();
channel.send(move |mut cx| {
Expand Down Expand Up @@ -701,7 +700,7 @@ fn register_named_rule(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let msg = cx.string(err.to_string());
return cx.throw(msg);
}
thread::spawn(move || {
rayon::spawn(move || {
for (inputs, options, sender) in recv {
let id = HANDLES.cb_idx.fetch_add(1, Ordering::AcqRel);
{
Expand Down
3 changes: 2 additions & 1 deletion cozo-lib-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ io-uring = ["cozo/io-uring"]
cozo = { version = "0.7.6", path = "../cozo-core", default-features = false }
pyo3 = { version = "0.20.0", features = ["extension-module", "abi3", "abi3-py37"] }
miette = "5.5.0"
serde_json = "1.0.96"
serde_json = "1.0.96"
rayon = "1.8.0"
3 changes: 1 addition & 2 deletions cozo-lib-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/

use std::collections::{BTreeMap, BTreeSet};
use std::thread;

use miette::{IntoDiagnostic, Report, Result};
use pyo3::exceptions::PyException;
Expand Down Expand Up @@ -258,7 +257,7 @@ impl CozoDbPy {
if let Some(db) = &self.db {
let cb: Py<PyAny> = callback.into();
let (id, ch) = db.register_callback(rel, None);
thread::spawn(move || {
rayon::spawn(move || {
for (op, new, old) in ch {
Python::with_gil(|py| {
let op = PyString::new(py, op.as_str()).into();
Expand Down

0 comments on commit 303ab1f

Please sign in to comment.