Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV/JSON Writes Not Guarenteed to Preserve Expected Ordering #7536

Closed
devinjdangelo opened this issue Sep 13, 2023 · 2 comments
Closed

CSV/JSON Writes Not Guarenteed to Preserve Expected Ordering #7536

devinjdangelo opened this issue Sep 13, 2023 · 2 comments
Labels
bug Something isn't working

Comments

@devinjdangelo
Copy link
Contributor

Describe the bug

Initial implementation of #7452 intended to preserve the ordering of rows in CSV/JSON files in case a user runs a query like:

COPY (select * from my_table order by my_col)
TO my_file.csv

It is reasonable to expect that the CSV should be ordered by my_col. When this function: https://github.com/apache/arrow-datafusion/blob/561e0d7e87825aba224bf2eb9c3b8b5e1b725597/datafusion/core/src/datasource/file_format/write.rs#L310-L393

was updated to include the mpsc::channel I believe we lost the guarantee of preserving expected file order. The channel is nice since it introduces backpressure and ensures memory requirements do not grow without bound in case ObjectStore writes are falling behind, but I am not sure how to preserve the ordering of serialized RecordBatches in the channel construct.

To Reproduce

I have not yet verified a specific case where order is not preserved (TODO), but I don't see any reason why it is guarenteed since the channel does not preserve ordering (it will depend on how tokio schedules the tasks).

Expected behavior

We should guarantee that file ordering is preserved regardless of parallelization.

Additional context

No response

@devinjdangelo devinjdangelo added the bug Something isn't working label Sep 13, 2023
@metesynnada
Copy link
Contributor

metesynnada commented Sep 13, 2023

It would be helpful if you could provide a reproducible example.

In the channel, the order-preserving mechanism depends on the serialization task handles that are awaited one by one. Since the handles are created in the order of the data stream, it will preserve the order.

while let Some(handle) = rx.recv().await { 
         match handle.await { 
               ....
         }
}

We are not using a JoinSet or similar mechanism for serialization task handles.

@devinjdangelo
Copy link
Contributor Author

@metesynnada it looks like you are right! I tested writing out large (1GB+) sorted CSV files and they came out sorted correctly over many runs. So it does seem that the tasks are scheduled and placed in the queue in a consistent order. This is somewhat surprising to me, but I suppose it does make sense that tokio tasks are scheduled more consistently vs. threads.

Closing this issue now unless anyone else sees a situation in which sort order is not preserved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants