Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use more than one core/thread when querying CSV #5205

Closed
kmitchener opened this issue Feb 6, 2023 · 6 comments
Closed

use more than one core/thread when querying CSV #5205

kmitchener opened this issue Feb 6, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@kmitchener
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

When querying a CSV file on local disk only 1 thread is used, leaving the system mostly idle. For example, I have a 9 GB gzip-compressed CSV file that I'd like to query and convert to Parquet format. However, when running queries against the CSV, it can take 5 minutes or more for simple counts or group bys on single columns. 5 minutes per query isn't too unexpected (when uncompressed, it's a 100 GB file, 45 million records x 242 columns). But it's nowhere near utilizing all the machine's resources. It seems like there should be a way to move more of the work to other threads and querying/conversion to parquet could be substantially faster.

When profiled, it looks like this (green is the only busy thread):
image

and if you zoom into what that thread is doing, it's following a pattern of:

  • read data from disk
  • then for about 10 cycles, it
    • decompresses a chunk
    • reads csv records (csv_core) from decompressed chunk
  • read again from disk and repeat the cycle

image

Describe the solution you'd like

Better utilization of the machine, work done in more than one thread and sustained reads from disk.

Describe alternatives you've considered

Is it possible to offload the decompression and processing of bytes -> csv records to other threads? I had an idea to try to implement some kind of fan-out/fan-in somewhere under CsvExec so that one thread is just reading from disk and sending the raw bytes to other threads to decompress/csv_read/convert to recordbatches (CPU-heavy work is in the CSV reading) -> fan back in to stream the recordbatches from CsvExec as expected. Is there a better way to do it?

Additional context
Add any other context or screenshots about the feature request here.

@kmitchener kmitchener added the enhancement New feature or request label Feb 6, 2023
@tustvold
Copy link
Contributor

tustvold commented Feb 6, 2023

Is it possible to offload the decompression and processing of bytes

I definitely think a first step would be to interleave the fetching of the next set of bytes and decoding. This might be as simple as adding buffered(1) to the stream returned by object_store and polling it every call to poll_fn. I'd be happy to get a PR up to do this, if you'd be willing to help test it out (I don't have any realistic CSV workloads)

I had an idea to try to implement some kind of fan-out/fan-in somewhere under CsvExec so that one thread is just reading from disk and sending the raw bytes to other threads to decompress/csv_read/convert to recordbatches

Doing this correctly is quite tricky as CSV permits unescaped newlines, which makes it actually impossible to know where a given record starts without having read all prior records, you can't just seek to the next newline character as that might be part way through a record.

That being said, a trick that I believe duckdb uses is to search for the next newline, and then test the first record against the expected schema, if it matches you have likely found a newline from which you can start reading.

5 minutes per query isn't too unexpected (when uncompressed, it's a 100 GB file, 45 million records x 242 columns)

Not to blow my own trumpet, but I'm pretty chuffed by this, 2.7Gbps on a single core is pretty sweet 😁 We can and should make it even faster, but I think this is already pretty cool.

@kmitchener
Copy link
Contributor Author

I definitely think a first step would be to interleave the fetching of the next set of bytes and decoding. This might be as simple as adding buffered(1) to the stream returned by object_store and polling it every call to poll_fn. I'd be happy to get a PR up to do this, if you'd be willing to help test it out (I don't have any realistic CSV workloads)

Sure, of course, I'll be happy to test out any patches.

I had an idea to try to implement some kind of fan-out/fan-in somewhere under CsvExec so that one thread is just reading from disk and sending the raw bytes to other threads to decompress/csv_read/convert to recordbatches

Doing this correctly is quite tricky as CSV permits unescaped newlines, which makes it actually impossible to know where a given record starts without having read all prior records, you can't just seek to the next newline character as that might be part way through a record.

That being said, a trick that I believe duckdb uses is to search for the next newline, and then test the first record against the expected schema, if it matches you have likely found a newline from which you can start reading.

Ah, yes. I was thinking of using the function you already made for something similar: newline_delimited_stream. It could at least help move the CSV-parsing -> recordbatch to other threads, if not decompression. CSV-parsing is the largest part of the work. But after playing with it for a while, I'm pretty sure this is beyond my capabilities.

5 minutes per query isn't too unexpected (when uncompressed, it's a 100 GB file, 45 million records x 242 columns)

Not to blow my own trumpet, but I'm pretty chuffed by this, 2.7Gbps on a single core is pretty sweet 😁 We can and should make it even faster, but I think this is already pretty cool.

Yeah, it's pretty great :) for a simple select count(*) .. query, current DF is faster than duckdb against my large test file. About twice as fast.

@alamb
Copy link
Contributor

alamb commented Feb 7, 2023

DuckDB does parallel csv reading, FWIW. duckdb/duckdb#5194

It would be great to implement this feature in DataFusion

Regarding the "csv escaping means you can't always know when a newline is a record delimiter" I suggest:

  1. Default to parsing using multi cores with a newline heuristic, and document that it may be incorrect in some cases
  2. Have a config setting that switches back to the slower but "handles newlines correctly" behavior

A small variation of 1 would be to detect (and error) if DataFusion realized the parallel split did not work well, and produce an error that mentioned the config setting,.

@alamb
Copy link
Contributor

alamb commented Feb 7, 2023

(nice work @tustvold on CSV reading). Maybe we can put this data into the blog post too apache/arrow-site#305

@alamb
Copy link
Contributor

alamb commented May 10, 2023

I filed #6325 to track a feature that might enable this to work

@alamb
Copy link
Contributor

alamb commented Jul 17, 2023

Closed by #6801

@alamb alamb closed this as completed Dec 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants