Skip to content

Commit

Permalink
[CHORE] early terminate read parquet bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Aug 27, 2024
1 parent f21ca7a commit ca017b0
Showing 1 changed file with 36 additions and 23 deletions.
59 changes: 36 additions & 23 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,37 +740,50 @@ pub fn read_parquet_bulk(
let columns = owned_columns
.as_ref()
.map(|s| s.iter().map(AsRef::as_ref).collect::<Vec<_>>());
Ok((
i,
read_parquet_single(
&uri,
columns.as_deref(),
start_offset,
num_rows,
owned_row_group,
owned_predicate,
io_client,
io_stats,
schema_infer_options,
owned_field_id_mapping,
metadata,
delete_rows,
chunk_size,
)
.await?,
))
read_parquet_single(
&uri,
columns.as_deref(),
start_offset,
num_rows,
owned_row_group,
owned_predicate,
io_client,
io_stats,
schema_infer_options,
owned_field_id_mapping,
metadata,
delete_rows,
chunk_size,
)
.await
})
}));
let mut remaining_rows = num_rows.map(|x| x as i64);
task_stream
.buffer_unordered(num_parallel_tasks)
// Limit the number of file reads we have in flight at any given time.
.buffered(num_parallel_tasks)
// Terminate the stream if we have already reached the row limit. With the upstream buffering, we will still read up to
// num_parallel_tasks redundant files.
.try_take_while(|result| {
match (result, remaining_rows) {
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
// (1) No limit, never early-terminate.
// (2) Encountered error, propagate error to try_collect to allow it to short-circuit.
(_, None) | (Err(_), _) => futures::future::ready(Ok(true)),
}
})
.try_collect::<Vec<_>>()
.await
})
.context(JoinSnafu { path: "UNKNOWN" })?;

let mut collected = tables.into_iter().collect::<DaftResult<Vec<_>>>()?;
collected.sort_by_key(|(idx, _)| *idx);
Ok(collected.into_iter().map(|(_, v)| v).collect())
tables.into_iter().collect::<DaftResult<Vec<_>>>()
}

#[allow(clippy::too_many_arguments)]
Expand Down

0 comments on commit ca017b0

Please sign in to comment.