Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: break stream by max bytes param #3435

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions rust/lance-datafusion/src/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,37 @@ struct BreakStreamState {
max_rows: usize,
rows_seen: usize,
rows_remaining: usize,
max_bytes: usize,
bytes_seen: usize,
bytes_remaining: usize,
batch: Option<RecordBatch>,
}

impl BreakStreamState {
fn next(mut self) -> Option<(Result<RecordBatch>, Self)> {
if self.rows_remaining == 0 {
if self.rows_remaining == 0 || self.bytes_remaining == 0 {
return None;
}
if self.rows_remaining + self.rows_seen <= self.max_rows {
if self.rows_remaining + self.rows_seen <= self.max_rows
&& self.bytes_remaining + self.bytes_seen <= self.max_bytes
{
self.rows_seen = (self.rows_seen + self.rows_remaining) % self.max_rows;
self.rows_remaining = 0;
self.bytes_seen = (self.bytes_seen + self.bytes_remaining) % self.max_bytes;
self.bytes_remaining = 0;
let next = self.batch.take().unwrap();
Some((Ok(next), self))
} else {
let rows_to_emit = self.max_rows - self.rows_seen;
let avg_bytes_row = self.bytes_remaining as f64 / self.rows_remaining as f64;
let rows_to_emit = if self.rows_remaining + self.rows_seen > self.max_rows {
self.max_rows - self.rows_seen
} else {
((self.max_bytes - self.bytes_seen) as f64 / avg_bytes_row) as usize
};
self.rows_seen = 0;
self.rows_remaining -= rows_to_emit;
self.bytes_seen = 0;
self.bytes_remaining -= (rows_to_emit as f64 * avg_bytes_row) as usize;
let batch = self.batch.as_mut().unwrap();
let next = batch.slice(0, rows_to_emit);
*batch = batch.slice(rows_to_emit, batch.num_rows() - rows_to_emit);
Expand All @@ -142,18 +156,24 @@ impl BreakStreamState {
// output batches will be [3, 5, 2 (break inserted) 6, 3, 1 (break inserted) 4]
pub fn break_stream(
stream: SendableRecordBatchStream,
max_chunk_rows: usize,
max_chunk_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>> {
let mut rows_already_seen = 0;
let mut bytes_already_seen = 0;
stream
.map_ok(move |batch| {
let state = BreakStreamState {
rows_remaining: batch.num_rows(),
max_rows: max_chunk_size,
max_rows: max_chunk_rows,
rows_seen: rows_already_seen,
bytes_remaining: batch.get_array_memory_size(),
max_bytes: max_chunk_size,
bytes_seen: bytes_already_seen,
batch: Some(batch),
};
rows_already_seen = (state.rows_seen + state.rows_remaining) % state.max_rows;
bytes_already_seen = (state.bytes_seen + state.bytes_remaining) % state.max_bytes;

futures::stream::unfold(state, move |state| std::future::ready(state.next())).boxed()
})
Expand Down Expand Up @@ -255,7 +275,7 @@ mod tests {
assert_eq!(chunked[1].num_rows(), 10);
assert_eq!(chunked[2].num_rows(), 8);

let chunked = super::break_stream(make_stream(), 10)
let chunked = super::break_stream(make_stream(), 10, 10000000)
.try_collect::<Vec<_>>()
.await
.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions rust/lance/src/dataset/fragment/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ impl<'a> FragmentCreateBuilder<'a> {

progress.begin(&fragment).await?;

let break_limit = (128 * 1024).min(params.max_rows_per_file);
let break_row_limit = (128 * 1024).min(params.max_rows_per_file);
let break_bytes_limit = (1024 * 1024 * 1024).min(params.max_bytes_per_file);

let mut broken_stream = break_stream(stream, break_limit)
let mut broken_stream = break_stream(stream, break_row_limit, break_bytes_limit)
.map_ok(|batch| vec![batch])
.boxed();
while let Some(batched_chunk) = broken_stream.next().await {
Expand Down
22 changes: 8 additions & 14 deletions rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,15 @@ pub async fn do_write_fragments(
} else {
// In v2 we don't care about group size but we do want to break
// the stream on file boundaries
break_stream(data, params.max_rows_per_file)
break_stream(data, params.max_rows_per_file, params.max_bytes_per_file)
.map_ok(|batch| vec![batch])
.boxed()
};

let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version);
let mut writer: Option<Box<dyn GenericWriter>> = None;
let mut num_rows_in_current_file = 0;
let mut num_bytes_in_current_file = 0;
let mut fragments = Vec::new();
while let Some(batch_chunk) = buffered_reader.next().await {
let batch_chunk = batch_chunk?;
Expand All @@ -266,10 +267,11 @@ pub async fn do_write_fragments(
writer.as_mut().unwrap().write(&batch_chunk).await?;
for batch in batch_chunk {
num_rows_in_current_file += batch.num_rows() as u32;
num_bytes_in_current_file += batch.get_array_memory_size() as u64;
}

if num_rows_in_current_file >= params.max_rows_per_file as u32
|| writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tell() always returns 0 since the writer has not called the finish()

|| num_bytes_in_current_file >= params.max_bytes_per_file as u64
{
let (num_rows, data_file) = writer.take().unwrap().finish().await?;
debug_assert_eq!(num_rows, num_rows_in_current_file);
Expand All @@ -278,6 +280,7 @@ pub async fn do_write_fragments(
last_fragment.physical_rows = Some(num_rows as usize);
last_fragment.files.push(data_file);
num_rows_in_current_file = 0;
num_bytes_in_current_file = 0;
}
}

Expand Down Expand Up @@ -434,11 +437,6 @@ pub async fn write_fragments_internal(
pub trait GenericWriter: Send {
/// Write the given batches to the file
async fn write(&mut self, batches: &[RecordBatch]) -> Result<()>;
/// Get the current position in the file
///
/// We use this to know when the file is too large and we need to start
/// a new file
async fn tell(&mut self) -> Result<u64>;
/// Finish writing the file (flush the remaining data and write footer)
async fn finish(&mut self) -> Result<(u32, DataFile)>;
}
Expand All @@ -448,9 +446,7 @@ impl<M: ManifestProvider + Send + Sync> GenericWriter for (FileWriter<M>, String
async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> {
self.0.write(batches).await
}
async fn tell(&mut self) -> Result<u64> {
Ok(self.0.tell().await? as u64)
}

async fn finish(&mut self) -> Result<(u32, DataFile)> {
Ok((
self.0.finish().await? as u32,
Expand All @@ -472,9 +468,7 @@ impl GenericWriter for V2WriterAdapter {
}
Ok(())
}
async fn tell(&mut self) -> Result<u64> {
Ok(self.writer.tell().await?)
}

async fn finish(&mut self) -> Result<(u32, DataFile)> {
let field_ids = self
.writer
Expand Down Expand Up @@ -718,7 +712,7 @@ mod tests {

let write_params = WriteParams {
max_rows_per_file: 1024 * 1024, // Won't be limited by this
max_bytes_per_file: 2 * 1024,
max_bytes_per_file: 15 * 1024 * 1024,
mode: WriteMode::Create,
..Default::default()
};
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/io/exec/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ impl ExecutionPlan for MaterializeIndexExec {
MATERIALIZE_INDEX_SCHEMA.clone(),
stream,
));
let stream = break_stream(stream, 64 * 1024);
let stream = break_stream(stream, 64 * 1024, 1024 * 1024 * 1024);
Ok(Box::pin(RecordBatchStreamAdapter::new(
MATERIALIZE_INDEX_SCHEMA.clone(),
stream.map_err(|err| err.into()),
Expand Down
Loading