Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT][Table-Read-Schema 1/3] Split reading tabular file formats into 2 method calls #1010

Merged
merged 11 commits into from
Jun 8, 2023
19 changes: 7 additions & 12 deletions daft/execution/logical_op_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
StorageType,
)
from daft.logical.logical_plan import FileWrite, TabularFilesScan
from daft.runners.partitioning import (
vPartitionParseCSVOptions,
vPartitionReadOptions,
vPartitionSchemaInferenceOptions,
)
from daft.runners.partitioning import TableParseCSVOptions, TableReadOptions
from daft.table import Table, table_io


Expand All @@ -35,8 +31,7 @@ def _handle_tabular_files_scan(
# Common options for reading vPartition
fs = scan._fs
schema = scan._schema
schema_options = vPartitionSchemaInferenceOptions(schema=schema)
read_options = vPartitionReadOptions(
read_options = TableReadOptions(
num_rows=scan._limit_rows,
column_names=scan._column_names, # read only specified columns
)
Expand All @@ -47,14 +42,12 @@ def _handle_tabular_files_scan(
[
table_io.read_csv(
file=fp,
schema=schema,
fs=fs,
csv_options=vPartitionParseCSVOptions(
csv_options=TableParseCSVOptions(
delimiter=scan._source_info.delimiter,
has_headers=scan._source_info.has_headers,
skip_rows_before_header=0,
skip_rows_after_header=0,
header_index=0 if scan._source_info.has_headers else None,
),
schema_options=schema_options,
read_options=read_options,
)
for fp in filepaths
Expand All @@ -66,6 +59,7 @@ def _handle_tabular_files_scan(
[
table_io.read_json(
file=fp,
schema=schema,
fs=fs,
read_options=read_options,
)
Expand All @@ -78,6 +72,7 @@ def _handle_tabular_files_scan(
[
table_io.read_parquet(
file=fp,
schema=schema,
fs=fs,
read_options=read_options,
)
Expand Down
13 changes: 4 additions & 9 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


@dataclass(frozen=True)
class vPartitionReadOptions:
class TableReadOptions:
"""Options for reading a vPartition

Args:
Expand Down Expand Up @@ -50,21 +50,16 @@ def full_schema_column_names(self) -> list[str] | None:


@dataclass(frozen=True)
class vPartitionParseCSVOptions:
class TableParseCSVOptions:
"""Options for parsing CSVs

Args:
delimiter: The delimiter to use when parsing CSVs, defaults to ","
has_headers: Whether the CSV has headers, defaults to True
column_names: Column names to use in place of headers, defaults to None
skip_rows_before_header: Number of rows to skip before the header, defaults to 0
skip_rows_after_header: Number of rows to skip after the header, defaults to 0
header_index: Index of the header row, or None if no header
"""

delimiter: str = ","
has_headers: bool = True
skip_rows_before_header: int = 0
skip_rows_after_header: int = 0
header_index: int | None = 0


@dataclass(frozen=True)
Expand Down
34 changes: 8 additions & 26 deletions daft/runners/runner_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from daft.logical.schema import Schema
from daft.runners.partitioning import (
PartitionSet,
vPartitionParseCSVOptions,
vPartitionReadOptions,
TableParseCSVOptions,
vPartitionSchemaInferenceOptions,
)
from daft.table import Table, table_io
from daft.table import schema_inference

PartitionT = TypeVar("PartitionT")

Expand Down Expand Up @@ -86,45 +85,28 @@ def sample_schema(
if fs is None:
fs = get_filesystem_from_path(filepath)

sampled_partition: Table
if source_info.scan_type() == StorageType.CSV:
assert isinstance(source_info, CSVSourceInfo)
sampled_partition = table_io.read_csv(
return schema_inference.from_csv(
file=filepath,
fs=fs,
csv_options=vPartitionParseCSVOptions(
csv_options=TableParseCSVOptions(
delimiter=source_info.delimiter,
has_headers=source_info.has_headers,
skip_rows_before_header=0,
skip_rows_after_header=0,
),
schema_options=schema_inference_options,
read_options=vPartitionReadOptions(
num_rows=100, # sample 100 rows for schema inference
column_names=None, # read all columns
header_index=0 if source_info.has_headers else None,
),
override_column_names=schema_inference_options.inference_column_names,
)
elif source_info.scan_type() == StorageType.JSON:
assert isinstance(source_info, JSONSourceInfo)
sampled_partition = table_io.read_json(
return schema_inference.from_json(
file=filepath,
fs=fs,
read_options=vPartitionReadOptions(
num_rows=100, # sample 100 rows for schema inference
column_names=None, # read all columns
),
)
elif source_info.scan_type() == StorageType.PARQUET:
assert isinstance(source_info, ParquetSourceInfo)
sampled_partition = table_io.read_parquet(
return schema_inference.from_parquet(
file=filepath,
fs=fs,
read_options=vPartitionReadOptions(
num_rows=0, # sample 100 rows for schema inference
column_names=None, # read all columns
),
)
else:
raise NotImplementedError(f"Schema inference for {source_info} not implemented")

return sampled_partition.schema()
102 changes: 102 additions & 0 deletions daft/table/schema_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from __future__ import annotations

import pathlib
from typing import TYPE_CHECKING

import pyarrow.csv as pacsv
import pyarrow.json as pajson
import pyarrow.parquet as papq

from daft.datatype import DataType
from daft.filesystem import _resolve_paths_and_filesystem
from daft.logical.schema import Schema
from daft.runners.partitioning import TableParseCSVOptions
from daft.table import Table
from daft.table.table_io import FileInput, _open_stream

if TYPE_CHECKING:
import fsspec


def from_csv(
file: FileInput,
fs: fsspec.AbstractFileSystem | None = None,
override_column_names: list[str] | None = None,
csv_options: TableParseCSVOptions = TableParseCSVOptions(),
) -> Schema:
"""Infers a Schema from a CSV file
Args:
file (str | IO): either a file-like object or a string file path (potentially prefixed with a protocol such as "s3://")
fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data.
By default, Daft will automatically construct a FileSystem instance internally.
override_column_names (list[str]): column names to use instead of those found in the CSV - will throw an error if its length does not
match the actual number of columns found in the CSV
csv_options (vPartitionParseCSVOptions, optional): CSV-specific configs to apply when reading the file
read_options (TableReadOptions, optional): Options for reading the file
Returns:
Schema: Inferred Schema from the CSV
"""

# Have PyArrow generate the column names if the CSV has no header and no column names were provided
pyarrow_autogenerate_column_names = (csv_options.header_index is None) and (override_column_names is None)

# Have Pyarrow skip the header row if override_column_names were provided, and a header exists in the CSV
pyarrow_skip_rows_after_names = (
1 if override_column_names is not None and csv_options.header_index is not None else 0
)

with _open_stream(file, fs) as f:
table = pacsv.read_csv(
f,
parse_options=pacsv.ParseOptions(
delimiter=csv_options.delimiter,
),
# First skip_rows is applied, then header row is read if column_names is None, then skip_rows_after_names is applied
read_options=pacsv.ReadOptions(
autogenerate_column_names=pyarrow_autogenerate_column_names,
column_names=override_column_names,
skip_rows_after_names=pyarrow_skip_rows_after_names,
skip_rows=csv_options.header_index,
),
)

return Table.from_arrow(table).schema()


def from_json(
file: FileInput,
fs: fsspec.AbstractFileSystem | None = None,
) -> Schema:
"""Reads a Schema from a JSON file

Args:
file (FileInput): either a file-like object or a string file path (potentially prefixed with a protocol such as "s3://")
read_options (TableReadOptions, optional): Options for reading the file

Returns:
Schema: Inferred Schema from the JSON
"""
with _open_stream(file, fs) as f:
table = pajson.read_json(f)

return Table.from_arrow(table).schema()


def from_parquet(
file: FileInput,
fs: fsspec.AbstractFileSystem | None = None,
) -> Schema:
"""Infers a Schema from a Parquet file"""
if not isinstance(file, (str, pathlib.Path)):
# BytesIO path.
f = file
else:
paths, fs = _resolve_paths_and_filesystem(file, fs)
assert len(paths) == 1
path = paths[0]
f = fs.open_input_file(path)

pqf = papq.ParquetFile(f)
arrow_schema = pqf.metadata.schema.to_arrow_schema()

return Schema._from_field_name_and_types([(f.name, DataType.from_arrow_type(f.type)) for f in arrow_schema])
Loading