Skip to content

Commit

Permalink
feat(api): add ibis.read top level API function
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and gforsyth committed Nov 8, 2022
1 parent fc617e2 commit e67132c
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 63 deletions.
1 change: 1 addition & 0 deletions docs/api/expressions/top_level.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ These methods and objects are available directly in the `ibis` module.
::: ibis.to_sql
::: ibis.random
::: ibis.range_window
::: ibis.read
::: ibis.row_number
::: ibis.schema
::: ibis.struct
Expand Down
122 changes: 76 additions & 46 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
_gen_table_names = (f"registered_table{i:d}" for i in itertools.count())


def _name_from_path(path: Path) -> str:
base, *_ = path.name.partition(os.extsep)
return base.replace("-", "_")
def _name_from_path(path: str) -> str:
# https://github.com/duckdb/duckdb/issues/5203
return path.replace(".", "_")


def _name_from_dataset(dataset: pa.dataset.FileSystemDataset) -> str:
Expand All @@ -47,49 +47,67 @@ def _quote(name: str):
return _dialect.identifier_preparer.quote(name)


@_generate_view_code.register(r"parquet://(?P<path>.+)", priority=10)
def _parquet(_, path, table_name=None, **kwargs):
path = Path(path).absolute()
table_name = table_name or _name_from_path(path)
quoted_table_name = _quote(table_name)
args = [f"'{str(path)}'"]
if kwargs:
args.extend([f"{k}={v}" for k, v in kwargs.items()])
def _get_scheme(scheme):
if scheme is None or scheme == "file://":
return ""
return scheme


def _format_kwargs(kwargs):
return (
f"CREATE OR REPLACE VIEW {quoted_table_name} as SELECT * "
f"from read_parquet({', '.join(args)})",
table_name,
f"{k}='{v}'" if isinstance(v, str) else f"{k}={v!r}" for k, v in kwargs.items()
)


@_generate_view_code.register(r"csv(?:\.gz)?://(?P<path>.+)", priority=10)
def _csv(_, path, table_name=None, **kwargs):
path = Path(path).absolute()
table_name = table_name or _name_from_path(path)
@_generate_view_code.register(r"parquet://(?P<path>.+)", priority=13)
def _parquet(_, path, table_name=None, scheme=None, **kwargs):
scheme = _get_scheme(scheme)
if not scheme:
path = os.path.abspath(path)
if not table_name:
table_name = _name_from_path(path)
quoted_table_name = _quote(table_name)
# AUTO_DETECT and COLUMNS collide, so we set AUTO_DETECT=True
args = [f"'{scheme}{path}'", *_format_kwargs(kwargs)]
code = f"""\
CREATE OR REPLACE VIEW {quoted_table_name} AS
SELECT * FROM read_parquet({', '.join(args)})"""
return code, table_name, ["parquet"] + ["httpfs"] if scheme else []


@_generate_view_code.register(r"(c|t)sv://(?P<path>.+)", priority=13)
def _csv(_, path, table_name=None, scheme=None, **kwargs):
scheme = _get_scheme(scheme)
if not scheme:
path = os.path.abspath(path)
if not table_name:
table_name = _name_from_path(path)
quoted_table_name = _quote(table_name)
# auto_detect and columns collide, so we set auto_detect=True
# unless COLUMNS has been specified
args = [f"'{str(path)}'"]
args.extend(
[
f"AUTO_DETECT="
f"{kwargs.pop('AUTO_DETECT', False if 'COLUMNS' in kwargs else True)}"
]
)
if kwargs:
args.extend([f"{k}={v}" for k, v in kwargs.items()])
return (
f"CREATE OR REPLACE VIEW {quoted_table_name} as SELECT * "
f"from read_csv({', '.join(args)})",
table_name,
)
args = [
f"'{scheme}{path}'",
f"auto_detect={kwargs.pop('auto_detect', 'columns' not in kwargs)}",
*_format_kwargs(kwargs),
]
code = f"""\
CREATE OR REPLACE VIEW {quoted_table_name} AS
SELECT * FROM read_csv({', '.join(args)})"""
return code, table_name, ["httpfs"] if scheme else []


@_generate_view_code.register(
r"(?P<scheme>(?:file|https?)://)?(?P<path>.+?\.((?:c|t)sv|txt)(?:\.gz)?)",
priority=12,
)
def _csv_file_or_url(_, path, table_name=None, **kwargs):
return _csv(f"csv://{path}", path=path, table_name=table_name, **kwargs)


@_generate_view_code.register(r"(?:file://)?(?P<path>.+)", priority=9)
def _file(_, path, table_name=None, **kwargs):
num_sep_chars = len(os.extsep)
extension = "".join(Path(path).suffixes)[num_sep_chars:]
return _generate_view_code(f"{extension}://{path}", table_name=table_name, **kwargs)
@_generate_view_code.register(
r"(?P<scheme>(?:file|https?)://)?(?P<path>.+?\.parquet)", priority=12
)
def _parquet_file_or_url(_, path, table_name=None, **kwargs):
return _parquet(f"parquet://{path}", path=path, table_name=table_name, **kwargs)


@_generate_view_code.register(r"s3://.+", priority=10)
Expand All @@ -100,17 +118,16 @@ def _s3(full_path, table_name=None):
dataset = ds.dataset(full_path)
table_name = table_name or _name_from_dataset(dataset)
quoted_table_name = _quote(table_name)
return quoted_table_name, dataset
return quoted_table_name, dataset, ()


@_generate_view_code.register(r".+", priority=1)
def _default(_, **kwargs):
def _default(path, **kwargs):
raise ValueError(
"""
Unrecognized filetype or extension.
Valid prefixes are parquet://, csv://, s3://, or file://
f"""Unrecognized file type or extension: {path}.
Supported filetypes are parquet, csv, and csv.gz
Valid prefixes are parquet://, csv://, tsv://, s3://, or file://
Supported file extensions are parquet, csv, tsv, txt, csv.gz, tsv.gz, and txt.gz
"""
)

Expand Down Expand Up @@ -180,6 +197,15 @@ def do_connect(
)
)
self._meta = sa.MetaData(bind=self.con)
self._extensions = set()

def _load_extensions(self, extensions):
for extension in extensions:
if extension not in self._extensions:
with self.con.connect() as con:
con.execute(f"INSTALL '{extension}'")
con.execute(f"LOAD '{extension}'")
self._extensions.add(extension)

def register(
self,
Expand Down Expand Up @@ -210,7 +236,10 @@ def register(
The just-registered table
"""
if isinstance(source, str) and source.startswith("s3://"):
table_name, dataset = _generate_view_code(source, table_name=table_name)
table_name, dataset, extensions_required = _generate_view_code(
source, table_name=table_name
)
self._load_extensions(extensions_required)
# We don't create a view since DuckDB special cases Arrow Datasets
# so if we also create a view we end up with both a "lazy table"
# and a view with the same name
Expand All @@ -221,9 +250,10 @@ def register(
# explicitly.
cursor.cursor.c.register(table_name, dataset)
elif isinstance(source, (str, Path)):
sql, table_name = _generate_view_code(
sql, table_name, extensions_required = _generate_view_code(
str(source), table_name=table_name, **kwargs
)
self._load_extensions(extensions_required)
self.con.execute(sql)
else:
if table_name is None:
Expand Down
52 changes: 36 additions & 16 deletions ibis/backends/duckdb/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import csv
import gzip
import os
import re
from pathlib import Path

import pytest
Expand Down Expand Up @@ -32,9 +33,9 @@ def gzip_csv(data_directory, tmp_path):


@pytest.mark.parametrize(
"fname, in_table_name, out_table_name",
("fname", "in_table_name", "out_table_name"),
[
param("diamonds.csv", None, "diamonds", id="default"),
param("diamonds.csv", None, None, id="default"),
param("csv://diamonds.csv", "Diamonds", "Diamonds", id="csv_name"),
param(
"file://diamonds.csv",
Expand All @@ -55,14 +56,17 @@ def test_register_csv(
data_directory, fname, in_table_name, out_table_name, ext, gzip_csv
):
con = ibis.duckdb.connect()
if ext is not None:
if ext:
fname = gzip_csv
with pushd(data_directory):
con.register(fname, table_name=in_table_name)
table = con.register(fname, table_name=in_table_name)

assert out_table_name in con.list_tables()
if out_table_name is not None:
out_table_name += (os.extsep * bool(ext) + (ext or "")) * (
in_table_name is None
)
assert out_table_name in con.list_tables()

table = con.table(out_table_name)
assert table.count().execute()


Expand All @@ -73,18 +77,17 @@ def test_register_with_dotted_name(data_directory, tmp_path):
f.parent.mkdir()
data = data_directory.joinpath("diamonds.csv").read_bytes()
f.write_bytes(data)
con.register(str(f.absolute()))
table = con.table("diamonds")
table = con.register(str(f.absolute()))
assert table.count().execute()


@pytest.mark.parametrize(
"fname, in_table_name, out_table_name",
("fname", "in_table_name", "out_table_name"),
[
(
pytest.param(
"parquet://functional_alltypes.parquet",
None,
"functional_alltypes",
"functional_alltypes_parquet",
),
("functional_alltypes.parquet", "funk_all", "funk_all"),
("parquet://functional_alltypes.parq", "funk_all", "funk_all"),
Expand All @@ -103,11 +106,10 @@ def test_register_parquet(

con = ibis.duckdb.connect()
with pushd(tmp_path):
con.register(f"parquet://{fname.name}", table_name=in_table_name)
table = con.register(f"parquet://{fname.name}", table_name=in_table_name)

assert out_table_name in con.list_tables()
assert any(out_table_name in t for t in con.list_tables())

table = con.table(out_table_name)
assert table.count().execute()


Expand Down Expand Up @@ -137,10 +139,10 @@ def test_register_pyarrow_tables():

@pytest.mark.parametrize(
"kwargs, expected_snippet",
[({}, "AUTO_DETECT=True"), ({"COLUMNS": {"foo": "int8"}}, "AUTO_DETECT=False")],
[({}, "auto_detect=True"), ({"columns": {"foo": "int8"}}, "auto_detect=False")],
)
def test_csv_register_kwargs(kwargs, expected_snippet):
view_str, _ = _generate_view_code("bork.csv", **kwargs)
view_str, _, _ = _generate_view_code("bork.csv", **kwargs)
assert expected_snippet in view_str


Expand All @@ -167,3 +169,21 @@ def test_csv_reregister_schema(tmp_path):
foo_table = con.register(foo, SAMPLE_SIZE=2)
exp_schema = ibis.schema(dict(cola="int32", colb="int32", colc="int32"))
assert foo_table.schema() == exp_schema


def test_read_csv(data_directory):
t = ibis.read(data_directory / "functional_alltypes.csv")
assert t.count().execute()


def test_read_parquet(data_directory):
t = ibis.read(data_directory / "functional_alltypes.parquet")
assert t.count().execute()


@pytest.mark.parametrize("basename", ["functional_alltypes.*", "df.xlsx"])
def test_read_invalid(data_directory, basename):
path = data_directory / basename
msg = f"^Unrecognized file type or extension: {re.escape(str(path))}"
with pytest.raises(ValueError, match=msg):
ibis.read(path)
34 changes: 33 additions & 1 deletion ibis/expr/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import functools
import itertools
import operator
from typing import Iterable, Literal, Mapping, Sequence
from pathlib import Path
from typing import Any, Iterable, Literal, Mapping, Sequence
from typing import Tuple as _Tuple
from typing import TypeVar
from typing import Union as _Union
Expand Down Expand Up @@ -198,6 +199,7 @@
'pi',
'random',
'range_window',
'read',
'row_number',
'rows_with_max_lookback',
'schema',
Expand Down Expand Up @@ -871,6 +873,36 @@ def row_number() -> ir.IntegerColumn:
return ops.RowNumber().to_expr()


def read(path: str | Path, **kwargs: Any) -> ir.Table:
"""Lazily load a data source located at `path`.
Parameters
----------
path
A filesystem path or URL. Supports CSV, TSV, and Parquet files.
kwargs
DuckDB-specific keyword arguments for the file type.
* CSV/TSV: https://duckdb.org/docs/data/csv#parameters.
* Parquet: https://duckdb.org/docs/data/parquet
Returns
-------
ir.Table
Table expression representing a file
Examples
--------
>>> batting = ibis.read("ci/ibis-testing-data/batting.csv")
>>> diamonds = ibis.read("ci/ibis-testing-data/parquet/diamonds/diamonds.parquet")
>>> ft = ibis.read("parquet://ci/ibis-testing-data/parquet/functional_alltypes/*")
"""
from ibis.config import _default_backend

con = _default_backend()
return con.register(str(path), **kwargs)


e = ops.E().to_expr()

pi = ops.Pi().to_expr()
Expand Down

0 comments on commit e67132c

Please sign in to comment.