Skip to content

Commit

Permalink
Unify DataFrame and SQL (Insert Into) Write Methods (#7141)
Browse files Browse the repository at this point in the history
* inner join_set

* Initial write_table implementation

* update MemSink to accept Vec<SendableRecordBatchStream>

* some cleaning up

* fmt

* allow listing table to control append method

* test fixes

* DataFrameOptions and clean up

* rename to execute streams for insertexec

* CsvReadOptions sort_order connect to ListingOptions
  • Loading branch information
devinjdangelo authored Aug 5, 2023
1 parent 2354758 commit 2d91917
Show file tree
Hide file tree
Showing 17 changed files with 448 additions and 102 deletions.
48 changes: 48 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,33 @@ use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::prelude::SessionContext;

/// Contains options that control how data is
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
/// Controls if existing data should be overwritten
overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider
// settings such as LOCATION and FILETYPE can be set here
// e.g. add location: Option<Path>
}

impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
DataFrameWriteOptions { overwrite: false }
}
/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}
}

impl Default for DataFrameWriteOptions {
fn default() -> Self {
Self::new()
}
}

/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
/// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html)
Expand Down Expand Up @@ -925,6 +952,27 @@ impl DataFrame {
))
}

/// Write this DataFrame to the referenced table
/// This method uses on the same underlying implementation
/// as the SQL Insert Into statement.
/// Unlike most other DataFrame methods, this method executes
/// eagerly, writing data, and returning the count of rows written.
pub async fn write_table(
self,
table_name: &str,
write_options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let arrow_schema = Schema::from(self.schema());
let plan = LogicalPlanBuilder::insert_into(
self.plan,
table_name.to_owned(),
&arrow_schema,
write_options.overwrite,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Write a `DataFrame` to a CSV file.
pub async fn write_csv(self, path: &str) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
Expand Down
116 changes: 82 additions & 34 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use rand::distributions::{Alphanumeric, DistString};

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -268,6 +269,11 @@ impl FileFormat for CsvFormat {
_state: &SessionState,
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented yet for CSV".into(),
));
}
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
conf,
Expand Down Expand Up @@ -560,10 +566,10 @@ impl CsvSink {
impl DataSink for CsvSink {
async fn write_all(
&self,
mut data: SendableRecordBatchStream,
mut data: Vec<SendableRecordBatchStream>,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = self.config.file_groups.len();
let num_partitions = data.len();

let object_store = context
.runtime_env()
Expand All @@ -572,44 +578,86 @@ impl DataSink for CsvSink {
// Construct serializer and writer for each file group
let mut serializers = vec![];
let mut writers = vec![];
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let header = self.has_header
&& (!matches!(&self.config.writer_mode, FileWriterMode::Append)
|| file_group.object_meta.size == 0);
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);

let file = file_group.clone();
let writer = self
.create_writer(file.object_meta.clone().into(), object_store.clone())
.await?;
writers.push(writer);
match self.config.writer_mode {
FileWriterMode::Append => {
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let header = self.has_header
&& (!matches!(&self.config.writer_mode, FileWriterMode::Append)
|| file_group.object_meta.size == 0);
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);

let file = file_group.clone();
let writer = self
.create_writer(
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
FileWriterMode::Put => {
return Err(DataFusionError::NotImplemented(
"Put Mode is not implemented for CSV Sink yet".into(),
))
}
FileWriterMode::PutMultipart => {
//currently assuming only 1 partition path (i.e. not hive style partitioning on a column)
let base_path = &self.config.table_paths[0];
//uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = true;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = self
.create_writer(object_meta.into(), object_store.clone())
.await?;
writers.push(writer);
}
}
}

let mut idx = 0;
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
while let Some(maybe_batch) = data.next().await {
// Write data to files in a round robin fashion:
idx = (idx + 1) % num_partitions;
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers).await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
// TODO parallelize serialization accross partitions and batches within partitions
// see: https://github.com/apache/arrow-datafusion/issues/7079
for idx in 0..num_partitions {
while let Some(maybe_batch) = data[idx].next().await {
// Write data to files in a round robin fashion:
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers)
.await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
}
}
// Perform cleanup:
let n_writers = writers.len();
Expand Down
25 changes: 22 additions & 3 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use crate::datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
Expand All @@ -39,6 +39,7 @@ use crate::datasource::{
};
use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use crate::logical_expr::Expr;

/// Options that control the reading of CSV files.
///
Expand Down Expand Up @@ -73,6 +74,10 @@ pub struct CsvReadOptions<'a> {
pub file_compression_type: FileCompressionType,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand All @@ -95,6 +100,8 @@ impl<'a> CsvReadOptions<'a> {
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendToFile,
}
}

Expand Down Expand Up @@ -171,6 +178,18 @@ impl<'a> CsvReadOptions<'a> {
self.file_compression_type = file_compression_type;
self
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -461,9 +480,9 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
// TODO: Add file sort order into CsvReadOptions and introduce here.
.with_file_sort_order(vec![])
.with_file_sort_order(self.file_sort_order.clone())
.with_infinite_source(self.infinite)
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use std::pin::Pin;
use std::sync::Arc;

pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
pub use table::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode,
};

/// Stream of files get listed from object store
pub type PartitionedFileStream =
Expand Down
Loading

0 comments on commit 2d91917

Please sign in to comment.