Skip to content

Commit

Permalink
Python: scan_ipc/parquet can scan from fsspec sources e.g. s3. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jun 8, 2022
1 parent 063b22d commit 66c7aaf
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 120 deletions.
2 changes: 1 addition & 1 deletion py-polars/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test-with-cov: venv
@cd tests && ../$(PYTHON) -m pytest \
--cov=polars \
--cov-report xml \
--cov-fail-under=90 \
--cov-fail-under=85 \
--import-mode=importlib

doctest:
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Parquet

read_parquet
scan_parquet
read_parquet_schema
DataFrame.write_parquet

SQL
Expand Down
3 changes: 2 additions & 1 deletion py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def version() -> str:
wrap_df,
)
from polars.internals.functions import arg_where, concat, date_range, get_dummies
from polars.internals.io import read_ipc_schema, read_parquet_schema
from polars.internals.lazy_frame import LazyFrame
from polars.internals.lazy_functions import _date as date
from polars.internals.lazy_functions import _datetime as datetime
Expand Down Expand Up @@ -113,7 +114,6 @@ def version() -> str:
read_csv,
read_excel,
read_ipc,
read_ipc_schema,
read_json,
read_parquet,
read_sql,
Expand Down Expand Up @@ -168,6 +168,7 @@ def version() -> str:
"scan_ds",
"scan_parquet",
"read_ipc_schema",
"read_parquet_schema",
"read_avro",
# polars.stringcache
"StringCache",
Expand Down
8 changes: 7 additions & 1 deletion py-polars/polars/internals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@
The modules within `polars.internals` are interdependent. To prevent cyclical imports, they all import from each other
via this __init__ file using `import polars.internals as pli`. The imports below are being shared across this module.
"""
from .anonymous_scan import _deser_and_exec, _scan_ds
from .anonymous_scan import (
_deser_and_exec,
_scan_ds,
_scan_ipc_fsspec,
_scan_parquet_fsspec,
)
from .expr import Expr, expr_to_lit_or_expr, selection_to_pyexpr_list, wrap_expr
from .frame import DataFrame, LazyFrame, wrap_df, wrap_ldf
from .functions import concat, date_range # DataFrame.describe() & DataFrame.upsample()
from .io import _prepare_file_arg, read_ipc_schema, read_parquet_schema
from .lazy_functions import all, argsort_by, col, concat_list, element, lit, select
from .series import Series, wrap_s
from .whenthen import when # used in expr.clip()
60 changes: 59 additions & 1 deletion py-polars/polars/internals/anonymous_scan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pickle
from functools import partial
from typing import List, Optional
from typing import Dict, List, Optional

import polars as pl
from polars import internals as pli
Expand Down Expand Up @@ -64,3 +64,61 @@ def _scan_ds(ds: "pa.dataset.dataset") -> "pli.LazyFrame":
func = partial(_scan_ds_impl, ds)
func_serialized = pickle.dumps(func)
return pli.LazyFrame._scan_python_function(ds.schema, func_serialized)


def _scan_ipc_impl(uri: "str", with_columns: Optional[List[str]]) -> "pli.DataFrame":
"""
Takes the projected columns and materializes an arrow table.
Parameters
----------
uri
with_columns
"""
import polars as pl

return pl.read_ipc(uri, with_columns)


def _scan_ipc_fsspec(
file: str,
storage_options: Optional[Dict] = None,
) -> "pli.LazyFrame":
func = partial(_scan_ipc_impl, file)
func_serialized = pickle.dumps(func)

storage_options = storage_options or {}
with pli._prepare_file_arg(file, **storage_options) as data:
schema = pli.read_ipc_schema(data)

return pli.LazyFrame._scan_python_function(schema, func_serialized)


def _scan_parquet_impl(
uri: "str", with_columns: Optional[List[str]]
) -> "pli.DataFrame":
"""
Takes the projected columns and materializes an arrow table.
Parameters
----------
uri
with_columns
"""
import polars as pl

return pl.read_parquet(uri, with_columns)


def _scan_parquet_fsspec(
file: str,
storage_options: Optional[Dict] = None,
) -> "pli.LazyFrame":
func = partial(_scan_parquet_impl, file)
func_serialized = pickle.dumps(func)

storage_options = storage_options or {}
with pli._prepare_file_arg(file, **storage_options) as data:
schema = pli.read_parquet_schema(data)

return pli.LazyFrame._scan_python_function(schema, func_serialized)
147 changes: 147 additions & 0 deletions py-polars/polars/internals/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from contextlib import contextmanager
from io import BytesIO, StringIO
from pathlib import Path
from typing import (
Any,
BinaryIO,
ContextManager,
Dict,
Iterator,
List,
TextIO,
Type,
Union,
overload,
)
from urllib.request import urlopen

from polars.datatypes import DataType
from polars.utils import format_path

try:
import fsspec
from fsspec.utils import infer_storage_options

_WITH_FSSPEC = True
except ImportError:
_WITH_FSSPEC = False

try:
from polars.polars import ipc_schema as _ipc_schema
from polars.polars import parquet_schema as _parquet_schema
except ImportError: # pragma: no cover
pass


def _process_http_file(path: str) -> BytesIO:
with urlopen(path) as f:
return BytesIO(f.read())


@overload
def _prepare_file_arg(
file: Union[str, List[str], Path, BinaryIO, bytes], **kwargs: Any
) -> ContextManager[Union[str, BinaryIO]]:
...


@overload
def _prepare_file_arg(
file: Union[str, TextIO, Path, BinaryIO, bytes], **kwargs: Any
) -> ContextManager[Union[str, BinaryIO]]:
...


@overload
def _prepare_file_arg(
file: Union[str, List[str], TextIO, Path, BinaryIO, bytes], **kwargs: Any
) -> ContextManager[Union[str, List[str], BinaryIO, List[BinaryIO]]]:
...


def _prepare_file_arg(
file: Union[str, List[str], TextIO, Path, BinaryIO, bytes], **kwargs: Any
) -> ContextManager[Union[str, BinaryIO, List[str], List[BinaryIO]]]:
"""
Utility for read_[csv, parquet]. (not to be used by scan_[csv, parquet]).
Returned value is always usable as a context.
A `StringIO`, `BytesIO` file is returned as a `BytesIO`.
A local path is returned as a string.
An http URL is read into a buffer and returned as a `BytesIO`.
When fsspec is installed, remote file(s) is (are) opened with
`fsspec.open(file, **kwargs)` or `fsspec.open_files(file, **kwargs)`.
"""

# Small helper to use a variable as context
@contextmanager
def managed_file(file: Any) -> Iterator[Any]:
try:
yield file
finally:
pass

if isinstance(file, StringIO):
return BytesIO(file.read().encode("utf8"))
if isinstance(file, BytesIO):
return managed_file(file)
if isinstance(file, Path):
return managed_file(format_path(file))
if isinstance(file, str):
if _WITH_FSSPEC:
if infer_storage_options(file)["protocol"] == "file":
return managed_file(format_path(file))
return fsspec.open(file, **kwargs)
if file.startswith("http"):
return _process_http_file(file)
if isinstance(file, list) and bool(file) and all(isinstance(f, str) for f in file):
if _WITH_FSSPEC:
if all(infer_storage_options(f)["protocol"] == "file" for f in file):
return managed_file([format_path(f) for f in file])
return fsspec.open_files(file, **kwargs)
if isinstance(file, str):
file = format_path(file)
return managed_file(file)


def read_ipc_schema(
file: Union[str, BinaryIO, Path, bytes]
) -> Dict[str, Type[DataType]]:
"""
Get a schema of the IPC file without reading data.
Parameters
----------
file
Path to a file or a file-like object.
Returns
-------
Dictionary mapping column names to datatypes
"""
if isinstance(file, (str, Path)):
file = format_path(file)

return _ipc_schema(file)


def read_parquet_schema(
file: Union[str, BinaryIO, Path, bytes]
) -> Dict[str, Type[DataType]]:
"""
Get a schema of the Parquet file without reading data.
Parameters
----------
file
Path to a file or a file-like object.
Returns
-------
Dictionary mapping column names to datatypes
"""
if isinstance(file, (str, Path)):
file = format_path(file)

return _parquet_schema(file)
13 changes: 11 additions & 2 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,18 @@ def write_json(
return None

@classmethod
def _scan_python_function(cls, schema: "pa.schema", scan_fn: bytes) -> "LazyFrame":
def _scan_python_function(
cls, schema: Union["pa.schema", Dict[str, Type[DataType]]], scan_fn: bytes
) -> "LazyFrame":
self = cls.__new__(cls)
self._ldf = PyLazyFrame.scan_from_python_function(list(schema), scan_fn)
if isinstance(schema, dict):
self._ldf = PyLazyFrame.scan_from_python_function_pl_schema(
[(name, dt) for name, dt in schema.items()], scan_fn
)
else:
self._ldf = PyLazyFrame.scan_from_python_function_arrow_schema(
list(schema), scan_fn
)
return self

def __getitem__(self, item: Union[int, range, slice]) -> None:
Expand Down
Loading

0 comments on commit 66c7aaf

Please sign in to comment.