Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stackoverflow when doing diagonal concat of 17k CSVs #13915

Closed
2 tasks done
mdavis-xyz opened this issue Jan 22, 2024 · 2 comments · Fixed by #13930
Closed
2 tasks done

stackoverflow when doing diagonal concat of 17k CSVs #13915

mdavis-xyz opened this issue Jan 22, 2024 · 2 comments · Fixed by #13930
Assignees
Labels
A-streaming Related to the streaming engine accepted Ready for implementation bug Something isn't working P-medium Priority: medium python Related to Python Polars

Comments

@mdavis-xyz
Copy link
Contributor

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import os
import re

import polars as pl

source_dir = '/home/matthew/data/debug/segfault/data'
dest_path = '/home/matthew/data/debug/segfault/test.parquet' 

schema = {
    'a': pl.String(),
    'b': pl.String(),
}

csv_data = """a,b
A,B"""

# generate the data
csv_paths = [os.path.join(source_dir, f"{i}.csv") for i  in range(17000)]
for csv_path in csv_paths:
    with open(csv_path, 'w') as f:
        f.write(csv_data)
print("Data generated")
# create LazyFrames for them

datasets = []
for csv_path in csv_paths:

    ds = pl.scan_csv(csv_path, dtypes=schema)
    
    datasets.append(ds)
        
ds = pl.concat(datasets, how='diagonal')
#ds = pl.concat(datasets, how='vertical')
#ds = pl.scan_csv(csv_paths)

ds.sink_parquet(dest_path)

(Change source_dir and dest_path accordingly.)

Log output

> RUN STREAMING PIPELINE
csv -> fast_projection -> pass -> parquet_sink
RefCell { value: [csv -> fast_projection -> pass -> parquet_sink, csv -> fast_projection -> pass -> parquet_sink, ... (repeated thousands of times) ...  fast_projection -> pass -> parquet_sink] }
> STREAMING CHUNK SIZE: 25000 rows
> STREAMING CHUNK SIZE: 25000 rows
> ...
> STREAMING CHUNK SIZE: 25000 rows
> STREAMING CHUNK SIZE: 25000 rows
> STREAMING CHUNK SIZE: 25000 rows
> Erreur de segmentation (core dumped)

Issue description

My context:

  • I have 100,000 small CSV files which I want to convert into one deduplicated parquet file.
  • The CSV files have hive partitioning. I want the x=1/y=2 style directory paths to end up as columns in the final dataframe. Hence why I'm not just passing a list of CSVs to scan_csv
  • In my real dataset, some files are missing some columns. That's why I need diagonal concat. (I'm not sure if there's some way to tell scan_csv what columns I expect, such that it returns them present but Null.)

Note that when using a vertical concat instead of diagonal, or using scan_csv(csv_paths), I instead get:

thread '<unnamed>' panicked at crates/polars-pipe/src/executors/sources/csv.rs:63:14:
called `Result::unwrap()` on an `Err` value: Io(Custom { kind: Uncategorized, error: "Too many open files (os error 24): /home/matthew/data/debug/segfault/data/1020.csv" })
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/home/matthew/Documents/TSE/AppliedEconometrics/repo/segfault2.py", line 35, in <module>
    ds.sink_parquet(dest_path)
  File "/home/matthew/Documents/TSE/AppliedEconometrics/repo/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py", line 1971, in sink_parquet
    return lf.sink_parquet(
           ^^^^^^^^^^^^^^^^
pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: Io(Custom { kind: Uncategorized, error: "Too many open files (os error 24): /home/matthew/data/debug/segfault/data/1020.csv" })

If I halve the number of files, I don't get this error.

Expected behavior

Polars should be smart enough to only open a reasonable number of files at a time. (Closing and re-openning if required.)

Even if this is not possible, I expect a clear error not a seg fault. (I thought it was theoretically impossible to get a seg fault with Rust.)

Installed versions

--------Version info---------
Polars:               0.20.5
Index type:           UInt32
Platform:             Linux-6.2.0-39-generic-x86_64-with-glibc2.37
Python:               3.11.4 (main, Dec  7 2023, 15:43:41) [GCC 12.3.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fsspec:               2023.12.2
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
numpy:                1.26.3
openpyxl:             3.1.2
pandas:               2.1.4
pyarrow:              14.0.2
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>

I'm on Ubuntu.

@mdavis-xyz mdavis-xyz added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jan 22, 2024
@stinodego stinodego added the A-io-csv Area: reading/writing CSV files label Jan 23, 2024
@ritchie46 ritchie46 self-assigned this Jan 23, 2024
@ritchie46
Copy link
Member

ritchie46 commented Jan 23, 2024

@stinodego the csv is a read herring. This is a stackoverflow in streaming engine. The sink parquet shares the append union which currently is implemented recursively per pipeline. 17K in this case.

Even if this is not possible, I expect a clear error not a seg fault. (I thought it was theoretically impossible to get a seg fault with Rust.)

No it is not. And a stackoverflow also isn't handled gracefully that's why you don't get a nice error.

@ritchie46 ritchie46 added P-medium Priority: medium and removed A-io-csv Area: reading/writing CSV files needs triage Awaiting prioritization by a maintainer labels Jan 23, 2024
@stinodego stinodego added the A-streaming Related to the streaming engine label Jan 23, 2024
@mdavis-xyz
Copy link
Contributor Author

Ah ok, that makes sense.

My workaround for now is to process the files in batches (17k CSV to 17 parquet in 17 batches of 1000, then another step to combine 17 parquet files into 1 parquet file.). But for different datasets the limit is lower than 17k.

@c-peters c-peters added the accepted Ready for implementation label Jan 29, 2024
@ritchie46 ritchie46 changed the title Seg fault when doing diagonal concat of 17k CSVs stackoverflow when doing diagonal concat of 17k CSVs Feb 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-streaming Related to the streaming engine accepted Ready for implementation bug Something isn't working P-medium Priority: medium python Related to Python Polars
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

4 participants