Skip to content

Commit

Permalink
feat(rust): Get polars to compile to wasm target (pola-rs#6050)
Browse files Browse the repository at this point in the history
Co-authored-by: Kival Mahadew <kivalm@protonmail.com>
  • Loading branch information
2 people authored and zundertj committed Jan 7, 2023
1 parent 43daf81 commit 9bf4397
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 1 deletion.
4 changes: 4 additions & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ polars-ops = { version = "0.26.1", path = "./polars-ops" }
polars-sql = { version = "0.2", path = "./polars-sql", default-features = false, optional = true }
polars-time = { version = "0.26.1", path = "./polars-time", default-features = false, optional = true }

# enable js feature for getrandom to work in wasm
[target.'cfg(target_family = "wasm")'.dependencies.getrandom]
features = ["js"]

[dev-dependencies]
ahash = "0.7"
criterion = "0.3"
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ smartstring = { version = "1" }
thiserror.workspace = true
xxhash-rust.workspace = true

[target.'cfg(target_family = "wasm")'.dependencies]
wasm-timer = "0.2.5"

[dev-dependencies]
bincode = "1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,16 @@ impl Default for SCacheInner {
fn default() -> Self {
Self {
map: PlIdHashMap::with_capacity(HASHMAP_INIT_SIZE),
#[cfg(not(target_family = "wasm"))]
uuid: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos(),
#[cfg(target_family = "wasm")]
uuid: wasm_timer::SystemTime::now()
.duration_since(wasm_timer::UNIX_EPOCH)
.unwrap()
.as_nanos(),
payloads: Vec::with_capacity(HASHMAP_INIT_SIZE),
}
}
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-core/src/frame/groupby/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ impl Drop for GroupsIdx {
let v = std::mem::take(&mut self.all);
// ~65k took approximately 1ms on local machine, so from that point we drop on other thread
// to stop query from being blocked
#[cfg(not(target_family = "wasm"))]
if v.len() > 1 << 16 {
std::thread::spawn(move || drop(v));
} else {
drop(v);
}

#[cfg(target_family = "wasm")]
drop(v);
}
}

Expand Down
4 changes: 4 additions & 0 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub(crate) static PROCESS_ID: Lazy<u128> = Lazy::new(|| {
});

// this is re-exported in utils for polars child crates
#[cfg(not(target_family = "wasm"))] // only use this on non wasm targets
pub static POOL: Lazy<ThreadPool> = Lazy::new(|| {
ThreadPoolBuilder::new()
.num_threads(
Expand All @@ -59,5 +60,8 @@ pub static POOL: Lazy<ThreadPool> = Lazy::new(|| {
.expect("could not spawn threads")
});

#[cfg(target_family = "wasm")] // instead use this on wasm targets
pub static POOL: Lazy<polars_utils::wasm::Pool> = Lazy::new(|| polars_utils::wasm::Pool);

// utility for the tests to ensure a single thread can execute
pub static SINGLE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
5 changes: 4 additions & 1 deletion polars/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ impl<'a> CoreReader<'a> {

// If the number of threads given by the user is lower than our global thread pool we create
// new one.
#[cfg(not(target_family = "wasm"))]
let owned_pool;
#[cfg(not(target_family = "wasm"))]
let pool = if POOL.current_num_threads() != n_threads {
owned_pool = Some(
ThreadPoolBuilder::new()
Expand All @@ -497,7 +499,8 @@ impl<'a> CoreReader<'a> {
} else {
&POOL
};

#[cfg(target_family = "wasm")] // use a pre-created pool for wasm
let pool = &POOL;
// An empty file with a schema should return an empty DataFrame with that schema
if bytes.is_empty() {
// TODO! add DataFrame::new_from_schema
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ pub use hash::HashSingle;
pub type IdxSize = u32;
#[cfg(feature = "bigidx")]
pub type IdxSize = u64;

#[cfg(target_family = "wasm")]
pub mod wasm;
32 changes: 32 additions & 0 deletions polars/polars-utils/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::IdxSize;
///
/// # Safety
/// The caller must ensure that the right indexes fo `&[(_, IdxSize)]` are integers ranging from `0..idx.len`
#[cfg(not(target_family = "wasm"))]
pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(IdxSize, IdxSize)], out: &mut Vec<IdxSize>) {
let chunk_size = std::cmp::max(
idx.len() / pool.current_num_threads(),
Expand All @@ -40,3 +41,34 @@ pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(IdxSize, IdxSize)], out: &
// all elements are written
out.set_len(idx.len());
}

// wasm alternative with different signature
#[cfg(target_family = "wasm")]
pub unsafe fn perfect_sort(
pool: &crate::wasm::Pool,
idx: &[(IdxSize, IdxSize)],
out: &mut Vec<IdxSize>,
) {
let chunk_size = std::cmp::max(
idx.len() / pool.current_num_threads(),
pool.current_num_threads(),
);

out.reserve(idx.len());
let ptr = out.as_mut_ptr() as *const IdxSize as usize;

pool.install(|| {
idx.par_chunks(chunk_size).for_each(|indices| {
let ptr = ptr as *mut IdxSize;
for (idx_val, idx_location) in indices {
// Safety:
// idx_location is in bounds by invariant of this function
// and we ensured we have at least `idx.len()` capacity
*ptr.add(*idx_location as usize) = *idx_val;
}
});
});
// Safety:
// all elements are written
out.set_len(idx.len());
}
32 changes: 32 additions & 0 deletions polars/polars-utils/src/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
pub struct Pool;

impl Pool {
pub fn current_num_threads(&self) -> usize {
rayon::current_num_threads()
}

pub fn install<OP, R>(&self, op: OP) -> R
where
OP: FnOnce() -> R + Send,
R: Send,
{
op()
}

pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
rayon::join(oper_a, oper_b)
}

pub fn spawn<F>(&self, func: F)
where
F: 'static + FnOnce() + Send,
{
rayon::spawn(func);
}
}

0 comments on commit 9bf4397

Please sign in to comment.