Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support InsertInto Sorted ListingTable #7743

Merged
merged 4 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use async_trait::async_trait;
use bytes::{Buf, Bytes};
Expand Down Expand Up @@ -263,6 +263,7 @@ impl FileFormat for CsvFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for CSV");
Expand All @@ -275,7 +276,12 @@ impl FileFormat for CsvFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(conf));

Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
Ok(Arc::new(FileSinkExec::new(
input,
sink,
sink_schema,
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion_common::not_impl_err;
use datafusion_common::DataFusionError;
use datafusion_common::FileType;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalSortRequirement;
use rand::distributions::Alphanumeric;
use rand::distributions::DistString;
use std::fmt;
Expand Down Expand Up @@ -173,6 +174,7 @@ impl FileFormat for JsonFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Json");
Expand All @@ -184,7 +186,12 @@ impl FileFormat for JsonFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));

Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
Ok(Arc::new(FileSinkExec::new(
input,
sink,
sink_schema,
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use datafusion_common::{not_impl_err, DataFusionError, FileType};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -99,6 +99,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
_input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
_order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -229,6 +229,7 @@ impl FileFormat for ParquetFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
Expand All @@ -237,7 +238,12 @@ impl FileFormat for ParquetFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(ParquetSink::new(conf));

Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
Ok(Arc::new(FileSinkExec::new(
input,
sink,
sink_schema,
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
Expand Down
46 changes: 31 additions & 15 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
};

use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -826,19 +828,6 @@ impl TableProvider for ListingTable {
);
}

// TODO support inserts to sorted tables which preserve sort_order
// Inserts currently make no effort to preserve sort_order. This could lead to
// incorrect query results on the table after inserting incorrectly sorted data.
let unsorted: Vec<Vec<Expr>> = vec![];
if self.options.file_sort_order != unsorted {
return Err(
DataFusionError::NotImplemented(
"Writing to a sorted listing table via insert into is not supported yet. \
To write to this table in the meantime, register an equivalent table with \
file_sort_order = vec![]".into())
);
}

let table_path = &self.table_paths()[0];
// Get the object store for the table path.
let store = state.runtime_env().object_store(table_path)?;
Expand Down Expand Up @@ -908,9 +897,36 @@ impl TableProvider for ListingTable {
file_type_writer_options,
};

let unsorted: Vec<Vec<Expr>> = vec![];
let order_requirements = if self.options().file_sort_order != unsorted {
if matches!(
self.options().insert_mode,
ListingTableInsertMode::AppendToFile
) {
return Err(DataFusionError::Plan(
"Cannot insert into a sorted ListingTable with mode append!".into(),
));
}
// Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
Some(
self.try_create_output_ordering()?
.into_iter()
.map(|v| {
Some(
v.into_iter()
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>(),
)
} else {
None
};

self.options()
.format
.create_writer_physical_plan(input, state, config)
.create_writer_physical_plan(input, state, config, order_requirements)
.await
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl TableProvider for MemTable {
input,
sink,
self.schema.clone(),
None,
)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner {
FileType::ARROW => Arc::new(ArrowFormat {}),
};

sink_format.create_writer_physical_plan(input_exec, session_state, config).await
sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
LogicalPlan::Dml(DmlStatement {
table_name,
Expand Down
19 changes: 14 additions & 5 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct FileSinkExec {
sink_schema: SchemaRef,
/// Schema describing the structure of the output data.
count_schema: SchemaRef,
/// Optional required sort order for output data.
sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FileSink can have only a single input, I think it only needs a single sort order per required_input_order

In other words, I think this could be simplified to

Suggested change
sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
sort_order: Option<Vec<PhysicalSortRequirement>>>,

And then adjust required_input_order appropriately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. Will do!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed an update with this change

}

impl fmt::Debug for FileSinkExec {
Expand All @@ -87,12 +89,14 @@ impl FileSinkExec {
input: Arc<dyn ExecutionPlan>,
sink: Arc<dyn DataSink>,
sink_schema: SchemaRef,
sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
) -> Self {
Self {
input,
sink,
sink_schema,
count_schema: make_count_schema(),
sort_order,
}
}

Expand Down Expand Up @@ -192,16 +196,20 @@ impl ExecutionPlan for FileSinkExec {
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// Require that the InsertExec gets the data in the order the
// The input order is either exlicitly set (such as by a ListingTable),
// or require that the [FileSinkExec] gets the data in the order the
// input produced it (otherwise the optimizer may chose to reorder
// the input which could result in unintended / poor UX)
//
// More rationale:
// https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
vec![self
.input
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
match &self.sort_order {
Some(requirements) => requirements.clone(),
None => vec![self
.input
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)],
}
}

fn maintains_input_order(&self) -> Vec<bool> {
Expand All @@ -221,6 +229,7 @@ impl ExecutionPlan for FileSinkExec {
sink: self.sink.clone(),
sink_schema: self.sink_schema.clone(),
count_schema: self.count_schema.clone(),
sort_order: self.sort_order.clone(),
}))
}

Expand Down
29 changes: 29 additions & 0 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,35 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
statement ok
set datafusion.execution.target_partitions = 8;

statement ok
CREATE EXTERNAL TABLE
ordered_insert_test(a bigint, b bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/'
WITH ORDER (a ASC, B DESC)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);

query II
INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
----
9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also add an EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2) to verify that the plan has a `SortExec in it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed an update with explain test.


query II
SELECT * from ordered_insert_test;
----
1 5
2 4
3 3
4 2
5 1
7 10
7 9
7 8
7 7

statement ok
CREATE EXTERNAL TABLE
single_file_test(a bigint, b bigint)
Expand Down