Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#3417: Fix read_csv with skiprows and header parameters #3419

Merged
merged 17 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion modin/core/io/text/fwf_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import pandas
from pandas._typing import FilePathOrBuffer
from typing import Union, Sequence

from modin.core.io.text.text_file_dispatcher import TextFileDispatcher

Expand All @@ -29,6 +30,8 @@ def check_parameters_support(
cls,
filepath_or_buffer: FilePathOrBuffer,
read_kwargs: dict,
skiprows_md: Union[Sequence, callable, int],
header_size: int,
):
"""
Check support of parameters of `read_fwf` function.
Expand All @@ -39,6 +42,10 @@ def check_parameters_support(
`filepath_or_buffer` parameter of `read_fwf` function.
read_kwargs : dict
Parameters of `read_fwf` function.
skiprows_md : int, array or callable
`skiprows` parameter modified for easier handling by Modin.
header_size : int
Number of rows that are used by header.

Returns
-------
Expand All @@ -49,4 +56,6 @@ def check_parameters_support(
# If infer_nrows is a significant portion of the number of rows, pandas may be
# faster.
return False
return super().check_parameters_support(filepath_or_buffer, read_kwargs)
return super().check_parameters_support(
filepath_or_buffer, read_kwargs, skiprows_md, header_size
)
39 changes: 34 additions & 5 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from modin.utils import _inherit_docstrings
from modin.core.io.text.utils import CustomNewlineIterator
from modin.config import NPartitions
from modin.error_message import ErrorMessage

ColumnNamesTypes = Tuple[Union[pandas.Index, pandas.MultiIndex, pandas.Int64Index]]
IndexColType = Union[int, str, bool, Sequence[int], Sequence[str], None]
Expand Down Expand Up @@ -616,6 +617,8 @@ def check_parameters_support(
cls,
filepath_or_buffer: FilePathOrBuffer,
read_kwargs: dict,
skiprows_md: Union[Sequence, callable, int],
header_size: int,
) -> bool:
"""
Check support of only general parameters of `read_*` function.
Expand All @@ -626,12 +629,17 @@ def check_parameters_support(
`filepath_or_buffer` parameter of `read_*` function.
read_kwargs : dict
Parameters of `read_*` function.
skiprows_md : int, array or callable
`skiprows` parameter modified for easier handling by Modin.
header_size : int
Number of rows that are used by header.

Returns
-------
bool
Whether passed parameters are supported or not.
"""
skiprows = read_kwargs.get("skiprows")
if isinstance(filepath_or_buffer, str):
if not cls.file_exists(filepath_or_buffer):
return False
Expand All @@ -641,6 +649,24 @@ def check_parameters_support(
if read_kwargs["chunksize"] is not None:
return False

skiprows_unsupported = (
is_list_like(skiprows_md) and skiprows_md[0] < header_size
) or (
callable(skiprows)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you use skiprows and skiprowd_md variables at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in that way because in the case of list-like skiprows we need already sorted array to get the smallest element (the first element in the skiprows_md array) but in the case of callable skiprows we can't get skiprows_md since it's value already shifted on header_size value and we can't get intersection information from it (so we need unshifted skiprows callable).

and any(
map(
bool,
cls._get_skip_mask(pandas.RangeIndex(header_size), skiprows),
)
)
)
amyskov marked this conversation as resolved.
Show resolved Hide resolved
if skiprows_unsupported:
ErrorMessage.single_warning(
"Values of `header` and `skiprows` parameters have intersections. "
"This case doesn't supported by Modin, so pandas implementation will be used!"
amyskov marked this conversation as resolved.
Show resolved Hide resolved
)
return False

return True

@classmethod
Expand Down Expand Up @@ -759,14 +785,19 @@ def _manage_skiprows_parameter(
)
skiprows_partitioning = len(skiprows_md)
skiprows_md = 0
else:
elif skiprows_md[0] > header_size:
skiprows_md = skiprows_md - header_size

amyskov marked this conversation as resolved.
Show resolved Hide resolved
elif callable(skiprows):

def skiprows_func(x):
return skiprows(x + header_size)

skiprows_md = skiprows_func
elif skiprows is not None:
raise TypeError(
f"Not acceptable type of `skiprows` parameter: {type(skiprows)}"
amyskov marked this conversation as resolved.
Show resolved Hide resolved
)

return skiprows_md, pre_reading, skiprows_partitioning

Expand Down Expand Up @@ -894,10 +925,6 @@ def _get_new_qc(
skip_mask = skip_mask.to_numpy("bool")
view_idx = index_range[~skip_mask]
new_query_compiler = new_query_compiler.view(index=view_idx)
else:
raise TypeError(
f"Not acceptable type of `skiprows` parameter: {type(skiprows_md)}"
)

if not isinstance(new_query_compiler.index, pandas.MultiIndex):
new_query_compiler = new_query_compiler.reset_index(drop=True)
Expand Down Expand Up @@ -964,6 +991,8 @@ def _read(cls, filepath_or_buffer: FilePathOrBuffer, **kwargs):
use_modin_impl = cls.check_parameters_support(
filepath_or_buffer,
kwargs,
skiprows_md,
header_size,
)
if not use_modin_impl:
return cls.single_worker_read(
Expand Down
11 changes: 10 additions & 1 deletion modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,13 @@ def test_read_csv_empty_frame(self):
@pytest.mark.parametrize(
"skiprows",
[
[x for x in range(10)],
[x + 5 for x in range(15)],
[x for x in range(10) if x % 2 == 0],
[x + 5 for x in range(15) if x % 2 == 0],
lambda x: x % 2,
lambda x: x > 20,
lambda x: x < 20,
lambda x: True,
lambda x: x in [10, 20],
pytest.param(
Expand All @@ -1147,13 +1153,16 @@ def test_read_csv_empty_frame(self):
),
],
)
def test_read_csv_skiprows_corner_cases(self, skiprows):
@pytest.mark.parametrize("header", ["infer", None, 0, 1, 150])
def test_read_csv_skiprows_corner_cases(self, skiprows, header):
eval_io(
fn_name="read_csv",
check_kwargs_callable=not callable(skiprows),
# read_csv kwargs
filepath_or_buffer=pytest.csvs_names["test_read_csv_regular"],
skiprows=skiprows,
header=header,
dtype="str", # to avoid issues with heterogeneous data
)


Expand Down