From ab2ad168049eaf391f96c1b6ad952de47ec60619 Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Wed, 31 Jul 2024 17:18:36 -0700 Subject: [PATCH 01/20] support read_parquet for backend with no native support --- ibis/backends/__init__.py | 31 ++++++++++++++++++++++++++++ ibis/backends/tests/test_register.py | 16 -------------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index b3c6a6e30107..6830bd7f7af9 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -1212,6 +1212,37 @@ def has_operation(cls, operation: type[ops.Value]) -> bool: f"{cls.name} backend has not implemented `has_operation` API" ) + def read_parquet( + self, path: str | Path, table_name: str | None = None, **kwargs: Any + ) -> ir.Table: + """Register a parquet file as a table in the current backend. + + Parameters + ---------- + path + The data source. May be a path to a file, an iterable of files, + or directory of parquet files. + table_name + An optional name to use for the created table. This defaults to + a sequentially generated name. + **kwargs + Additional keyword arguments passed to the pyarrow loading function. + See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html + for more information. + + Returns + ------- + ir.Table + The just-registered table + + """ + import pyarrow.parquet as pq + + table = pq.read_table(path, **kwargs) + table_name = table_name or util.gen_name("read_parquet") + self.create_table(table_name, table) + return self.table(table_name) + def _cached(self, expr: ir.Table): """Cache the provided expression. diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 64d8b23792b9..4b32a2358957 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -421,9 +421,6 @@ def test_register_garbage(con, monkeypatch): ("functional_alltypes.parquet", "funk_all"), ], ) -@pytest.mark.notyet( - ["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"] -) def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): pq = pytest.importorskip("pyarrow.parquet") @@ -452,19 +449,6 @@ def ft_data(data_dir): return table.slice(0, nrows) -@pytest.mark.notyet( - [ - "flink", - "impala", - "mssql", - "mysql", - "pandas", - "postgres", - "risingwave", - "sqlite", - "trino", - ] -) def test_read_parquet_glob(con, tmp_path, ft_data): pq = pytest.importorskip("pyarrow.parquet") From 661f50d335119b8ac9f1484854dc0c046870de73 Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:46:41 -0700 Subject: [PATCH 02/20] fix unit tests --- ibis/backends/tests/test_register.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 4b32a2358957..37c958f50138 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -421,6 +421,7 @@ def test_register_garbage(con, monkeypatch): ("functional_alltypes.parquet", "funk_all"), ], ) +@pytest.mark.notyet(["flink"]) def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): pq = pytest.importorskip("pyarrow.parquet") @@ -449,6 +450,7 @@ def ft_data(data_dir): return table.slice(0, nrows) +@pytest.mark.notyet(["flink"]) def test_read_parquet_glob(con, tmp_path, ft_data): pq = pytest.importorskip("pyarrow.parquet") @@ -460,7 +462,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data): for fname in fnames: pq.write_table(ft_data, tmp_path / fname) - table = con.read_parquet(tmp_path / f"*.{ext}") + table = con.read_parquet(tmp_path) assert table.count().execute() == nrows * ntables From e16f1bb93de0b72aa8336690acfff4dd6b6290df Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Thu, 1 Aug 2024 18:50:46 -0700 Subject: [PATCH 03/20] resolve Xpass and clickhouse tests --- ibis/backends/tests/test_register.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 37c958f50138..df6b8c23980f 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -425,6 +425,9 @@ def test_register_garbage(con, monkeypatch): def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): pq = pytest.importorskip("pyarrow.parquet") + if con.name in ["oracle", "exasol"]: + pytest.skip("Skip Exasol and Oracle because of the global pytestmark") + fname = Path(fname) fname = Path(data_dir) / "parquet" / fname.name table = pq.read_table(fname) @@ -462,7 +465,11 @@ def test_read_parquet_glob(con, tmp_path, ft_data): for fname in fnames: pq.write_table(ft_data, tmp_path / fname) - table = con.read_parquet(tmp_path) + if con.name == "clickhouse": + # clickhouse does not support read directory + table = con.read_parquet(tmp_path / f"*.{ext}") + else: + table = con.read_parquet(tmp_path) assert table.count().execute() == nrows * ntables From eaec7a2ccfe040aca9a0f71b0affc238096c4eac Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Mon, 5 Aug 2024 16:44:27 -0700 Subject: [PATCH 04/20] handle different inputs --- ibis/backends/__init__.py | 31 +++++++++++++++++-- ibis/backends/tests/test_register.py | 30 ++++++++++-------- ibis/util.py | 46 ++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 15 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 6830bd7f7af9..13df578f836f 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -3,10 +3,13 @@ import abc import collections.abc import functools +import glob import importlib.metadata import keyword import re import urllib.parse +import urllib.request +from io import BytesIO from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar @@ -1236,13 +1239,37 @@ def read_parquet( The just-registered table """ - import pyarrow.parquet as pq - table = pq.read_table(path, **kwargs) + table = self._get_pyarrow_table_from_path(path, **kwargs) table_name = table_name or util.gen_name("read_parquet") self.create_table(table_name, table) return self.table(table_name) + def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table: + pq = util.import_object("pyarrow.parquet") + + path = str(path) + # handle url + if util.is_url(path): + headers = kwargs.pop("headers", {}) + req_info = urllib.request.Request(path, headers=headers) # noqa: S310 + with urllib.request.urlopen(req_info) as req: # noqa: S310 + with BytesIO(req.read()) as reader: + return pq.read_table(reader) + + # handle fsspec compatible url + if util.is_fsspec_url(path): + return pq.read_table(path, **kwargs) + + # Handle local file paths or patterns + paths = glob.glob(path) + if not paths: + raise ValueError(f"No files found matching pattern: {path!r}") + elif len(paths) == 1: + paths = paths[0] + + return pq.read_table(paths, **kwargs) + def _cached(self, expr: ir.Table): """Cache the provided expression. diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index df6b8c23980f..4c51c8f0c6d2 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -18,8 +18,6 @@ import pyarrow as pa -pytestmark = pytest.mark.notimpl(["druid", "exasol", "oracle"]) - @contextlib.contextmanager def pushd(new_dir): @@ -98,6 +96,7 @@ def gzip_csv(data_dir, tmp_path): "trino", ] ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_register_csv(con, data_dir, fname, in_table_name, out_table_name): with pushd(data_dir / "csv"): with pytest.warns(FutureWarning, match="v9.1"): @@ -109,7 +108,7 @@ def test_register_csv(con, data_dir, fname, in_table_name, out_table_name): # TODO: rewrite or delete test when register api is removed -@pytest.mark.notimpl(["datafusion"]) +@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"]) @pytest.mark.notyet( [ "bigquery", @@ -153,6 +152,7 @@ def test_register_csv_gz(con, data_dir, gzip_csv): "trino", ] ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_register_with_dotted_name(con, data_dir, tmp_path): basename = "foo.bar.baz/diamonds.csv" f = tmp_path.joinpath(basename) @@ -212,6 +212,7 @@ def read_table(path: Path) -> Iterator[tuple[str, pa.Table]]: "trino", ] ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_register_parquet( con, tmp_path, data_dir, fname, in_table_name, out_table_name ): @@ -252,6 +253,7 @@ def test_register_parquet( "trino", ] ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_register_iterator_parquet( con, tmp_path, @@ -278,7 +280,7 @@ def test_register_iterator_parquet( # TODO: modify test to use read_in_memory when implemented xref: 8858 -@pytest.mark.notimpl(["datafusion"]) +@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"]) @pytest.mark.notyet( [ "bigquery", @@ -312,7 +314,7 @@ def test_register_pandas(con): # TODO: modify test to use read_in_memory when implemented xref: 8858 -@pytest.mark.notimpl(["datafusion", "polars"]) +@pytest.mark.notimpl(["datafusion", "polars", "druid", "exasol", "oracle"]) @pytest.mark.notyet( [ "bigquery", @@ -357,6 +359,7 @@ def test_register_pyarrow_tables(con): "trino", ] ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_csv_reregister_schema(con, tmp_path): foo = tmp_path.joinpath("foo.csv") with foo.open("w", newline="") as csvfile: @@ -386,10 +389,13 @@ def test_csv_reregister_schema(con, tmp_path): "clickhouse", "dask", "datafusion", + "druid", + "exasol", "flink", "impala", "mysql", "mssql", + "oracle", "pandas", "polars", "postgres", @@ -422,12 +428,10 @@ def test_register_garbage(con, monkeypatch): ], ) @pytest.mark.notyet(["flink"]) +@pytest.mark.notimpl(["druid"]) def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): pq = pytest.importorskip("pyarrow.parquet") - if con.name in ["oracle", "exasol"]: - pytest.skip("Skip Exasol and Oracle because of the global pytestmark") - fname = Path(fname) fname = Path(data_dir) / "parquet" / fname.name table = pq.read_table(fname) @@ -454,6 +458,7 @@ def ft_data(data_dir): @pytest.mark.notyet(["flink"]) +@pytest.mark.notimpl(["druid"]) def test_read_parquet_glob(con, tmp_path, ft_data): pq = pytest.importorskip("pyarrow.parquet") @@ -465,11 +470,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data): for fname in fnames: pq.write_table(ft_data, tmp_path / fname) - if con.name == "clickhouse": - # clickhouse does not support read directory - table = con.read_parquet(tmp_path / f"*.{ext}") - else: - table = con.read_parquet(tmp_path) + table = con.read_parquet(tmp_path / f"*.{ext}") assert table.count().execute() == nrows * ntables @@ -487,6 +488,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data): "trino", ] ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_read_csv_glob(con, tmp_path, ft_data): pc = pytest.importorskip("pyarrow.csv") @@ -523,6 +525,7 @@ def test_read_csv_glob(con, tmp_path, ft_data): raises=ValueError, reason="read_json() missing required argument: 'schema'", ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_read_json_glob(con, tmp_path, ft_data): nrows = len(ft_data) ntables = 2 @@ -569,6 +572,7 @@ def num_diamonds(data_dir): @pytest.mark.notyet( ["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"] ) +@pytest.mark.notimpl(["druid", "exasol", "oracle"]) def test_read_csv(con, data_dir, in_table_name, num_diamonds): fname = "diamonds.csv" with pushd(data_dir / "csv"): diff --git a/ibis/util.py b/ibis/util.py index d127f141061b..95f7ff11717d 100644 --- a/ibis/util.py +++ b/ibis/util.py @@ -18,6 +18,12 @@ import warnings from types import ModuleType from typing import TYPE_CHECKING, Any, Generic, TypeVar +from urllib.parse import ( + urlparse, + uses_netloc, + uses_params, + uses_relative, +) from uuid import uuid4 import toolz @@ -42,6 +48,10 @@ # https://www.compart.com/en/unicode/U+2026 HORIZONTAL_ELLIPSIS = "\u2026" +_VALID_URLS = set(uses_relative + uses_netloc + uses_params) +_VALID_URLS.discard("") +_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://") + def guid() -> str: """Return a uuid4 hexadecimal value.""" @@ -485,6 +495,42 @@ def import_object(qualname: str) -> Any: raise ImportError(f"cannot import name {name!r} from {mod_name!r}") from None +def is_url(url: str) -> bool: + """Check to see if a URL has a valid protocol. + + Parameters + ---------- + url : str + The URL to be checked. + + Returns + ------- + bool + True if the URL has a valid protocol, False otherwise + + """ + + return urlparse(url).scheme in _VALID_URLS + + +def is_fsspec_url(url: str) -> bool: + """Check if the given URL looks like something fsspec can handle. + + Parameters + ---------- + url : str + The URL string to be checked. + + Returns + ------- + bool + True if the URL is likely compatible with fsspec, False otherwise. + """ + return bool(_RFC_3986_PATTERN.match(url)) and not url.startswith( + ("http://", "https://") + ) + + def normalize_filename(source: str | Path) -> str: source = str(source) for prefix in ( From 27d7a08efeb48fd59ce207591d3b170742f85183 Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Mon, 5 Aug 2024 19:49:35 -0700 Subject: [PATCH 05/20] pandas not suporting glob pattern --- ibis/backends/tests/test_register.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index faf55d9aa196..5f8b5a06ea71 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -315,7 +315,6 @@ def test_register_pandas(con): assert t.x.sum().execute() == 6 - # TODO: remove entirely when `register` is removed # This same functionality is implemented across all backends # via `create_table` and tested in `test_client.py` @@ -462,7 +461,7 @@ def ft_data(data_dir): return table.slice(0, nrows) -@pytest.mark.notyet(["flink"]) +@pytest.mark.notyet(["flink", "pandas"]) @pytest.mark.notimpl(["druid"]) def test_read_parquet_glob(con, tmp_path, ft_data): pq = pytest.importorskip("pyarrow.parquet") From 3ce96745d55cc8a479bf5fe953dda69d453f76f2 Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Sun, 18 Aug 2024 14:12:16 -0700 Subject: [PATCH 06/20] tests for url and fssepc url --- ibis/backends/tests/test_register.py | 98 ++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 5f8b5a06ea71..97f64a061825 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -4,8 +4,10 @@ import csv import gzip import os +import urllib from pathlib import Path from typing import TYPE_CHECKING +from unittest import mock import pytest from pytest import param @@ -453,6 +455,102 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): assert table.count().execute() +# test reading a Parquet file from a URL request for backends using pyarrow +# that do not have their own implementation +@pytest.mark.parametrize( + ("url", "in_table_name"), + [ + ("http://example.com/functional_alltypes.parquet", "http_table"), + ("sftp://example.com/path/to/functional_alltypes.parquet", "sftp_table"), + ], +) +@pytest.mark.notimpl( + [ + "druid", + "flink", + "duckdb", + "pandas", + "polars", + "bigquery", + "dask", + "clickhouse", + "datafusion", + "snowflake", + ] +) +def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch): + pytest.importorskip("pyarrow.parquet") + + headers = {"User-Agent": "test-agent"} + fname = Path("functional_alltypes.parquet") + fname = Path(data_dir) / "parquet" / fname.name + mock_calls = [] + + mock_request = mock.create_autospec(urllib.request.Request) + + def mock_urlopen(request, *args, **kwargs): + mock_calls.append((request, args, kwargs)) + return open(fname, "rb") # noqa: SIM115 + + monkeypatch.setattr("urllib.request.Request", mock_request) + monkeypatch.setattr("urllib.request.urlopen", mock_urlopen) + + table = con.read_parquet(url, in_table_name, headers=headers) + + mock_request.assert_called_once_with(url, headers=headers) + called_url = mock_request.call_args[0][0] + called_headers = mock_request.call_args[1] + assert url == called_url + assert called_headers["headers"] == headers + assert len(mock_calls) == 1 + assert table.count().execute() + + if in_table_name is not None: + assert table.op().name == in_table_name + + +# test read_parquet from a fsspec url for backends using pyarrow +# that do not have their own implementation +@pytest.mark.notimpl( + [ + "druid", + "flink", + "duckdb", + "pandas", + "polars", + "bigquery", + "dask", + "clickhouse", + "datafusion", + "snowflake", + ] +) +@pytest.mark.parametrize( + "fsspec_url", + [ + "s3://data-bucket/datasets/sample.parquet", + "gs://data-bucket/datasets/sample.parquet", + ], +) +def test_read_parquet_fsspec_url(con, fsspec_url, data_dir, monkeypatch): + pq = pytest.importorskip("pyarrow.parquet") + + mock_calls = [] + fname = Path("functional_alltypes.parquet") + fname = Path(data_dir) / "parquet" / fname.name + + def mock_read_table(args, **kwargs): + mock_calls.append((args, kwargs)) + parquet_file = pq.ParquetFile(fname) + return parquet_file.read() + + monkeypatch.setattr(pq, "read_table", mock_read_table) + table = con.read_parquet(fsspec_url) + + assert len(mock_calls) == 1 + assert table.count().execute() + + @pytest.fixture(scope="module") def ft_data(data_dir): pq = pytest.importorskip("pyarrow.parquet") From 24530cabc852014846b9c252c7e61f812245f4ee Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Mon, 19 Aug 2024 15:40:34 -0700 Subject: [PATCH 07/20] resolve pandas use pyarrow as default --- ibis/backends/tests/test_register.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 97f64a061825..aafe8ede5a60 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -476,6 +476,7 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): "clickhouse", "datafusion", "snowflake", + "pyspark", ] ) def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch): @@ -516,13 +517,13 @@ def mock_urlopen(request, *args, **kwargs): "druid", "flink", "duckdb", - "pandas", "polars", "bigquery", "dask", "clickhouse", "datafusion", "snowflake", + "pyspark", ] ) @pytest.mark.parametrize( From bb238affce3e9e073483ee78ddbd160168163b5d Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Tue, 20 Aug 2024 23:22:48 -0700 Subject: [PATCH 08/20] add test for is_url and is_fsspec_url --- ibis/backends/tests/test_register.py | 42 ----------------- ibis/tests/test_util.py | 67 +++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 43 deletions(-) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index aafe8ede5a60..d910d7199252 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -510,48 +510,6 @@ def mock_urlopen(request, *args, **kwargs): assert table.op().name == in_table_name -# test read_parquet from a fsspec url for backends using pyarrow -# that do not have their own implementation -@pytest.mark.notimpl( - [ - "druid", - "flink", - "duckdb", - "polars", - "bigquery", - "dask", - "clickhouse", - "datafusion", - "snowflake", - "pyspark", - ] -) -@pytest.mark.parametrize( - "fsspec_url", - [ - "s3://data-bucket/datasets/sample.parquet", - "gs://data-bucket/datasets/sample.parquet", - ], -) -def test_read_parquet_fsspec_url(con, fsspec_url, data_dir, monkeypatch): - pq = pytest.importorskip("pyarrow.parquet") - - mock_calls = [] - fname = Path("functional_alltypes.parquet") - fname = Path(data_dir) / "parquet" / fname.name - - def mock_read_table(args, **kwargs): - mock_calls.append((args, kwargs)) - parquet_file = pq.ParquetFile(fname) - return parquet_file.read() - - monkeypatch.setattr(pq, "read_table", mock_read_table) - table = con.read_parquet(fsspec_url) - - assert len(mock_calls) == 1 - assert table.count().execute() - - @pytest.fixture(scope="module") def ft_data(data_dir): pq = pytest.importorskip("pyarrow.parquet") diff --git a/ibis/tests/test_util.py b/ibis/tests/test_util.py index 9b0872f77407..fbbfc9eab376 100644 --- a/ibis/tests/test_util.py +++ b/ibis/tests/test_util.py @@ -4,7 +4,13 @@ import pytest -from ibis.util import PseudoHashable, flatten_iterable, import_object +from ibis.util import ( + PseudoHashable, + flatten_iterable, + import_object, + is_fsspec_url, + is_url, +) @pytest.mark.parametrize( @@ -51,6 +57,65 @@ def test_import_object(): import_object("collections.this_attribute_doesnt_exist") +@pytest.mark.parametrize( + ("url", "expected"), + [ + ("http://example.com", True), # Valid http URL + ("https://example.com", True), # Valid https URL + ("ftp://example.com", True), # Valid ftp URL + ("sftp://example.com", True), # Valid sftp URL + ("ws://example.com", True), # Valid WebSocket URL + ("wss://example.com", True), # Valid WebSocket Secure URL + ("file:///home/user/file.txt", True), # Valid file URL + ("mailto:example@example.com", False), # Invalid URL with non-supported scheme + ("http://localhost:8000", True), # Valid URL with port + ("ftp://192.168.1.1", True), # Valid URL with IP address + ("https://example.com/path/to/resource", True), # Valid URL with path + ("http://user:pass@example.com", True), # Valid URL with credentials + ("ftp://example.com/resource", True), # Valid FTP URL with resource + ("telnet://example.com", True), # Valid Telnet URL + ("git://example.com/repo.git", True), # Valid Git URL + ("sip://example.com", True), # Valid SIP URL + ("sips://example.com", True), # Valid SIPS URL + ("invalid://example.com", False), # Invalid URL with unknown scheme + ], +) +def test_is_url(url, expected): + assert is_url(url) == expected + + +@pytest.mark.parametrize( + ("url", "expected"), + [ + ("s3://bucket/path/to/file", True), # Valid fsspec URL + ("ftp://example.com/file.txt", True), # Valid fsspec URL + ("gs://bucket/path/to/file", True), # Valid fsspec URL + ("http://example.com/file.txt", False), # Invalid URL (HTTP) + ("https://example.com/file.txt", False), # Invalid URL (HTTPS) + ("file://localhost/path/to/file", True), # Valid fsspec URL + ("mailto:user@example.com", False), # Invalid URL + ( + "ftp://user:pass@example.com/path/to/file", + True, + ), # Valid fsspec URL with credentials + ("ftp://example.com", True), # Valid fsspec URL without file + ("", False), # Empty string (invalid URL) + ("invalid://path/to/file", True), # Invalid scheme but valid format + ("http://localhost:8000", False), # Invalid URL (HTTP with port) + ( + "https://192.168.1.1/path/to/file", + False, + ), # Invalid URL (HTTPS with IP address) + ( + "file:/path/to/file", + False, + ), # Invalid URL (missing double slashes after file:) + ], +) +def test_is_fsspec_url(url, expected): + assert is_fsspec_url(url) == expected + + # TODO(kszucs): add tests for promote_list and promote_tuple From 12cfc7d3389c84ab18f3bdb1f4df2153e7325192 Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Thu, 22 Aug 2024 17:13:56 -0700 Subject: [PATCH 09/20] change to fssepc and add examples --- ibis/backends/__init__.py | 72 ++++++++++++++++++++++++++-- ibis/backends/tests/test_register.py | 32 ++++++------- 2 files changed, 81 insertions(+), 23 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index f5aa97a24c7b..3e836c3abe2e 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -1220,11 +1220,72 @@ def read_parquet( See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html for more information. + When reading data from cloud storage (such as Amazon S3 or Google Cloud Storage), + credentials can be provided via the `filesystem` argument by creating an appropriate + filesystem object (e.g., `pyarrow.fs.S3FileSystem`). + + For URLs with credentials, `fsspec` is used to handle authentication and file access. + Pass the credentials using the `credentials` keyword argument. `fsspec` will use these + credentials to manage access to the remote files. + Returns ------- ir.Table The just-registered table + Examples + -------- + Connect to a SQLite database: + + >>> con = ibis.sqlite.connect() + + Read a single parquet file: + + >>> table = con.read_parquet("path/to/file.parquet") + + Read all parquet files in a directory: + + >>> table = con.read_parquet("path/to/parquet_directory/") + + Read parquet files with a glob pattern + + >>> table = con.read_parquet("path/to/parquet_directory/data_*.parquet") + + Read from Amazon S3 + + >>> table = con.read_parquet("s3://bucket-name/path/to/file.parquet") + + Read from Google Cloud Storage + + >>> table = con.read_parquet("gs://bucket-name/path/to/file.parquet") + + Read from HTTPS URL + + >>> table = con.read_parquet("https://example.com/data/file.parquet") + + Read with a custom table name + + >>> table = con.read_parquet("s3://bucket/data.parquet", table_name="my_table") + + Read with additional pyarrow options + + >>> table = con.read_parquet("gs://bucket/data.parquet", columns=["col1", "col2"]) + + Read from Amazon S3 with secret info + + >>> from pyarrow import fs + >>> s3_fs = fs.S3FileSystem( + ... access_key="YOUR_ACCESS_KEY", secret_key="YOUR_SECRET_KEY", region="YOUR_AWS_REGION" + ... ) + >>> table = con.read_parquet("s3://bucket/data.parquet", filesystem=s3_fs) + + Read from HTTPS URL with authentication tokens + + >>> table = con.read_parquet( + ... "https://example.com/data/file.parquet", + ... credentials={"headers": {"Authorization": "Bearer YOUR_TOKEN"}}, + ... ) + """ table = self._get_pyarrow_table_from_path(path, **kwargs) @@ -1233,15 +1294,16 @@ def read_parquet( return self.table(table_name) def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table: - pq = util.import_object("pyarrow.parquet") + import pyarrow.parquet as pq path = str(path) # handle url if util.is_url(path): - headers = kwargs.pop("headers", {}) - req_info = urllib.request.Request(path, headers=headers) # noqa: S310 - with urllib.request.urlopen(req_info) as req: # noqa: S310 - with BytesIO(req.read()) as reader: + import fsspec + + credentials = kwargs.pop("credentials", {}) + with fsspec.open(path, **credentials) as f: + with BytesIO(f.read()) as reader: return pq.read_table(reader) # handle fsspec compatible url diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index d910d7199252..fd00bbfd887a 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -4,10 +4,8 @@ import csv import gzip import os -import urllib from pathlib import Path from typing import TYPE_CHECKING -from unittest import mock import pytest from pytest import param @@ -464,10 +462,8 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): ("sftp://example.com/path/to/functional_alltypes.parquet", "sftp_table"), ], ) -@pytest.mark.notimpl( +@pytest.mark.never( [ - "druid", - "flink", "duckdb", "pandas", "polars", @@ -476,33 +472,33 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): "clickhouse", "datafusion", "snowflake", + ] +) +@pytest.mark.notyet(["flink"]) +@pytest.mark.notimpl( + [ + "druid", "pyspark", ] ) def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch): pytest.importorskip("pyarrow.parquet") + import fsspec - headers = {"User-Agent": "test-agent"} fname = Path("functional_alltypes.parquet") fname = Path(data_dir) / "parquet" / fname.name mock_calls = [] - mock_request = mock.create_autospec(urllib.request.Request) + original_fsspec_open = fsspec.open - def mock_urlopen(request, *args, **kwargs): - mock_calls.append((request, args, kwargs)) - return open(fname, "rb") # noqa: SIM115 + def mock_fsspec_open(path, *args, **kwargs): + mock_calls.append((path, args, kwargs)) + return original_fsspec_open(fname, "rb") - monkeypatch.setattr("urllib.request.Request", mock_request) - monkeypatch.setattr("urllib.request.urlopen", mock_urlopen) + monkeypatch.setattr("fsspec.open", mock_fsspec_open) - table = con.read_parquet(url, in_table_name, headers=headers) + table = con.read_parquet(url, in_table_name) - mock_request.assert_called_once_with(url, headers=headers) - called_url = mock_request.call_args[0][0] - called_headers = mock_request.call_args[1] - assert url == called_url - assert called_headers["headers"] == headers assert len(mock_calls) == 1 assert table.count().execute() From 2cf597a9948536917481c53d8c893c8b8cc35535 Mon Sep 17 00:00:00 2001 From: Jiting Xu <126802425+jitingxu1@users.noreply.github.com> Date: Thu, 22 Aug 2024 17:37:59 -0700 Subject: [PATCH 10/20] add reason for mark.never --- ibis/backends/tests/test_register.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index fd00bbfd887a..650f54a39b6d 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -472,7 +472,8 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): "clickhouse", "datafusion", "snowflake", - ] + ], + reason="backend implements its own read_parquet", ) @pytest.mark.notyet(["flink"]) @pytest.mark.notimpl( @@ -483,7 +484,7 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): ) def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch): pytest.importorskip("pyarrow.parquet") - import fsspec + fsspec = pytest.importorskip("fsspec") fname = Path("functional_alltypes.parquet") fname = Path(data_dir) / "parquet" / fname.name From b4cf0ea00c06a0922e34e1571098a08678ff9ae5 Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Thu, 22 Aug 2024 21:28:33 -0700 Subject: [PATCH 11/20] re run workflow From 24bfe389f07eabc87088b74474f99d51fbaf03db Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Sun, 15 Sep 2024 11:59:16 -0700 Subject: [PATCH 12/20] lint --- ibis/backends/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 7dc9cf996970..08775b507ba8 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -11,8 +11,8 @@ import sys import urllib.parse import urllib.request -from io import BytesIO import weakref +from io import BytesIO from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple From 4579bff0e1ca7b13ead4ee209252697398eac354 Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Sun, 15 Sep 2024 12:45:11 -0700 Subject: [PATCH 13/20] remove pandas --- ibis/backends/tests/test_register.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index ff7c1ebfa066..e9e883182b3e 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -447,10 +447,8 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): @pytest.mark.never( [ "duckdb", - "pandas", "polars", "bigquery", - "dask", "clickhouse", "datafusion", "snowflake", @@ -497,19 +495,8 @@ def ft_data(data_dir): return table.slice(0, nrows) +@pytest.mark.notyet(["flink"]) @pytest.mark.notimpl(["druid"]) -@pytest.mark.notyet( - [ - "flink", - "impala", - "mssql", - "mysql", - "postgres", - "risingwave", - "sqlite", - "trino", - ] -) def test_read_parquet_glob(con, tmp_path, ft_data): pq = pytest.importorskip("pyarrow.parquet") From c3fba4408f8c1994f9953c46ca706360ebe30909 Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Thu, 19 Sep 2024 10:11:48 -0700 Subject: [PATCH 14/20] reconcile coe --- ibis/backends/__init__.py | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index a8d638b995ce..694ff1a4896a 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -1427,44 +1427,6 @@ def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table: return pq.read_table(paths, **kwargs) - def _cached(self, expr: ir.Table): - """Cache the provided expression. - - All subsequent operations on the returned expression will be performed on the cached data. - - Parameters - ---------- - expr - Table expression to cache - - Returns - ------- - Expr - Cached table - - """ - op = expr.op() - if (result := self._query_cache.get(op)) is None: - result = self._query_cache.store(expr) - return ir.CachedTable(result) - - def _release_cached(self, expr: ir.CachedTable) -> None: - """Releases the provided cached expression. - - Parameters - ---------- - expr - Cached expression to release - - """ - self._query_cache.release(expr.op().name) - - def _load_into_cache(self, name, expr): - raise NotImplementedError(self.name) - - def _clean_up_cached_table(self, name): - raise NotImplementedError(self.name) - def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str: # only transpile if dialect was passed if dialect is None: From 0d55190d8a9c75144b083ce0f19984fc2697d3db Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Fri, 20 Sep 2024 12:20:12 -0700 Subject: [PATCH 15/20] skip trino and impala --- ibis/backends/__init__.py | 6 +++++- ibis/backends/tests/test_register.py | 12 ++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index c30507abd0aa..4ac18ab95d87 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -10,7 +10,6 @@ import re import sys import urllib.parse -import urllib.request import weakref from io import BytesIO from pathlib import Path @@ -1272,11 +1271,16 @@ def has_operation(cls, operation: type[ops.Value]) -> bool: f"{cls.name} backend has not implemented `has_operation` API" ) + @util.experimental def read_parquet( self, path: str | Path, table_name: str | None = None, **kwargs: Any ) -> ir.Table: """Register a parquet file as a table in the current backend. + This function reads a Parquet file and registers it as a table in the current + backend. Note that for Impala and Trino backends, the performance + may be suboptimal. + Parameters ---------- path diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index e9e883182b3e..3fbd47005e78 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -418,6 +418,12 @@ def test_register_garbage(con, monkeypatch): def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): pq = pytest.importorskip("pyarrow.parquet") + if con.name in ("trino", "impala"): + # TODO: remove after trino and impala have efficient insertion + pytest.skip( + "Both Impala and Trino lack efficient data insertion methods from Python." + ) + fname = Path(fname) fname = Path(data_dir) / "parquet" / fname.name table = pq.read_table(fname) @@ -466,6 +472,12 @@ def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch pytest.importorskip("pyarrow.parquet") fsspec = pytest.importorskip("fsspec") + if con.name in ("trino", "impala"): + # TODO: remove after trino and impala have efficient insertion + pytest.skip( + "Both Impala and Trino lack efficient data insertion methods from Python." + ) + fname = Path("functional_alltypes.parquet") fname = Path(data_dir) / "parquet" / fname.name mock_calls = [] From fda549339e858b9f0473996371cc485f80d9f5ff Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Fri, 20 Sep 2024 13:19:21 -0700 Subject: [PATCH 16/20] Trigger CI From 71ebb8e3b134343843a05295069096df24d35f74 Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Fri, 20 Sep 2024 17:29:03 -0700 Subject: [PATCH 17/20] chore: trigger CI From 2473c028f7ce56f14c162c837a0b47110d8096c9 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Mon, 23 Sep 2024 13:56:07 -0400 Subject: [PATCH 18/20] chore(test): skip test for backends with own parquet readers --- ibis/backends/tests/test_register.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 3fbd47005e78..2b7d35bf5f82 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -478,6 +478,18 @@ def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch "Both Impala and Trino lack efficient data insertion methods from Python." ) + if con.name in ( + "bigquery", + "clickhouse", + "datafusion", + "duckdb", + "polars", + "snowflake", + ): + # Hard skipping this as it may cause weird transaction errors in some + # backends (DuckDB) in certain scenarios. + pytest.skip(f"{con.name} implements its own `read_parquet`") + fname = Path("functional_alltypes.parquet") fname = Path(data_dir) / "parquet" / fname.name mock_calls = [] From 3ab60a8363c7fad8221bd5c62abd0f4c76db58ec Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Wed, 25 Sep 2024 07:07:23 -0700 Subject: [PATCH 19/20] chore: simplify the logic --- ibis/backends/__init__.py | 68 +++++++------------- ibis/backends/tests/test_register.py | 95 ++++++++-------------------- ibis/tests/test_util.py | 59 ----------------- ibis/util.py | 46 -------------- 4 files changed, 47 insertions(+), 221 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 4ac18ab95d87..83854ab78ef0 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -11,7 +11,6 @@ import sys import urllib.parse import weakref -from io import BytesIO from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple @@ -24,6 +23,7 @@ if TYPE_CHECKING: from collections.abc import Iterable, Iterator, Mapping, MutableMapping + from io import BytesIO from urllib.parse import ParseResult import pandas as pd @@ -1273,7 +1273,7 @@ def has_operation(cls, operation: type[ops.Value]) -> bool: @util.experimental def read_parquet( - self, path: str | Path, table_name: str | None = None, **kwargs: Any + self, path: str | Path | BytesIO, table_name: str | None = None, **kwargs: Any ) -> ir.Table: """Register a parquet file as a table in the current backend. @@ -1284,8 +1284,8 @@ def read_parquet( Parameters ---------- path - The data source. May be a path to a file, an iterable of files, - or directory of parquet files. + The data source. May be a path to a file, glob pattern to match Parquet files, + directory of parquet files, or BytseIO. table_name An optional name to use for the created table. This defaults to a sequentially generated name. @@ -1294,14 +1294,6 @@ def read_parquet( See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html for more information. - When reading data from cloud storage (such as Amazon S3 or Google Cloud Storage), - credentials can be provided via the `filesystem` argument by creating an appropriate - filesystem object (e.g., `pyarrow.fs.S3FileSystem`). - - For URLs with credentials, `fsspec` is used to handle authentication and file access. - Pass the credentials using the `credentials` keyword argument. `fsspec` will use these - credentials to manage access to the remote files. - Returns ------- ir.Table @@ -1353,46 +1345,30 @@ def read_parquet( ... ) >>> table = con.read_parquet("s3://bucket/data.parquet", filesystem=s3_fs) - Read from HTTPS URL with authentication tokens - - >>> table = con.read_parquet( - ... "https://example.com/data/file.parquet", - ... credentials={"headers": {"Authorization": "Bearer YOUR_TOKEN"}}, - ... ) - + Read from URL + + >>> import fsspec + >>> from io import BytesIO + >>> url = "https://https://{storage_service}/{path_to_file}/xxx.parquet" + >>> credentials = {} + >>> f = fsspec.open(url, **credentials).open() + >>> reader = BytesIO(f.read()) + >>> table = con.read_parquet(reader) + >>> reader.close() + >>> f.close() """ + import pyarrow.parquet as pq - table = self._get_pyarrow_table_from_path(path, **kwargs) table_name = table_name or util.gen_name("read_parquet") + paths = list(glob.glob(str(path))) + if paths: + table = pq.read_table(paths, **kwargs) + else: + table = pq.read_table(path, **kwargs) + self.create_table(table_name, table) return self.table(table_name) - def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table: - import pyarrow.parquet as pq - - path = str(path) - # handle url - if util.is_url(path): - import fsspec - - credentials = kwargs.pop("credentials", {}) - with fsspec.open(path, **credentials) as f: - with BytesIO(f.read()) as reader: - return pq.read_table(reader) - - # handle fsspec compatible url - if util.is_fsspec_url(path): - return pq.read_table(path, **kwargs) - - # Handle local file paths or patterns - paths = glob.glob(path) - if not paths: - raise ValueError(f"No files found matching pattern: {path!r}") - elif len(paths) == 1: - paths = paths[0] - - return pq.read_table(paths, **kwargs) - def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str: # only transpile if dialect was passed if dialect is None: diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 2b7d35bf5f82..7266f18245fa 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -4,6 +4,7 @@ import csv import gzip import os +from io import BytesIO from pathlib import Path from typing import TYPE_CHECKING @@ -441,76 +442,6 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): assert table.count().execute() -# test reading a Parquet file from a URL request for backends using pyarrow -# that do not have their own implementation -@pytest.mark.parametrize( - ("url", "in_table_name"), - [ - ("http://example.com/functional_alltypes.parquet", "http_table"), - ("sftp://example.com/path/to/functional_alltypes.parquet", "sftp_table"), - ], -) -@pytest.mark.never( - [ - "duckdb", - "polars", - "bigquery", - "clickhouse", - "datafusion", - "snowflake", - ], - reason="backend implements its own read_parquet", -) -@pytest.mark.notyet(["flink"]) -@pytest.mark.notimpl( - [ - "druid", - "pyspark", - ] -) -def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch): - pytest.importorskip("pyarrow.parquet") - fsspec = pytest.importorskip("fsspec") - - if con.name in ("trino", "impala"): - # TODO: remove after trino and impala have efficient insertion - pytest.skip( - "Both Impala and Trino lack efficient data insertion methods from Python." - ) - - if con.name in ( - "bigquery", - "clickhouse", - "datafusion", - "duckdb", - "polars", - "snowflake", - ): - # Hard skipping this as it may cause weird transaction errors in some - # backends (DuckDB) in certain scenarios. - pytest.skip(f"{con.name} implements its own `read_parquet`") - - fname = Path("functional_alltypes.parquet") - fname = Path(data_dir) / "parquet" / fname.name - mock_calls = [] - - original_fsspec_open = fsspec.open - - def mock_fsspec_open(path, *args, **kwargs): - mock_calls.append((path, args, kwargs)) - return original_fsspec_open(fname, "rb") - - monkeypatch.setattr("fsspec.open", mock_fsspec_open) - - table = con.read_parquet(url, in_table_name) - - assert len(mock_calls) == 1 - assert table.count().execute() - - if in_table_name is not None: - assert table.op().name == in_table_name - - @pytest.fixture(scope="module") def ft_data(data_dir): pq = pytest.importorskip("pyarrow.parquet") @@ -537,6 +468,30 @@ def test_read_parquet_glob(con, tmp_path, ft_data): assert table.count().execute() == nrows * ntables +@pytest.mark.notyet(["flink"]) +@pytest.mark.notimpl(["druid"]) +@pytest.mark.never( + [ + "duckdb", + "polars", + "bigquery", + "clickhouse", + "datafusion", + "snowflake", + "pyspark", + ], + reason="backend implements its own read_parquet", +) +def test_read_parquet_bytesio(con, ft_data): + pq = pytest.importorskip("pyarrow.parquet") + + bytes_io = BytesIO() + pq.write_table(ft_data, bytes_io) + bytes_io.seek(0) + table = con.read_parquet(bytes_io) + assert table.count().execute() == ft_data.num_rows + + @pytest.mark.notyet( [ "flink", diff --git a/ibis/tests/test_util.py b/ibis/tests/test_util.py index fbbfc9eab376..7ab696e1e599 100644 --- a/ibis/tests/test_util.py +++ b/ibis/tests/test_util.py @@ -8,8 +8,6 @@ PseudoHashable, flatten_iterable, import_object, - is_fsspec_url, - is_url, ) @@ -57,63 +55,6 @@ def test_import_object(): import_object("collections.this_attribute_doesnt_exist") -@pytest.mark.parametrize( - ("url", "expected"), - [ - ("http://example.com", True), # Valid http URL - ("https://example.com", True), # Valid https URL - ("ftp://example.com", True), # Valid ftp URL - ("sftp://example.com", True), # Valid sftp URL - ("ws://example.com", True), # Valid WebSocket URL - ("wss://example.com", True), # Valid WebSocket Secure URL - ("file:///home/user/file.txt", True), # Valid file URL - ("mailto:example@example.com", False), # Invalid URL with non-supported scheme - ("http://localhost:8000", True), # Valid URL with port - ("ftp://192.168.1.1", True), # Valid URL with IP address - ("https://example.com/path/to/resource", True), # Valid URL with path - ("http://user:pass@example.com", True), # Valid URL with credentials - ("ftp://example.com/resource", True), # Valid FTP URL with resource - ("telnet://example.com", True), # Valid Telnet URL - ("git://example.com/repo.git", True), # Valid Git URL - ("sip://example.com", True), # Valid SIP URL - ("sips://example.com", True), # Valid SIPS URL - ("invalid://example.com", False), # Invalid URL with unknown scheme - ], -) -def test_is_url(url, expected): - assert is_url(url) == expected - - -@pytest.mark.parametrize( - ("url", "expected"), - [ - ("s3://bucket/path/to/file", True), # Valid fsspec URL - ("ftp://example.com/file.txt", True), # Valid fsspec URL - ("gs://bucket/path/to/file", True), # Valid fsspec URL - ("http://example.com/file.txt", False), # Invalid URL (HTTP) - ("https://example.com/file.txt", False), # Invalid URL (HTTPS) - ("file://localhost/path/to/file", True), # Valid fsspec URL - ("mailto:user@example.com", False), # Invalid URL - ( - "ftp://user:pass@example.com/path/to/file", - True, - ), # Valid fsspec URL with credentials - ("ftp://example.com", True), # Valid fsspec URL without file - ("", False), # Empty string (invalid URL) - ("invalid://path/to/file", True), # Invalid scheme but valid format - ("http://localhost:8000", False), # Invalid URL (HTTP with port) - ( - "https://192.168.1.1/path/to/file", - False, - ), # Invalid URL (HTTPS with IP address) - ( - "file:/path/to/file", - False, - ), # Invalid URL (missing double slashes after file:) - ], -) -def test_is_fsspec_url(url, expected): - assert is_fsspec_url(url) == expected # TODO(kszucs): add tests for promote_list and promote_tuple diff --git a/ibis/util.py b/ibis/util.py index 214ff82d8889..993e539dfa88 100644 --- a/ibis/util.py +++ b/ibis/util.py @@ -18,12 +18,6 @@ import warnings from types import ModuleType from typing import TYPE_CHECKING, Any, Generic, TypeVar -from urllib.parse import ( - urlparse, - uses_netloc, - uses_params, - uses_relative, -) from uuid import uuid4 import toolz @@ -49,10 +43,6 @@ # https://www.compart.com/en/unicode/U+2026 HORIZONTAL_ELLIPSIS = "\u2026" -_VALID_URLS = set(uses_relative + uses_netloc + uses_params) -_VALID_URLS.discard("") -_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://") - def guid() -> str: """Return a uuid4 hexadecimal value.""" @@ -496,42 +486,6 @@ def import_object(qualname: str) -> Any: raise ImportError(f"cannot import name {name!r} from {mod_name!r}") from None -def is_url(url: str) -> bool: - """Check to see if a URL has a valid protocol. - - Parameters - ---------- - url : str - The URL to be checked. - - Returns - ------- - bool - True if the URL has a valid protocol, False otherwise - - """ - - return urlparse(url).scheme in _VALID_URLS - - -def is_fsspec_url(url: str) -> bool: - """Check if the given URL looks like something fsspec can handle. - - Parameters - ---------- - url : str - The URL string to be checked. - - Returns - ------- - bool - True if the URL is likely compatible with fsspec, False otherwise. - """ - return bool(_RFC_3986_PATTERN.match(url)) and not url.startswith( - ("http://", "https://") - ) - - def normalize_filename(source: str | Path) -> str: source = str(source) for prefix in ( From c0c1fd1604a52fbb08ea344e7b32fcf2314f2408 Mon Sep 17 00:00:00 2001 From: jitingxu1 Date: Wed, 25 Sep 2024 07:30:00 -0700 Subject: [PATCH 20/20] chore: lint --- ibis/backends/__init__.py | 8 ++------ ibis/backends/tests/test_register.py | 1 + ibis/tests/test_util.py | 8 +------- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 83854ab78ef0..3e85111f26bb 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -1325,10 +1325,6 @@ def read_parquet( >>> table = con.read_parquet("gs://bucket-name/path/to/file.parquet") - Read from HTTPS URL - - >>> table = con.read_parquet("https://example.com/data/file.parquet") - Read with a custom table name >>> table = con.read_parquet("s3://bucket/data.parquet", table_name="my_table") @@ -1345,11 +1341,11 @@ def read_parquet( ... ) >>> table = con.read_parquet("s3://bucket/data.parquet", filesystem=s3_fs) - Read from URL + Read from HTTPS URL >>> import fsspec >>> from io import BytesIO - >>> url = "https://https://{storage_service}/{path_to_file}/xxx.parquet" + >>> url = "https://example.com/data/file.parquet" >>> credentials = {} >>> f = fsspec.open(url, **credentials).open() >>> reader = BytesIO(f.read()) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 9ebbcea78178..e655e81dcdd3 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -27,6 +27,7 @@ ), ] + @contextlib.contextmanager def pushd(new_dir): previous_dir = os.getcwd() diff --git a/ibis/tests/test_util.py b/ibis/tests/test_util.py index 7ab696e1e599..9b0872f77407 100644 --- a/ibis/tests/test_util.py +++ b/ibis/tests/test_util.py @@ -4,11 +4,7 @@ import pytest -from ibis.util import ( - PseudoHashable, - flatten_iterable, - import_object, -) +from ibis.util import PseudoHashable, flatten_iterable, import_object @pytest.mark.parametrize( @@ -55,8 +51,6 @@ def test_import_object(): import_object("collections.this_attribute_doesnt_exist") - - # TODO(kszucs): add tests for promote_list and promote_tuple