From 2d91917bb66542a44c31b7a306512bb4e09b5298 Mon Sep 17 00:00:00 2001 From: devinjdangelo Date: Sat, 5 Aug 2023 07:21:54 -0400 Subject: [PATCH] Unify DataFrame and SQL (Insert Into) Write Methods (#7141) * inner join_set * Initial write_table implementation * update MemSink to accept Vec * some cleaning up * fmt * allow listing table to control append method * test fixes * DataFrameOptions and clean up * rename to execute streams for insertexec * CsvReadOptions sort_order connect to ListingOptions --- datafusion/core/src/dataframe.rs | 48 +++++ .../core/src/datasource/file_format/csv.rs | 116 +++++++--- .../src/datasource/file_format/options.rs | 25 ++- datafusion/core/src/datasource/listing/mod.rs | 4 +- .../core/src/datasource/listing/table.rs | 199 +++++++++++++++++- datafusion/core/src/datasource/memory.rs | 22 +- .../core/src/datasource/physical_plan/json.rs | 2 + .../core/src/datasource/physical_plan/mod.rs | 6 +- datafusion/core/src/datasource/provider.rs | 3 +- datafusion/core/src/physical_plan/insert.rs | 41 ++-- datafusion/core/src/physical_planner.rs | 21 +- .../sqllogictests/test_files/explain.slt | 2 +- .../tests/sqllogictests/test_files/insert.slt | 23 +- datafusion/expr/src/logical_plan/builder.rs | 10 +- datafusion/expr/src/logical_plan/dml.rs | 6 +- datafusion/sql/src/statement.rs | 14 +- datafusion/sql/tests/sql_integration.rs | 8 +- 17 files changed, 448 insertions(+), 102 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index ae933974517b..8d4ad6cc504d 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -53,6 +53,33 @@ use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; use crate::prelude::SessionContext; +/// Contains options that control how data is +/// written out from a DataFrame +pub struct DataFrameWriteOptions { + /// Controls if existing data should be overwritten + overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider + // settings such as LOCATION and FILETYPE can be set here + // e.g. add location: Option +} + +impl DataFrameWriteOptions { + /// Create a new DataFrameWriteOptions with default values + pub fn new() -> Self { + DataFrameWriteOptions { overwrite: false } + } + /// Set the overwrite option to true or false + pub fn with_overwrite(mut self, overwrite: bool) -> Self { + self.overwrite = overwrite; + self + } +} + +impl Default for DataFrameWriteOptions { + fn default() -> Self { + Self::new() + } +} + /// DataFrame represents a logical set of rows with the same named columns. /// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or /// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html) @@ -925,6 +952,27 @@ impl DataFrame { )) } + /// Write this DataFrame to the referenced table + /// This method uses on the same underlying implementation + /// as the SQL Insert Into statement. + /// Unlike most other DataFrame methods, this method executes + /// eagerly, writing data, and returning the count of rows written. + pub async fn write_table( + self, + table_name: &str, + write_options: DataFrameWriteOptions, + ) -> Result, DataFusionError> { + let arrow_schema = Schema::from(self.schema()); + let plan = LogicalPlanBuilder::insert_into( + self.plan, + table_name.to_owned(), + &arrow_schema, + write_options.overwrite, + )? + .build()?; + DataFrame::new(self.session_state, plan).collect().await + } + /// Write a `DataFrame` to a CSV file. pub async fn write_csv(self, path: &str) -> Result<()> { let plan = self.session_state.create_physical_plan(&self.plan).await?; diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 8df9a86b1e79..cbcdc2f112b0 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -53,6 +53,7 @@ use crate::execution::context::SessionState; use crate::physical_plan::insert::{DataSink, InsertExec}; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use rand::distributions::{Alphanumeric, DistString}; /// The default file extension of csv files pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; @@ -268,6 +269,11 @@ impl FileFormat for CsvFormat { _state: &SessionState, conf: FileSinkConfig, ) -> Result> { + if conf.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented yet for CSV".into(), + )); + } let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new( conf, @@ -560,10 +566,10 @@ impl CsvSink { impl DataSink for CsvSink { async fn write_all( &self, - mut data: SendableRecordBatchStream, + mut data: Vec, context: &Arc, ) -> Result { - let num_partitions = self.config.file_groups.len(); + let num_partitions = data.len(); let object_store = context .runtime_env() @@ -572,44 +578,86 @@ impl DataSink for CsvSink { // Construct serializer and writer for each file group let mut serializers = vec![]; let mut writers = vec![]; - for file_group in &self.config.file_groups { - // In append mode, consider has_header flag only when file is empty (at the start). - // For other modes, use has_header flag as is. - let header = self.has_header - && (!matches!(&self.config.writer_mode, FileWriterMode::Append) - || file_group.object_meta.size == 0); - let builder = WriterBuilder::new().with_delimiter(self.delimiter); - let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); - serializers.push(serializer); - - let file = file_group.clone(); - let writer = self - .create_writer(file.object_meta.clone().into(), object_store.clone()) - .await?; - writers.push(writer); + match self.config.writer_mode { + FileWriterMode::Append => { + for file_group in &self.config.file_groups { + // In append mode, consider has_header flag only when file is empty (at the start). + // For other modes, use has_header flag as is. + let header = self.has_header + && (!matches!(&self.config.writer_mode, FileWriterMode::Append) + || file_group.object_meta.size == 0); + let builder = WriterBuilder::new().with_delimiter(self.delimiter); + let serializer = CsvSerializer::new() + .with_builder(builder) + .with_header(header); + serializers.push(serializer); + + let file = file_group.clone(); + let writer = self + .create_writer( + file.object_meta.clone().into(), + object_store.clone(), + ) + .await?; + writers.push(writer); + } + } + FileWriterMode::Put => { + return Err(DataFusionError::NotImplemented( + "Put Mode is not implemented for CSV Sink yet".into(), + )) + } + FileWriterMode::PutMultipart => { + //currently assuming only 1 partition path (i.e. not hive style partitioning on a column) + let base_path = &self.config.table_paths[0]; + //uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let header = true; + let builder = WriterBuilder::new().with_delimiter(self.delimiter); + let serializer = CsvSerializer::new() + .with_builder(builder) + .with_header(header); + serializers.push(serializer); + let file_path = base_path + .prefix() + .child(format!("/{}_{}.csv", write_id, part_idx)); + let object_meta = ObjectMeta { + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = self + .create_writer(object_meta.into(), object_store.clone()) + .await?; + writers.push(writer); + } + } } - let mut idx = 0; let mut row_count = 0; // Map errors to DatafusionError. let err_converter = |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - while let Some(maybe_batch) = data.next().await { - // Write data to files in a round robin fashion: - idx = (idx + 1) % num_partitions; - let serializer = &mut serializers[idx]; - let batch = check_for_errors(maybe_batch, &mut writers).await?; - row_count += batch.num_rows(); - let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers).await?; - let writer = &mut writers[idx]; - check_for_errors( - writer.write_all(&bytes).await.map_err(err_converter), - &mut writers, - ) - .await?; + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 + for idx in 0..num_partitions { + while let Some(maybe_batch) = data[idx].next().await { + // Write data to files in a round robin fashion: + let serializer = &mut serializers[idx]; + let batch = check_for_errors(maybe_batch, &mut writers).await?; + row_count += batch.num_rows(); + let bytes = + check_for_errors(serializer.serialize(batch).await, &mut writers) + .await?; + let writer = &mut writers[idx]; + check_for_errors( + writer.write_all(&bytes).await.map_err(err_converter), + &mut writers, + ) + .await?; + } } // Perform cleanup: let n_writers = writers.len(); diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index a50066fd97c8..b8499065bd69 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -30,7 +30,7 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; -use crate::datasource::listing::ListingTableUrl; +use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl}; use crate::datasource::{ file_format::{ avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, @@ -39,6 +39,7 @@ use crate::datasource::{ }; use crate::error::Result; use crate::execution::context::{SessionConfig, SessionState}; +use crate::logical_expr::Expr; /// Options that control the reading of CSV files. /// @@ -73,6 +74,10 @@ pub struct CsvReadOptions<'a> { pub file_compression_type: FileCompressionType, /// Flag indicating whether this file may be unbounded (as in a FIFO file). pub infinite: bool, + /// Indicates how the file is sorted + pub file_sort_order: Vec>, + /// Setting controls how inserts to this file should be handled + pub insert_mode: ListingTableInsertMode, } impl<'a> Default for CsvReadOptions<'a> { @@ -95,6 +100,8 @@ impl<'a> CsvReadOptions<'a> { table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, + file_sort_order: vec![], + insert_mode: ListingTableInsertMode::AppendToFile, } } @@ -171,6 +178,18 @@ impl<'a> CsvReadOptions<'a> { self.file_compression_type = file_compression_type; self } + + /// Configure if file has known sort order + pub fn file_sort_order(mut self, file_sort_order: Vec>) -> Self { + self.file_sort_order = file_sort_order; + self + } + + /// Configure how insertions to this table should be handled + pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { + self.insert_mode = insert_mode; + self + } } /// Options that control the reading of Parquet files. @@ -461,9 +480,9 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_file_extension(self.file_extension) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) - // TODO: Add file sort order into CsvReadOptions and introduce here. - .with_file_sort_order(vec![]) + .with_file_sort_order(self.file_sort_order.clone()) .with_infinite_source(self.infinite) + .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index aa2e20164b5e..8b0f021f0277 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -31,7 +31,9 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use table::{ListingOptions, ListingTable, ListingTableConfig}; +pub use table::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode, +}; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2d8ba9956362..4085dac48404 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -207,6 +207,16 @@ impl ListingTableConfig { } } +#[derive(Debug, Clone)] +///controls how new data should be inserted to a ListingTable +pub enum ListingTableInsertMode { + ///Data should be appended to an existing file + AppendToFile, + ///Data is appended as new files in existing TablePaths + AppendNewFiles, + ///Throw an error if insert into is attempted on this table + Error, +} /// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { @@ -245,6 +255,8 @@ pub struct ListingOptions { /// In order to support infinite inputs, DataFusion may adjust query /// plans (e.g. joins) to run the given query in full pipelining mode. pub infinite_source: bool, + ///This setting controls how inserts to this table should be handled + pub insert_mode: ListingTableInsertMode, } impl ListingOptions { @@ -263,6 +275,7 @@ impl ListingOptions { target_partitions: 1, file_sort_order: vec![], infinite_source: false, + insert_mode: ListingTableInsertMode::AppendToFile, } } @@ -431,6 +444,12 @@ impl ListingOptions { self } + /// Configure how insertions to this table should be handled. + pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { + self.insert_mode = insert_mode; + self + } + /// Infer the schema of the files at the given path on the provided object store. /// The inferred schema does not include the partitioning columns. /// @@ -770,6 +789,7 @@ impl TableProvider for ListingTable { &self, state: &SessionState, input: Arc, + overwrite: bool, ) -> Result> { // Check that the schema of the plan matches the schema of this table. if !self.schema().equivalent_names_and_types(&input.schema()) { @@ -781,7 +801,7 @@ impl TableProvider for ListingTable { if self.table_paths().len() > 1 { return plan_err!( - "Writing to a table backed by multiple files is not supported yet" + "Writing to a table backed by multiple partitions is not supported yet" ); } @@ -799,20 +819,41 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; - - if file_groups.len() > 1 { - return plan_err!( - "Datafusion currently supports tables from single partition and/or file." - ); + let writer_mode; + //if we are writing a single output_partition to a table backed by a single file + //we can append to that file. Otherwise, we can write new files into the directory + //adding new files to the listing table in order to insert to the table. + let input_partitions = input.output_partitioning().partition_count(); + match self.options.insert_mode { + ListingTableInsertMode::AppendToFile => { + if input_partitions > file_groups.len() { + return Err(DataFusionError::Plan(format!( + "Cannot append {input_partitions} partitions to {} files!", + file_groups.len() + ))); + } + writer_mode = crate::datasource::file_format::FileWriterMode::Append; + } + ListingTableInsertMode::AppendNewFiles => { + writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart + } + ListingTableInsertMode::Error => { + return Err(DataFusionError::Plan( + "Invalid plan attempting write to table with TableWriteMode::Error!" + .into(), + )) + } } // Sink related option, apart from format let config = FileSinkConfig { object_store_url: self.table_paths()[0].object_store(), + table_paths: self.table_paths().clone(), file_groups, output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), - writer_mode: crate::datasource::file_format::FileWriterMode::Append, + writer_mode, + overwrite, }; self.options() @@ -1396,12 +1437,14 @@ mod tests { fn load_empty_schema_csv_table( schema: SchemaRef, temp_path: &str, + insert_mode: ListingTableInsertMode, ) -> Result> { File::create(temp_path)?; let table_path = ListingTableUrl::parse(temp_path).unwrap(); let file_format = CsvFormat::default(); - let listing_options = ListingOptions::new(Arc::new(file_format)); + let listing_options = + ListingOptions::new(Arc::new(file_format)).with_insert_mode(insert_mode); let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) @@ -1551,8 +1594,11 @@ mod tests { let tmp_dir = TempDir::new()?; let path = tmp_dir.path().join(filename); - let initial_table = - load_empty_schema_csv_table(schema.clone(), path.to_str().unwrap())?; + let initial_table = load_empty_schema_csv_table( + schema.clone(), + path.to_str().unwrap(), + ListingTableInsertMode::AppendToFile, + )?; session_ctx.register_table("t", initial_table)?; // Create and register the source table with the provided schema and inserted data let source_table = Arc::new(MemTable::try_new( @@ -1566,7 +1612,7 @@ mod tests { let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; // Create an insert plan to insert the source data into the initial table let insert_into_table = - LogicalPlanBuilder::insert_into(scan_plan, "t", &schema)?.build()?; + LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; // Create a physical plan from the insert plan let plan = session_ctx .state() @@ -1677,4 +1723,135 @@ mod tests { // Return Ok if the function Ok(()) } + + #[tokio::test] + async fn test_append_new_files_to_csv_table() -> Result<()> { + // Create the initial context, schema, and batch. + let session_ctx = SessionContext::new(); + // Create a new schema with one field called "a" of type Int32 + let schema = Arc::new(Schema::new(vec![Field::new( + "column1", + DataType::Int32, + false, + )])); + + // Create a new batch of data to insert into the table + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], + )?; + + // Create a temporary directory and a CSV file within it. + let tmp_dir = TempDir::new()?; + session_ctx + .register_csv( + "t", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new() + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()), + ) + .await?; + // Create and register the source table with the provided schema and inserted data + let source_table = Arc::new(MemTable::try_new( + schema.clone(), + vec![vec![batch.clone(), batch.clone()]], + )?); + session_ctx.register_table("source", source_table.clone())?; + // Convert the source table into a provider so that it can be used in a query + let source = provider_as_source(source_table); + // Create a table scan logical plan to read from the source table + let scan_plan = LogicalPlanBuilder::scan("source", source, None)? + .repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))? + .build()?; + // Create an insert plan to insert the source data into the initial table + let insert_into_table = + LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; + // Create a physical plan from the insert plan + let plan = session_ctx + .state() + .create_physical_plan(&insert_into_table) + .await?; + + // Execute the physical plan and collect the results + let res = collect(plan, session_ctx.task_ctx()).await?; + // Insert returns the number of rows written, in our case this would be 6. + let expected = vec![ + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &res); + + // Read the records in the table + let batches = session_ctx + .sql("select count(*) from t") + .await? + .collect() + .await?; + let expected = vec![ + "+----------+", + "| COUNT(*) |", + "+----------+", + "| 6 |", + "+----------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &batches); + + //asert that 6 files were added to the table + let num_files = tmp_dir.path().read_dir()?.count(); + assert_eq!(num_files, 6); + + // Create a physical plan from the insert plan + let plan = session_ctx + .state() + .create_physical_plan(&insert_into_table) + .await?; + + // Again, execute the physical plan and collect the results + let res = collect(plan, session_ctx.task_ctx()).await?; + // Insert returns the number of rows written, in our case this would be 6. + let expected = vec![ + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &res); + + // Read the contents of the table + let batches = session_ctx + .sql("select count(*) from t") + .await? + .collect() + .await?; + + // Define the expected result after the second append. + let expected = vec![ + "+----------+", + "| COUNT(*) |", + "+----------+", + "| 12 |", + "+----------+", + ]; + + // Assert that the batches read from the file after the second append match the expected result. + assert_batches_eq!(expected, &batches); + + // Assert that another 6 files were added to the table + let num_files = tmp_dir.path().read_dir()?.count(); + assert_eq!(num_files, 12); + + // Return Ok if the function + Ok(()) + } } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 5bac551cf61e..0441ac70585d 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -204,6 +204,7 @@ impl TableProvider for MemTable { &self, _state: &SessionState, input: Arc, + overwrite: bool, ) -> Result> { // Create a physical plan from the logical plan. // Check that the schema of the plan matches the schema of this table. @@ -212,6 +213,11 @@ impl TableProvider for MemTable { "Inserting query must have the same schema with the table." ); } + if overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrite not implemented for MemoryTable yet".into(), + )); + } let sink = Arc::new(MemSink::new(self.batches.clone())); Ok(Arc::new(InsertExec::new(input, sink, self.schema.clone()))) } @@ -252,7 +258,7 @@ impl MemSink { impl DataSink for MemSink { async fn write_all( &self, - mut data: SendableRecordBatchStream, + mut data: Vec, _context: &Arc, ) -> Result { let num_partitions = self.batches.len(); @@ -262,10 +268,14 @@ impl DataSink for MemSink { let mut new_batches = vec![vec![]; num_partitions]; let mut i = 0; let mut row_count = 0; - while let Some(batch) = data.next().await.transpose()? { - row_count += batch.num_rows(); - new_batches[i].push(batch); - i = (i + 1) % num_partitions; + let num_parts = data.len(); + // TODO parallelize outer and inner loops + for data_part in data.iter_mut().take(num_parts) { + while let Some(batch) = data_part.next().await.transpose()? { + row_count += batch.num_rows(); + new_batches[i].push(batch); + i = (i + 1) % num_partitions; + } } // write the outputs into the batches @@ -542,7 +552,7 @@ mod tests { let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; // Create an insert plan to insert the source data into the initial table let insert_into_table = - LogicalPlanBuilder::insert_into(scan_plan, "t", &schema)?.build()?; + LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; // Create a physical plan from the insert plan let plan = session_ctx .state() diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index cbae85f6c8be..141cac037004 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -277,6 +277,7 @@ pub async fn plan_to_json( let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; + let mut buffer = Vec::with_capacity(1024); while let Some(batch) = stream.next().await.transpose()? { let mut writer = json::LineDelimitedWriter::new(buffer); @@ -285,6 +286,7 @@ pub async fn plan_to_json( multipart_writer.write_all(&buffer).await?; buffer.clear(); } + multipart_writer .shutdown() .await diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ac660770b126..a9ca6fc90a6b 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -82,7 +82,7 @@ use std::{ vec, }; -use super::{ColumnStatistics, Statistics}; +use super::{listing::ListingTableUrl, ColumnStatistics, Statistics}; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -323,6 +323,8 @@ pub struct FileSinkConfig { pub object_store_url: ObjectStoreUrl, /// A vector of [`PartitionedFile`] structs, each representing a file partition pub file_groups: Vec, + /// Vector of partition paths + pub table_paths: Vec, /// The schema of the output file pub output_schema: SchemaRef, /// A vector of column names and their corresponding data types, @@ -330,6 +332,8 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// A writer mode that determines how data is written to the file pub writer_mode: FileWriterMode, + /// Controls whether existing data should be overwritten by this sink + pub overwrite: bool, } impl FileSinkConfig { diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 9c9793510589..6a81f8969611 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -127,8 +127,9 @@ pub trait TableProvider: Sync + Send { &self, _state: &SessionState, _input: Arc, + _overwrite: bool, ) -> Result> { - let msg = "Insertion not implemented for this table".to_owned(); + let msg = "Insert into not implemented for this table".to_owned(); Err(DataFusionError::NotImplemented(msg)) } } diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 8766b62e9a9e..622e33b117fd 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -36,7 +36,6 @@ use std::fmt::Debug; use std::sync::Arc; use crate::physical_plan::stream::RecordBatchStreamAdapter; -use crate::physical_plan::Distribution; use datafusion_common::DataFusionError; use datafusion_execution::TaskContext; @@ -57,7 +56,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { /// or rollback required. async fn write_all( &self, - data: SendableRecordBatchStream, + data: Vec, context: &Arc, ) -> Result; } @@ -97,7 +96,7 @@ impl InsertExec { } } - fn make_input_stream( + fn execute_input_stream( &self, partition: usize, context: Arc, @@ -136,6 +135,18 @@ impl InsertExec { ))) } } + + fn execute_all_input_streams( + &self, + context: Arc, + ) -> Result> { + let n_input_parts = self.input.output_partitioning().partition_count(); + let mut streams = Vec::with_capacity(n_input_parts); + for part in 0..n_input_parts { + streams.push(self.execute_input_stream(part, context.clone())?); + } + Ok(streams) + } } impl DisplayAs for InsertExec { @@ -172,8 +183,12 @@ impl ExecutionPlan for InsertExec { None } - fn required_input_distribution(&self) -> Vec { - vec![Distribution::SinglePartition] + fn benefits_from_input_partitioning(&self) -> bool { + // Incoming number of partitions is taken to be the + // number of files the query is required to write out. + // The optimizer should not change this number. + // Parrallelism is handled within the appropriate DataSink + false } fn required_input_ordering(&self) -> Vec>> { @@ -218,20 +233,10 @@ impl ExecutionPlan for InsertExec { ) -> Result { if partition != 0 { return Err(DataFusionError::Internal( - format!("Invalid requested partition {partition}. InsertExec requires a single input partition." - ))); + "InsertExec can only be called on partition 0!".into(), + )); } - - // Execute each of our own input's partitions and pass them to the sink - let input_partition_count = self.input.output_partitioning().partition_count(); - if input_partition_count != 1 { - return Err(DataFusionError::Internal(format!( - "Invalid input partition count {input_partition_count}. \ - InsertExec needs only a single partition." - ))); - } - - let data = self.make_input_stream(0, context.clone())?; + let data = self.execute_all_input_streams(context.clone())?; let count_schema = self.count_schema.clone(); let sink = self.sink.clone(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 509dab5474a0..e0dba6fe14bb 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -551,7 +551,7 @@ impl DefaultPhysicalPlanner { } LogicalPlan::Dml(DmlStatement { table_name, - op: WriteOp::Insert, + op: WriteOp::InsertInto, input, .. }) => { @@ -559,7 +559,24 @@ impl DefaultPhysicalPlanner { let schema = session_state.schema_for_ref(table_name)?; if let Some(provider) = schema.table(name).await { let input_exec = self.create_initial_plan(input, session_state).await?; - provider.insert_into(session_state, input_exec).await + provider.insert_into(session_state, input_exec, false).await + } else { + return Err(DataFusionError::Execution(format!( + "Table '{table_name}' does not exist" + ))); + } + } + LogicalPlan::Dml(DmlStatement { + table_name, + op: WriteOp::InsertOverwrite, + input, + .. + }) => { + let name = table_name.table(); + let schema = session_state.schema_for_ref(table_name)?; + if let Some(provider) = schema.table(name).await { + let input_exec = self.create_initial_plan(input, session_state).await?; + provider.insert_into(session_state, input_exec, true).await } else { return Err(DataFusionError::Execution(format!( "Table '{table_name}' does not exist" diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt index bd3513550a4e..aa560961d2f3 100644 --- a/datafusion/core/tests/sqllogictests/test_files/explain.slt +++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt @@ -159,7 +159,7 @@ query TT EXPLAIN INSERT INTO sink_table SELECT * FROM aggregate_test_100 ORDER by c1 ---- logical_plan -Dml: op=[Insert] table=[sink_table] +Dml: op=[Insert Into] table=[sink_table] --Projection: aggregate_test_100.c1 AS c1, aggregate_test_100.c2 AS c2, aggregate_test_100.c3 AS c3, aggregate_test_100.c4 AS c4, aggregate_test_100.c5 AS c5, aggregate_test_100.c6 AS c6, aggregate_test_100.c7 AS c7, aggregate_test_100.c8 AS c8, aggregate_test_100.c9 AS c9, aggregate_test_100.c10 AS c10, aggregate_test_100.c11 AS c11, aggregate_test_100.c12 AS c12, aggregate_test_100.c13 AS c13 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] diff --git a/datafusion/core/tests/sqllogictests/test_files/insert.slt b/datafusion/core/tests/sqllogictests/test_files/insert.slt index 90a33bd1c5f7..e42d2ef0592d 100644 --- a/datafusion/core/tests/sqllogictests/test_files/insert.slt +++ b/datafusion/core/tests/sqllogictests/test_files/insert.slt @@ -57,7 +57,7 @@ FROM aggregate_test_100 ORDER by c1 ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1 @@ -120,20 +120,19 @@ COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWI FROM aggregate_test_100 ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 ----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan InsertExec: sink=MemoryTable (partitions=1) ---CoalescePartitionsExec -----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] -------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] ---------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] -----------CoalesceBatchesExec: target_batch_size=8192 -------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 ---------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true +--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] +----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] +------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 +------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true @@ -168,7 +167,7 @@ FROM aggregate_test_100 ORDER BY c1 ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: a1 AS a1, a2 AS a2 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1 @@ -212,7 +211,7 @@ query TT explain insert into table_without_values select c1 from aggregate_test_100 order by c1; ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: aggregate_test_100.c1 AS c1 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index a121553ea501..176459db149a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -236,12 +236,20 @@ impl LogicalPlanBuilder { input: LogicalPlan, table_name: impl Into, table_schema: &Schema, + overwrite: bool, ) -> Result { let table_schema = table_schema.clone().to_dfschema_ref()?; + + let op = if overwrite { + WriteOp::InsertOverwrite + } else { + WriteOp::InsertInto + }; + Ok(Self::from(LogicalPlan::Dml(DmlStatement { table_name: table_name.into(), table_schema, - op: WriteOp::Insert, + op, input: Arc::new(input), }))) } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 117a42cda970..07f34101eb3a 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -40,7 +40,8 @@ pub struct DmlStatement { #[derive(Clone, PartialEq, Eq, Hash)] pub enum WriteOp { - Insert, + InsertOverwrite, + InsertInto, Delete, Update, Ctas, @@ -49,7 +50,8 @@ pub enum WriteOp { impl Display for WriteOp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - WriteOp::Insert => write!(f, "Insert"), + WriteOp::InsertOverwrite => write!(f, "Insert Overwrite"), + WriteOp::InsertInto => write!(f, "Insert Into"), WriteOp::Delete => write!(f, "Delete"), WriteOp::Update => write!(f, "Update"), WriteOp::Ctas => write!(f, "Ctas"), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index cb4a7cb52779..ad66640efa14 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -359,9 +359,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if or.is_some() { plan_err!("Inserts with or clauses not supported")?; } - if overwrite { - plan_err!("Insert overwrite is not supported")?; - } if partitioned.is_some() { plan_err!("Partitioned inserts not yet supported")?; } @@ -378,7 +375,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan_err!("Insert-returning clause not supported")?; } let _ = into; // optional keyword doesn't change behavior - self.insert_to_plan(table_name, columns, source) + self.insert_to_plan(table_name, columns, source, overwrite) } Statement::Update { @@ -934,6 +931,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { table_name: ObjectName, columns: Vec, source: Box, + overwrite: bool, ) -> Result { // Do a table lookup to verify the table exists let table_name = self.object_name_to_table_reference(table_name)?; @@ -1027,10 +1025,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>()?; let source = project(source, exprs)?; + let op = if overwrite { + WriteOp::InsertOverwrite + } else { + WriteOp::InsertInto + }; + let plan = LogicalPlan::Dml(DmlStatement { table_name, table_schema: Arc::new(table_schema), - op: WriteOp::Insert, + op, input: Arc::new(source), }); Ok(plan) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index f19ddfc05312..eef9093947fb 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -330,7 +330,7 @@ fn plan_insert() { let sql = "insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')"; let plan = r#" -Dml: op=[Insert] table=[person] +Dml: op=[Insert Into] table=[person] Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name Values: (Int64(1), Utf8("Alan"), Utf8("Turing")) "# @@ -342,7 +342,7 @@ Dml: op=[Insert] table=[person] fn plan_insert_no_target_columns() { let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)"; let plan = r#" -Dml: op=[Insert] table=[test_decimal] +Dml: op=[Insert Into] table=[test_decimal] Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price Values: (Int64(1), Int64(2)), (Int64(3), Int64(4)) "# @@ -3880,7 +3880,7 @@ fn test_prepare_statement_insert_infer() { let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3)"; let expected_plan = r#" -Dml: op=[Insert] table=[person] +Dml: op=[Insert Into] table=[person] Projection: column1 AS id, column2 AS first_name, column3 AS last_name Values: ($1, $2, $3) "# @@ -3904,7 +3904,7 @@ Dml: op=[Insert] table=[person] ScalarValue::Utf8(Some("Turing".to_string())), ]; let expected_plan = r#" -Dml: op=[Insert] table=[person] +Dml: op=[Insert Into] table=[person] Projection: column1 AS id, column2 AS first_name, column3 AS last_name Values: (UInt32(1), Utf8("Alan"), Utf8("Turing")) "#