Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Support for Copy To Logical and Physical plans #7283

Merged
merged 4 commits into from
Aug 16, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Aug 14, 2023

Which issue does this PR close?

closes #5076
closes #6539
Part of #5654

Rationale for this change

In many cases, we want to be able to export data to file(s) in an ObjectStore without first registering an external table. This is possible with COPY ... TO ... statements. We can leverage the FileSinks created to support inserting to ListingTables for part of the implementation for this.

What changes are included in this PR?

  • Implement a logical plan for Copy To statements
  • Generalize name of InsertExec to FileSinkExec
  • Implement a physical plan for Copy To statements relying on FileSinkExec
  • Expand sqllogictests in copy.slt, add support for automatically cleared directory in sqllogictests for writing files fresh
  • Reimplement DataFrame::write_* methods to use Copy To
  • Add support for per_thread_output setting in FileSinks and Copy To so user can specify if they want only one file output or possibly many is ok

Note that this PR does not add support for most statement level settings / overrides yet. That will be important to implement before closing out #5654.

This graphic shows how all of the write related code is wired up after this PR:
datafusion_writes

Are these changes tested?

Yes, see expanded copy.slt for new tests.

I also have plans to expand insert.slt to improve testing of recent additions of insert into support.

Are there any user-facing changes?

Copy To statements (less most statement level overrides) are supported now.

DataFrame write_* APIs have some small changes will need more changes as support for statement level overrides is added for copy to

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Aug 14, 2023
@alamb
Copy link
Contributor

alamb commented Aug 15, 2023

Thank you @devinjdangelo -- this looks epic -- I plan to review it tomorrow

----
2

#Explain copy queries not currently working
Copy link
Contributor Author

@devinjdangelo devinjdangelo Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that EXPLAIN <copy statement> currently does not work. When prefixed by EXPLAIN the subsequent COPY token is being parsed into the COPY statement defined in the sqlparser crate. We haven't implemented a logical plan for that and our expected syntax for COPY is different from sqlparser, so this leads to various errors.

When the COPY token starts the statement, it is correctly parsed as a DFStatement defined within DataFusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where the parsing diverges. We might need to add some special case for parsing explain copy, but I'm not sure if there is a better way.

https://github.com/apache/arrow-datafusion/blob/6ad79165f6554a66aa5ed4c5d432401c2c162f69/datafusion/sql/src/parser.rs#L296-L331

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a special case for explain copy is probably the right thing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I coded up support for EXPLAIN COPY in #7291

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🏆 work @devinjdangelo -- it is amazing to see COPY finally working in DataFusion.

I tried this out locally and it worked great 👍 -- thank you so much!

In terms of what to do with this PR, I have a few comments, but nothing that I think is required prior to merge, so therefore I am approving it.

Here are some of the tickets:

  • Add documentation for copy statement
  • Implement additional parameters
  • Explain support

I am happy to file these if you would like. Just let me know.

Example

DataFusion CLI v29.0.0
❯ copy (values (1), (2)) to '/tmp/foo.parquet';
+-------+
| count |
+-------+
| 2     |
+-------+
1 row in set. Query took 0.050 seconds.

❯
\q
(arrow_dev) alamb@MacBook-Pro-8:~/Software/arrow-datafusion2/datafusion-cli$ file /tmp/foo.parquet
/tmp/foo.parquet: Apache Parquet
(arrow_dev) alamb@MacBook-Pro-8:~/Software/arrow-datafusion2/datafusion-cli$ datafusion-cli -c "select * from '/tmp/foo.parquet'";
DataFusion CLI v28.0.0
+---------+
| column1 |
+---------+
| 1       |
| 2       |
+---------+
2 rows in set. Query took 0.041 seconds.

This is so cool

copy (select * from '/Users/alamb/Software/clickbench_hits_compatible/hits.parquet' limit 100) to '/tmp/hits.10.parquet';
+-------+
| count |
+-------+
| 100   |
+-------+
1 row in set. Query took 2.809 seconds.

.gitignore Outdated
@@ -104,3 +104,6 @@ datafusion/CHANGELOG.md.bak

# Generated tpch data
datafusion/core/tests/sqllogictests/test_files/tpch/data/*

# Scratch temp dir for sqllogictests
datafusion/core/tests/sqllogictests/test_files/scratch*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will likely have a logical conflict with #7284, FYI

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I rebased locally and then updated these relative paths throughout the pr to fix the pipeline.

};

// TODO: implement statement level overrides for each file type
// E.g. CsvFormat::from_options(options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -58,10 +59,26 @@ pub async fn main() -> Result<()> {
run_tests().await
}

/// Sets up an empty directory at tests/sqllogictests/test_files/scratch/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ -- we should also probably add a note about scratch to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/sqllogictests/README.md eventually too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #7312

----
2

#Explain copy queries not currently working
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a special case for explain copy is probably the right thing

statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported
COPY source_table to '/tmp/table.parquet' (row_group_size 55);
query IT
COPY source_table to 'tests/sqllogictests/test_files/scratch/table.json' (row_group_size 55);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row_group_size for json is somewhat surprising to me as I expect it is a parquet thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, setting that as JSON with row_group_size is a mistake. It doesn't cause any issue because options are ignored right now.

This raises an interesting question about the desired behavior in this scenario. If the options specify an irrelevant setting (row_group_size for a json setting), should DataFusion:

  1. Ignore the irrelevant setting (current behavior)
  2. Ignore the irrelevant setting but emit a warning
  3. Raise an error and refuse to execute the query entirely

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think raising an error and refusing to execute the query entirely is the most sensible thing -- I left a comment on #7298 (comment)

input: LogicalPlan,
output_url: String,
file_format: OutputFileFormat,
per_thread_output: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some more documentation about what each of the parameters mean (specifically per_thread_output)

Also, do you envision other options here (like overwrite vs append)? If so maybe it makes sense to make a struct lik

CopyOptions {
  output_url: String,
  file_format: OutputFileFormat, 
  per_thread_output: bool,
  other: Vec<(String, String)
}

Making a config struct like that would not only allow additional options to be easily added without an API change, it would also provide a natural location to document the options and what they meant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is documentation for these parameters here:
https://github.com/devinjdangelo/arrow-datafusion/blob/27e062a880d8960a9aef4657bc7416d98c9a744c/datafusion/expr/src/logical_plan/dml.rs#L30-L43

I envision most additional options being passed in optionally via the options Vec<>. I think the list supported by DuckDB would be a good starting point.

It might make sense to implement a CopyOptions struct as you show with a CopyOptions::from(Vec<String,String>) method. Then, for the DataFrame::write_* API the user could construct CopyOptions directly rather than passing a Vec<String,String>, which is an awkward interface in comparison for use directly from rust code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #7322 to track this idea

pub options: Vec<(String, String)>,
}

/// The file formats that CopyTo can output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very similar to the existing FileType enum: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/file_type/enum.FileType.html

Perhaps we could move FileType into datafusion_common so it could be used by both the logical plan and datasource?

Comment on lines +1102 to +1110
let mut op_str = String::new();
op_str.push('(');
for (key, val) in options {
if !op_str.is_empty() {
op_str.push(',');
}
op_str.push_str(&format!("{key} {val}"));
}
op_str.push(')');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could use join() here to be more conscise: https://doc.rust-lang.org/std/primitive.slice.html#method.join

But then you would probably have to make the (key val) pairs in a separate Vec resulting in another copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a .map + .join to do this more elegantly? I take another look at it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a stab at making this more concise in #7294

@alamb
Copy link
Contributor

alamb commented Aug 15, 2023

I took the liberty of merging up from main to resolve conflicts on this PR

@alamb
Copy link
Contributor

alamb commented Aug 16, 2023

Thanks @devinjdangelo -- I am merging this PR in and will file follow on PRs / tickets for the remaining items we have identified. Thanks again 🙏

@timrobertson100
Copy link

Thanks @devinjdangelo !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Re-implement DataFrame.write_* to use LogicalPlan::Write
3 participants