Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use ObjectStore for dataframe writes #6987

Merged
merged 11 commits into from
Jul 24, 2023
Merged

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Jul 16, 2023

Which issue does this PR close?

Related to #1777

Rationale for this change

Many ETL and other workflows need to persist data to an ObjectStore. Currently, DataFrame write_parquet, write_json, and write_csv only support writing to a local file system.

What changes are included in this PR?

DataFrame write methods (write_parquet, write_json, and write_csv) now expect a ObjectStore URL registered in the current context. Unit tests are updated to accommodate this change, and an example for writing to S3 is added.

Are these changes tested?

Existing unit tests are updated to cover this change.

Are there any user-facing changes?

Yes, users will now need to pass an ObjectStore url to write methods rather than a local path.

@github-actions github-actions bot added the core Core DataFusion crate label Jul 16, 2023
@alamb
Copy link
Contributor

alamb commented Jul 16, 2023

Thank you @devinjdangelo

@alamb
Copy link
Contributor

alamb commented Jul 16, 2023

I plan to review this PR tomorrow if no one else beats me to it

@devinjdangelo
Copy link
Contributor Author

Looks like I didn't handle temp directories in the unit tests correctly for all environments based on the checks failing. I'll look into fixing that tonight.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution @devinjdangelo -- this is looking very close. I think we should consider using multi-part uploads to stream the files to object store but otherwise 👌 other than the CI failures this PR looks good to me

It would also be awesome, longer term, to unify these "write_to_json" type methods into DataSinks that are object store enabled -- like https://github.com/apache/arrow-datafusion/blob/9338880d8f64f8143e348a60beee8af2789fa8ae/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs#L390-L408

let out_path = format!("s3://{bucket_name}/test_write/");
df.write_parquet(&out_path, None).await?;

//write as JSON to s3 instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to leave this commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The example is showing how you would write as parquet to S3, but I added the comments just to call out that it is nearly the same code for writing any other file type to S3. I could uncomment and clone the df so the example writes out all file types instead if that is preferred.

let plan: Arc<dyn ExecutionPlan> = plan.clone();
let filename = format!("{}/part-{i}.csv", parsed.prefix());
let file = object_store::path::Path::parse(filename)?;
buffer = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand this code that it effectively buffers the entire output to memory prior to writing to the remote store?

What do you think about using the multi-part put API instead? https://docs.rs/object_store/0.6.1/object_store/#multipart-put-object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, everything is buffered into memory and uploaded in a single put, which will be very memory inefficient if you are writing large files.

If you are writing many small files, multipart upload could add unnecessary overhead creating and finalizing the uploads which may only have a single part.

If we must choose only one or the other, I would also favor multipart upload, since large files could fail in the current implementation, whereas small files would at worst be slower in a multipart implementation. I will work on a multipart implementation of this!

This is probably too ambitious for this PR, but we could in the future automatically select the optimal choice between put and multipart-put based on information in the ExecutionPlan (maybe this?). I.e. if you anticipate files on average >threshold_bytes, do a streaming multipart-put, otherwise a single put.

Copy link
Contributor

@alamb alamb Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we must choose only one or the other, I would also favor multipart upload, since large files could fail in the current implementation, whereas small files would at worst be slower in a multipart implementation. I will work on a multipart implementation of this!

Thank you -- I think if the overhead of doing a multi-part upload is too big, it would make sense to add (optional ) buffering in the object_store crate (if it doesn't have one already) such that multiple small multi-part writes are coalesced into a smaller number of larger writes

@devinjdangelo
Copy link
Contributor Author

devinjdangelo commented Jul 17, 2023

Thanks for the feedback @alamb! I'll work on a multipart upload version of this and fixing the failing checks.

I'm not sure if this is what you meant by unifying these methods into DataSinks, but I did experiment a little with creating a generic write_to_file method that accepts the appropriate RecordBatch writer as an argument. The issue I ran into is that I couldn't find an appropriate trait that provides a method like fn write(&mut self, batch: &RecordBatch) -> Result<()>. I may have just missed it.

Edit: found the trait here. Could refactor with this first so I don't have to replicate the changes in all 3 methods.

@alamb
Copy link
Contributor

alamb commented Jul 20, 2023

I am sorry -- I ran out of time again today but this PR is at the top of my list for tomorrow

@devinjdangelo
Copy link
Contributor Author

No worries, @alamb. Today is better anyway since I just pushed up changes to support multipart incremental uploads instead of always buffering the entire file. For parquet, it was relatively straightforward to use AsyncArrowWriter and pass it the appropriate async multipart writer. For JSON lines and CSV, I initially tried an implementation that relied on passing ownership of a buffer around and reinitializing the writer as needed. I.e. something roughly like:

let mut buffer = Vec::new();
while let Some(next_batch) = stream.next().await{
    let batch = next_batch?;
    let writer = csv::Writer::new(buffer);
    writer.write(next_batch)?;
    buffer = writer.into_inner();
    multipart_writer.write_all(&buffer).await?;
    buffer.clear();

This approached mostly worked, except for the fact that recreating a RecordBatchWriter can in general have side effects such as writing CSV headers to the file multiple times. To get around that issue, I followed an approach similar to AsyncArrowWriter, but generic so it could work for any RecordBatchWriter.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking very cool @devinjdangelo -- I think the code looks good to me.

I will see if @tustvold has time to review this prior to merge.

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a brief look, I think this is perfectly valid, My one thought is that the writing logic for CSV and JSON could be made significantly simpler by not keeping the writer around. Something like

let mut buffer = Vec::with_capacity(1024);
let mut out = store.put_multipart(...).await?;
while let Some(batch) = stream.next().await.transpose()? {
    let mut writer = make_writer(&mut buffer).await?;
    writer.write(&batch)?;
    out.write_all(&buffer).await?;
    buffer.clear();
}

The only wrinkle is that for CSV it will need to only enable headers on WriterBuilder for the first write, however, this should still be simpler than the futures locking dance.

datafusion-examples/examples/dataframe-to-s3.rs Outdated Show resolved Hide resolved
@devinjdangelo
Copy link
Contributor Author

Oh! Thanks @tustvold I agree that approach is simpler and I had tried that. I did not realize I could use the csv::WriterBuilder directly to prevent writing headers after the first iter.

I just pushed a commit to revert back to that approach.

@alamb
Copy link
Contributor

alamb commented Jul 22, 2023

Thanks @devinjdangelo -- once the CI passes I plan to mereg this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants