Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify DataFrame and SQL (Insert Into) Write Methods #7141

Merged
merged 10 commits into from
Aug 5, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Jul 30, 2023

Which issue does this PR close?

None, but progresses towards the goals of #5076 and #7079

Rationale for this change

The goal of this PR is to enable DataFrame write methods to leverage a common implementation with SQL Insert Into statements, so that common logic related to writing via ObjectStore and parallelization or other optimizations can be made in one place (such as those discussed in #7079).

What changes are included in this PR?

The following changes are completed/planned:

  • Implement DataFrame.write_table method which creates an insert_into LogicalPlanand executes eagerly
  • Extend InsertExec / DataSink / ListingTable.insert_into to support writing multiple files from multiple partitions
  • Extend CsvSink to support writing multiple partitions to multiple files
  • Implement a way for a user to pass DataFrameWriteOptions
  • Implement ListingTableInsertMode controlling whether insert into appends to existing files or writes new files
  • Create a unit test for writing multiple CSV files to a listing table via insert into logical plan
  • Add support for insert overwrite in logical plan and via DataFrame.write_table

The following work is planned for follow up PRs

  • Create JsonSink supporting writing multiple partitions to multiple files
  • Create ParquetSink supporting writing multiple partitions to multiple files
  • Update existing write_json, write_csv, and write_parquet to create temporary tables and to call DataFrame.write_table
  • Parallelize file serialization
  • Support insert into for listing tables with hive style partitioning / multiple table paths
  • Implement insert overwrite type execution plans

Are these changes tested?

Yes, a test case is added for using insert into to append new files to a listing table.

Are there any user-facing changes?

The write_table method is a new public method to expose the functionality of the insert_into logical plan for DataFrames.

@github-actions github-actions bot added the core Core DataFusion crate label Jul 30, 2023
//we can append to that file. Otherwise, we can write new files into the directory
//adding new files to the listing table in order to insert to the table.
let input_partitions = input.output_partitioning().partition_count();
if file_groups.len() == 1 && input_partitions == 1 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic works but feels inflexible to me. There may be a more explicit way for a user to express their intention, similar to Spark's Save Modes and PartitionBy methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree - I think passing in a write_options: ListingTableWriteOptions type structure would be the best API -- and that write_options could contain additional information like desired save mode, partition by and ordering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you envisioningListingTableWriteOptions as part of the ListingOptions struct (i.e. a property of the registered table itself)? So the user would do something like:

ctx.register_csv("csv_table_sink", out_path, CsvReadOptions::default(), CsvWriteOptions::default().write_mode(WriteMode::Append))
        .await?;

Subsequent writes to this table would then respect the declared settings on that table. PartitionBy information is actually already included in CsvReadOptions.

I had been thinking about how to pass this option at write time:

df.write_table("csv_table_sink", WriteMode::Append)

but I was not sure how to propagate that additional argument from the LogicalPlan through to InsertExec (or if that even makes sense to do).

Copy link
Contributor

@alamb alamb Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you envisioningListingTableWriteOptions as part of the ListingOptions struct (i.e. a property of the registered table itself)? So the user would do something like:

I guess I was thinking there could be a way to register the initil table with WriteOptions for when it was written to via INSERT ... type queries.

However, I think the more interesting usecase i my mind is passing the options at write time, as you show

df.write_table("table", WriteOptions::new()...)

I am not quite sure how the code would look and how to make a nice API that separates the two concerns (the actual act of writing / passing the write options) and the registered table.

It seems like there should also be a way to write directly to a target output without having to register a table provider. I am sure there is a way we just need to come up with a clever API to do so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there should also be a way to write directly to a target output without having to register a table provider. I am sure there is a way we just need to come up with a clever API to do so

I definitely agree. I am thinking for that we could use the COPY TO SQL syntax to dump a table to files. We will need to implement a logical plan for copy to, but it looks like that is already on your radar in #5654. The execution plan for CopyTo is ultimately the same as InsertExec which is currently created by each FileSink. Probably want to rename InsertExec to something more generic like WriteFileSinkExec at that point.

When that is done we can add options to DataFrame.write_table or create additional methods that allow hooking into a LogicalPlanBuilder::copy_to method to enable writing to files without the need for a registered table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. The execution plan for CopyTo is ultimately the same as InsertExec which is currently created by each FileSink. Probably want to rename InsertExec to something more generic like WriteFileSinkExec at that point.

Yes that sounds ideal

@devinjdangelo
Copy link
Contributor Author

I've got a ways to go with this, but curious if this approach is looking on the right track with what you are thinking @alamb.

@alamb
Copy link
Contributor

alamb commented Jul 31, 2023

Thanks @devinjdangelo -- I'll check this out later today. Thank you for pushing on this

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this looks great.

Maybe for a test we can start small with a test that writes a dataframe to 2 CSV files?

It also seems to me we could incrementally implement this feature (e.g we could incrementally add support for json / parquet as follow on PRs)

Thanks again for pushing this forward

//we can append to that file. Otherwise, we can write new files into the directory
//adding new files to the listing table in order to insert to the table.
let input_partitions = input.output_partitioning().partition_count();
if file_groups.len() == 1 && input_partitions == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree - I think passing in a write_options: ListingTableWriteOptions type structure would be the best API -- and that write_options could contain additional information like desired save mode, partition by and ordering

@@ -254,7 +254,7 @@ impl MemSink {
impl DataSink for MemSink {
async fn write_all(
&self,
mut data: SendableRecordBatchStream,
mut data: Vec<SendableRecordBatchStream>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@@ -330,6 +330,8 @@ pub struct FileSinkConfig {
pub object_store_url: ObjectStoreUrl,
/// A vector of [`PartitionedFile`] structs, each representing a file partition
pub file_groups: Vec<PartitionedFile>,
/// Vector of partition paths
pub table_paths: Vec<ListingTableUrl>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 is this logically different than the file_groups

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that file_groups represents every individual file in every path contained in the ListingTable, whereas table_paths is just a list of the paths themselves. The language in existing comments/variable names is a bit confusing as it seems we refer to both files and directories of files as "partitions" sometimes.

I could compute the table_path based on the prefix of individual files in the table. I'll need to think about this more and do some testing. What if the listing table does not contain any files yet? Is file_groups empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure -- the notion of tables that are modifed by DataFusion wasn't really in the initial design -- which was focused on read.

@metesynnada has started the process to add the ability to write to some tables, but I think there is still a ways to go.

Ideally in my mind most of the code to write data to a sink (CSV, JSON, etc) would not be tied to a TableProvider. Each table provider could provide some adapter that made the appropriate sink for updating the state if that made sense

Thus I would suggest focusing on how to make writing to a sink unconnected to a table provider working well, and then we'll go wire that API up into the relevant TableProviders if desired

datafusion/core/src/physical_plan/insert.rs Outdated Show resolved Hide resolved
@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions sqllogictest SQL Logic Tests (.slt) labels Aug 4, 2023
@devinjdangelo devinjdangelo marked this pull request as ready for review August 4, 2023 13:08
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nice @devinjdangelo -- the structure is beautiful and the code is clean and well commented and tested. Thank you

cc @mustafasrepo or @ozankabak and @metegenez in case you have extra comments

@@ -560,10 +566,10 @@ impl CsvSink {
impl DataSink for CsvSink {
async fn write_all(
&self,
mut data: SendableRecordBatchStream,
mut data: Vec<SendableRecordBatchStream>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@@ -73,6 +74,10 @@ pub struct CsvReadOptions<'a> {
pub file_compression_type: FileCompressionType,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@@ -464,6 +483,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
// TODO: Add file sort order into CsvReadOptions and introduce here.
.with_file_sort_order(vec![])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be self.file_sort_order?

@@ -207,6 +207,16 @@ impl ListingTableConfig {
}
}

#[derive(Debug, Clone)]
///controls how new data should be inserted to a ListingTable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very nice abstraction ❤️

// Incoming number of partitions is taken to be the
// number of files the query is required to write out.
// The optimizer should not change this number.
// Parrallelism is handled within the appropriate DataSink
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense to me

&mut writers,
)
.await?;
// TODO parallelize serialization accross partitions and batches within partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this todo can probably be done with some fancy async stream stuff -- however, I got a little hung on on how to handle the abort case. I'll try and think on it some more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested join sets / task spawning seems to work well based on a quick test which I described here. I haven't thoroughly thought through or tested how aborts are handled with this code yet though. Parquet is also going to be more challenging to parallelize in this way since the serializer is stateful.

This will be the fun part to experiment with!

@alamb
Copy link
Contributor

alamb commented Aug 5, 2023

I am merging this so we can continue the work on main. Thank you @devinjdangelo for the work so far. I can't wait to see what you come up with for the next steps - it is not that often that one can get an order of magnitude, but I think this is one of those times.

Very nice 👌

@alamb alamb merged commit 2d91917 into apache:main Aug 5, 2023
@metesynnada
Copy link
Contributor

I have carefully examined the code, and I must say that the logic is executed flawlessly. Your dedication and hard work reflect in your impressive PR. Best regards, @devinjdangelo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants