Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support read_parquet for backend with no native support #9744

Open
wants to merge 31 commits into
base: main
Choose a base branch
from

Conversation

jitingxu1
Copy link
Contributor

Description of changes

Support read_parquet for backends that do not have native support (like duckdb). This implementation leverages the PyArrow read_table function.

If a backend does not have its own version, it will fall back on this pyarrow implementation.

Issues closed

This addresses part of issue #9448. Additional tasks related to this issue will be completed and submitted individually.

@cpcloud
Copy link
Member

cpcloud commented Aug 1, 2024

@jitingxu1 This PR has a lot of failures. Can you take a look so we can decide how to move forward?

table = con.read_parquet(tmp_path / f"*.{ext}")
if con.name == "clickhouse":
# clickhouse does not support read directory
table = con.read_parquet(tmp_path / f"*.{ext}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like the right approach. You're changing what's being tested. Why can't you leave this code unchanged here?

Copy link
Contributor Author

@jitingxu1 jitingxu1 Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pyarrow read_table cannot read things like tmp_path/*.parquet and clickhouse cannot read the directory.

We have three kinds of read_parquet:

  • backends use pyarrow read_table could read single file and directory, but not glob pattern without specify the filesystem.
  • duckdb, and some other accepts all the above three formats
  • clickhouse does not accept directory.

Maybe we could add something before read_table to convert the path/*.parquet --> path, then it accepts all

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is whether the backend can read a glob of parquet files, the answer to that seems to be "no", so it should be marked as notyet

ibis/backends/tests/test_register.py Outdated Show resolved Hide resolved
@jitingxu1
Copy link
Contributor Author

rewrite it to support urls as the input:

  • regular url: https, 'ftp' and so on
  • fsspec compatible url: s3, gcp
  • local files
    • support: single file, directory, glob patterns

@cpcloud for review again, Thanks

ibis/backends/__init__.py Outdated Show resolved Hide resolved
@jitingxu1 jitingxu1 requested a review from cpcloud August 21, 2024 16:43
@jitingxu1
Copy link
Contributor Author

increased the test coverage. @cpcloud

ibis/backends/__init__.py Outdated Show resolved Hide resolved
self.create_table(table_name, table)
return self.table(table_name)

def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table:
Copy link
Member

@cpcloud cpcloud Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the implementation of this just be:

return pq.read_table(path, **kwargs)

Did you try that already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that in my first commit, it cannot handle all the cases:

such as glob pattern and Parquet files hosted on some uri: i.e HTTPS SFTP

Pyarrow implements natively the following filesystem subclasses:

Local FS (LocalFileSystem)

S3 (S3FileSystem)

Google Cloud Storage File System (GcsFileSystem)

Hadoop Distributed File System (HDFS) (HadoopFileSystem)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cpcloud does this make sense to you?

@jitingxu1 jitingxu1 requested a review from cpcloud August 27, 2024 00:09
ibis/backends/__init__.py Outdated Show resolved Hide resolved
@jitingxu1
Copy link
Contributor Author

jitingxu1 commented Sep 19, 2024

HI @cpcloud ,

I got several timeout error in the CI for this PR, is there something we need to fix in another PR

FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_74[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_72[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_94[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_92[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_07[trino]@tpcds - Failed: Timeout >90.0s

is it related to trino setup in the CI?

@github-actions github-actions bot added the tests Issues or PRs related to tests label Sep 20, 2024
@jitingxu1
Copy link
Contributor Author

Hi @cpcloud

In this PR, I have the trino/impala test on read_parquet, it reads about 7300 rows from functional_alltypes.parquet(seems like insertion after 10k rows will have a performance issue), I suspect it impacts the trino database performance, I have the following timeout error in other tests, I suspect it is caused by read large parquet file in test_read_parquet, does this make sense?

Should I skip the Trino and Impala in this test too? Or do you have better way to handle this?

I got several timeout error in the CI for this PR, is there something we need to fix in another PR

FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_74[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_72[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_94[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_92[trino]@tpcds - Failed: Timeout >90.0s
FAILED ibis/backends/tests/tpc/ds/test_queries.py::test_07[trino]@tpcds - Failed: Timeout >90.0s

is it related to trino setup in the CI?

@gforsyth
Copy link
Member

I'm going to try something here to see if I can isolate which test is leaving us in a (sometimes) broken state only on the nix osx runs

@gforsyth
Copy link
Member

Ok, that's one pass for the nix mac osx job. I'm going to cycle it a few times to make sure.

Copy link
Member

@gforsyth gforsyth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of my comments are inline -- I think we should start off offering this with a simplified implementation. It's nice to offer users options, but I think we should be measured in how much we enable directly.

Comment on lines 1297 to 1299
When reading data from cloud storage (such as Amazon S3 or Google Cloud Storage),
credentials can be provided via the `filesystem` argument by creating an appropriate
filesystem object (e.g., `pyarrow.fs.S3FileSystem`).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, and additionally, pq.read_table also supports the standard aws auth patterns (environment variables, or aws SSO credentials, or instance credentials)

Comment on lines 1371 to 1395
import pyarrow.parquet as pq

path = str(path)
# handle url
if util.is_url(path):
import fsspec

credentials = kwargs.pop("credentials", {})
with fsspec.open(path, **credentials) as f:
with BytesIO(f.read()) as reader:
return pq.read_table(reader)

# handle fsspec compatible url
if util.is_fsspec_url(path):
return pq.read_table(path, **kwargs)

# Handle local file paths or patterns
paths = glob.glob(path)
if not paths:
raise ValueError(f"No files found matching pattern: {path!r}")
elif len(paths) == 1:
paths = paths[0]

return pq.read_table(paths, **kwargs)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should reconsider handling all of these cases -- this sort of branching logic means that when a user reports an error, we'll have any number of possible culprits to consider, and it makes it harder to debug for everyone.

I think (and I could be wrong) that nearly all of these cases are covered by pq.read_table by itself, and that's much easier to document and debug.

read_table also has support for being passed an fsspec object, so if someone needs to read from a hypertext url, they can use fsspec as a shim for that. (This is something we can add a note about in the docstring).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pq.read_table could handle most of the cases, I will simplify the logic, to see how much cases could be covered. Thanks for your suggestion.

path = str(path)
# handle url
if util.is_url(path):
import fsspec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsspec is not a dependency of Ibis (it's in our test suite) so this would need extra import handling if we leave it in (but see my other comments)

@gforsyth
Copy link
Member

Ok, that's one pass for the nix mac osx job. I'm going to cycle it a few times to make sure.

Ok, skipping the mocked URL test on DuckDB seems to have resolved the nested transaction failures on the nix osx CI job

@jitingxu1
Copy link
Contributor Author

Ok, that's one pass for the nix mac osx job. I'm going to cycle it a few times to make sure.

Ok, skipping the mocked URL test on DuckDB seems to have resolved the nested transaction failures on the nix osx CI job

Thank you so much.

@util.experimental
def read_parquet(
self, path: str | Path | BytesIO, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of BytesIO, I could pass the fsspec object, It could be HTTPFile if we pass an HTTP url. Not sure what is the best way to handle the type of path

@gforsyth any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fsspec is a good option.

"""Register a parquet file as a table in the current backend.

This function reads a Parquet file and registers it as a table in the current
backend. Note that for Impala and Trino backends, the performance
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
backend. Note that for Impala and Trino backends, the performance
backend. Note that for the Impala and Trino backends, the performance


table_name = table_name or util.gen_name("read_parquet")
paths = list(glob.glob(str(path)))
if paths:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment here indicating that this is to help with reading from remote file locations

else:
table = pq.read_table(path, **kwargs)

self.create_table(table_name, table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the read_csv PR, this should probably be a memtable so we don't create a persistent table by default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests Issues or PRs related to tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants