Skip to content

Commit

Permalink
Add more tests for reading corner cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Jun 8, 2023
1 parent 50707ef commit cfa288a
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 15 deletions.
18 changes: 10 additions & 8 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,22 @@ def read_parquet(
Returns:
Table: Parsed Table from Parquet
"""
f: IO
if not isinstance(file, (str, pathlib.Path)):
# BytesIO path.
return Table.from_arrow(papq.read_table(file, columns=read_options.column_names))

paths, fs = _resolve_paths_and_filesystem(file, fs)
assert len(paths) == 1
path = paths[0]
f = fs.open_input_file(path)
pqf = papq.ParquetFile(f)
f = file
else:
paths, fs = _resolve_paths_and_filesystem(file, fs)
assert len(paths) == 1
path = paths[0]
f = fs.open_input_file(path)

# If no rows required, we manually construct an empty table with the right schema
if read_options.num_rows == 0:
pqf = papq.ParquetFile(f)
arrow_schema = pqf.metadata.schema.to_arrow_schema()
table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema)
elif read_options.num_rows is not None:
pqf = papq.ParquetFile(f)
# Only read the required row groups.
rows_needed = read_options.num_rows
for i in range(pqf.metadata.num_row_groups):
Expand Down
113 changes: 108 additions & 5 deletions tests/table/table_io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ def test_csv_infer_schema(data, expected_dtype):
assert schema == Schema._from_field_name_and_types([("id", DataType.int64()), ("data", expected_dtype)])


def test_csv_infer_schema_custom_delimiter():
f = _csv_write_helper(
header=["id", "data"],
data=[
["1", "1"],
["2", "2"],
["3", None],
],
delimiter="|",
)

schema = schema_inference.from_csv(f, csv_options=TableParseCSVOptions(delimiter="|"))
assert schema == Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])


def test_csv_infer_schema_no_header():
f = _csv_write_helper(
header=None,
data=[
["1", "1"],
["2", "2"],
["3", None],
],
)

schema = schema_inference.from_csv(f, csv_options=TableParseCSVOptions(header_index=None))
assert schema == Schema._from_field_name_and_types([("f0", DataType.int64()), ("f1", DataType.int64())])


@pytest.mark.parametrize(
["data", "expected_data_series"],
[
Expand Down Expand Up @@ -96,27 +125,101 @@ def test_csv_read_data(data, expected_data_series):
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


def test_csv_read_data_csv_options():
def test_csv_read_data_csv_limit_rows():
f = _csv_write_helper(
header=None,
header=["id", "data"],
data=[
["1", "1"],
["2", "2"],
["3", None],
],
delimiter="|",
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"id": [1, 2],
"data": [1, 2],
}
)
table = table_io.read_csv(
f,
schema,
csv_options=TableParseCSVOptions(header_index=None, delimiter="|"),
read_options=TableReadOptions(num_rows=2, column_names=["data"]),
read_options=TableReadOptions(num_rows=2),
)
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


def test_csv_read_data_csv_select_columns():
f = _csv_write_helper(
header=["id", "data"],
data=[
["1", "1"],
["2", "2"],
["3", None],
],
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"data": [1, 2, None],
}
)
table = table_io.read_csv(
f,
schema,
read_options=TableReadOptions(column_names=["data"]),
)
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


def test_csv_read_data_csv_custom_delimiter():
f = _csv_write_helper(
header=["id", "data"],
data=[
["1", "1"],
["2", "2"],
["3", None],
],
delimiter="|",
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"id": [1, 2, 3],
"data": [1, 2, None],
}
)
table = table_io.read_csv(
f,
schema,
csv_options=TableParseCSVOptions(delimiter="|"),
)
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


def test_csv_read_data_csv_no_header():
f = _csv_write_helper(
header=None,
data=[
["1", "1"],
["2", "2"],
["3", None],
],
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"id": [1, 2, 3],
"data": [1, 2, None],
}
)
table = table_io.read_csv(
f,
schema,
csv_options=TableParseCSVOptions(header_index=None),
)
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"
38 changes: 38 additions & 0 deletions tests/table/table_io/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import daft
from daft.datatype import DataType
from daft.logical.schema import Schema
from daft.runners.partitioning import TableReadOptions
from daft.table import Table, schema_inference, table_io


Expand Down Expand Up @@ -101,3 +102,40 @@ def test_json_read_data(data, expected_data_series):
)
table = table_io.read_json(f, schema)
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


def test_json_read_data_limit_rows():
f = _json_write_helper(
{
"id": [1, 2, 3],
"data": [1, 2, None],
}
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"id": [1, 2],
"data": [1, 2],
}
)
table = table_io.read_json(f, schema, read_options=TableReadOptions(num_rows=2))
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


def test_json_read_data_select_columns():
f = _json_write_helper(
{
"id": [1, 2, 3],
"data": [1, 2, None],
}
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"data": [1, 2, None],
}
)
table = table_io.read_json(f, schema, read_options=TableReadOptions(column_names=["data"]))
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"
48 changes: 46 additions & 2 deletions tests/table/table_io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import daft
from daft.datatype import DataType
from daft.logical.schema import Schema
from daft.runners.partitioning import TableReadOptions
from daft.table import Table, schema_inference, table_io


Expand All @@ -30,9 +31,9 @@ def test_read_input(tmpdir):
assert table_io.read_parquet(f, schema=schema).to_arrow() == data


def _parquet_write_helper(data: pa.Table):
def _parquet_write_helper(data: pa.Table, row_group_size: int = None):
f = io.BytesIO()
papq.write_table(data, f)
papq.write_table(data, f, row_group_size=row_group_size)
f.seek(0)
return f

Expand Down Expand Up @@ -103,3 +104,46 @@ def test_parquet_read_data(data, expected_data_series):
)
table = table_io.read_parquet(f, schema)
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


@pytest.mark.parametrize("row_group_size", [None, 1, 3])
def test_parquet_read_data_limit_rows(row_group_size):
f = _parquet_write_helper(
pa.Table.from_pydict(
{
"id": [1, 2, 3],
"data": [1, 2, None],
}
),
row_group_size=row_group_size,
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"id": [1, 2],
"data": [1, 2],
}
)
table = table_io.read_parquet(f, schema, read_options=TableReadOptions(num_rows=2))
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"


def test_parquet_read_data_select_columns():
f = _parquet_write_helper(
pa.Table.from_pydict(
{
"id": [1, 2, 3],
"data": [1, 2, None],
}
)
)

schema = Schema._from_field_name_and_types([("id", DataType.int64()), ("data", DataType.int64())])
expected = Table.from_pydict(
{
"data": [1, 2, None],
}
)
table = table_io.read_parquet(f, schema, read_options=TableReadOptions(column_names=["data"]))
assert table.to_arrow() == expected.to_arrow(), f"Expected:\n{expected}\n\nReceived:\n{table}"

0 comments on commit cfa288a

Please sign in to comment.