Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Multiple Parquet Files in Parallel #7483

Merged
merged 1 commit into from
Sep 7, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Sep 5, 2023

Which issue does this PR close?

Closes #7079

Rationale for this change

When writing out multiple partitions to multiple parquet files, we can speed up the operation by writing all parquet files in parallel on multiple cpu cores.

Parallelizing the serialization of a single parquet file (similar to as done for csv/json in #7452) is more complex and will need upstream changes in arrow-rs. There is already an issue open for this here apache/arrow-rs#1718.

What changes are included in this PR?

Spawn a tokio task for each parquet file being written.

Are these changes tested?

Yes by existing tests.

Are there any user-facing changes?

No other than better write performance when writing multiple parquet files on a system with multiple cpu cores.

@github-actions github-actions bot added the core Core DataFusion crate label Sep 5, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @devinjdangelo

My local tests suggest this goes slightly faster:

This branch: 20.310647042s
main: 57.579146358s

I tested this with the following setup:

 cargo run --release
[package]
name = "perf_test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
env_logger = "0.10.0"
parquet = "46.0.0"
serde = "1.0.163"
serde_json = "1.0.96"
datafusion = { path = '/Users/alamb/Software/arrow-datafusion/datafusion/core', default-features = false }
object_store = "0.7.0"
tokio = "1.0"
chrono = "0.4.26"
url = "2.4.1"
use chrono;
use datafusion::{
    dataframe::DataFrameWriteOptions, datasource::listing::ListingTableInsertMode,
    error::DataFusionError, prelude::*,
};
use object_store::local::LocalFileSystem;
use std::{sync::Arc, time::Instant};
use url::Url;

const FILENAME: &str =
    "/Users/alamb/Software/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let _ctx = SessionContext::new();
    let local = Arc::new(LocalFileSystem::new());
    let local_url = Url::parse("file://local").unwrap();
    _ctx.runtime_env().register_object_store(&local_url, local);

    let _read_options = ParquetReadOptions {
        file_extension: ".parquet",
        table_partition_cols: vec![],
        parquet_pruning: None,
        skip_metadata: None,
        schema: None,
        file_sort_order: vec![],
        insert_mode: ListingTableInsertMode::AppendNewFiles,
    };
    let df = _ctx
        .read_parquet(FILENAME, _read_options)
        .await
        .unwrap()
        .repartition(Partitioning::RoundRobinBatch(16))
        .unwrap();

    let start = Instant::now();
    println!("datafusion start -> {:?}", chrono::offset::Local::now());

    let props = None;
    let write_options = DataFrameWriteOptions::new().with_single_file_output(false);

    df.write_parquet("file:///tmp/test_out/", write_options, props)
        .await?;
    let elapsed = Instant::now() - start;
    println!(
        "datafusion end -> {:?} {elapsed:?}",
        chrono::offset::Local::now()
    );
    Ok(())
}

@alamb
Copy link
Contributor

alamb commented Sep 7, 2023

Note that I had to explicitly set the distribution

@alamb alamb merged commit 3a52ee1 into apache:main Sep 7, 2023
@alamb
Copy link
Contributor

alamb commented Sep 7, 2023

Thanks again @devinjdangelo

@andygrove andygrove added the enhancement New feature or request label Sep 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[DataFrame] Parallel Write out of dataframe
3 participants