diff --git a/.gitignore b/.gitignore index 65d3c0f345e3..203455e4a796 100644 --- a/.gitignore +++ b/.gitignore @@ -103,4 +103,7 @@ datafusion/CHANGELOG.md.bak .githubchangeloggenerator.cache* # Generated tpch data -datafusion/core/tests/sqllogictests/test_files/tpch/data/* +datafusion/sqllogictests/test_files/tpch/data/* + +# Scratch temp dir for sqllogictests +datafusion/sqllogictest/test_files/scratch* diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 5b1983f5678e..232bedfe0bfa 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -25,6 +25,7 @@ use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; use datafusion_common::{DataFusionError, SchemaError, UnnestOptions}; +use datafusion_expr::dml::OutputFileFormat; use parquet::file::properties::WriterProperties; use datafusion_common::{Column, DFSchema, ScalarValue}; @@ -37,7 +38,6 @@ use crate::arrow::datatypes::Schema; use crate::arrow::datatypes::SchemaRef; use crate::arrow::record_batch::RecordBatch; use crate::arrow::util::pretty; -use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::datasource::{provider_as_source, MemTable, TableProvider}; use crate::error::Result; use crate::execution::{ @@ -992,28 +992,55 @@ impl DataFrame { } /// Write a `DataFrame` to a CSV file. - pub async fn write_csv(self, path: &str) -> Result<()> { - let plan = self.session_state.create_physical_plan(&self.plan).await?; - let task_ctx = Arc::new(self.task_ctx()); - plan_to_csv(task_ctx, plan, path).await + pub async fn write_csv( + self, + path: &str, + ) -> Result, DataFusionError> { + let plan = LogicalPlanBuilder::copy_to( + self.plan, + path.into(), + OutputFileFormat::CSV, + true, + // TODO implement options + vec![], + )? + .build()?; + DataFrame::new(self.session_state, plan).collect().await } /// Write a `DataFrame` to a Parquet file. pub async fn write_parquet( self, path: &str, - writer_properties: Option, - ) -> Result<()> { - let plan = self.session_state.create_physical_plan(&self.plan).await?; - let task_ctx = Arc::new(self.task_ctx()); - plan_to_parquet(task_ctx, plan, path, writer_properties).await + _writer_properties: Option, + ) -> Result, DataFusionError> { + let plan = LogicalPlanBuilder::copy_to( + self.plan, + path.into(), + OutputFileFormat::PARQUET, + true, + // TODO implement options + vec![], + )? + .build()?; + DataFrame::new(self.session_state, plan).collect().await } /// Executes a query and writes the results to a partitioned JSON file. - pub async fn write_json(self, path: impl AsRef) -> Result<()> { - let plan = self.session_state.create_physical_plan(&self.plan).await?; - let task_ctx = Arc::new(self.task_ctx()); - plan_to_json(task_ctx, plan, path).await + pub async fn write_json( + self, + path: &str, + ) -> Result, DataFusionError> { + let plan = LogicalPlanBuilder::copy_to( + self.plan, + path.into(), + OutputFileFormat::JSON, + true, + // TODO implement options + vec![], + )? + .build()?; + DataFrame::new(self.session_state, plan).collect().await } /// Add an additional column to the DataFrame. diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index c3ab50fd430c..59c4fedeff1f 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -47,7 +47,7 @@ use crate::datasource::physical_plan::{ }; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::insert::{DataSink, InsertExec}; +use crate::physical_plan::insert::{DataSink, FileSinkExec}; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use rand::distributions::{Alphanumeric, DistString}; @@ -277,6 +277,7 @@ impl FileFormat for CsvFormat { "Inserting compressed CSV is not implemented yet.".into(), )); } + let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new( conf, @@ -285,7 +286,7 @@ impl FileFormat for CsvFormat { self.file_compression_type, )); - Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } } @@ -505,12 +506,14 @@ impl DataSink for CsvSink { let object_store = context .runtime_env() .object_store(&self.config.object_store_url)?; - // Construct serializer and writer for each file group let mut serializers: Vec> = vec![]; let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { + if !self.config.per_thread_output { + return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for CsvSink in Append mode".into())); + } for file_group in &self.config.file_groups { // In append mode, consider has_header flag only when file is empty (at the start). // For other modes, use has_header flag as is. @@ -542,38 +545,72 @@ impl DataSink for CsvSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files - let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions { - let header = self.has_header; - let builder = WriterBuilder::new().with_delimiter(self.delimiter); - let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); - let file_path = base_path - .prefix() - .child(format!("/{}_{}.csv", write_id, part_idx)); - let object_meta = ObjectMeta { - location: file_path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - self.config.writer_mode, - self.file_compression_type, - object_meta.into(), - object_store.clone(), - ) - .await?; - - serializers.push(Box::new(serializer)); - writers.push(writer); + match self.config.per_thread_output { + true => { + // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = + Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let header = self.has_header; + let builder = + WriterBuilder::new().with_delimiter(self.delimiter); + let serializer = CsvSerializer::new() + .with_builder(builder) + .with_header(header); + serializers.push(Box::new(serializer)); + let file_path = base_path + .prefix() + .child(format!("{}_{}.csv", write_id, part_idx)); + let object_meta = ObjectMeta { + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + object_meta.into(), + object_store.clone(), + ) + .await?; + writers.push(writer); + } + } + false => { + let header = self.has_header; + let builder = WriterBuilder::new().with_delimiter(self.delimiter); + let serializer = CsvSerializer::new() + .with_builder(builder) + .with_header(header); + serializers.push(Box::new(serializer)); + let file_path = base_path.prefix(); + let object_meta = ObjectMeta { + location: file_path.clone(), + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + object_meta.into(), + object_store.clone(), + ) + .await?; + writers.push(writer); + } } } } - stateless_serialize_and_write_files(data, serializers, writers).await + stateless_serialize_and_write_files( + data, + serializers, + writers, + self.config.per_thread_output, + ) + .await } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 8472f4e5c164..6870fc1b417c 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -43,7 +43,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore}; use crate::datasource::physical_plan::FileGroupDisplay; use crate::physical_plan::insert::DataSink; -use crate::physical_plan::insert::InsertExec; +use crate::physical_plan::insert::FileSinkExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; @@ -187,7 +187,7 @@ impl FileFormat for JsonFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(JsonSink::new(conf, self.file_compression_type)); - Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } } @@ -280,6 +280,9 @@ impl DataSink for JsonSink { let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { + if !self.config.per_thread_output { + return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for JsonSink in Append mode".into())); + } for file_group in &self.config.file_groups { let serializer = JsonSerializer::new(); serializers.push(Box::new(serializer)); @@ -303,33 +306,63 @@ impl DataSink for JsonSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files - let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions { - let serializer = JsonSerializer::new(); - serializers.push(Box::new(serializer)); - let file_path = base_path - .prefix() - .child(format!("/{}_{}.json", write_id, part_idx)); - let object_meta = ObjectMeta { - location: file_path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - self.config.writer_mode, - self.file_compression_type, - object_meta.into(), - object_store.clone(), - ) - .await?; - writers.push(writer); + match self.config.per_thread_output { + true => { + // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = + Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let serializer = JsonSerializer::new(); + serializers.push(Box::new(serializer)); + let file_path = base_path + .prefix() + .child(format!("{}_{}.json", write_id, part_idx)); + let object_meta = ObjectMeta { + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + object_meta.into(), + object_store.clone(), + ) + .await?; + writers.push(writer); + } + } + false => { + let serializer = JsonSerializer::new(); + serializers.push(Box::new(serializer)); + let file_path = base_path.prefix(); + let object_meta = ObjectMeta { + location: file_path.clone(), + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = create_writer( + self.config.writer_mode, + self.file_compression_type, + object_meta.into(), + object_store.clone(), + ) + .await?; + writers.push(writer); + } } } } - stateless_serialize_and_write_files(data, serializers, writers).await + stateless_serialize_and_write_files( + data, + serializers, + writers, + self.config.per_thread_output, + ) + .await } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index de3ec3ffb799..6688d3dd372b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -53,11 +53,12 @@ use crate::config::ConfigOptions; use crate::datasource::physical_plan::{ FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, }; + use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::insert::{DataSink, InsertExec}; +use crate::physical_plan::insert::{DataSink, FileSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics, @@ -238,7 +239,7 @@ impl FileFormat for ParquetFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ParquetSink::new(conf)); - Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } } @@ -604,7 +605,6 @@ impl DisplayAs for ParquetSink { } /// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding -/// TODO use upstream version: fn parse_encoding_string(str_setting: &str) -> Result { let str_setting_lower: &str = &str_setting.to_lowercase(); match str_setting_lower { @@ -668,7 +668,6 @@ fn require_level(codec: &str, level: Option) -> Result { } /// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression -/// TODO use upstream version: fn parse_compression_string(str_setting: &str) -> Result { let str_setting_lower: &str = &str_setting.to_lowercase(); let (codec, level) = split_compression_string(str_setting_lower)?; @@ -719,7 +718,6 @@ fn parse_compression_string(str_setting: &str) -> Result fn parse_version_string(str_setting: &str) -> Result { let str_setting_lower: &str = &str_setting.to_lowercase(); match str_setting_lower { @@ -732,7 +730,6 @@ fn parse_version_string(str_setting: &str) -> Result { } } -/// TODO use upstream version: fn parse_statistics_string(str_setting: &str) -> Result { let str_setting_lower: &str = &str_setting.to_lowercase(); match str_setting_lower { @@ -848,26 +845,48 @@ impl DataSink for ParquetSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files - let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions { - let file_path = base_path - .prefix() - .child(format!("/{}_{}.parquet", write_id, part_idx)); - let object_meta = ObjectMeta { - location: file_path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = self - .create_writer( - object_meta.into(), - object_store.clone(), - parquet_props.clone(), - ) - .await?; - writers.push(writer); + match self.config.per_thread_output { + true => { + // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = + Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let file_path = base_path + .prefix() + .child(format!("{}_{}.parquet", write_id, part_idx)); + let object_meta = ObjectMeta { + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = self + .create_writer( + object_meta.into(), + object_store.clone(), + parquet_props.clone(), + ) + .await?; + writers.push(writer); + } + } + false => { + let file_path = base_path.prefix(); + let object_meta = ObjectMeta { + location: file_path.clone(), + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = self + .create_writer( + object_meta.into(), + object_store.clone(), + parquet_props.clone(), + ) + .await?; + writers.push(writer); + } } } } @@ -875,8 +894,12 @@ impl DataSink for ParquetSink { let mut row_count = 0; // TODO parallelize serialization accross partitions and batches within partitions // see: https://github.com/apache/arrow-datafusion/issues/7079 - for idx in 0..num_partitions { - while let Some(batch) = data[idx].next().await.transpose()? { + for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { + let idx = match self.config.per_thread_output { + true => part_idx, + false => 0, + }; + while let Some(batch) = data_stream.next().await.transpose()? { row_count += batch.num_rows(); // TODO cleanup all multipart writes when any encounters an error writers[idx].write(&batch).await?; diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index c256c9689ab9..3c005894f6e6 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -328,16 +328,29 @@ pub(crate) async fn stateless_serialize_and_write_files( mut data: Vec, mut serializers: Vec>, mut writers: Vec>>, + per_thread_output: bool, ) -> Result { + if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) { + return Err(DataFusionError::Internal( + "per_thread_output is false, but got more than 1 writer!".into(), + )); + } let num_partitions = data.len(); + if per_thread_output && (num_partitions != writers.len()) { + return Err(DataFusionError::Internal("per_thread_output is true, but did not get 1 writer for each output partition!".into())); + } let mut row_count = 0; // Map errors to DatafusionError. let err_converter = |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); // TODO parallelize serialization accross partitions and batches within partitions // see: https://github.com/apache/arrow-datafusion/issues/7079 - for idx in 0..num_partitions { - while let Some(maybe_batch) = data[idx].next().await { + for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { + let idx = match per_thread_output { + true => part_idx, + false => 0, + }; + while let Some(maybe_batch) = data_stream.next().await { // Write data to files in a round robin fashion: let serializer = &mut serializers[idx]; let batch = check_for_errors(maybe_batch, &mut writers).await?; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d4e2c4aafeb5..5cc31c8397c2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -883,6 +883,8 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), writer_mode, + // TODO: when listing table is known to be backed by a single file, this should be false + per_thread_output: true, overwrite, }; diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index a4940f57f8a5..0c017a517cda 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -107,7 +107,6 @@ impl ListingTableUrl { .map_err(|_| DataFusionError::Internal(format!("Can not open path: {s}")))?; // TODO: Currently we do not have an IO-related error variant that accepts () // or a string. Once we have such a variant, change the error type above. - Ok(Self::new(url, glob)) } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 0441ac70585d..54ebfd5cabb6 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -35,7 +35,7 @@ use crate::datasource::{TableProvider, TableType}; use crate::error::Result; use crate::execution::context::SessionState; use crate::logical_expr::Expr; -use crate::physical_plan::insert::{DataSink, InsertExec}; +use crate::physical_plan::insert::{DataSink, FileSinkExec}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::{common, SendableRecordBatchStream}; use crate::physical_plan::{repartition::RepartitionExec, Partitioning}; @@ -219,7 +219,11 @@ impl TableProvider for MemTable { )); } let sink = Arc::new(MemSink::new(self.batches.clone())); - Ok(Arc::new(InsertExec::new(input, sink, self.schema.clone()))) + Ok(Arc::new(FileSinkExec::new( + input, + sink, + self.schema.clone(), + ))) } } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 6bf5e3634021..e3ac81173660 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -660,7 +660,7 @@ mod tests { use futures::StreamExt; use object_store::local::LocalFileSystem; use rstest::*; - use std::fs::File; + use std::fs::{self, File}; use std::io::Write; use tempfile::TempDir; use url::Url; @@ -1191,11 +1191,32 @@ mod tests { Field::new("c2", DataType::UInt64, false), ])); + // get name of first part + let paths = fs::read_dir(&out_dir).unwrap(); + let mut part_0_name: String = "".to_owned(); + for path in paths { + let path = path.unwrap(); + let name = path + .path() + .file_name() + .expect("Should be a file name") + .to_str() + .expect("Should be a str") + .to_owned(); + if name.ends_with("_0.csv") { + part_0_name = name; + break; + } + } + + if part_0_name.is_empty() { + panic!("Did not find part_0 in csv output files!") + } // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); ctx.register_csv( "part0", - &format!("{out_dir}/part-0.csv"), + &format!("{out_dir}/{part_0_name}"), csv_read_option.clone(), ) .await?; diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index b8ad2aa0a62f..62fcc320eb45 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -331,6 +331,7 @@ mod tests { use crate::test::partitioned_file_groups; use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array}; use rstest::*; + use std::fs; use std::path::Path; use tempfile::TempDir; use url::Url; @@ -699,11 +700,33 @@ mod tests { // create a new context and verify that the results were saved to a partitioned csv file let ctx = SessionContext::new(); + // get name of first part + let paths = fs::read_dir(&out_dir).unwrap(); + let mut part_0_name: String = "".to_owned(); + for path in paths { + let name = path + .unwrap() + .path() + .file_name() + .expect("Should be a file name") + .to_str() + .expect("Should be a str") + .to_owned(); + if name.ends_with("_0.json") { + part_0_name = name; + break; + } + } + + if part_0_name.is_empty() { + panic!("Did not find part_0 in json output files!") + } + // register each partition as well as the top level dir let json_read_option = NdJsonReadOptions::default(); ctx.register_json( "part0", - &format!("{out_dir}/part-0.json"), + &format!("{out_dir}/{part_0_name}"), json_read_option.clone(), ) .await?; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index b0914b081616..06c16ad7519d 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -332,6 +332,10 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// A writer mode that determines how data is written to the file pub writer_mode: FileWriterMode, + /// If false, it is assumed there is a single table_path which is a file to which all data should be written + /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory + /// to which each output partition is written to its own output file. + pub per_thread_output: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 24243ec749dd..3ef1d13c2693 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -768,7 +768,7 @@ mod tests { use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::ObjectMeta; - use std::fs::File; + use std::fs::{self, File}; use std::io::Write; use tempfile::TempDir; use url::Url; @@ -1956,31 +1956,46 @@ mod tests { df.write_parquet(out_dir_url, None).await?; // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?; - // create a new context and verify that the results were saved to a partitioned csv file + // create a new context and verify that the results were saved to a partitioned parquet file let ctx = SessionContext::new(); + // get write_id + let mut paths = fs::read_dir(&out_dir).unwrap(); + let path = paths.next(); + let name = path + .unwrap()? + .path() + .file_name() + .expect("Should be a file name") + .to_str() + .expect("Should be a str") + .to_owned(); + println!("{name}"); + let (parsed_id, _) = name.split_once('_').expect("File should contain _ !"); + let write_id = parsed_id.to_owned(); + // register each partition as well as the top level dir ctx.register_parquet( "part0", - &format!("{out_dir}/part-0.parquet"), + &format!("{out_dir}/{write_id}_0.parquet"), ParquetReadOptions::default(), ) .await?; ctx.register_parquet( "part1", - &format!("{out_dir}/part-1.parquet"), + &format!("{out_dir}/{write_id}_1.parquet"), ParquetReadOptions::default(), ) .await?; ctx.register_parquet( "part2", - &format!("{out_dir}/part-2.parquet"), + &format!("{out_dir}/{write_id}_2.parquet"), ParquetReadOptions::default(), ) .await?; ctx.register_parquet( "part3", - &format!("{out_dir}/part-3.parquet"), + &format!("{out_dir}/{write_id}_3.parquet"), ParquetReadOptions::default(), ) .await?; diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 6a81f8969611..48965addc7eb 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -119,10 +119,10 @@ pub trait TableProvider: Sync + Send { /// /// # See Also /// - /// See [`InsertExec`] for the common pattern of inserting a - /// single stream of `RecordBatch`es. + /// See [`FileSinkExec`] for the common pattern of inserting a + /// streams of `RecordBatch`es as files to an ObjectStore. /// - /// [`InsertExec`]: crate::physical_plan::insert::InsertExec + /// [`FileSinkExec`]: crate::physical_plan::insert::FileSinkExec async fn insert_into( &self, _state: &SessionState, diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index a05cb5fb15ad..acbca834f29c 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -64,7 +64,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { /// Execution plan for writing record batches to a [`DataSink`] /// /// Returns a single row with the number of values written -pub struct InsertExec { +pub struct FileSinkExec { /// Input plan that produces the record batches to be written. input: Arc, /// Sink to which to write @@ -75,13 +75,13 @@ pub struct InsertExec { count_schema: SchemaRef, } -impl fmt::Debug for InsertExec { +impl fmt::Debug for FileSinkExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "InsertExec schema: {:?}", self.count_schema) + write!(f, "FileSinkExec schema: {:?}", self.count_schema) } } -impl InsertExec { +impl FileSinkExec { /// Create a plan to write to `sink` pub fn new( input: Arc, @@ -149,7 +149,7 @@ impl InsertExec { } } -impl DisplayAs for InsertExec { +impl DisplayAs for FileSinkExec { fn fmt_as( &self, t: DisplayFormatType, @@ -164,7 +164,7 @@ impl DisplayAs for InsertExec { } } -impl ExecutionPlan for InsertExec { +impl ExecutionPlan for FileSinkExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self @@ -233,7 +233,7 @@ impl ExecutionPlan for InsertExec { ) -> Result { if partition != 0 { return Err(DataFusionError::Internal( - "InsertExec can only be called on partition 0!".into(), + "FileSinkExec can only be called on partition 0!".into(), )); } let data = self.execute_all_input_streams(context.clone())?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6b868b9b2424..f154c2a17364 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -17,6 +17,15 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] +use crate::datasource::file_format::arrow::ArrowFormat; +use crate::datasource::file_format::avro::AvroFormat; +use crate::datasource::file_format::csv::CsvFormat; +use crate::datasource::file_format::json::JsonFormat; +use crate::datasource::file_format::parquet::ParquetFormat; +use crate::datasource::file_format::write::FileWriterMode; +use crate::datasource::file_format::FileFormat; +use crate::datasource::listing::ListingTableUrl; +use crate::datasource::physical_plan::FileSinkConfig; use crate::datasource::source_as_provider; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; @@ -29,6 +38,8 @@ use crate::logical_expr::{ Repartition, Union, UserDefinedLogicalNode, }; use datafusion_common::display::ToStringifiedPlan; +use datafusion_expr::dml::{CopyTo, OutputFileFormat}; +use url::Url; use crate::logical_expr::{Limit, Values}; use crate::physical_expr::create_physical_expr; @@ -80,6 +91,7 @@ use itertools::{multiunzip, Itertools}; use log::{debug, trace}; use std::collections::HashMap; use std::fmt::Write; +use std::fs; use std::sync::Arc; fn create_function_physical_name( @@ -544,6 +556,67 @@ impl DefaultPhysicalPlanner { let unaliased: Vec = filters.into_iter().map(unalias).collect(); source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await } + LogicalPlan::Copy(CopyTo{ + input, + output_url, + file_format, + per_thread_output, + options: _, + }) => { + let input_exec = self.create_initial_plan(input, session_state).await?; + + // Get object store for specified output_url + // if user did not pass in a url, we assume it is a local file path + // this requires some special handling as copy can create non + // existing file paths + let is_valid_url = Url::parse(output_url).is_ok(); + + // TODO: make this behavior configurable via options (should copy to create path/file as needed?) + // TODO: add additional configurable options for if existing files should be overwritten or + // appended to + let parsed_url = match is_valid_url { + true => ListingTableUrl::parse(output_url), + false => { + let path = std::path::PathBuf::from(output_url); + if !path.exists(){ + if *per_thread_output{ + fs::create_dir_all(path)?; + } else{ + fs::File::create(path)?; + } + } + ListingTableUrl::parse(output_url) + } + }?; + + let object_store_url = parsed_url.object_store(); + + let schema: Schema = (**input.schema()).clone().into(); + + // Set file sink related options + let config = FileSinkConfig { + object_store_url, + table_paths: vec![parsed_url], + file_groups: vec![], + output_schema: Arc::new(schema), + table_partition_cols: vec![], + writer_mode: FileWriterMode::PutMultipart, + per_thread_output: *per_thread_output, + overwrite: false, + }; + + // TODO: implement statement level overrides for each file type + // E.g. CsvFormat::from_options(options) + let sink_format: Arc = match file_format { + OutputFileFormat::CSV => Arc::new(CsvFormat::default()), + OutputFileFormat::PARQUET => Arc::new(ParquetFormat::default()), + OutputFileFormat::JSON => Arc::new(JsonFormat::default()), + OutputFileFormat::AVRO => Arc::new(AvroFormat {} ), + OutputFileFormat::ARROW => Arc::new(ArrowFormat {}), + }; + + sink_format.create_writer_physical_plan(input_exec, session_state, config).await + } LogicalPlan::Dml(DmlStatement { table_name, op: WriteOp::InsertInto, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f89be03f7937..29d7571d36fc 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -17,6 +17,7 @@ //! This module provides a builder for creating LogicalPlans +use crate::dml::{CopyTo, OutputFileFormat}; use crate::expr::Alias; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, @@ -232,6 +233,23 @@ impl LogicalPlanBuilder { Self::scan_with_filters(table_name, table_source, projection, vec![]) } + /// Create a [CopyTo] for copying the contents of this builder to the specified file(s) + pub fn copy_to( + input: LogicalPlan, + output_url: String, + file_format: OutputFileFormat, + per_thread_output: bool, + options: Vec<(String, String)>, + ) -> Result { + Ok(Self::from(LogicalPlan::Copy(CopyTo { + input: Arc::new(input), + output_url, + file_format, + per_thread_output, + options, + }))) + } + /// Create a [DmlStatement] for inserting the contents of this builder into the named table pub fn insert_into( input: LogicalPlan, diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 07f34101eb3a..ecdea7dcc652 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -17,13 +17,71 @@ use std::{ fmt::{self, Display}, + str::FromStr, sync::Arc, }; -use datafusion_common::{DFSchemaRef, OwnedTableReference}; +use datafusion_common::{DFSchemaRef, DataFusionError, OwnedTableReference}; use crate::LogicalPlan; +/// Operator that copies the contents of a database to file(s) +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct CopyTo { + /// The relation that determines the tuples to write to the output file(s) + pub input: Arc, + /// The location to write the file(s) + pub output_url: String, + /// The file format to output (explicitly defined or inferred from file extension) + pub file_format: OutputFileFormat, + /// If false, it is assumed output_url is a file to which all data should be written + /// regardless of input partitioning. Otherwise, output_url is assumed to be a directory + /// to which each output partition is written to its own output file + pub per_thread_output: bool, + /// Arbitrary options as tuples + pub options: Vec<(String, String)>, +} + +/// The file formats that CopyTo can output +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum OutputFileFormat { + CSV, + JSON, + PARQUET, + AVRO, + ARROW, +} + +impl FromStr for OutputFileFormat { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "csv" => Ok(OutputFileFormat::CSV), + "json" => Ok(OutputFileFormat::JSON), + "parquet" => Ok(OutputFileFormat::PARQUET), + "avro" => Ok(OutputFileFormat::AVRO), + "arrow" => Ok(OutputFileFormat::ARROW), + _ => Err(DataFusionError::NotImplemented(format!( + "Unknown or not supported file format {s}!" + ))), + } + } +} + +impl Display for OutputFileFormat { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let out = match self { + OutputFileFormat::CSV => "csv", + OutputFileFormat::JSON => "json", + OutputFileFormat::PARQUET => "parquet", + OutputFileFormat::AVRO => "avro", + OutputFileFormat::ARROW => "arrow", + }; + write!(f, "{}", out) + } +} + /// The operator that modifies the content of a database (adapted from /// substrait WriteRel) #[derive(Clone, PartialEq, Eq, Hash)] diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 01862c3d5427..8316417138bd 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -18,7 +18,7 @@ pub mod builder; mod ddl; pub mod display; -mod dml; +pub mod dml; mod extension; mod plan; mod statement; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3557745ed346..1ee4fb810da9 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -31,6 +31,7 @@ use crate::{ build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, TableSource, }; +use super::dml::CopyTo; use super::DdlStatement; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -120,6 +121,8 @@ pub enum LogicalPlan { Dml(DmlStatement), /// CREATE / DROP TABLES / VIEWS / SCHEMAs Ddl(DdlStatement), + /// COPY TO + Copy(CopyTo), /// Describe the schema of table DescribeTable(DescribeTable), /// Unnest a column that contains a nested list type. @@ -157,6 +160,7 @@ impl LogicalPlan { dummy_schema } LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema, + LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(), LogicalPlan::Ddl(ddl) => ddl.schema(), LogicalPlan::Unnest(Unnest { schema, .. }) => schema, } @@ -203,6 +207,7 @@ impl LogicalPlan { | LogicalPlan::EmptyRelation(_) | LogicalPlan::Ddl(_) | LogicalPlan::Dml(_) + | LogicalPlan::Copy(_) | LogicalPlan::Values(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Union(_) @@ -343,6 +348,7 @@ impl LogicalPlan { | LogicalPlan::Distinct(_) | LogicalPlan::Dml(_) | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::Prepare(_) => Ok(()), } @@ -371,6 +377,7 @@ impl LogicalPlan { LogicalPlan::Explain(explain) => vec![&explain.plan], LogicalPlan::Analyze(analyze) => vec![&analyze.input], LogicalPlan::Dml(write) => vec![&write.input], + LogicalPlan::Copy(copy) => vec![©.input], LogicalPlan::Ddl(ddl) => ddl.inputs(), LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], LogicalPlan::Prepare(Prepare { input, .. }) => vec![input], @@ -477,6 +484,7 @@ impl LogicalPlan { | LogicalPlan::Analyze(_) | LogicalPlan::Extension(_) | LogicalPlan::Dml(_) + | LogicalPlan::Copy(_) | LogicalPlan::Ddl(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::Unnest(_) => Ok(None), @@ -640,6 +648,7 @@ impl LogicalPlan { | LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Dml(_) + | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::Prepare(_) | LogicalPlan::Statement(_) @@ -1083,6 +1092,24 @@ impl LogicalPlan { LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => { write!(f, "Dml: op=[{op}] table=[{table_name}]") } + LogicalPlan::Copy(CopyTo { + input: _, + output_url, + file_format, + per_thread_output, + options, + }) => { + let mut op_str = String::new(); + op_str.push('('); + for (key, val) in options { + if !op_str.is_empty() { + op_str.push(','); + } + op_str.push_str(&format!("{key} {val}")); + } + op_str.push(')'); + write!(f, "CopyTo: format={file_format} output_url={output_url} per_thread_output={per_thread_output} options: {op_str}") + } LogicalPlan::Ddl(ddl) => { write!(f, "{}", ddl.display()) } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index bffcd0669cef..90307f1491ae 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -17,6 +17,7 @@ //! Expression utilities +use crate::dml::CopyTo; use crate::expr::{Alias, Sort, WindowFunction}; use crate::logical_plan::builder::build_join_schema; use crate::logical_plan::{ @@ -745,6 +746,19 @@ pub fn from_plan( op: op.clone(), input: Arc::new(inputs[0].clone()), })), + LogicalPlan::Copy(CopyTo { + input: _, + output_url, + file_format, + per_thread_output, + options, + }) => Ok(LogicalPlan::Copy(CopyTo { + input: Arc::new(inputs[0].clone()), + output_url: output_url.clone(), + file_format: file_format.clone(), + per_thread_output: *per_thread_output, + options: options.clone(), + })), LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), values: expr diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 74c4b1d36f52..08b28567fbc8 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -368,6 +368,7 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Distinct(_) | LogicalPlan::Extension(_) | LogicalPlan::Dml(_) + | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) | LogicalPlan::Prepare(_) => { // apply the optimization to all inputs of the plan diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index d00e5e2f5908..511b28844225 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1429,6 +1429,9 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Dml(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for Dml", )), + LogicalPlan::Copy(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for Copy", + )), LogicalPlan::DescribeTable(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for DescribeTable", )), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 16036defda94..0f5dbb9ec0b7 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -16,8 +16,8 @@ // under the License. use crate::parser::{ - CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt, LexOrdering, - Statement as DFStatement, + CopyToSource, CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt, + LexOrdering, Statement as DFStatement, }; use crate::planner::{ object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel, @@ -31,6 +31,7 @@ use datafusion_common::{ DataFusionError, ExprSchema, OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema, }; +use datafusion_expr::dml::{CopyTo, OutputFileFormat}; use datafusion_expr::expr::Placeholder; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; @@ -55,6 +56,8 @@ use sqlparser::parser::ParserError::ParserError; use datafusion_common::plan_err; use std::collections::{BTreeMap, HashMap, HashSet}; +use std::path::Path; +use std::str::FromStr; use std::sync::Arc; fn ident_to_string(ident: &Ident) -> String { @@ -377,7 +380,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let _ = into; // optional keyword doesn't change behavior self.insert_to_plan(table_name, columns, source, overwrite) } - Statement::Update { table, assignments, @@ -547,11 +549,71 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { })) } - fn copy_to_plan(&self, _statement: CopyToStatement) -> Result { - // TODO: implement as part of https://github.com/apache/arrow-datafusion/issues/5654 - Err(DataFusionError::NotImplemented( - "`COPY .. TO ..` statement is not yet supported".to_string(), - )) + fn copy_to_plan(&self, statement: CopyToStatement) -> Result { + // determine if source is table or query and handle accordingly + let copy_source = statement.source; + let input = match copy_source { + CopyToSource::Relation(object_name) => { + let table_ref = + self.object_name_to_table_reference(object_name.clone())?; + let table_source = self.schema_provider.get_table_provider(table_ref)?; + LogicalPlanBuilder::scan( + object_name_to_string(&object_name), + table_source, + None, + )? + .build()? + } + CopyToSource::Query(query) => { + self.query_to_plan(query, &mut PlannerContext::new())? + } + }; + + // convert options to lowercase strings, check for explicitly set "format" option + let mut options = vec![]; + let mut explicit_format = None; + // default behavior is to assume the user is specifying a single file to which + // we should output all data regardless of input partitioning. + let mut per_thread_output: bool = false; + for (key, value) in statement.options { + let (k, v) = (key.to_lowercase(), value.to_string().to_lowercase()); + // check for options important to planning + if k == "format" { + explicit_format = Some(OutputFileFormat::from_str(&v)?); + } + if k == "per_thread_output" { + per_thread_output = match v.as_str(){ + "true" => true, + "false" => false, + _ => return Err(DataFusionError::Plan( + format!("Copy to option 'per_thread_output' must be true or false, got {value}") + )) + } + } + options.push((k, v)); + } + let format = match explicit_format { + Some(file_format) => file_format, + None => { + // try to infer file format from file extension + let extension: &str = &Path::new(&statement.target) + .extension() + .ok_or( + DataFusionError::Plan("Copy To format not explicitly set and unable to get file extension!".to_string()))? + .to_str() + .ok_or(DataFusionError::Plan("Copy to format not explicitly set and failed to parse file extension!".to_string()))? + .to_lowercase(); + + OutputFileFormat::from_str(extension)? + } + }; + Ok(LogicalPlan::Copy(CopyTo { + input: Arc::new(input), + output_url: statement.target, + file_format: format, + per_thread_output, + options, + })) } fn build_order_by( diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index accb6ec9ced5..53ffccbdb4c3 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -325,6 +325,30 @@ fn plan_rollback_transaction_chained() { quick_test(sql, plan); } +#[test] +fn plan_copy_to() { + let sql = "COPY test_decimal to 'output.csv'"; + let plan = r#" +CopyTo: format=csv output_url=output.csv per_thread_output=false options: () + TableScan: test_decimal + "# + .trim(); + quick_test(sql, plan); +} + +#[test] +fn plan_copy_to_query() { + let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'"; + let plan = r#" +CopyTo: format=csv output_url=output.csv per_thread_output=false options: () + Limit: skip=0, fetch=10 + Projection: test_decimal.id, test_decimal.price + TableScan: test_decimal + "# + .trim(); + quick_test(sql, plan); +} + #[test] fn plan_insert() { let sql = diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index f28fdbe23c13..d097d97fb7a9 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -16,6 +16,7 @@ // under the License. use std::ffi::OsStr; +use std::fs; use std::path::{Path, PathBuf}; #[cfg(target_family = "windows")] use std::thread; @@ -54,10 +55,26 @@ pub async fn main() -> Result<()> { run_tests().await } +/// Sets up an empty directory at test_files/scratch +/// creating it if needed and clearing any file contents if it exists +/// This allows tests for inserting to external tables or copy to +/// to persist data to disk and have consistent state when running +/// a new test +fn setup_scratch_dir() -> Result<()> { + let path = std::path::Path::new("test_files/scratch"); + if path.exists() { + fs::remove_dir_all(path)?; + } + fs::create_dir(path)?; + Ok(()) +} + async fn run_tests() -> Result<()> { // Enable logging (e.g. set RUST_LOG=debug to see debug logs) env_logger::init(); + setup_scratch_dir()?; + let options = Options::new(); // Run all tests in parallel, reporting failures at the end diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index e7bde89d2940..364459fa2df1 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -16,29 +16,141 @@ # under the License. # tests for copy command - statement ok create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, 'Bar'); -# Copy from table -statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported -COPY source_table to '/tmp/table.parquet'; +# Copy to directory as multiple files +query IT +COPY source_table TO 'test_files/scratch/table' (format parquet, per_thread_output true); +---- +2 + +#Explain copy queries not currently working +query error DataFusion error: This feature is not implemented: Unsupported SQL statement: Some\("COPY source_table TO 'test_files/scratch/table'"\) +EXPLAIN COPY source_table to 'test_files/scratch/table' + +query error DataFusion error: SQL error: ParserError\("Expected end of statement, found: source_table"\) +EXPLAIN COPY source_table to 'test_files/scratch/table' (format parquet, per_thread_output true) + +# Copy more files to directory via query +query IT +COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/table' (format parquet, per_thread_output true); +---- +4 + +# validate multiple parquet file output +statement ok +CREATE EXTERNAL TABLE validate_parquet STORED AS PARQUET LOCATION 'test_files/scratch/table/'; + +query IT +select * from validate_parquet; +---- +1 Foo +2 Bar +1 Foo +2 Bar +1 Foo +2 Bar + +# Copy from table to single file +query IT +COPY source_table to 'test_files/scratch/table.parquet'; +---- +2 + +# validate single parquet file output +statement ok +CREATE EXTERNAL TABLE validate_parquet_single STORED AS PARQUET LOCATION 'test_files/scratch/table.parquet'; + +query IT +select * from validate_parquet_single; +---- +1 Foo +2 Bar + +# copy from table to folder of csv files +query IT +COPY source_table to 'test_files/scratch/table_csv' (format csv, per_thread_output true); +---- +2 + +# validate folder of csv files +statement ok +CREATE EXTERNAL TABLE validate_csv STORED AS csv WITH HEADER ROW LOCATION 'test_files/scratch/table_csv'; + +query IT +select * from validate_csv; +---- +1 Foo +2 Bar + +# Copy from table to single csv +query IT +COPY source_table to 'test_files/scratch/table.csv'; +---- +2 + +# Validate single csv output +statement ok +CREATE EXTERNAL TABLE validate_single_csv STORED AS csv WITH HEADER ROW LOCATION 'test_files/scratch/table.csv'; + +query IT +select * from validate_single_csv; +---- +1 Foo +2 Bar + +# Copy from table to folder of json +query IT +COPY source_table to 'test_files/scratch/table_json' (format json, per_thread_output true); +---- +2 + +# Validate json output +statement ok +CREATE EXTERNAL TABLE validate_json STORED AS json LOCATION 'test_files/scratch/table_json'; + +query IT +select * from validate_json; +---- +1 Foo +2 Bar + +# Copy from table to single json file +query IT +COPY source_table to 'test_files/scratch/table.json'; +---- +2 + +# Validate single JSON file` +statement ok +CREATE EXTERNAL TABLE validate_single_json STORED AS json LOCATION 'test_files/scratch/table_json'; + +query IT +select * from validate_single_json; +---- +1 Foo +2 Bar # Copy from table with options -statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported -COPY source_table to '/tmp/table.parquet' (row_group_size 55); +query IT +COPY source_table to 'test_files/scratch/table.json' (row_group_size 55); +---- +2 # Copy from table with options (and trailing comma) -statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported -COPY source_table to '/tmp/table.parquet' (row_group_size 55, row_group_limit_bytes 9,); +query IT +COPY source_table to 'test_files/scratch/table.json' (row_group_size 55, row_group_limit_bytes 9,); +---- +2 # Error cases: # Incomplete statement -statement error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\) +query error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\) COPY (select col2, sum(col1) from source_table # Copy from table with non literal -statement error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' after option definition, found: \+"\) +query error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' after option definition, found: \+"\) COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102);