From 7afeb8b8078018a167e7bce6421024b95517b234 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 8 Nov 2023 17:04:53 +0800 Subject: [PATCH 1/3] Minor: Improve the document format of JoinHashMap --- .../src/joins/hash_join_utils.rs | 115 ++++++++++-------- 1 file changed, 62 insertions(+), 53 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs b/datafusion/physical-plan/src/joins/hash_join_utils.rs index 3a2a85c72722..3ea0331ab4fe 100644 --- a/datafusion/physical-plan/src/joins/hash_join_utils.rs +++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs @@ -40,59 +40,68 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use hashbrown::raw::RawTable; use hashbrown::HashSet; -// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. -// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, -// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value. -// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 -// As the key is a hash value, we need to check possible hash collisions in the probe stage -// During this stage it might be the case that a row is contained the same hashmap value, -// but the values don't match. Those are checked in the [equal_rows] macro -// The indices (values) are stored in a separate chained list stored in the `Vec`. -// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. -// The chain can be followed until the value "0" has been reached, meaning the end of the list. -// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487) -// See the example below: -// Insert (1,1) -// map: -// --------- -// | 1 | 2 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 0 | 0 | -// --------------------- -// Insert (2,2) -// map: -// --------- -// | 1 | 2 | -// | 2 | 3 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 0 | 0 | -// --------------------- -// Insert (1,3) -// map: -// --------- -// | 1 | 4 | -// | 2 | 3 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 2 | 0 | <--- hash value 1 maps to 4,2 (which means indices values 3,1) -// --------------------- -// Insert (1,4) -// map: -// --------- -// | 1 | 5 | -// | 2 | 3 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) -// --------------------- -// TODO: speed up collision checks -// https://github.com/apache/arrow-datafusion/issues/50 +/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. +/// +/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, +/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value. +/// +/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 +/// As the key is a hash value, we need to check possible hash collisions in the probe stage +/// During this stage it might be the case that a row is contained the same hashmap value, +/// but the values don't match. Those are checked in the [equal_rows] macro +/// The indices (values) are stored in a separate chained list stored in the `Vec`. +/// +/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. +/// +/// The chain can be followed until the value "0" has been reached, meaning the end of the list. +/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487) +/// +/// # Example +/// +/// ``` text +/// See the example below: +/// Insert (1,1) +/// map: +/// --------- +/// | 1 | 2 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 0 | 0 | +/// --------------------- +/// Insert (2,2) +/// map: +/// --------- +/// | 1 | 2 | +/// | 2 | 3 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 0 | 0 | +/// --------------------- +/// Insert (1,3) +/// map: +/// --------- +/// | 1 | 4 | +/// | 2 | 3 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 2 | 0 | <--- hash value 1 maps to 4,2 (which means indices values 3,1) +/// --------------------- +/// Insert (1,4) +/// map: +/// --------- +/// | 1 | 5 | +/// | 2 | 3 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) +/// --------------------- +/// ``` +/// +///TODO: [speed up collision checks](https://github.com/apache/arrow-datafusion/issues/50) pub struct JoinHashMap { // Stores hash value to last row index pub map: RawTable<(u64, u64)>, From dbba31730826a7ca888cdb0b8d29dd2d0c6c184d Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 29 Nov 2023 10:55:09 +0800 Subject: [PATCH 2/3] sql csv_with_quote_escape --- .../common/src/file_options/csv_writer.rs | 5 ++ .../src/datasource/listing_table_factory.rs | 23 +++++-- datafusion/core/tests/data/escape.csv | 11 ++++ datafusion/core/tests/data/quote.csv | 11 ++++ .../sqllogictest/test_files/csv_files.slt | 65 +++++++++++++++++++ 5 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 datafusion/core/tests/data/escape.csv create mode 100644 datafusion/core/tests/data/quote.csv create mode 100644 datafusion/sqllogictest/test_files/csv_files.slt diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs index fef4a1d21b4b..37231e1012e4 100644 --- a/datafusion/common/src/file_options/csv_writer.rs +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -91,6 +91,11 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { ) })?) }, + "quote" | "escape" => { + // These two attributes are only available when reading csv files. + // To avoid error + builder + }, _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for CSV format!"))) } } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 543a3a83f7c5..dbdccf9f5bac 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -67,12 +67,23 @@ impl TableProviderFactory for ListingTableFactory { let file_extension = get_extension(cmd.location.as_str()); let file_format: Arc = match file_type { - FileType::CSV => Arc::new( - CsvFormat::default() - .with_has_header(cmd.has_header) - .with_delimiter(cmd.delimiter as u8) - .with_file_compression_type(file_compression_type), - ), + FileType::CSV => { + let mut statement_options = StatementOptions::from(&cmd.options); + let quote = statement_options + .take_str_option("quote") + .map_or(b'"', |x| x.as_bytes()[0]); + let escape = statement_options + .take_str_option("escape") + .map(|x| x.as_bytes()[0]); + Arc::new( + CsvFormat::default() + .with_has_header(cmd.has_header) + .with_delimiter(cmd.delimiter as u8) + .with_quote(quote) + .with_escape(escape) + .with_file_compression_type(file_compression_type), + ) + } #[cfg(feature = "parquet")] FileType::PARQUET => Arc::new(ParquetFormat::default()), FileType::AVRO => Arc::new(AvroFormat), diff --git a/datafusion/core/tests/data/escape.csv b/datafusion/core/tests/data/escape.csv new file mode 100644 index 000000000000..331a1e697329 --- /dev/null +++ b/datafusion/core/tests/data/escape.csv @@ -0,0 +1,11 @@ +c1,c2 +"id0","value\"0" +"id1","value\"1" +"id2","value\"2" +"id3","value\"3" +"id4","value\"4" +"id5","value\"5" +"id6","value\"6" +"id7","value\"7" +"id8","value\"8" +"id9","value\"9" diff --git a/datafusion/core/tests/data/quote.csv b/datafusion/core/tests/data/quote.csv new file mode 100644 index 000000000000..d81488436409 --- /dev/null +++ b/datafusion/core/tests/data/quote.csv @@ -0,0 +1,11 @@ +c1,c2 +~id0~,~value0~ +~id1~,~value1~ +~id2~,~value2~ +~id3~,~value3~ +~id4~,~value4~ +~id5~,~value5~ +~id6~,~value6~ +~id7~,~value7~ +~id8~,~value8~ +~id9~,~value9~ diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt new file mode 100644 index 000000000000..9facb064bf32 --- /dev/null +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# create_external_table_with_quote_escape +statement ok +CREATE EXTERNAL TABLE csv_with_quote ( +c1 VARCHAR, +c2 VARCHAR +) STORED AS CSV +WITH HEADER ROW +DELIMITER ',' +OPTIONS ('quote' '~') +LOCATION '../core/tests/data/quote.csv'; + +statement ok +CREATE EXTERNAL TABLE csv_with_escape ( +c1 VARCHAR, +c2 VARCHAR +) STORED AS CSV +WITH HEADER ROW +DELIMITER ',' +OPTIONS ('escape' '\"') +LOCATION '../core/tests/data/escape.csv'; + +query TT +select * from csv_with_quote; +---- +id0 value0 +id1 value1 +id2 value2 +id3 value3 +id4 value4 +id5 value5 +id6 value6 +id7 value7 +id8 value8 +id9 value9 + +query TT +select * from csv_with_escape; +---- +id0 value"0 +id1 value"1 +id2 value"2 +id3 value"3 +id4 value"4 +id5 value"5 +id6 value"6 +id7 value"7 +id8 value"8 +id9 value"9 From dbba0c96fd0e55d6952532d26f1cf3071fed1830 Mon Sep 17 00:00:00 2001 From: asura7969 <1402357969@qq.com> Date: Wed, 29 Nov 2023 21:48:44 +0800 Subject: [PATCH 3/3] fix --- .../common/src/file_options/csv_writer.rs | 1 + .../src/datasource/listing_table_factory.rs | 25 ++++++++----------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs index 37231e1012e4..d6046f0219dd 100644 --- a/datafusion/common/src/file_options/csv_writer.rs +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -92,6 +92,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { })?) }, "quote" | "escape" => { + // https://github.com/apache/arrow-rs/issues/5146 // These two attributes are only available when reading csv files. // To avoid error builder diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index dbdccf9f5bac..f70a82035108 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -69,20 +69,17 @@ impl TableProviderFactory for ListingTableFactory { let file_format: Arc = match file_type { FileType::CSV => { let mut statement_options = StatementOptions::from(&cmd.options); - let quote = statement_options - .take_str_option("quote") - .map_or(b'"', |x| x.as_bytes()[0]); - let escape = statement_options - .take_str_option("escape") - .map(|x| x.as_bytes()[0]); - Arc::new( - CsvFormat::default() - .with_has_header(cmd.has_header) - .with_delimiter(cmd.delimiter as u8) - .with_quote(quote) - .with_escape(escape) - .with_file_compression_type(file_compression_type), - ) + let mut csv_format = CsvFormat::default() + .with_has_header(cmd.has_header) + .with_delimiter(cmd.delimiter as u8) + .with_file_compression_type(file_compression_type); + if let Some(quote) = statement_options.take_str_option("quote") { + csv_format = csv_format.with_quote(quote.as_bytes()[0]) + } + if let Some(escape) = statement_options.take_str_option("escape") { + csv_format = csv_format.with_escape(Some(escape.as_bytes()[0])) + } + Arc::new(csv_format) } #[cfg(feature = "parquet")] FileType::PARQUET => Arc::new(ParquetFormat::default()),