Skip to content

Commit

Permalink
refactor(rust): Add buffers to zip heads to reduce contention (pola-r…
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Sep 30, 2024
1 parent 477a80e commit 4c6d501
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
1 change: 1 addition & 0 deletions crates/polars-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ mod utils;
// TODO: experiment with these, and make them configurable through environment variables.
const DEFAULT_LINEARIZER_BUFFER_SIZE: usize = 4;
const DEFAULT_DISTRIBUTOR_BUFFER_SIZE: usize = 4;
const DEFAULT_ZIP_HEAD_BUFFER_SIZE: usize = 4;
25 changes: 22 additions & 3 deletions crates/polars-stream/src/nodes/zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use polars_core::functions::concat_df_horizontal;
use polars_core::schema::Schema;
use polars_core::series::Series;
use polars_error::polars_ensure;
use polars_utils::itertools::Itertools;

use super::compute_node_prelude::*;
use crate::morsel::SourceToken;
use crate::DEFAULT_ZIP_HEAD_BUFFER_SIZE;

/// The head of an input stream.
#[derive(Debug)]
Expand Down Expand Up @@ -211,7 +213,24 @@ impl ComputeNode for ZipNode {
assert!(send.len() == 1);
assert!(!recv.is_empty());
let mut sender = send[0].take().unwrap().serial();
let mut receivers: Vec<_> = recv.iter_mut().map(|r| Some(r.take()?.serial())).collect();

let mut receivers = recv
.iter_mut()
.map(|r| {
// Add buffering to each receiver to reduce contention between input heads.
let mut serial_recv = r.take()?.serial();
let (buf_send, buf_recv) = tokio::sync::mpsc::channel(DEFAULT_ZIP_HEAD_BUFFER_SIZE);
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
while let Ok(morsel) = serial_recv.recv().await {
if buf_send.send(morsel).await.is_err() {
break;
}
}
Ok(())
}));
Some(buf_recv)
})
.collect_vec();

join_handles.push(scope.spawn_task(TaskPriority::High, async move {
let mut out = Vec::new();
Expand All @@ -227,7 +246,7 @@ impl ComputeNode for ZipNode {
for (recv_idx, opt_recv) in receivers.iter_mut().enumerate() {
if let Some(recv) = opt_recv {
while !self.input_heads[recv_idx].ready_to_send() {
if let Ok(morsel) = recv.recv().await {
if let Some(morsel) = recv.recv().await {
self.input_heads[recv_idx].add_morsel(morsel);
} else {
break;
Expand Down Expand Up @@ -283,7 +302,7 @@ impl ComputeNode for ZipNode {

for (recv_idx, opt_recv) in receivers.iter_mut().enumerate() {
if let Some(recv) = opt_recv {
while let Ok(mut morsel) = recv.recv().await {
while let Some(mut morsel) = recv.recv().await {
morsel.source_token().stop();
drop(morsel.take_consume_token());
self.input_heads[recv_idx].add_morsel(morsel);
Expand Down

0 comments on commit 4c6d501

Please sign in to comment.