Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Get polars to compile to wasm target #6050

Merged
merged 1 commit into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ polars-ops = { version = "0.26.1", path = "./polars-ops" }
polars-sql = { version = "0.2", path = "./polars-sql", default-features = false, optional = true }
polars-time = { version = "0.26.1", path = "./polars-time", default-features = false, optional = true }

# enable js feature for getrandom to work in wasm
[target.'cfg(target_family = "wasm")'.dependencies.getrandom]
features = ["js"]

[dev-dependencies]
ahash = "0.7"
criterion = "0.3"
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ smartstring = { version = "1" }
thiserror.workspace = true
xxhash-rust.workspace = true

[target.'cfg(target_family = "wasm")'.dependencies]
wasm-timer = "0.2.5"

[dev-dependencies]
bincode = "1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,16 @@ impl Default for SCacheInner {
fn default() -> Self {
Self {
map: PlIdHashMap::with_capacity(HASHMAP_INIT_SIZE),
#[cfg(not(target_family = "wasm"))]
uuid: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos(),
#[cfg(target_family = "wasm")]
uuid: wasm_timer::SystemTime::now()
.duration_since(wasm_timer::UNIX_EPOCH)
.unwrap()
.as_nanos(),
payloads: Vec::with_capacity(HASHMAP_INIT_SIZE),
}
}
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-core/src/frame/groupby/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ impl Drop for GroupsIdx {
let v = std::mem::take(&mut self.all);
// ~65k took approximately 1ms on local machine, so from that point we drop on other thread
// to stop query from being blocked
#[cfg(not(target_family = "wasm"))]
if v.len() > 1 << 16 {
std::thread::spawn(move || drop(v));
} else {
drop(v);
}

#[cfg(target_family = "wasm")]
drop(v);
}
}

Expand Down
4 changes: 4 additions & 0 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub(crate) static PROCESS_ID: Lazy<u128> = Lazy::new(|| {
});

// this is re-exported in utils for polars child crates
#[cfg(not(target_family = "wasm"))] // only use this on non wasm targets
pub static POOL: Lazy<ThreadPool> = Lazy::new(|| {
ThreadPoolBuilder::new()
.num_threads(
Expand All @@ -59,5 +60,8 @@ pub static POOL: Lazy<ThreadPool> = Lazy::new(|| {
.expect("could not spawn threads")
});

#[cfg(target_family = "wasm")] // instead use this on wasm targets
pub static POOL: Lazy<polars_utils::wasm::Pool> = Lazy::new(|| polars_utils::wasm::Pool);

// utility for the tests to ensure a single thread can execute
pub static SINGLE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
5 changes: 4 additions & 1 deletion polars/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ impl<'a> CoreReader<'a> {

// If the number of threads given by the user is lower than our global thread pool we create
// new one.
#[cfg(not(target_family = "wasm"))]
let owned_pool;
#[cfg(not(target_family = "wasm"))]
let pool = if POOL.current_num_threads() != n_threads {
owned_pool = Some(
ThreadPoolBuilder::new()
Expand All @@ -497,7 +499,8 @@ impl<'a> CoreReader<'a> {
} else {
&POOL
};

#[cfg(target_family = "wasm")] // use a pre-created pool for wasm
let pool = &POOL;
// An empty file with a schema should return an empty DataFrame with that schema
if bytes.is_empty() {
// TODO! add DataFrame::new_from_schema
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ pub use hash::HashSingle;
pub type IdxSize = u32;
#[cfg(feature = "bigidx")]
pub type IdxSize = u64;

#[cfg(target_family = "wasm")]
pub mod wasm;
32 changes: 32 additions & 0 deletions polars/polars-utils/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::IdxSize;
///
/// # Safety
/// The caller must ensure that the right indexes fo `&[(_, IdxSize)]` are integers ranging from `0..idx.len`
#[cfg(not(target_family = "wasm"))]
pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(IdxSize, IdxSize)], out: &mut Vec<IdxSize>) {
let chunk_size = std::cmp::max(
idx.len() / pool.current_num_threads(),
Expand All @@ -40,3 +41,34 @@ pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(IdxSize, IdxSize)], out: &
// all elements are written
out.set_len(idx.len());
}

// wasm alternative with different signature
#[cfg(target_family = "wasm")]
pub unsafe fn perfect_sort(
pool: &crate::wasm::Pool,
idx: &[(IdxSize, IdxSize)],
out: &mut Vec<IdxSize>,
) {
let chunk_size = std::cmp::max(
idx.len() / pool.current_num_threads(),
pool.current_num_threads(),
);

out.reserve(idx.len());
let ptr = out.as_mut_ptr() as *const IdxSize as usize;

pool.install(|| {
idx.par_chunks(chunk_size).for_each(|indices| {
let ptr = ptr as *mut IdxSize;
for (idx_val, idx_location) in indices {
// Safety:
// idx_location is in bounds by invariant of this function
// and we ensured we have at least `idx.len()` capacity
*ptr.add(*idx_location as usize) = *idx_val;
}
});
});
// Safety:
// all elements are written
out.set_len(idx.len());
}
32 changes: 32 additions & 0 deletions polars/polars-utils/src/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
pub struct Pool;

impl Pool {
pub fn current_num_threads(&self) -> usize {
rayon::current_num_threads()
}

pub fn install<OP, R>(&self, op: OP) -> R
where
OP: FnOnce() -> R + Send,
R: Send,
{
op()
}

pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
rayon::join(oper_a, oper_b)
}

pub fn spawn<F>(&self, func: F)
where
F: 'static + FnOnce() + Send,
{
rayon::spawn(func);
}
}