Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Setting Minimum Parallelism with RowCount Based Demuxer #7841

Merged
merged 6 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ config_namespace! {
/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32

/// Guarantees a minimum level of output files running in parallel.
/// RecordBatches will be distributed in round robin fashion to each
/// parallel writer. Each writer is closed and a new file opened once
/// soft_max_rows_per_output_file is reached.
pub minimum_parallel_output_files: usize, default = 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about defaulting to the number of cores (maybe if this was 0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returns to additional cores seems to decline very fast beyond 4 tasks in my testing. I believe this is because ~4 parallel serialization tasks no longer bottlenecks the end-to-end execution plan. Going beyond 4 tasks mostly gives higher memory usage and smaller output files for little benefit.

My testing is mostly on a 32core system. I have not tested on enough different configurations to know if core_count/8 is a reasonable default or if a static 4 tasks is a decent default.

It will also depend a lot on the actual execution plan. If you are writing a pre-cached in memory dataset, then you definitely want 1 task/output file per core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to work on a statement level option soon, so you could easily do:

copy my_in_memory_table to my_dir (format parquet, output_files 32);

to boost the parallelism for specific plans that benefit from it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me


/// Target number of rows in output files when writing multiple.
/// This is a soft max, so it can be exceeded slightly. There also
/// will be one file smaller than the limit if the total
Expand Down
65 changes: 46 additions & 19 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,33 +122,49 @@ async fn row_count_demuxer(
single_file_output: bool,
) -> Result<()> {
let exec_options = &context.session_config().options().execution;

let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
let mut total_rows_current_file = 0;
let minimum_parallel_files = exec_options.minimum_parallel_output_files;
let mut part_idx = 0;
let write_id =
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);

let mut tx_file = create_new_file_stream(
&base_output_path,
&write_id,
part_idx,
&file_extension,
single_file_output,
max_buffered_batches,
&mut tx,
)?;
part_idx += 1;
let mut open_file_streams = Vec::with_capacity(minimum_parallel_files);

let mut next_send_steam = 0;
let mut row_counts = Vec::with_capacity(minimum_parallel_files);

// Overrides if single_file_output is set
let minimum_parallel_files = if single_file_output {
1
} else {
minimum_parallel_files
};

let max_rows_per_file = if single_file_output {
usize::MAX
} else {
max_rows_per_file
};

while let Some(rb) = input.next().await.transpose()? {
total_rows_current_file += rb.num_rows();
tx_file.send(rb).await.map_err(|_| {
DataFusionError::Execution("Error sending RecordBatch to file stream!".into())
})?;

if total_rows_current_file >= max_rows_per_file && !single_file_output {
total_rows_current_file = 0;
tx_file = create_new_file_stream(
// ensure we have at least minimum_parallel_files open
if open_file_streams.len() < minimum_parallel_files {
open_file_streams.push(create_new_file_stream(
&base_output_path,
&write_id,
part_idx,
&file_extension,
single_file_output,
max_buffered_batches,
&mut tx,
)?);
row_counts.push(0);
part_idx += 1;
} else if row_counts[next_send_steam] >= max_rows_per_file {
row_counts[next_send_steam] = 0;
open_file_streams[next_send_steam] = create_new_file_stream(
&base_output_path,
&write_id,
part_idx,
Expand All @@ -159,6 +175,17 @@ async fn row_count_demuxer(
)?;
part_idx += 1;
}
row_counts[next_send_steam] += rb.num_rows();
open_file_streams[next_send_steam]
.send(rb)
.await
.map_err(|_| {
DataFusionError::Execution(
"Error sending RecordBatch to file stream!".into(),
)
})?;

next_send_steam = (next_send_steam + 1) % minimum_parallel_files;
}
Ok(())
}
Expand Down
58 changes: 42 additions & 16 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1608,15 +1608,16 @@ mod tests {
#[tokio::test]
async fn test_insert_into_append_new_json_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
config_map.insert(
"datafusion.execution.soft_max_rows_per_output_file".into(),
"1".into(),
"10".into(),
);
helper_test_append_new_files_to_table(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
2,
)
.await?;
Ok(())
Expand All @@ -1636,32 +1637,52 @@ mod tests {
#[tokio::test]
async fn test_insert_into_append_new_csv_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
config_map.insert(
"datafusion.execution.soft_max_rows_per_output_file".into(),
"1".into(),
"10".into(),
);
helper_test_append_new_files_to_table(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
2,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> {
async fn test_insert_into_append_2_new_parquet_files_defaults() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
config_map.insert(
"datafusion.execution.soft_max_rows_per_output_file".into(),
"10".into(),
);
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
2,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_append_1_new_parquet_files_defaults() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "20".into());
config_map.insert(
"datafusion.execution.soft_max_rows_per_output_file".into(),
"1".into(),
"20".into(),
);
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
1,
)
.await?;
Ok(())
Expand Down Expand Up @@ -1788,10 +1809,10 @@ mod tests {
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
config_map.insert(
"datafusion.execution.soft_max_rows_per_output_file".into(),
"1".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.compression".into(),
Expand Down Expand Up @@ -1858,6 +1879,7 @@ mod tests {
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
2,
)
.await?;
Ok(())
Expand All @@ -1875,6 +1897,7 @@ mod tests {
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
Some(config_map),
2,
)
.await
.expect_err("Example should fail!");
Expand Down Expand Up @@ -2092,6 +2115,7 @@ mod tests {
file_type: FileType,
file_compression_type: FileCompressionType,
session_config_map: Option<HashMap<String, String>>,
expected_n_files_per_insert: usize,
) -> Result<()> {
// Create the initial context, schema, and batch.
let session_ctx = match session_config_map {
Expand All @@ -2118,7 +2142,9 @@ mod tests {
// Create a new batch of data to insert into the table
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
vec![Arc::new(arrow_array::Int32Array::from(vec![
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
]))],
)?;

// Register appropriate table depending on file_type we want to test
Expand Down Expand Up @@ -2214,7 +2240,7 @@ mod tests {
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"| 20 |",
"+-------+",
];

Expand All @@ -2231,7 +2257,7 @@ mod tests {
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"| 20 |",
"+-------+",
];

Expand All @@ -2240,7 +2266,7 @@ mod tests {

// Assert that `target_partition_number` many files were added to the table.
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 3);
assert_eq!(num_files, expected_n_files_per_insert);

// Create a physical plan from the insert plan
let plan = session_ctx
Expand All @@ -2255,7 +2281,7 @@ mod tests {
"+-------+",
"| count |",
"+-------+",
"| 6 |",
"| 20 |",
"+-------+",
];

Expand All @@ -2274,7 +2300,7 @@ mod tests {
"+-------+",
"| count |",
"+-------+",
"| 12 |",
"| 40 |",
"+-------+",
];

Expand All @@ -2283,7 +2309,7 @@ mod tests {

// Assert that another `target_partition_number` many files were added to the table.
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 6);
assert_eq!(num_files, expected_n_files_per_insert * 2);

// Return Ok if the function
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
datafusion.execution.minimum_parallel_output_files 4
datafusion.execution.parquet.allow_single_file_parallelism false
datafusion.execution.parquet.bloom_filter_enabled false
datafusion.execution.parquet.bloom_filter_fpp NULL
Expand Down Expand Up @@ -221,6 +222,7 @@ datafusion.execution.coalesce_batches true When set to true, record batches will
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached.
datafusion.execution.parquet.allow_single_file_parallelism false Controls whether DataFusion will attempt to speed up writing large parquet files by first writing multiple smaller files and then stitching them together into a single large file. This will result in faster write speeds, but higher memory usage. Also currently unsupported are bloom filters and column indexes when single_file_parallelism is enabled.
datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter is enabled for any column
datafusion.execution.parquet.bloom_filter_fpp NULL Sets bloom filter false positive probability. If NULL, uses default parquet writer setting
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |
| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
Expand Down