-
-
Notifications
You must be signed in to change notification settings - Fork 18k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: support non default indexes in writing to Parquet #18629
Changes from 12 commits
4bf7f56
7a6d9a9
1496ef6
9c8cfb7
60124ac
03941b1
d8a27e1
cd26110
1acb00c
0f7b526
67dffde
79de86b
0fbdc14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,8 @@ | |
from warnings import catch_warnings | ||
from distutils.version import LooseVersion | ||
from pandas import DataFrame, RangeIndex, Int64Index, get_option | ||
from pandas.compat import range | ||
from pandas.compat import string_types | ||
from pandas.core.common import AbstractMethodError | ||
from pandas.io.common import get_filepath_or_buffer | ||
|
||
|
||
|
@@ -34,39 +35,72 @@ def get_engine(engine): | |
return FastParquetImpl() | ||
|
||
|
||
class PyArrowImpl(object): | ||
class BaseImpl(object): | ||
|
||
api = None # module | ||
|
||
@staticmethod | ||
def validate_dataframe(df): | ||
if not isinstance(df, DataFrame): | ||
raise ValueError("to_parquet only supports IO with DataFrames") | ||
# must have value column names (strings only) | ||
if df.columns.inferred_type not in {'string', 'unicode'}: | ||
raise ValueError("parquet must have string column names") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also test the index level names to be strings (if not None, and if written) |
||
# index level names must be strings | ||
valid_names = all( | ||
isinstance(name, string_types) | ||
for name in df.index.names | ||
if name is not None | ||
) | ||
if not valid_names: | ||
raise ValueError("Index level names must be strings") | ||
|
||
def write(self, df, path, compression, **kwargs): | ||
raise AbstractMethodError(self) | ||
|
||
def read(self, path, columns=None, **kwargs): | ||
raise AbstractMethodError(self) | ||
|
||
|
||
class PyArrowImpl(BaseImpl): | ||
|
||
def __init__(self): | ||
# since pandas is a dependency of pyarrow | ||
# we need to import on first use | ||
|
||
try: | ||
import pyarrow | ||
import pyarrow.parquet | ||
except ImportError: | ||
raise ImportError("pyarrow is required for parquet support\n\n" | ||
"you can install via conda\n" | ||
"conda install pyarrow -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U pyarrow\n") | ||
|
||
if LooseVersion(pyarrow.__version__) < LooseVersion('0.4.1'): | ||
raise ImportError("pyarrow >= 0.4.1 is required for parquet" | ||
"support\n\n" | ||
"you can install via conda\n" | ||
"conda install pyarrow -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U pyarrow\n") | ||
|
||
self._pyarrow_lt_050 = (LooseVersion(pyarrow.__version__) < | ||
LooseVersion('0.5.0')) | ||
self._pyarrow_lt_060 = (LooseVersion(pyarrow.__version__) < | ||
LooseVersion('0.6.0')) | ||
raise ImportError( | ||
"pyarrow is required for parquet support\n\n" | ||
"you can install via conda\n" | ||
"conda install pyarrow -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U pyarrow\n" | ||
) | ||
if LooseVersion(pyarrow.__version__) < '0.4.1': | ||
raise ImportError( | ||
"pyarrow >= 0.4.1 is required for parquet support\n\n" | ||
"you can install via conda\n" | ||
"conda install pyarrow -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U pyarrow\n" | ||
) | ||
|
||
self._pyarrow_lt_060 = ( | ||
LooseVersion(pyarrow.__version__) < LooseVersion('0.6.0')) | ||
self._pyarrow_lt_070 = ( | ||
LooseVersion(pyarrow.__version__) < LooseVersion('0.7.0')) | ||
|
||
self.api = pyarrow | ||
|
||
def write(self, df, path, compression='snappy', | ||
coerce_timestamps='ms', **kwargs): | ||
self.validate_dataframe(df) | ||
if self._pyarrow_lt_070: | ||
self._validate_write_lt_070(df) | ||
path, _, _ = get_filepath_or_buffer(path) | ||
|
||
if self._pyarrow_lt_060: | ||
table = self.api.Table.from_pandas(df, timestamps_to_ms=True) | ||
self.api.parquet.write_table( | ||
|
@@ -80,36 +114,75 @@ def write(self, df, path, compression='snappy', | |
|
||
def read(self, path, columns=None, **kwargs): | ||
path, _, _ = get_filepath_or_buffer(path) | ||
if self._pyarrow_lt_070: | ||
return self.api.parquet.read_pandas(path, columns=columns, | ||
**kwargs).to_pandas() | ||
kwargs['use_pandas_metadata'] = True | ||
return self.api.parquet.read_table(path, columns=columns, | ||
**kwargs).to_pandas() | ||
|
||
|
||
class FastParquetImpl(object): | ||
def _validate_write_lt_070(self, df): | ||
# Compatibility shim for pyarrow < 0.7.0 | ||
# TODO: Remove in pandas 0.22.0 | ||
from pandas.core.indexes.multi import MultiIndex | ||
if isinstance(df.index, MultiIndex): | ||
msg = ( | ||
"Multi-index DataFrames are only supported " | ||
"with pyarrow >= 0.7.0" | ||
) | ||
raise ValueError(msg) | ||
# Validate index | ||
if not isinstance(df.index, Int64Index): | ||
msg = ( | ||
"pyarrow < 0.7.0 does not support serializing {} for the " | ||
"index; you can .reset_index() to make the index into " | ||
"column(s), or install the latest version of pyarrow or " | ||
"fastparquet." | ||
) | ||
raise ValueError(msg.format(type(df.index))) | ||
if not df.index.equals(RangeIndex(len(df))): | ||
raise ValueError( | ||
"pyarrow < 0.7.0 does not support serializing a non-default " | ||
"index; you can .reset_index() to make the index into " | ||
"column(s), or install the latest version of pyarrow or " | ||
"fastparquet." | ||
) | ||
if df.index.name is not None: | ||
raise ValueError( | ||
"pyarrow < 0.7.0 does not serialize indexes with a name; you " | ||
"can set the index.name to None or install the latest version " | ||
"of pyarrow or fastparquet." | ||
) | ||
|
||
|
||
class FastParquetImpl(BaseImpl): | ||
|
||
def __init__(self): | ||
# since pandas is a dependency of fastparquet | ||
# we need to import on first use | ||
|
||
try: | ||
import fastparquet | ||
except ImportError: | ||
raise ImportError("fastparquet is required for parquet support\n\n" | ||
"you can install via conda\n" | ||
"conda install fastparquet -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U fastparquet") | ||
|
||
if LooseVersion(fastparquet.__version__) < LooseVersion('0.1.0'): | ||
raise ImportError("fastparquet >= 0.1.0 is required for parquet " | ||
"support\n\n" | ||
"you can install via conda\n" | ||
"conda install fastparquet -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U fastparquet") | ||
|
||
raise ImportError( | ||
"fastparquet is required for parquet support\n\n" | ||
"you can install via conda\n" | ||
"conda install fastparquet -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U fastparquet" | ||
) | ||
if LooseVersion(fastparquet.__version__) < '0.1.0': | ||
raise ImportError( | ||
"fastparquet >= 0.1.0 is required for parquet " | ||
"support\n\n" | ||
"you can install via conda\n" | ||
"conda install fastparquet -c conda-forge\n" | ||
"\nor via pip\n" | ||
"pip install -U fastparquet" | ||
) | ||
self.api = fastparquet | ||
|
||
def write(self, df, path, compression='snappy', **kwargs): | ||
self.validate_dataframe(df) | ||
# thriftpy/protocol/compact.py:339: | ||
# DeprecationWarning: tostring() is deprecated. | ||
# Use tobytes() instead. | ||
|
@@ -120,7 +193,8 @@ def write(self, df, path, compression='snappy', **kwargs): | |
|
||
def read(self, path, columns=None, **kwargs): | ||
path, _, _ = get_filepath_or_buffer(path) | ||
return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs) | ||
parquet_file = self.api.ParquetFile(path) | ||
return parquet_file.to_pandas(columns=columns, **kwargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For fastparquet I haven't bothered reading the metadata to pull out the index columns since even if you do you still hit the |
||
|
||
|
||
def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): | ||
|
@@ -141,43 +215,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): | |
kwargs | ||
Additional keyword arguments passed to the engine | ||
""" | ||
|
||
impl = get_engine(engine) | ||
|
||
if not isinstance(df, DataFrame): | ||
raise ValueError("to_parquet only support IO with DataFrames") | ||
|
||
valid_types = {'string', 'unicode'} | ||
|
||
# validate index | ||
# -------------- | ||
|
||
# validate that we have only a default index | ||
# raise on anything else as we don't serialize the index | ||
|
||
if not isinstance(df.index, Int64Index): | ||
raise ValueError("parquet does not support serializing {} " | ||
"for the index; you can .reset_index()" | ||
"to make the index into column(s)".format( | ||
type(df.index))) | ||
|
||
if not df.index.equals(RangeIndex.from_range(range(len(df)))): | ||
raise ValueError("parquet does not support serializing a " | ||
"non-default index for the index; you " | ||
"can .reset_index() to make the index " | ||
"into column(s)") | ||
|
||
if df.index.name is not None: | ||
raise ValueError("parquet does not serialize index meta-data on a " | ||
"default index") | ||
|
||
# validate columns | ||
# ---------------- | ||
|
||
# must have value column names (strings only) | ||
if df.columns.inferred_type not in valid_types: | ||
raise ValueError("parquet must have string column names") | ||
|
||
return impl.write(df, path, compression=compression, **kwargs) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a blank line before comments, easier to read