From 1e863016ce11f131f4de117552a5a011acff6ebb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 1 Nov 2023 07:49:33 +0000 Subject: [PATCH 1/4] Remove FileWriterMode Support (#7994) --- .../core/src/datasource/file_format/csv.rs | 74 +--- .../core/src/datasource/file_format/json.rs | 59 +-- .../src/datasource/file_format/options.rs | 32 +- .../src/datasource/file_format/parquet.rs | 67 +--- .../src/datasource/file_format/write/mod.rs | 204 ++--------- .../file_format/write/orchestration.rs | 111 +----- datafusion/core/src/datasource/listing/mod.rs | 4 +- .../core/src/datasource/listing/table.rs | 336 +----------------- .../src/datasource/listing_table_factory.rs | 24 +- .../core/src/datasource/physical_plan/mod.rs | 7 +- datafusion/core/src/physical_planner.rs | 2 - datafusion/core/tests/fifo.rs | 1 + datafusion/proto/proto/datafusion.proto | 9 +- datafusion/proto/src/generated/pbjson.rs | 94 ----- datafusion/proto/src/generated/prost.rs | 31 -- .../proto/src/physical_plan/from_proto.rs | 12 - .../proto/src/physical_plan/to_proto.rs | 13 - .../tests/cases/roundtrip_physical_plan.rs | 2 - datafusion/sqllogictest/test_files/array.slt | 12 +- datafusion/sqllogictest/test_files/copy.slt | 2 +- datafusion/sqllogictest/test_files/errors.slt | 2 +- .../sqllogictest/test_files/explain.slt | 2 +- datafusion/sqllogictest/test_files/insert.slt | 2 +- .../test_files/insert_to_external.slt | 10 +- datafusion/sqllogictest/test_files/joins.slt | 1 - .../sqllogictest/test_files/options.slt | 4 +- datafusion/sqllogictest/test_files/order.slt | 2 +- .../sqllogictest/test_files/predicates.slt | 1 + .../sqllogictest/test_files/set_variable.slt | 2 +- datafusion/sqllogictest/test_files/update.slt | 2 +- 30 files changed, 96 insertions(+), 1028 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 5f2084bc80a8..684f416f771a 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -34,10 +34,10 @@ use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use super::write::orchestration::{stateless_append_all, stateless_multipart_put}; +use super::write::orchestration::stateless_multipart_put; use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; +use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, }; @@ -465,11 +465,7 @@ impl DisplayAs for CsvSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "CsvSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "CsvSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -481,55 +477,6 @@ impl CsvSink { fn new(config: FileSinkConfig) -> Self { Self { config } } - - async fn append_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - if !self.config.table_partition_cols.is_empty() { - return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into())); - } - let writer_options = self.config.file_type_writer_options.try_into_csv()?; - let (builder, compression) = - (&writer_options.writer_options, &writer_options.compression); - let compression = FileCompressionType::from(*compression); - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - let file_groups = &self.config.file_groups; - - let builder_clone = builder.clone(); - let options_clone = writer_options.clone(); - let get_serializer = move |file_size| { - let inner_clone = builder_clone.clone(); - // In append mode, consider has_header flag only when file is empty (at the start). - // For other modes, use has_header flag as is. - let serializer: Box = Box::new(if file_size > 0 { - CsvSerializer::new() - .with_builder(inner_clone) - .with_header(false) - } else { - CsvSerializer::new() - .with_builder(inner_clone) - .with_header(options_clone.writer_options.header()) - }); - serializer - }; - - stateless_append_all( - data, - context, - object_store, - file_groups, - self.config.unbounded_input, - compression, - Box::new(get_serializer), - ) - .await - } - async fn multipartput_all( &self, data: SendableRecordBatchStream, @@ -577,19 +524,8 @@ impl DataSink for CsvSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - match self.config.writer_mode { - FileWriterMode::Append => { - let total_count = self.append_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::PutMultipart => { - let total_count = self.multipartput_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::Put => { - return not_impl_err!("FileWriterMode::Put is not supported yet!") - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 8d62d0a858ac..9893a1db45de 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; -use super::write::orchestration::{stateless_append_all, stateless_multipart_put}; +use super::write::orchestration::stateless_multipart_put; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; +use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec}; use crate::error::Result; @@ -245,11 +245,7 @@ impl DisplayAs for JsonSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "JsonSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "JsonSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -268,40 +264,6 @@ impl JsonSink { &self.config } - async fn append_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - if !self.config.table_partition_cols.is_empty() { - return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into())); - } - - let writer_options = self.config.file_type_writer_options.try_into_json()?; - let compression = &writer_options.compression; - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - let file_groups = &self.config.file_groups; - - let get_serializer = move |_| { - let serializer: Box = Box::new(JsonSerializer::new()); - serializer - }; - - stateless_append_all( - data, - context, - object_store, - file_groups, - self.config.unbounded_input, - (*compression).into(), - Box::new(get_serializer), - ) - .await - } - async fn multipartput_all( &self, data: SendableRecordBatchStream, @@ -342,19 +304,8 @@ impl DataSink for JsonSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - match self.config.writer_mode { - FileWriterMode::Append => { - let total_count = self.append_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::PutMultipart => { - let total_count = self.multipartput_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::Put => { - return not_impl_err!("FileWriterMode::Put is not supported yet!") - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 41a70e6d2f8f..4c7557a4a9c0 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -28,7 +28,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; -use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl}; +use crate::datasource::listing::ListingTableUrl; use crate::datasource::{ file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat}, listing::ListingOptions, @@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for CsvReadOptions<'a> { @@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendToFile, } } @@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } /// Options that control the reading of Parquet files. @@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> { pub schema: Option<&'a Schema>, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for ParquetReadOptions<'a> { @@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> { skip_metadata: None, schema: None, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendNewFiles, } } } @@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } /// Options that control the reading of ARROW files. @@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for NdJsonReadOptions<'a> { @@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendToFile, } } } @@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } #[async_trait] @@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) .with_infinite_source(self.infinite) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { .with_table_partition_cols(self.table_partition_cols.clone()) .with_infinite_source(self.infinite) .with_file_sort_order(self.file_sort_order.clone()) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 2cba474e559e..c4d05adfc6bc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -40,11 +40,12 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use bytes::{BufMut, BytesMut}; -use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; +use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; +use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, @@ -55,7 +56,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use super::write::demux::start_demuxer_task; -use super::write::{create_writer, AbortableWrite, FileWriterMode}; +use super::write::{create_writer, AbortableWrite}; use super::{FileFormat, FileScanConfig}; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, @@ -64,7 +65,7 @@ use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; use crate::datasource::physical_plan::{ - FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, + FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter, }; use crate::error::Result; use crate::execution::context::SessionState; @@ -596,11 +597,7 @@ impl DisplayAs for ParquetSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "ParquetSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "ParquetSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -642,36 +639,23 @@ impl ParquetSink { /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized async fn create_async_arrow_writer( &self, - file_meta: FileMeta, + location: &Path, object_store: Arc, parquet_props: WriterProperties, ) -> Result< AsyncArrowWriter>, > { - let object = &file_meta.object_meta; - match self.config.writer_mode { - FileWriterMode::Append => { - plan_err!( - "Appending to Parquet files is not supported by the file format!" - ) - } - FileWriterMode::Put => { - not_impl_err!("FileWriterMode::Put is not implemented for ParquetSink") - } - FileWriterMode::PutMultipart => { - let (_, multipart_writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AsyncArrowWriter::try_new( - multipart_writer, - self.get_writer_schema(), - 10485760, - Some(parquet_props), - )?; - Ok(writer) - } - } + let (_, multipart_writer) = object_store + .put_multipart(location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AsyncArrowWriter::try_new( + multipart_writer, + self.get_writer_schema(), + 10485760, + Some(parquet_props), + )?; + Ok(writer) } } @@ -730,13 +714,7 @@ impl DataSink for ParquetSink { if !allow_single_file_parallelism { let mut writer = self .create_async_arrow_writer( - ObjectMeta { - location: path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - } - .into(), + &path, object_store.clone(), parquet_props.clone(), ) @@ -752,17 +730,10 @@ impl DataSink for ParquetSink { }); } else { let writer = create_writer( - FileWriterMode::PutMultipart, // Parquet files as a whole are never compressed, since they // manage compressed blocks themselves. FileCompressionType::UNCOMPRESSED, - ObjectMeta { - location: path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - } - .into(), + &path, object_store.clone(), ) .await?; diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index 770c7a49c326..cfcdbd8c464e 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -19,128 +19,32 @@ //! write support for the various file formats use std::io::Error; -use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::FileMeta; use crate::error::Result; use arrow_array::RecordBatch; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common::DataFusionError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::ready; -use futures::FutureExt; use object_store::path::Path; -use object_store::{MultipartId, ObjectMeta, ObjectStore}; +use object_store::{MultipartId, ObjectStore}; use tokio::io::AsyncWrite; pub(crate) mod demux; pub(crate) mod orchestration; -/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. -/// It is specifically designed for the `object_store` crate's `put` method and sends -/// whole bytes at once when the buffer is flushed. -pub struct AsyncPutWriter { - /// Object metadata - object_meta: ObjectMeta, - /// A shared reference to the object store - store: Arc, - /// A buffer that stores the bytes to be sent - current_buffer: Vec, - /// Used for async handling in flush method - inner_state: AsyncPutState, -} - -impl AsyncPutWriter { - /// Constructor for the `AsyncPutWriter` object - pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { - Self { - object_meta, - store, - current_buffer: vec![], - // The writer starts out in buffering mode - inner_state: AsyncPutState::Buffer, - } - } - - /// Separate implementation function that unpins the [`AsyncPutWriter`] so - /// that partial borrows work correctly - fn poll_shutdown_inner( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - loop { - match &mut self.inner_state { - AsyncPutState::Buffer => { - // Convert the current buffer to bytes and take ownership of it - let bytes = Bytes::from(mem::take(&mut self.current_buffer)); - // Set the inner state to Put variant with the bytes - self.inner_state = AsyncPutState::Put { bytes } - } - AsyncPutState::Put { bytes } => { - // Send the bytes to the object store's put method - return Poll::Ready( - ready!(self - .store - .put(&self.object_meta.location, bytes.clone()) - .poll_unpin(cx)) - .map_err(Error::from), - ); - } - } - } - } -} - -/// An enum that represents the inner state of AsyncPut -enum AsyncPutState { - /// Building Bytes struct in this state - Buffer, - /// Data in the buffer is being sent to the object store - Put { bytes: Bytes }, -} - -impl AsyncWrite for AsyncPutWriter { - // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct - fn poll_write( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // Extend the current buffer with the incoming buffer - self.current_buffer.extend_from_slice(buf); - // Return a ready poll with the length of the incoming buffer - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - // Return a ready poll with an empty result - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Call the poll_shutdown_inner method to handle the actual sending of data to the object store - self.poll_shutdown_inner(cx) - } -} - /// Stores data needed during abortion of MultiPart writers +#[derive(Clone)] pub(crate) struct MultiPart { /// A shared reference to the object store store: Arc, @@ -163,45 +67,28 @@ impl MultiPart { } } -pub(crate) enum AbortMode { - Put, - Append, - MultiPart(MultiPart), -} - /// A wrapper struct with abort method and writer pub(crate) struct AbortableWrite { writer: W, - mode: AbortMode, + multipart: MultiPart, } impl AbortableWrite { /// Create a new `AbortableWrite` instance with the given writer, and write mode. - pub(crate) fn new(writer: W, mode: AbortMode) -> Self { - Self { writer, mode } + pub(crate) fn new(writer: W, multipart: MultiPart) -> Self { + Self { writer, multipart } } /// handling of abort for different write modes pub(crate) fn abort_writer(&self) -> Result>> { - match &self.mode { - AbortMode::Put => Ok(async { Ok(()) }.boxed()), - AbortMode::Append => exec_err!("Cannot abort in append mode"), - AbortMode::MultiPart(MultiPart { - store, - multipart_id, - location, - }) => { - let location = location.clone(); - let multipart_id = multipart_id.clone(); - let store = store.clone(); - Ok(Box::pin(async move { - store - .abort_multipart(&location, &multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } - } + let multi = self.multipart.clone(); + Ok(Box::pin(async move { + multi + .store + .abort_multipart(&multi.location, &multi.multipart_id) + .await + .map_err(DataFusionError::ObjectStore) + })) } } @@ -229,16 +116,6 @@ impl AsyncWrite for AbortableWrite { } } -/// An enum that defines different file writer modes. -#[derive(Debug, Clone, Copy)] -pub enum FileWriterMode { - /// Data is appended to an existing file. - Append, - /// Data is written to a new file. - Put, - /// Data is written to a new file in multiple parts. - PutMultipart, -} /// A trait that defines the methods required for a RecordBatch serializer. #[async_trait] pub trait BatchSerializer: Unpin + Send { @@ -255,51 +132,16 @@ pub trait BatchSerializer: Unpin + Send { /// Returns an [`AbortableWrite`] which writes to the given object store location /// with the specified compression pub(crate) async fn create_writer( - writer_mode: FileWriterMode, file_compression_type: FileCompressionType, - file_meta: FileMeta, + location: &Path, object_store: Arc, ) -> Result>> { - let object = &file_meta.object_meta; - match writer_mode { - // If the mode is append, call the store's append method and return wrapped in - // a boxed trait object. - FileWriterMode::Append => { - let writer = object_store - .append(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Append, - ); - Ok(writer) - } - // If the mode is put, create a new AsyncPut writer and return it wrapped in - // a boxed trait object - FileWriterMode::Put => { - let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Put, - ); - Ok(writer) - } - // If the mode is put multipart, call the store's put_multipart method and - // return the writer wrapped in a boxed trait object. - FileWriterMode::PutMultipart => { - let (multipart_id, writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::MultiPart(MultiPart::new( - object_store, - multipart_id, - object.location.clone(), - )), - )) - } - } + let (multipart_id, writer) = object_store + .put_multipart(location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + MultiPart::new(object_store, multipart_id, location.clone()), + )) } diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index f84baa9ac225..2ae6b70ed1c5 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::FileSinkConfig; use crate::error::Result; use crate::physical_plan::SendableRecordBatchStream; @@ -34,17 +33,13 @@ use datafusion_common::DataFusionError; use bytes::Bytes; use datafusion_execution::TaskContext; -use futures::StreamExt; - -use object_store::{ObjectMeta, ObjectStore}; - use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; use tokio::task::{JoinHandle, JoinSet}; use tokio::try_join; use super::demux::start_demuxer_task; -use super::{create_writer, AbortableWrite, BatchSerializer, FileWriterMode}; +use super::{create_writer, AbortableWrite, BatchSerializer}; type WriterType = AbortableWrite>; type SerializerType = Box; @@ -274,21 +269,9 @@ pub(crate) async fn stateless_multipart_put( stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) .await }); - while let Some((output_location, rb_stream)) = file_stream_rx.recv().await { + while let Some((location, rb_stream)) = file_stream_rx.recv().await { let serializer = get_serializer(); - let object_meta = ObjectMeta { - location: output_location, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - FileWriterMode::PutMultipart, - compression, - object_meta.into(), - object_store.clone(), - ) - .await?; + let writer = create_writer(compression, &location, object_store.clone()).await?; tx_file_bundle .send((rb_stream, serializer, writer)) @@ -325,91 +308,3 @@ pub(crate) async fn stateless_multipart_put( Ok(total_count) } - -/// Orchestrates append_all for any statelessly serialized file type. Appends to all files provided -/// in a round robin fashion. -pub(crate) async fn stateless_append_all( - mut data: SendableRecordBatchStream, - context: &Arc, - object_store: Arc, - file_groups: &Vec, - unbounded_input: bool, - compression: FileCompressionType, - get_serializer: Box Box + Send>, -) -> Result { - let rb_buffer_size = &context - .session_config() - .options() - .execution - .max_buffered_batches_per_output_file; - - let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(file_groups.len()); - let mut send_channels = vec![]; - for file_group in file_groups { - let serializer = get_serializer(file_group.object_meta.size); - - let file = file_group.clone(); - let writer = create_writer( - FileWriterMode::Append, - compression, - file.object_meta.clone().into(), - object_store.clone(), - ) - .await?; - - let (tx, rx) = tokio::sync::mpsc::channel(rb_buffer_size / 2); - send_channels.push(tx); - tx_file_bundle - .send((rx, serializer, writer)) - .await - .map_err(|_| { - DataFusionError::Internal( - "Writer receive file bundle channel closed unexpectedly!".into(), - ) - })?; - } - - let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel(); - let write_coordinater_task = tokio::spawn(async move { - stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) - .await - }); - - // Append to file groups in round robin - let mut next_file_idx = 0; - while let Some(rb) = data.next().await.transpose()? { - send_channels[next_file_idx].send(rb).await.map_err(|_| { - DataFusionError::Internal( - "Recordbatch file append stream closed unexpectedly!".into(), - ) - })?; - next_file_idx = (next_file_idx + 1) % send_channels.len(); - if unbounded_input { - tokio::task::yield_now().await; - } - } - // Signal to the write coordinater that no more files are coming - drop(tx_file_bundle); - drop(send_channels); - - let total_count = rx_row_cnt.await.map_err(|_| { - DataFusionError::Internal( - "Did not receieve row count from write coordinater".into(), - ) - })?; - - match try_join!(write_coordinater_task) { - Ok(r1) => { - r1.0?; - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - - Ok(total_count) -} diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 8b0f021f0277..aa2e20164b5e 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -31,9 +31,7 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use table::{ - ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode, -}; +pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c22eb58e88fa..84c69c833d77 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -214,33 +214,6 @@ impl ListingTableConfig { } } -#[derive(Debug, Clone)] -///controls how new data should be inserted to a ListingTable -pub enum ListingTableInsertMode { - ///Data should be appended to an existing file - AppendToFile, - ///Data is appended as new files in existing TablePaths - AppendNewFiles, - ///Throw an error if insert into is attempted on this table - Error, -} - -impl FromStr for ListingTableInsertMode { - type Err = DataFusionError; - fn from_str(s: &str) -> Result { - let s_lower = s.to_lowercase(); - match s_lower.as_str() { - "append_to_file" => Ok(ListingTableInsertMode::AppendToFile), - "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles), - "error" => Ok(ListingTableInsertMode::Error), - _ => plan_err!( - "Unknown or unsupported insert mode {s}. Supported options are \ - append_to_file, append_new_files, and error." - ), - } - } -} - /// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { @@ -279,8 +252,6 @@ pub struct ListingOptions { /// In order to support infinite inputs, DataFusion may adjust query /// plans (e.g. joins) to run the given query in full pipelining mode. pub infinite_source: bool, - /// This setting controls how inserts to this table should be handled - pub insert_mode: ListingTableInsertMode, /// This setting when true indicates that the table is backed by a single file. /// Any inserts to the table may only append to this existing file. pub single_file: bool, @@ -305,7 +276,6 @@ impl ListingOptions { target_partitions: 1, file_sort_order: vec![], infinite_source: false, - insert_mode: ListingTableInsertMode::AppendToFile, single_file: false, file_type_write_options: None, } @@ -476,12 +446,6 @@ impl ListingOptions { self } - /// Configure how insertions to this table should be handled. - pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } - /// Configure if this table is backed by a sigle file pub fn with_single_file(mut self, single_file: bool) -> Self { self.single_file = single_file; @@ -820,31 +784,6 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; - //if we are writing a single output_partition to a table backed by a single file - //we can append to that file. Otherwise, we can write new files into the directory - //adding new files to the listing table in order to insert to the table. - let input_partitions = input.output_partitioning().partition_count(); - let writer_mode = match self.options.insert_mode { - ListingTableInsertMode::AppendToFile => { - if input_partitions > file_groups.len() { - return plan_err!( - "Cannot append {input_partitions} partitions to {} files!", - file_groups.len() - ); - } - - crate::datasource::file_format::write::FileWriterMode::Append - } - ListingTableInsertMode::AppendNewFiles => { - crate::datasource::file_format::write::FileWriterMode::PutMultipart - } - ListingTableInsertMode::Error => { - return plan_err!( - "Invalid plan attempting write to table with TableWriteMode::Error!" - ); - } - }; - let file_format = self.options().format.as_ref(); let file_type_writer_options = match &self.options().file_type_write_options { @@ -862,7 +801,6 @@ impl TableProvider for ListingTable { file_groups, output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), - writer_mode, // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT // queries. Thus, we can check if the plan is streaming to ensure file sink input is // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now` @@ -877,14 +815,6 @@ impl TableProvider for ListingTable { let unsorted: Vec> = vec![]; let order_requirements = if self.options().file_sort_order != unsorted { - if matches!( - self.options().insert_mode, - ListingTableInsertMode::AppendToFile - ) { - return plan_err!( - "Cannot insert into a sorted ListingTable with mode append!" - ); - } // Multiple sort orders in outer vec are equivalent, so we pass only the first one let ordering = self .try_create_output_ordering()? @@ -1003,7 +933,7 @@ mod tests { use crate::prelude::*; use crate::{ assert_batches_eq, - datasource::file_format::{avro::AvroFormat, file_compression_type::FileTypeExt}, + datasource::file_format::avro::AvroFormat, execution::options::ReadOptions, logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, @@ -1567,17 +1497,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_json_file() -> Result<()> { - helper_test_insert_into_append_to_existing_files( - FileType::JSON, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await?; - Ok(()) - } - #[tokio::test] async fn test_insert_into_append_new_json_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); @@ -1596,17 +1515,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_csv_file() -> Result<()> { - helper_test_insert_into_append_to_existing_files( - FileType::CSV, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await?; - Ok(()) - } - #[tokio::test] async fn test_insert_into_append_new_csv_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); @@ -1663,13 +1571,8 @@ mod tests { #[tokio::test] async fn test_insert_into_sql_csv_defaults() -> Result<()> { - helper_test_insert_into_sql( - "csv", - FileCompressionType::UNCOMPRESSED, - "OPTIONS (insert_mode 'append_new_files')", - None, - ) - .await?; + helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None) + .await?; Ok(()) } @@ -1678,8 +1581,7 @@ mod tests { helper_test_insert_into_sql( "csv", FileCompressionType::UNCOMPRESSED, - "WITH HEADER ROW \ - OPTIONS (insert_mode 'append_new_files')", + "WITH HEADER ROW", None, ) .await?; @@ -1688,13 +1590,8 @@ mod tests { #[tokio::test] async fn test_insert_into_sql_json_defaults() -> Result<()> { - helper_test_insert_into_sql( - "json", - FileCompressionType::UNCOMPRESSED, - "OPTIONS (insert_mode 'append_new_files')", - None, - ) - .await?; + helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None) + .await?; Ok(()) } @@ -1879,211 +1776,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> { - let maybe_err = helper_test_insert_into_append_to_existing_files( - FileType::PARQUET, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await; - let _err = - maybe_err.expect_err("Appending to existing parquet file did not fail!"); - Ok(()) - } - - fn load_empty_schema_table( - schema: SchemaRef, - temp_path: &str, - insert_mode: ListingTableInsertMode, - file_format: Arc, - ) -> Result> { - File::create(temp_path)?; - let table_path = ListingTableUrl::parse(temp_path).unwrap(); - - let listing_options = - ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode); - - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_options) - .with_schema(schema); - - let table = ListingTable::try_new(config)?; - Ok(Arc::new(table)) - } - - /// Logic of testing inserting into listing table by Appending to existing files - /// is the same for all formats/options which support this. This helper allows - /// passing different options to execute the same test with different settings. - async fn helper_test_insert_into_append_to_existing_files( - file_type: FileType, - file_compression_type: FileCompressionType, - session_config_map: Option>, - ) -> Result<()> { - // Create the initial context, schema, and batch. - let session_ctx = match session_config_map { - Some(cfg) => { - let config = SessionConfig::from_string_hash_map(cfg)?; - SessionContext::new_with_config(config) - } - None => SessionContext::new(), - }; - // Create a new schema with one field called "a" of type Int32 - let schema = Arc::new(Schema::new(vec![Field::new( - "column1", - DataType::Int32, - false, - )])); - - // Create a new batch of data to insert into the table - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], - )?; - - // Filename with extension - let filename = format!( - "path{}", - file_type - .to_owned() - .get_ext_with_compression(file_compression_type) - .unwrap() - ); - - // Create a temporary directory and a CSV file within it. - let tmp_dir = TempDir::new()?; - let path = tmp_dir.path().join(filename); - - let file_format: Arc = match file_type { - FileType::CSV => Arc::new( - CsvFormat::default().with_file_compression_type(file_compression_type), - ), - FileType::JSON => Arc::new( - JsonFormat::default().with_file_compression_type(file_compression_type), - ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), - FileType::AVRO => Arc::new(AvroFormat {}), - FileType::ARROW => Arc::new(ArrowFormat {}), - }; - - let initial_table = load_empty_schema_table( - schema.clone(), - path.to_str().unwrap(), - ListingTableInsertMode::AppendToFile, - file_format, - )?; - session_ctx.register_table("t", initial_table)?; - // Create and register the source table with the provided schema and inserted data - let source_table = Arc::new(MemTable::try_new( - schema.clone(), - vec![vec![batch.clone(), batch.clone()]], - )?); - session_ctx.register_table("source", source_table.clone())?; - // Convert the source table into a provider so that it can be used in a query - let source = provider_as_source(source_table); - // Create a table scan logical plan to read from the source table - let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; - // Create an insert plan to insert the source data into the initial table - let insert_into_table = - LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; - // Create a physical plan from the insert plan - let plan = session_ctx - .state() - .create_physical_plan(&insert_into_table) - .await?; - - // Execute the physical plan and collect the results - let res = collect(plan, session_ctx.task_ctx()).await?; - // Insert returns the number of rows written, in our case this would be 6. - let expected = [ - "+-------+", - "| count |", - "+-------+", - "| 6 |", - "+-------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &res); - - // Read the records in the table - let batches = session_ctx.sql("select * from t").await?.collect().await?; - - // Define the expected result as a vector of strings. - let expected = [ - "+---------+", - "| column1 |", - "+---------+", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "+---------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &batches); - - // Assert that only 1 file was added to the table - let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 1); - - // Create a physical plan from the insert plan - let plan = session_ctx - .state() - .create_physical_plan(&insert_into_table) - .await?; - - // Again, execute the physical plan and collect the results - let res = collect(plan, session_ctx.task_ctx()).await?; - // Insert returns the number of rows written, in our case this would be 6. - let expected = [ - "+-------+", - "| count |", - "+-------+", - "| 6 |", - "+-------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &res); - - // Open the CSV file, read its contents as a record batch, and collect the batches into a vector. - let batches = session_ctx.sql("select * from t").await?.collect().await?; - - // Define the expected result after the second append. - let expected = vec![ - "+---------+", - "| column1 |", - "+---------+", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "+---------+", - ]; - - // Assert that the batches read from the file after the second append match the expected result. - assert_batches_eq!(expected, &batches); - - // Assert that no additional files were added to the table - let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 1); - - // Return Ok if the function - Ok(()) - } - async fn helper_test_append_new_files_to_table( file_type: FileType, file_compression_type: FileCompressionType, @@ -2129,7 +1821,6 @@ mod tests { "t", tmp_dir.path().to_str().unwrap(), CsvReadOptions::new() - .insert_mode(ListingTableInsertMode::AppendNewFiles) .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) @@ -2141,7 +1832,6 @@ mod tests { "t", tmp_dir.path().to_str().unwrap(), NdJsonReadOptions::default() - .insert_mode(ListingTableInsertMode::AppendNewFiles) .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) @@ -2152,9 +1842,7 @@ mod tests { .register_parquet( "t", tmp_dir.path().to_str().unwrap(), - ParquetReadOptions::default() - .insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + ParquetReadOptions::default().schema(schema.as_ref()), ) .await?; } @@ -2163,10 +1851,7 @@ mod tests { .register_avro( "t", tmp_dir.path().to_str().unwrap(), - AvroReadOptions::default() - // TODO implement insert_mode for avro - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + AvroReadOptions::default().schema(schema.as_ref()), ) .await?; } @@ -2175,10 +1860,7 @@ mod tests { .register_arrow( "t", tmp_dir.path().to_str().unwrap(), - ArrowReadOptions::default() - // TODO implement insert_mode for arrow - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + ArrowReadOptions::default().schema(schema.as_ref()), ) .await?; } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index f9a7ab04ce68..543a3a83f7c5 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -21,8 +21,6 @@ use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use super::listing::ListingTableInsertMode; - #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::{ @@ -38,7 +36,7 @@ use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions}; -use datafusion_common::{DataFusionError, FileType}; +use datafusion_common::{plan_err, DataFusionError, FileType}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; @@ -149,19 +147,12 @@ impl TableProviderFactory for ListingTableFactory { .take_bool_option("single_file")? .unwrap_or(false); - let explicit_insert_mode = statement_options.take_str_option("insert_mode"); - let insert_mode = match explicit_insert_mode { - Some(mode) => ListingTableInsertMode::from_str(mode.as_str()), - None => match file_type { - FileType::CSV => Ok(ListingTableInsertMode::AppendToFile), - #[cfg(feature = "parquet")] - FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles), - FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles), - FileType::JSON => Ok(ListingTableInsertMode::AppendToFile), - FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles), - }, - }?; - + // Backwards compatibility + if let Some(s) = statement_options.take_str_option("insert_mode") { + if !s.eq_ignore_ascii_case("append_new_files") { + return plan_err!("Unknown or unsupported insert mode {s}. Only append_to_file supported"); + } + } let file_type = file_format.file_type(); // Use remaining options and session state to build FileTypeWriterOptions @@ -214,7 +205,6 @@ impl TableProviderFactory for ListingTableFactory { .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()) - .with_insert_mode(insert_mode) .with_single_file(single_file) .with_write_options(file_type_writer_options) .with_infinite_source(unbounded); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ea0a9698ff5c..738e70966bce 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -49,10 +49,7 @@ use std::{ use super::listing::ListingTableUrl; use crate::error::{DataFusionError, Result}; -use crate::{ - datasource::file_format::write::FileWriterMode, - physical_plan::{DisplayAs, DisplayFormatType}, -}; +use crate::physical_plan::{DisplayAs, DisplayFormatType}; use crate::{ datasource::{ listing::{FileRange, PartitionedFile}, @@ -90,8 +87,6 @@ pub struct FileSinkConfig { /// A vector of column names and their corresponding data types, /// representing the partitioning columns for the file pub table_partition_cols: Vec<(String, DataType)>, - /// A writer mode that determines how data is written to the file - pub writer_mode: FileWriterMode, /// If true, it is assumed there is a single table_path which is a file to which all data should be written /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index fffc51abeb67..4f146cb1be54 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -27,7 +27,6 @@ use crate::datasource::file_format::csv::CsvFormat; use crate::datasource::file_format::json::JsonFormat; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; -use crate::datasource::file_format::write::FileWriterMode; use crate::datasource::file_format::FileFormat; use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::FileSinkConfig; @@ -591,7 +590,6 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols: vec![], unbounded_input: false, - writer_mode: FileWriterMode::PutMultipart, single_file_output: *single_file_output, overwrite: false, file_type_writer_options diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 93c7f7368065..67841ffec07e 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -323,6 +323,7 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] + #[ignore] async fn test_sql_insert_into_fifo() -> Result<()> { // To make unbounded deterministic let waiting = Arc::new(AtomicBool::new(true)); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index fa080518d50c..7bd9b7112d16 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1153,12 +1153,6 @@ message PhysicalPlanNode { } } -enum FileWriterMode { - APPEND = 0; - PUT = 1; - PUT_MULTIPART = 2; -} - enum CompressionTypeVariant { GZIP = 0; BZIP2 = 1; @@ -1183,12 +1177,13 @@ message JsonWriterOptions { } message FileSinkConfig { + reserved 6; // writer_mode + string object_store_url = 1; repeated PartitionedFile file_groups = 2; repeated string table_paths = 3; Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; - FileWriterMode writer_mode = 6; bool single_file_output = 7; bool unbounded_input = 8; bool overwrite = 9; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 08e7413102e8..ed670cc36680 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -7471,9 +7471,6 @@ impl serde::Serialize for FileSinkConfig { if !self.table_partition_cols.is_empty() { len += 1; } - if self.writer_mode != 0 { - len += 1; - } if self.single_file_output { len += 1; } @@ -7502,11 +7499,6 @@ impl serde::Serialize for FileSinkConfig { if !self.table_partition_cols.is_empty() { struct_ser.serialize_field("tablePartitionCols", &self.table_partition_cols)?; } - if self.writer_mode != 0 { - let v = FileWriterMode::try_from(self.writer_mode) - .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.writer_mode)))?; - struct_ser.serialize_field("writerMode", &v)?; - } if self.single_file_output { struct_ser.serialize_field("singleFileOutput", &self.single_file_output)?; } @@ -7539,8 +7531,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "outputSchema", "table_partition_cols", "tablePartitionCols", - "writer_mode", - "writerMode", "single_file_output", "singleFileOutput", "unbounded_input", @@ -7557,7 +7547,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { TablePaths, OutputSchema, TablePartitionCols, - WriterMode, SingleFileOutput, UnboundedInput, Overwrite, @@ -7588,7 +7577,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "tablePaths" | "table_paths" => Ok(GeneratedField::TablePaths), "outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), - "writerMode" | "writer_mode" => Ok(GeneratedField::WriterMode), "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput), "unboundedInput" | "unbounded_input" => Ok(GeneratedField::UnboundedInput), "overwrite" => Ok(GeneratedField::Overwrite), @@ -7617,7 +7605,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut table_paths__ = None; let mut output_schema__ = None; let mut table_partition_cols__ = None; - let mut writer_mode__ = None; let mut single_file_output__ = None; let mut unbounded_input__ = None; let mut overwrite__ = None; @@ -7654,12 +7641,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } table_partition_cols__ = Some(map_.next_value()?); } - GeneratedField::WriterMode => { - if writer_mode__.is_some() { - return Err(serde::de::Error::duplicate_field("writerMode")); - } - writer_mode__ = Some(map_.next_value::()? as i32); - } GeneratedField::SingleFileOutput => { if single_file_output__.is_some() { return Err(serde::de::Error::duplicate_field("singleFileOutput")); @@ -7692,7 +7673,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { table_paths: table_paths__.unwrap_or_default(), output_schema: output_schema__, table_partition_cols: table_partition_cols__.unwrap_or_default(), - writer_mode: writer_mode__.unwrap_or_default(), single_file_output: single_file_output__.unwrap_or_default(), unbounded_input: unbounded_input__.unwrap_or_default(), overwrite: overwrite__.unwrap_or_default(), @@ -7800,80 +7780,6 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { deserializer.deserialize_struct("datafusion.FileTypeWriterOptions", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for FileWriterMode { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - let variant = match self { - Self::Append => "APPEND", - Self::Put => "PUT", - Self::PutMultipart => "PUT_MULTIPART", - }; - serializer.serialize_str(variant) - } -} -impl<'de> serde::Deserialize<'de> for FileWriterMode { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "APPEND", - "PUT", - "PUT_MULTIPART", - ]; - - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = FileWriterMode; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - fn visit_i64(self, v: i64) -> std::result::Result - where - E: serde::de::Error, - { - i32::try_from(v) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_else(|| { - serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) - }) - } - - fn visit_u64(self, v: u64) -> std::result::Result - where - E: serde::de::Error, - { - i32::try_from(v) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_else(|| { - serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) - }) - } - - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "APPEND" => Ok(FileWriterMode::Append), - "PUT" => Ok(FileWriterMode::Put), - "PUT_MULTIPART" => Ok(FileWriterMode::PutMultipart), - _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), - } - } - } - deserializer.deserialize_any(GeneratedVisitor) - } -} impl serde::Serialize for FilterExecNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 15606488b33a..79b0658f24c0 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1613,8 +1613,6 @@ pub struct FileSinkConfig { pub output_schema: ::core::option::Option, #[prost(message, repeated, tag = "5")] pub table_partition_cols: ::prost::alloc::vec::Vec, - #[prost(enumeration = "FileWriterMode", tag = "6")] - pub writer_mode: i32, #[prost(bool, tag = "7")] pub single_file_output: bool, #[prost(bool, tag = "8")] @@ -3189,35 +3187,6 @@ impl UnionMode { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] -pub enum FileWriterMode { - Append = 0, - Put = 1, - PutMultipart = 2, -} -impl FileWriterMode { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - FileWriterMode::Append => "APPEND", - FileWriterMode::Put => "PUT", - FileWriterMode::PutMultipart => "PUT_MULTIPART", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "APPEND" => Some(Self::Append), - "PUT" => Some(Self::Put), - "PUT_MULTIPART" => Some(Self::PutMultipart), - _ => None, - } - } -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] pub enum CompressionTypeVariant { Gzip = 0, Bzip2 = 1, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 22b74db9afd2..f5771ddb155b 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use arrow::compute::SortOptions; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::file_format::json::JsonSink; -use datafusion::datasource::file_format::write::FileWriterMode; use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; @@ -739,7 +738,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { table_paths, output_schema: Arc::new(convert_required!(conf.output_schema)?), table_partition_cols, - writer_mode: conf.writer_mode().into(), single_file_output: conf.single_file_output, unbounded_input: conf.unbounded_input, overwrite: conf.overwrite, @@ -748,16 +746,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { } } -impl From for FileWriterMode { - fn from(value: protobuf::FileWriterMode) -> Self { - match value { - protobuf::FileWriterMode::Append => Self::Append, - protobuf::FileWriterMode::Put => Self::Put, - protobuf::FileWriterMode::PutMultipart => Self::PutMultipart, - } - } -} - impl From for CompressionTypeVariant { fn from(value: protobuf::CompressionTypeVariant) -> Self { match value { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index b8a590b0dc1a..44864be947d5 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -31,7 +31,6 @@ use datafusion::datasource::{ file_format::json::JsonSink, physical_plan::FileScanConfig, }; use datafusion::datasource::{ - file_format::write::FileWriterMode, listing::{FileRange, PartitionedFile}, physical_plan::FileSinkConfig, }; @@ -819,7 +818,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { type Error = DataFusionError; fn try_from(conf: &FileSinkConfig) -> Result { - let writer_mode: protobuf::FileWriterMode = conf.writer_mode.into(); let file_groups = conf .file_groups .iter() @@ -847,7 +845,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { table_paths, output_schema: Some(conf.output_schema.as_ref().try_into()?), table_partition_cols, - writer_mode: writer_mode.into(), single_file_output: conf.single_file_output, unbounded_input: conf.unbounded_input, overwrite: conf.overwrite, @@ -856,16 +853,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { } } -impl From for protobuf::FileWriterMode { - fn from(value: FileWriterMode) -> Self { - match value { - FileWriterMode::Append => Self::Append, - FileWriterMode::Put => Self::Put, - FileWriterMode::PutMultipart => Self::PutMultipart, - } - } -} - impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant { fn from(value: &CompressionTypeVariant) -> Self { match value { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 076ca415810a..23b0ea43c73a 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -22,7 +22,6 @@ use datafusion::arrow::array::ArrayRef; use datafusion::arrow::compute::kernels::sort::SortOptions; use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema}; use datafusion::datasource::file_format::json::JsonSink; -use datafusion::datasource::file_format::write::FileWriterMode; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ @@ -732,7 +731,6 @@ fn roundtrip_json_sink() -> Result<()> { table_paths: vec![ListingTableUrl::parse("file:///")?], output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], - writer_mode: FileWriterMode::Put, single_file_output: true, unbounded_input: false, overwrite: true, diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 92013f37d36c..67cabb0988fd 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -702,7 +702,7 @@ select array_element(make_array(1, 2, 3, 4, 5), 0), array_element(make_array('h' NULL NULL # array_element scalar function #4 (with NULL) -query error +query error select array_element(make_array(1, 2, 3, 4, 5), NULL), array_element(make_array('h', 'e', 'l', 'l', 'o'), NULL); # array_element scalar function #5 (with negative index) @@ -871,11 +871,11 @@ select array_slice(make_array(1, 2, 3, 4, 5), 0, 4), array_slice(make_array('h', [1, 2, 3, 4] [h, e, l] # array_slice scalar function #8 (with NULL and positive number) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), NULL, 4), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL, 3); # array_slice scalar function #9 (with positive number and NULL) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), 2, NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), 3, NULL); # array_slice scalar function #10 (with zero-zero) @@ -885,7 +885,7 @@ select array_slice(make_array(1, 2, 3, 4, 5), 0, 0), array_slice(make_array('h', [] [] # array_slice scalar function #11 (with NULL-NULL) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL); # array_slice scalar function #12 (with zero and negative number) @@ -895,11 +895,11 @@ select array_slice(make_array(1, 2, 3, 4, 5), 0, -4), array_slice(make_array('h' [1] [h, e] # array_slice scalar function #13 (with negative number and NULL) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), 2, NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), 3, NULL); # array_slice scalar function #14 (with NULL and negative number) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), NULL, -4), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL, -3); # array_slice scalar function #15 (with negative indexes) diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 6e4a711a0115..fbf1523477b1 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -32,7 +32,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_output=false options: (compression 'zstd(10)') --TableScan: source_table projection=[col1, col2] physical_plan -FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index 4aded8a576fb..e3b2610e51be 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -133,4 +133,4 @@ order by c9 statement error Inconsistent data type across values list at row 1 column 0. Was Int64 but found Utf8 -create table foo as values (1), ('foo'); \ No newline at end of file +create table foo as values (1), ('foo'); diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 9726c35a319e..ed0dd9467294 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] physical_plan -FileSinkExec: sink=CsvSink(writer_mode=Append, file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) +FileSinkExec: sink=CsvSink(file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index 9860bdcae05c..a100b5ac6b85 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -289,7 +289,7 @@ insert into table_without_values values(2, NULL); ---- 1 -# insert NULL values for the missing column (field2) +# insert NULL values for the missing column (field2) query II insert into table_without_values(field1) values(3); ---- diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 44410362412c..204199452c42 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -100,7 +100,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test] --Projection: column1 AS a, column2 AS b ----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... physical_plan -FileSinkExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=CsvSink(file_groups=[]) --SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] ----ProjectionExec: expr=[column1@0 as a, column2@1 as b] ------ValuesExec @@ -267,8 +267,6 @@ INSERT INTO single_file_test values (4, 5), (6, 7); query II select * from single_file_test; ---- -1 2 -3 4 4 5 6 7 @@ -315,7 +313,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(file_groups=[]) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -378,7 +376,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(file_groups=[]) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] @@ -422,7 +420,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +FileSinkExec: sink=ParquetSink(file_groups=[]) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index fa3a6cff8c4a..c212f996cf17 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3409,4 +3409,3 @@ set datafusion.optimizer.prefer_existing_sort = false; statement ok drop table annotated_data; - diff --git a/datafusion/sqllogictest/test_files/options.slt b/datafusion/sqllogictest/test_files/options.slt index 83fe85745ef8..9366a9b3b3c8 100644 --- a/datafusion/sqllogictest/test_files/options.slt +++ b/datafusion/sqllogictest/test_files/options.slt @@ -84,7 +84,7 @@ statement ok drop table a # test datafusion.sql_parser.parse_float_as_decimal -# +# # default option value is false query RR select 10000000000000000000.01, -10000000000000000000.01 @@ -209,5 +209,3 @@ select -123456789.0123456789012345678901234567890 # Restore option to default value statement ok set datafusion.sql_parser.parse_float_as_decimal = false; - - diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 8148f1c4c7c9..9c5d1704f42b 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -447,7 +447,7 @@ statement ok drop table multiple_ordered_table; # Create tables having some ordered columns. In the next step, we will expect to observe that scalar -# functions, such as mathematical functions like atan(), ceil(), sqrt(), or date_time functions +# functions, such as mathematical functions like atan(), ceil(), sqrt(), or date_time functions # like date_bin() and date_trunc(), will maintain the order of its argument columns. statement ok CREATE EXTERNAL TABLE csv_with_timestamps ( diff --git a/datafusion/sqllogictest/test_files/predicates.slt b/datafusion/sqllogictest/test_files/predicates.slt index d22b2ff953b7..e992a440d0a2 100644 --- a/datafusion/sqllogictest/test_files/predicates.slt +++ b/datafusion/sqllogictest/test_files/predicates.slt @@ -495,6 +495,7 @@ set datafusion.execution.parquet.bloom_filter_enabled=true; query T SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'foo'; +---- query T SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'test'; diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index 714e1e995e26..440fb2c6ef2b 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -243,4 +243,4 @@ statement ok SET TIME ZONE = 'Asia/Taipei2' statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2": 'Asia/Taipei2' is not a valid timezone -SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ \ No newline at end of file +SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index c88082fc7272..6412c3ca859e 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -89,4 +89,4 @@ Dml: op=[Update] table=[t1] ------CrossJoin: --------SubqueryAlias: t ----------TableScan: t1 ---------TableScan: t2 \ No newline at end of file +--------TableScan: t2 From 8e1c2d2013c71eac88c72c5ec75c7a98b0c4b2d4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Nov 2023 15:26:35 +0000 Subject: [PATCH 2/4] Don't ignore test --- datafusion/core/tests/fifo.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 67841ffec07e..93c7f7368065 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -323,7 +323,6 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] - #[ignore] async fn test_sql_insert_into_fifo() -> Result<()> { // To make unbounded deterministic let waiting = Arc::new(AtomicBool::new(true)); From 7a5d876356ef15ea0ebaf624766587b20819397c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 16 Nov 2023 16:58:24 +0000 Subject: [PATCH 3/4] Error on insert to single file --- .../core/src/datasource/listing/table.rs | 7 +++++++ datafusion/core/src/datasource/listing/url.rs | 8 ++++++-- datafusion/core/src/datasource/stream.rs | 4 ++-- .../test_files/insert_to_external.slt | 18 ++++++++++++++++++ 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 84c69c833d77..515bc8a9e612 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -770,6 +770,13 @@ impl TableProvider for ListingTable { } let table_path = &self.table_paths()[0]; + if !table_path.is_collection() { + return plan_err!( + "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \ + To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE" + ); + } + // Get the object store for the table path. let store = state.runtime_env().object_store(table_path)?; diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 9197e37adbd5..ba3c3fae21e2 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -181,6 +181,11 @@ impl ListingTableUrl { } } + /// Returns `true` if `path` refers to a collection of objects + pub fn is_collection(&self) -> bool { + self.url.as_str().ends_with('/') + } + /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning /// an iterator of the remaining path segments pub(crate) fn strip_prefix<'a, 'b: 'a>( @@ -203,8 +208,7 @@ impl ListingTableUrl { file_extension: &'a str, ) -> Result>> { // If the prefix is a file, use a head request, otherwise list - let is_dir = self.url.as_str().ends_with('/'); - let list = match is_dir { + let list = match self.is_collection() { true => match ctx.runtime_env().cache_manager.get_list_files_cache() { None => futures::stream::once(store.list(Some(&self.prefix))) .try_flatten() diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index cf95dd249a7f..2ec07287bfe6 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -165,7 +165,7 @@ impl StreamConfig { match &self.encoding { StreamEncoding::Csv => { let header = self.header && !self.location.exists(); - let file = OpenOptions::new().write(true).open(&self.location)?; + let file = OpenOptions::new().append(true).open(&self.location)?; let writer = arrow::csv::WriterBuilder::new() .with_header(header) .build(file); @@ -173,7 +173,7 @@ impl StreamConfig { Ok(Box::new(writer)) } StreamEncoding::Json => { - let file = OpenOptions::new().write(true).open(&self.location)?; + let file = OpenOptions::new().append(true).open(&self.location)?; Ok(Box::new(arrow::json::LineDelimitedWriter::new(file))) } } diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 204199452c42..39323479ff74 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -254,6 +254,22 @@ create_local_path 'true', single_file 'true', ); +query error DataFusion error: Error during planning: Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`\. To append to an existing file use StreamTable, e\.g\. by using CREATE UNBOUNDED EXTERNAL TABLE +INSERT INTO single_file_test values (1, 2), (3, 4); + +statement ok +drop table single_file_test; + +statement ok +CREATE UNBOUNDED EXTERNAL TABLE +single_file_test(a bigint, b bigint) +STORED AS csv +LOCATION 'test_files/scratch/insert_to_external/single_csv_table.csv' +OPTIONS( +create_local_path 'true', +single_file 'true', +); + query II INSERT INTO single_file_test values (1, 2), (3, 4); ---- @@ -267,6 +283,8 @@ INSERT INTO single_file_test values (4, 5), (6, 7); query II select * from single_file_test; ---- +1 2 +3 4 4 5 6 7 From 0ae3d9e671d3c8c78328e7e1abc7b8f30f24a867 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 16 Nov 2023 17:32:26 +0000 Subject: [PATCH 4/4] Improve DisplayAs --- datafusion/core/src/datasource/stream.rs | 7 ++++++- datafusion/sqllogictest/test_files/explain.slt | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 2ec07287bfe6..7c17ae2e1369 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -283,7 +283,12 @@ struct StreamWrite(Arc); impl DisplayAs for StreamWrite { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{self:?}") + f.debug_struct("StreamWrite") + .field("location", &self.0.location) + .field("batch_size", &self.0.batch_size) + .field("encoding", &self.0.encoding) + .field("header", &self.0.header) + .finish_non_exhaustive() } } diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index ed0dd9467294..129814767ca2 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -140,7 +140,7 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te # create a sink table, path is same with aggregate_test_100 table # we do not overwrite this file, we only assert plan. statement ok -CREATE EXTERNAL TABLE sink_table ( +CREATE UNBOUNDED EXTERNAL TABLE sink_table ( c1 VARCHAR NOT NULL, c2 TINYINT NOT NULL, c3 SMALLINT NOT NULL, @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] physical_plan -FileSinkExec: sink=CsvSink(file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) +FileSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true