Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using INSERT INTO query to append a file #5130

Closed
metesynnada opened this issue Jan 31, 2023 · 22 comments
Closed

Using INSERT INTO query to append a file #5130

metesynnada opened this issue Jan 31, 2023 · 22 comments
Labels
enhancement New feature or request

Comments

@metesynnada
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I want to create a table from a CSV(JSON or AVRO) file, execute a query and write the results into a CSV file.

Describe the solution you'd like
If we support this on a logical plan level, this might be executed. It may also require enhancing catalog management.

  • Create table
CREATE EXTERNAL TABLE source_table(c1 int) STORED AS CSV LOCATION 'foo.csv'
  • Sink table
CREATE EXTERNAL TABLE sink_table(c1 int) LOCATION 'bar.csv'
  • Insert the sink table
INSERT INTO sink_table
SELECT * FROM t
WHERE c1 > 10;

Describe alternatives you've considered
NA

Additional context
NA

@metesynnada metesynnada added the enhancement New feature or request label Jan 31, 2023
@alamb
Copy link
Contributor

alamb commented Feb 1, 2023

👍 I would very much like a feature like this too.

The use of SELECT is also compelling to me because you can then filter and order, for example.

Some ideal features (doesn't have to be in the first implementation, but it would be nice if the API supported this):

  1. Support multiple different types of format (CSV as well as parquet, for example)
  2. support potenteially writing to a directory (and writing out multiple smaller files)
  3. Have some way to control the options for the output writer (e.g. for CSV the format of timestamps, for parquet row group size, etc)

@andygrove and @Dandandan do you have thoughts on what this feature might look like?

@alamb
Copy link
Contributor

alamb commented Feb 1, 2023

BTW the sqllogictest driver has some version of INSERT INTO AS SELECT support (not as full featured as this proposal): https://github.com/apache/arrow-datafusion/blob/bd645279874df8e70f1cd891ecea89577733b788/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs#L30

@metesynnada
Copy link
Contributor Author

We will start working on this feature quite soon, so any help would be welcomed. Our team can provide initial design as well.

@alamb
Copy link
Contributor

alamb commented Feb 14, 2023

We will start working on this feature quite soon, so any help would be welcomed. Our team can provide initial design as well.

I am certainly interested in this feature and will standby to review designs and code

I terms of the API to support I would like to recommend we follow some other implementation's API than inventing our own. For example, perhaps we can use the COPY orders TO 'orders' (FORMAT PARQUET, PARTITION_BY (year, month)); style command described by DuckDB: https://duckdb.org/2023/02/13/announcing-duckdb-070.html

Using create external table seems like it may make implementations more complicated for some reasons:

  1. It requires two commands to write files rather than just one
  2. What happens if you try to write data to a table that already has a backing file
  3. What happens if you issue two INSERT INTO commands in a row

For a format like CSV or NDJSON appending new data might be straightforward, but parquet doesn't really support append well.

@metesynnada
Copy link
Contributor Author

Thanks a lot for your help! I believe we can consolidate the usages into a Google doc soon, which will also include a literature survey on DDLs & DMLs. Once we have everything in one place, we can review it together and decide on the best options for the use cases.

@alamb
Copy link
Contributor

alamb commented Feb 15, 2023

FYI I wrote up some thought related to catalog manipulations here #5291

@metesynnada metesynnada changed the title Using INSERT INTO query to write a file Using INSERT INTO query to append a file Feb 19, 2023
@metesynnada
Copy link
Contributor Author

metesynnada commented Feb 21, 2023

Hi @alamb, I added a design document. cc @Dandandan & @mingmwang if you are interested as well.
https://docs.google.com/document/d/19IhOklwkhIm09-3XPpZHf75frtxoQtMa0g-1QG8Da1c/edit?usp=sharing

@alamb
Copy link
Contributor

alamb commented Feb 21, 2023

Thanks @metesynnada -- https://docs.google.com/document/d/19IhOklwkhIm09-3XPpZHf75frtxoQtMa0g-1QG8Da1c/edit?usp=sharing looks great

I can't wait to help with this feature and use DataFusion to write resorted parquet files.

My major feedback is:

  1. Use a method on TableProvider to support writing
  2. Ensure other users can opt in / out of modifiable external table support

cc @avantgardnerio who has also been thinking about ways to support writing to tables. Not sure who else might be interested in this proposal too but it might be worth a note to the mailing list dev@arrow.apache.org and the ASF slack channel pointing at this ticket and asking for feedback.

@aplunk
Copy link

aplunk commented Feb 23, 2023

Hello @alamb, thanks for all your work on this project. I've noticed that the current TableProvider API does not seem to support getting a mutable reference to a downcasted table via the as_any method. Is this expected, and if not do we expect the API to change to support mutability when this issue is complete?

@alamb
Copy link
Contributor

alamb commented Feb 23, 2023

Is this expected, and if not do we expect the API to change to support mutability when this issue is complete?

TLDR is I don't expect this to change. The TableProviders are wrapped in Arcs and potentially shared across multiple SessionContexts

To support mutability the TableProvider implementation would need to implement "interior mutability" (for example have some sort of Mutex or RwLock to control access)

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2023

To support mutability the TableProvider implementation would need to implement "interior mutability"

I think this touches on a key point, there needs to be some sort of consistency/atomicity story here. Most users would likely expect that INSERT INTO is atomic, i.e. a query sees all the inserted data or none of the inserted data. They may additionally have expectations with respect to transaction isolation / serializability.

Blindly appending to a CSV / JSON file without any external coordination will result in queries seeing partial or potentially corrupted data

One common approach is for new data to always be written to a new file, thus ensuring atomicity.

This basic approach can then be optionally extended with things like:

  • A Write-Ahead Log (local or distributed) and MemTable to reduce file churn
  • Catalog functionality, such as provided by deltalake or lakehouse, to support in-place, atomic rewrites, transactions, etc...
  • Compaction functionality (deltalake calls this bin-packing) to coalesce small files into larger ones

I think adding some pieces of functionality for this to DataFusion would be amazing, and may even be of interest to the delta-rs folks (FYI @roeap), but may benefit from having a more fleshed out catalog story first (#5291)

@metesynnada
Copy link
Contributor Author

metesynnada commented Mar 1, 2023

I agree with the atomicity. Aborted queries might result corrupted files.

In which cases do we use abort_multipart for put_multipart? Do you have a use case in another library for these APIs? Maybe I can iterate a design for such functionality in Datafusion.

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2023

In which cases do we use abort_multipart for put_multipart

In an ideal world, every time you wish to abandon a multipart upload... In practice Azure and GCP automatically cleanup incomplete multipart uploads after a given time, and S3 can be configured to have similar behaviour.

Do you have a use case in another library for these APIs

I believe delta-rs is making use of them. FWIW in the case of object stores the minimum PUT payload is 5MB, so if uploading files smaller than this there is no benefit over a regular PUT request.

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2023

Aborted queries might result corrupted files.

Aborted queries definitely could, but also the append operation itself is likely not atomic, as it may take multiple calls to the underlying write primitive to completely flush the payload. In the intervening time between these writes any reader may see incomplete data, and any parallel writer would interleave its writes resulting in data corruption

@ozankabak
Copy link
Contributor

Let's talk about all these in the sync-meeting. Some of the concerns I see here are definitely valid/important (e.g. atomicity of appends), but I also think some higher-level concerns are bleeding into the discussion of a lower-level mechanism. It is great to think ahead, but we should also be mindful of separation of concern/complexity.

@alamb
Copy link
Contributor

alamb commented Mar 2, 2023

Here is what I heard at our meeting (documenting here in case others are following along). Please correct me if I am wrong

  1. DataFusion should have some sort of Trait (separate from TableProvider) for "writing to a table / sink. This trait will allow other systems to implement whatever semantics they may way
  2. The trait should support both "streaming" (incremental writing and appending) as well as writing complete files
  3. There will be some sort of implementation in DataFusion that allows writing to the existing file formats (e.g. parquet, csv, etc) that may or may not support appending (or may support appending when there is some external coordination) but this implementation will remain simple

The INSERT INTO <...> and COPY ... functionality will be implemented in terms of the trait.

@avantgardnerio
Copy link
Contributor

The trait should support both "streaming"

I missed the call, but this would be great for our use-case! Thanks guys for thinking of us.

@metesynnada
Copy link
Contributor Author

Here is what I heard at our meeting (documenting here in case others are following along). Please correct me if I am wrong

  1. DataFusion should have some sort of Trait (separate from TableProvider) for "writing to a table / sink. This trait will allow other systems to implement whatever semantics they may way
  2. The trait should support both "streaming" (incremental writing and appending) as well as writing complete files
  3. There will be some sort of implementation in DataFusion that allows writing to the existing file formats (e.g. parquet, csv, etc) that may or may not support appending (or may support appending when there is some external coordination) but this implementation will remain simple

The INSERT INTO <...> and COPY ... functionality will be implemented in terms of the trait.

I think this summarizes the meeting well. I made a POC asap on trait implementation.

@alamb
Copy link
Contributor

alamb commented Mar 2, 2023

Related discussion: apache/arrow-rs#3791

@roeap
Copy link
Contributor

roeap commented Mar 2, 2023

I think adding some pieces of functionality for this to DataFusion would be amazing, and may even be of interest to the delta-rs folks (FYI @roeap)

This would be amazing indeed. In fact I am currently working on the full implementation of deltas optimistic commit protocol. And we'd be more then happy to adopt these new APIs as well as contribute to their implementation here. We also have users asking to support more catalogs, and adopting datafusion traits for this is always a great option.

Do you have a use case in another library for these APIs
I believe delta-rs is making use of them.

Indeed we do use the multipart APIs.

@ozankabak
Copy link
Contributor

Fantastic! We will have an initial implementation ready sometime next week.

@alamb
Copy link
Contributor

alamb commented Aug 17, 2023

I believe this is now complete, given @devinjdangelo 's recent work and the work from @metesynnada and @ozankabak earlier. Additional work is tracked in #6569

@alamb alamb closed this as completed Aug 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants