Skip to content

Commit

Permalink
use crossbeam instead of mpsc
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 17, 2023
1 parent a7d0393 commit 082f2c7
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 9 deletions.
1 change: 1 addition & 0 deletions polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ description = "Lazy query engine for the Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
crossbeam-channel = "0.5"
enum_dispatch = "0.3"
hashbrown.workspace = true
num.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::any::Any;
use std::path::Path;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
// use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread::JoinHandle;

use crossbeam_channel::{bounded, Receiver, Sender};
use polars_core::prelude::*;
use polars_io::parquet::{BatchedWriter, ParquetWriter};
use polars_plan::prelude::ParquetWriteOptions;
Expand Down Expand Up @@ -57,7 +58,7 @@ fn init_writer_thread(
// Ensure the data is return in the order it was streamed
#[derive(Clone)]
pub struct ParquetSink {
sender: SyncSender<Option<DataChunk>>,
sender: Sender<Option<DataChunk>>,
io_thread_handle: Arc<Option<JoinHandle<()>>>,
}

Expand All @@ -73,7 +74,7 @@ impl ParquetSink {

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = sync_channel(backpressure);
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
Expand Down
7 changes: 3 additions & 4 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/sort/io.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crossbeam_channel::{bounded, Sender};
use polars_core::prelude::*;
use polars_core::{POOL, PROCESS_ID};
use polars_io::prelude::*;
Expand All @@ -13,7 +13,7 @@ pub(super) type DfIter = Box<dyn ExactSizeIterator<Item = DataFrame> + Sync + Se
type Payload = (Option<IdxCa>, DfIter);

pub(super) struct IOThread {
sender: SyncSender<Payload>,
sender: Sender<Payload>,
pub(super) dir: PathBuf,
pub(super) sent: Arc<AtomicUsize>,
pub(super) total: Arc<AtomicUsize>,
Expand Down Expand Up @@ -53,8 +53,7 @@ impl IOThread {
std::fs::create_dir_all(&dir)?;

// we need some pushback otherwise we still could go OOM.
let (sender, receiver) =
std::sync::mpsc::sync_channel::<Payload>(POOL.current_num_threads() * 2);
let (sender, receiver) = bounded::<Payload>(POOL.current_num_threads() * 2);

let sent: Arc<AtomicUsize> = Default::default();
let total: Arc<AtomicUsize> = Default::default();
Expand Down
25 changes: 25 additions & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ name = "polars"
crate-type = ["cdylib"]

[profile.release]
codegen-units = 1
lto = "fat"
#codegen-units = 1
#lto = "fat"

# This is ignored here; would be set in .cargo/config.toml.
# Should not be used when packaging
Expand Down

0 comments on commit 082f2c7

Please sign in to comment.