Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataFrame] Parallel Write out of dataframe #7079

Closed
devinjdangelo opened this issue Jul 24, 2023 · 3 comments · Fixed by #7483
Closed

[DataFrame] Parallel Write out of dataframe #7079

devinjdangelo opened this issue Jul 24, 2023 · 3 comments · Fixed by #7483
Labels
enhancement New feature or request

Comments

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Jul 24, 2023

Is your feature request related to a problem or challenge?

Related to #6983. I noticed the same performance issue when writing a single large partition/file from a DataFrame. Only a single core is used and it can take quite a long time. When there are a small number of large partitions being written it would be ideal to leverage multiple cores, especially now that we are leveraging multipart ObjectStore uploads for writes #6987.

Describe the solution you'd like

This part of the write methods needs to process the RecordBatch stream in parallel (perhaps with try_for_each_concurrent):

while let Some(next_batch) = stream.next().await {
                let batch = next_batch?;
                writer.write(&batch).await?;
            }

This could be nontrivial for stateful writers like AsyncArrowWriter. It also isn't clear to me immediately how the multipart context could be shared with concurrent access across threads.

Describe alternatives you've considered

You can repartition your DataFrame to more partitions and write out smaller files, but sometimes you really do want large files to be written.

Additional context

To reproduce this (adapted from @alamb's example in #6983) :

cd datafusion/benchmarks
./bench.sh data tpch10
use std::{io::Error, time::Instant, sync::Arc};
use datafusion::prelude::*;
use chrono;
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use url::Url;

const FILENAME: &str = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let _ctx = SessionContext::new();
    let local = Arc::new(LocalFileSystem::new());
    let local_url = Url::parse("file://local").unwrap();
    _ctx.runtime_env().register_object_store(&local_url, local);

    let _read_options = ParquetReadOptions { file_extension: ".parquet", table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
    let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();

    let start = Instant::now();
    println!("datafusion start -> {:?}", chrono::offset::Local::now());

    _df.write_parquet("file://local/home/dev/arrow-datafusion/test_out/", None).await?;
    let elapsed = Instant::now() - start;
    println!("datafusion end -> {:?} {elapsed:?}", chrono::offset::Local::now());
    Ok(())
}

This took 379s on my machine.

@devinjdangelo devinjdangelo added the enhancement New feature or request label Jul 24, 2023
@alamb
Copy link
Contributor

alamb commented Jul 26, 2023

Thank you for this writeup @devinjdangelo

I think the best architectural decision would be to move DataFrame to use the same (partially implemented) code that handles writing into files (e.g. for INSERT INTO ...) -- https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.DmlStatement.html

We could then handle all the details of multi part writes, parallelization, etc there.

@devinjdangelo
Copy link
Contributor Author

Thanks @alamb for the pointer. I will take a look at that other write code!

I was able to develop a POC implementation on the existing write_json method to prove out the potential for a significant speed up. I changed the above test code to separate out the read/write timings:

use std::{io::Error, time::Instant, sync::Arc};
use datafusion::prelude::*;
use chrono;
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use url::Url;

const FILENAME: &str = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let _ctx = SessionContext::new();
    let local = Arc::new(LocalFileSystem::new());
    let local_url = Url::parse("file://local").unwrap();
    _ctx.runtime_env().register_object_store(&local_url, local);

    let _read_options = ParquetReadOptions { file_extension: ".parquet", table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };

    let start = Instant::now();
    let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap()
        //select a few columns with types compatible with write_json method
        .select_columns(&["l_orderkey", "l_partkey", "l_receiptdate"])?.cache().await?;
    let elapsed = Instant::now() - start;
    println!("read parquet to memory took -> {elapsed:?}");

    let start2 = Instant::now();
    _df.write_json("file://local/home/dev/arrow-datafusion/test_out/").await?;
    let elapsed2 = Instant::now() - start2;
    println!("write as json to disk took -> {elapsed2:?}");
    Ok(())
}

As a baseline, the current write_json on main results in the following timings:

read parquet to memory took -> 10.985273516s
write as json to disk took -> 191.64463431s

I modified the plan_to_json method as follows:

pub async fn plan_to_json(
    task_ctx: Arc<TaskContext>,
    plan: Arc<dyn ExecutionPlan>,
    path: impl AsRef<str>,
) -> Result<()> {
    let path = path.as_ref();
    let parsed = ListingTableUrl::parse(path)?;
    let object_store_url = parsed.object_store();
    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
    let mut join_set = JoinSet::new();
    for i in 0..plan.output_partitioning().partition_count() {
        let storeref = store.clone();
        let plan: Arc<dyn ExecutionPlan> = plan.clone();
        let filename = format!("{}/part-{i}.json", parsed.prefix());
        let file = object_store::path::Path::parse(filename)?;

        let mut stream = plan.execute(i, task_ctx.clone())?;
        join_set.spawn(async move {
            let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
            
            let mut inner_join_set = JoinSet::new();
            while let Some(batch) = stream.try_next().await?{
                inner_join_set.spawn(async move {
                    let buffer = Vec::with_capacity(1024);
                    let mut writer = json::LineDelimitedWriter::new(buffer);
                    writer.write(&batch)?;
                    let r: Result<Vec<u8>, DataFusionError> = Ok(writer.into_inner());
                    r
                });
            }

            while let Some(result) = inner_join_set.join_next().await{
                match result {
                    Ok(r) => {
                        let batch = r?;
                        multipart_writer.write_all(&batch).await?;
                    },
                    Err(e) => {
                        if e.is_panic() {
                        std::panic::resume_unwind(e.into_panic());
                            } else {
                                unreachable!();
                            }
                        }
                }
            }

            multipart_writer
                .shutdown()
                .await
                .map_err(DataFusionError::from)
        });
    }

    while let Some(result) = join_set.join_next().await {
        match result {
            Ok(res) => res?, // propagate DataFusion error
            Err(e) => {
                if e.is_panic() {
                    std::panic::resume_unwind(e.into_panic());
                } else {
                    unreachable!();
                }
            }
        }
    }

    Ok(())
}

Now the timings are:

read parquet to memory took -> 10.990137608s
write as json to disk took -> 18.332761845s

@alamb
Copy link
Contributor

alamb commented Jul 27, 2023

That looks awesome @devinjdangelo

I agree there is significant room for improvement when loading data in parallel. Can't wait to see it get implemented ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants