Skip to content

Commit

Permalink
parquet and ipc scanning from fsspec
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 8, 2022
1 parent 1939873 commit 774643b
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 6 deletions.
2 changes: 1 addition & 1 deletion py-polars/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test-with-cov: venv
@cd tests && ../$(PYTHON) -m pytest \
--cov=polars \
--cov-report xml \
--cov-fail-under=90 \
--cov-fail-under=85 \
--import-mode=importlib

doctest:
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Parquet

read_parquet
scan_parquet
read_parquet_schema
DataFrame.write_parquet

SQL
Expand Down
3 changes: 2 additions & 1 deletion py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def version() -> str:
wrap_df,
)
from polars.internals.functions import arg_where, concat, date_range, get_dummies
from polars.internals.io import read_ipc_schema
from polars.internals.io import read_ipc_schema, read_parquet_schema
from polars.internals.lazy_frame import LazyFrame
from polars.internals.lazy_functions import _date as date
from polars.internals.lazy_functions import _datetime as datetime
Expand Down Expand Up @@ -168,6 +168,7 @@ def version() -> str:
"scan_ds",
"scan_parquet",
"read_ipc_schema",
"read_parquet_schema",
"read_avro",
# polars.stringcache
"StringCache",
Expand Down
9 changes: 7 additions & 2 deletions py-polars/polars/internals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
The modules within `polars.internals` are interdependent. To prevent cyclical imports, they all import from each other
via this __init__ file using `import polars.internals as pli`. The imports below are being shared across this module.
"""
from .anonymous_scan import _deser_and_exec, _scan_ds, _scan_ipc_fsspec
from .anonymous_scan import (
_deser_and_exec,
_scan_ds,
_scan_ipc_fsspec,
_scan_parquet_fsspec,
)
from .expr import Expr, expr_to_lit_or_expr, selection_to_pyexpr_list, wrap_expr
from .frame import DataFrame, LazyFrame, wrap_df, wrap_ldf
from .functions import concat, date_range # DataFrame.describe() & DataFrame.upsample()
from .io import _prepare_file_arg, read_ipc_schema
from .io import _prepare_file_arg, read_ipc_schema, read_parquet_schema
from .lazy_functions import all, argsort_by, col, concat_list, element, lit, select
from .series import Series, wrap_s
from .whenthen import when # used in expr.clip()
30 changes: 30 additions & 0 deletions py-polars/polars/internals/anonymous_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,33 @@ def _scan_ipc_fsspec(
schema = pli.read_ipc_schema(data)

return pli.LazyFrame._scan_python_function(schema, func_serialized)


def _scan_parquet_impl(
uri: "str", with_columns: Optional[List[str]]
) -> "pli.DataFrame":
"""
Takes the projected columns and materializes an arrow table.
Parameters
----------
uri
with_columns
"""
import polars as pl

return pl.read_parquet(uri, with_columns)


def _scan_parquet_fsspec(
file: str,
storage_options: Optional[Dict] = None,
) -> "pli.LazyFrame":
func = partial(_scan_parquet_impl, file)
func_serialized = pickle.dumps(func)

storage_options = storage_options or {}
with pli._prepare_file_arg(file, **storage_options) as data:
schema = pli.read_parquet_schema(data)

return pli.LazyFrame._scan_python_function(schema, func_serialized)
22 changes: 22 additions & 0 deletions py-polars/polars/internals/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

try:
from polars.polars import ipc_schema as _ipc_schema
from polars.polars import parquet_schema as _parquet_schema
except ImportError: # pragma: no cover
pass

Expand Down Expand Up @@ -123,3 +124,24 @@ def read_ipc_schema(
file = format_path(file)

return _ipc_schema(file)


def read_parquet_schema(
file: Union[str, BinaryIO, Path, bytes]
) -> Dict[str, Type[DataType]]:
"""
Get a schema of the Parquet file without reading data.
Parameters
----------
file
Path to a file or a file-like object.
Returns
-------
Dictionary mapping column names to datatypes
"""
if isinstance(file, (str, Path)):
file = format_path(file)

return _parquet_schema(file)
21 changes: 20 additions & 1 deletion py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@

from polars.convert import from_arrow
from polars.datatypes import DataType
from polars.internals import DataFrame, LazyFrame, _scan_ds, _scan_ipc_fsspec
from polars.internals import (
DataFrame,
LazyFrame,
_scan_ds,
_scan_ipc_fsspec,
_scan_parquet_fsspec,
)
from polars.internals.io import _prepare_file_arg

try:
Expand Down Expand Up @@ -558,6 +564,10 @@ def scan_ipc(
If not None, this will insert a row count column with give name into the DataFrame
row_count_offset
Offset to start the row_count column (only use if the name is set)
storage_options
Extra options that make sense for ``fsspec.open()`` or a
particular storage connection.
e.g. host, port, username, password, etc.
"""

# Map legacy arguments to current ones and remove them from kwargs.
Expand Down Expand Up @@ -588,6 +598,7 @@ def scan_parquet(
rechunk: bool = True,
row_count_name: Optional[str] = None,
row_count_offset: int = 0,
storage_options: Optional[Dict] = None,
**kwargs: Any,
) -> LazyFrame:
"""
Expand All @@ -612,6 +623,10 @@ def scan_parquet(
If not None, this will insert a row count column with give name into the DataFrame
row_count_offset
Offset to start the row_count column (only use if the name is set)
storage_options
Extra options that make sense for ``fsspec.open()`` or a
particular storage connection.
e.g. host, port, username, password, etc.
"""

# Map legacy arguments to current ones and remove them from kwargs.
Expand All @@ -620,6 +635,10 @@ def scan_parquet(
if isinstance(file, (str, Path)):
file = format_path(file)

# try fsspec scanner
if not os.path.exists((file)):
return _scan_parquet_fsspec(file, storage_options)

return LazyFrame.scan_parquet(
file=file,
n_rows=n_rows,
Expand Down
21 changes: 20 additions & 1 deletion py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use mimalloc::MiMalloc;
use polars::functions::{diag_concat_df, hor_concat_df};
use polars::prelude::Null;
use polars_core::datatypes::TimeUnit;
use polars_core::export::arrow::io::ipc::read::read_file_metadata;
use polars_core::prelude::IntoSeries;
use pyo3::panic::PanicException;
use pyo3::types::{PyBool, PyDict, PyFloat, PyInt, PyString};
Expand Down Expand Up @@ -351,6 +350,7 @@ fn concat_series(series: &PyAny) -> PyResult<PySeries> {

#[pyfunction]
fn ipc_schema(py: Python, py_f: PyObject) -> PyResult<PyObject> {
use polars_core::export::arrow::io::ipc::read::read_file_metadata;
let metadata = match get_either_file(py_f, false)? {
EitherRustPythonFile::Rust(mut r) => {
read_file_metadata(&mut r).map_err(PyPolarsErr::from)?
Expand All @@ -366,6 +366,24 @@ fn ipc_schema(py: Python, py_f: PyObject) -> PyResult<PyObject> {
Ok(dict.to_object(py))
}

#[pyfunction]
fn parquet_schema(py: Python, py_f: PyObject) -> PyResult<PyObject> {
use polars_core::export::arrow::io::parquet::read::{infer_schema, read_metadata};

let metadata = match get_either_file(py_f, false)? {
EitherRustPythonFile::Rust(mut r) => read_metadata(&mut r).map_err(PyPolarsErr::from)?,
EitherRustPythonFile::Py(mut r) => read_metadata(&mut r).map_err(PyPolarsErr::from)?,
};
let arrow_schema = infer_schema(&metadata).map_err(PyPolarsErr::from)?;

let dict = PyDict::new(py);
for field in arrow_schema.fields {
let dt: Wrap<DataType> = Wrap((&field.data_type).into());
dict.set_item(field.name, dt.to_object(py))?;
}
Ok(dict.to_object(py))
}

#[pyfunction]
fn collect_all(lfs: Vec<PyLazyFrame>, py: Python) -> PyResult<Vec<PyDataFrame>> {
use polars_core::utils::rayon::prelude::*;
Expand Down Expand Up @@ -474,6 +492,7 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(concat_lf)).unwrap();
m.add_wrapped(wrap_pyfunction!(concat_series)).unwrap();
m.add_wrapped(wrap_pyfunction!(ipc_schema)).unwrap();
m.add_wrapped(wrap_pyfunction!(parquet_schema)).unwrap();
m.add_wrapped(wrap_pyfunction!(collect_all)).unwrap();
m.add_wrapped(wrap_pyfunction!(spearman_rank_corr)).unwrap();
m.add_wrapped(wrap_pyfunction!(map_mul)).unwrap();
Expand Down

0 comments on commit 774643b

Please sign in to comment.