Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Support for Copy To Logical and Physical plans #7283

Merged
merged 4 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,7 @@ datafusion/CHANGELOG.md.bak
.githubchangeloggenerator.cache*

# Generated tpch data
datafusion/core/tests/sqllogictests/test_files/tpch/data/*
datafusion/sqllogictests/test_files/tpch/data/*

# Scratch temp dir for sqllogictests
datafusion/sqllogictest/test_files/scratch*
55 changes: 41 additions & 14 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
use datafusion_expr::dml::OutputFileFormat;
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema, ScalarValue};
Expand All @@ -37,7 +38,6 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
Expand Down Expand Up @@ -992,28 +992,55 @@ impl DataFrame {
}

/// Write a `DataFrame` to a CSV file.
pub async fn write_csv(self, path: &str) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
let task_ctx = Arc::new(self.task_ctx());
plan_to_csv(task_ctx, plan, path).await
pub async fn write_csv(
self,
path: &str,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
OutputFileFormat::CSV,
true,
// TODO implement options
vec![],
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Write a `DataFrame` to a Parquet file.
pub async fn write_parquet(
self,
path: &str,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
let task_ctx = Arc::new(self.task_ctx());
plan_to_parquet(task_ctx, plan, path, writer_properties).await
_writer_properties: Option<WriterProperties>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
OutputFileFormat::PARQUET,
true,
// TODO implement options
vec![],
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Executes a query and writes the results to a partitioned JSON file.
pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
let task_ctx = Arc::new(self.task_ctx());
plan_to_json(task_ctx, plan, path).await
pub async fn write_json(
self,
path: &str,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
OutputFileFormat::JSON,
true,
// TODO implement options
vec![],
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Add an additional column to the DataFrame.
Expand Down
99 changes: 68 additions & 31 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::datasource::physical_plan::{
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use rand::distributions::{Alphanumeric, DistString};
Expand Down Expand Up @@ -277,6 +277,7 @@ impl FileFormat for CsvFormat {
"Inserting compressed CSV is not implemented yet.".into(),
));
}

let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
conf,
Expand All @@ -285,7 +286,7 @@ impl FileFormat for CsvFormat {
self.file_compression_type,
));

Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
}
}

Expand Down Expand Up @@ -505,12 +506,14 @@ impl DataSink for CsvSink {
let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;

// Construct serializer and writer for each file group
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for CsvSink in Append mode".into()));
}
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
Expand Down Expand Up @@ -542,38 +545,72 @@ impl DataSink for CsvSink {
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = self.has_header;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;

serializers.push(Box::new(serializer));
writers.push(writer);
match self.config.per_thread_output {
true => {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id =
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = self.has_header;
let builder =
WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
false => {
let header = self.has_header;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(Box::new(serializer));
let file_path = base_path.prefix();
let object_meta = ObjectMeta {
location: file_path.clone(),
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
}
}

stateless_serialize_and_write_files(data, serializers, writers).await
stateless_serialize_and_write_files(
data,
serializers,
writers,
self.config.per_thread_output,
)
.await
}
}

Expand Down
83 changes: 58 additions & 25 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};

use crate::datasource::physical_plan::FileGroupDisplay;
use crate::physical_plan::insert::DataSink;
use crate::physical_plan::insert::InsertExec;
use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

Expand Down Expand Up @@ -187,7 +187,7 @@ impl FileFormat for JsonFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));

Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
}
}

Expand Down Expand Up @@ -280,6 +280,9 @@ impl DataSink for JsonSink {
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for JsonSink in Append mode".into()));
}
for file_group in &self.config.file_groups {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
Expand All @@ -303,33 +306,63 @@ impl DataSink for JsonSink {
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("/{}_{}.json", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
match self.config.per_thread_output {
true => {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id =
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("{}_{}.json", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
false => {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
let file_path = base_path.prefix();
let object_meta = ObjectMeta {
location: file_path.clone(),
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
}
}

stateless_serialize_and_write_files(data, serializers, writers).await
stateless_serialize_and_write_files(
data,
serializers,
writers,
self.config.per_thread_output,
)
.await
}
}

Expand Down
Loading