Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet): Target parquet writes by size bytes instead of rows #3457

Merged
merged 18 commits into from
Dec 6, 2024

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Dec 1, 2024

Addresses #3443

  1. The write operator in swordfish unnecessarily buffers data into morsel_size rows, when the subsequent writers (particularly the parquet writer when writing row groups) does its own buffering.
  2. The estimation of parquet row group / file sizes can be done better by looking at the size_bytes of each micropartition, instead of trying to estimate the row size based on the schema. This is a problem because we could unintentionally write very very large row groups, and consume a lot of memory to do this.
  3. The inflation factors can be updated adaptively, based on written row groups / rows.

This is a before and after of the tensor[uint8] example in the tagged issue with the fixes implemented. (Running swordfish on a 128cpu machine)

Screenshot 2024-11-27 at 10 26 07 AM Screenshot 2024-11-27 at 10 12 27 AM

@github-actions github-actions bot added the enhancement New feature or request label Dec 1, 2024
Copy link

codspeed-hq bot commented Dec 1, 2024

CodSpeed Performance Report

Merging #3457 will not alter performance

Comparing colin/swordfish-parquet-size-based-writes (a89724a) with main (8009155)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Dec 1, 2024

Codecov Report

Attention: Patch coverage is 89.21833% with 40 lines in your changes missing coverage. Please review.

Project coverage is 77.60%. Comparing base (6d30e30) to head (a89724a).
Report is 17 commits behind head on main.

Files with missing lines Patch % Lines
daft/io/writer.py 0.00% 26 Missing ⚠️
src/daft-writers/src/partition.rs 50.00% 6 Missing ⚠️
src/daft-writers/src/file.rs 96.38% 3 Missing ⚠️
src/daft-writers/src/lance.rs 70.00% 3 Missing ⚠️
src/daft-writers/src/batch.rs 99.25% 1 Missing ⚠️
src/daft-writers/src/lib.rs 98.64% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3457      +/-   ##
==========================================
+ Coverage   77.00%   77.60%   +0.59%     
==========================================
  Files         696      706      +10     
  Lines       86039    86225     +186     
==========================================
+ Hits        66256    66916     +660     
+ Misses      19783    19309     -474     
Files with missing lines Coverage Δ
src/daft-local-execution/src/pipeline.rs 93.76% <100.00%> (-0.70%) ⬇️
src/daft-local-execution/src/sinks/write.rs 100.00% <100.00%> (ø)
src/daft-writers/src/pyarrow.rs 99.41% <100.00%> (+0.03%) ⬆️
src/daft-writers/src/test.rs 96.36% <100.00%> (+0.44%) ⬆️
src/daft-writers/src/batch.rs 99.10% <99.25%> (-0.11%) ⬇️
src/daft-writers/src/lib.rs 98.33% <98.64%> (+2.25%) ⬆️
src/daft-writers/src/file.rs 97.80% <96.38%> (-1.46%) ⬇️
src/daft-writers/src/lance.rs 95.16% <70.00%> (-4.84%) ⬇️
src/daft-writers/src/partition.rs 86.79% <50.00%> (-5.13%) ⬇️
daft/io/writer.py 0.00% <0.00%> (ø)

... and 99 files with indirect coverage changes

@colin-ho colin-ho requested a review from samster25 December 2, 2024 17:18
.expect("Micropartitions in target batch writer must be loaded");

if let Some(leftovers) = self.leftovers.take() {
input = MicroPartition::concat([leftovers, input])?.into();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we had a bunch of small morsels, we can potentially perform this concat every iteration. Ideally we can just keep a buffer and keep track of the count number of bytes / rows that we have

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, implemented a simple buffer for this.

cfg.parquet_target_row_group_size as f64,
cfg.parquet_inflation_factor,
);
let (target_in_memory_file_size, target_in_memory_row_group_size) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline but we can expose a tell method on the writer trait and update these values as we write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion, implemented. Works for s3 as well

@colin-ho
Copy link
Contributor Author

colin-ho commented Dec 3, 2024

Results of doing adaptive inflation factor calculations when writing a scale factor 1 lineitem to s3.

Native runner: 2 files. File 1 (371 mb). 1 rg with 79 mb, and 4 row groups of 241mb. File 2: (320 mb). 1rg with 122mb, and 2 row groups of 242mb.

PyRunner: 8 files (due to scan task splitting), each with 2 rg of 90mb each, except the last one which has 1 rg with 12mb.
When doing into_partitions(1) and then write, the result is 2 files with four 150mb row groups and one 55mb row group each.

src/daft-writers/src/batch.rs Outdated Show resolved Hide resolved
src/daft-writers/src/batch.rs Outdated Show resolved Hide resolved
src/daft-writers/src/batch.rs Outdated Show resolved Hide resolved
}
}
// Else, we need to split the table
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intent is weird

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify a bit? Is it this else case or the whole loop logic?

Copy link
Member

@samster25 samster25 Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops made a typo! I meant to say indent is weird. The else case is on a new line rather than with the if close bracket

src/daft-writers/src/file.rs Outdated Show resolved Hide resolved
src/daft-writers/src/file.rs Outdated Show resolved Hide resolved
src/daft-writers/src/pyarrow.rs Outdated Show resolved Hide resolved
estimate_in_memory_size_bytes as f64 / actual_on_disk_size_bytes as f64;
let new_num_samples = self.num_samples.fetch_add(1, Ordering::Relaxed) + 1;

let current_factor =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty bootleg and will prob cause bugs in the future. especially if folks try to use current_factor directly and get some huge number from it.

You are also using 2 atomics which makes the current factor not atomic.

I would just recommend using a Mutex<f32> and doing a mul_add on that. You're not gonna be holding on it for long anyways and we're bottle necked by the GIL mutex anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f32 or f64? The default inflation factors in execution config come as f64

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either is fine. we don't need the precision of f64 but it doesn't hurt

@colin-ho colin-ho requested a review from samster25 December 5, 2024 21:39

/// Close the file and return the result. The caller should NOT write to the file after calling this method.
fn close(&mut self) -> DaftResult<Self::Result>;

/// Return the current position of the file, if applicable.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update this comment

@colin-ho colin-ho changed the title [FEAT] Target parquet writes by size bytes instead of rows feat(parquet): Target parquet writes by size bytes instead of rows Dec 6, 2024
@github-actions github-actions bot added the feat label Dec 6, 2024
@colin-ho colin-ho merged commit 528b797 into main Dec 6, 2024
43 of 44 checks passed
@colin-ho colin-ho deleted the colin/swordfish-parquet-size-based-writes branch December 6, 2024 04:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request feat
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants