Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#2451: Read multiple csv files simultaneously via glob paths #2662

Merged
merged 30 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2d334cb
FEAT-#2451: Be able to discover multiple files
williamma12 Jan 29, 2021
d44beda
FEAT-#2451: Calculate splits and read multiple splits
williamma12 Jan 29, 2021
91999a7
FEAT-#2451: Fix syntax bugs
williamma12 Jan 29, 2021
f9cd502
FEAT-#2451: Added tests
williamma12 Jan 29, 2021
6a8e71f
FEAT-#2451: Fix dependencies
williamma12 Jan 29, 2021
c6566e8
FEAT-#2451: Fix docstring spacing
williamma12 Jan 30, 2021
fef2512
FEAT-#2451: Linting
williamma12 Jan 30, 2021
48bcee7
FEAT-#2451: Fix skip_header reference bug.
williamma12 Jan 30, 2021
cd58793
FEAT-#2451: Fix defaulting to pandas bug
williamma12 Jan 30, 2021
4fd9abb
FEAT-#2451: Run test only when not python engine.
williamma12 Jan 30, 2021
9f7a1cd
FEAT-#2451: Fix docstring formatting
williamma12 Jan 30, 2021
76999a1
FEAT-#2451: Fix read nrows bug
williamma12 Jan 31, 2021
9b5ed7e
FEAT-#2451: Fixed some file objects (s3 and compressed) not haveing a…
williamma12 Jan 31, 2021
77f9ebc
FEAT-#2451: linting
williamma12 Jan 31, 2021
ebb2565
FEAT-#2451: Remove wrong TODO.
williamma12 Jan 31, 2021
f37dbe0
FEAT-#2451: Fix glob file ordering in tests
williamma12 Jan 31, 2021
c6f0e22
FEAT-#2451: Linting
williamma12 Jan 31, 2021
8a85891
FEAT-#2451: Move to experimental
williamma12 Jan 31, 2021
e7dbd14
FEAT-#2451: Update test to only run for ray.
williamma12 Jan 31, 2021
7406629
FEAT-#2451: Linting
williamma12 Jan 31, 2021
780fb59
FEAT-#2451: Passing tests
williamma12 Feb 1, 2021
0cc367e
FEAT-#2451: More linting
williamma12 Feb 1, 2021
bebe6f2
FEAT-#2451: Document partition packing logic.
williamma12 Feb 1, 2021
04f042f
FEAT-#2451: Reduce PR size
williamma12 Feb 2, 2021
ce22ac3
FEAT-#2451: Further reduce PR size
williamma12 Feb 2, 2021
5a1054f
FEAT-#2451: Add type hints
williamma12 Feb 2, 2021
a912f9d
FEAT-#2451: Resolve comments
williamma12 Feb 2, 2021
70ec23c
FEAT-#2451: Only read_csv_glob for experimental pandas on ray.
williamma12 Feb 2, 2021
1713bc5
FEAT-#2451: Make top-level api
williamma12 Feb 3, 2021
975a80d
FEAT-#2451: Fix outdated documentation in tests
williamma12 Feb 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions modin/backends/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,52 @@ def parse(fname, **kwargs):
]


class PandasCSVGlobParser(PandasCSVParser):
@staticmethod
def parse(chunks, **kwargs):
warnings.filterwarnings("ignore")
num_splits = kwargs.pop("num_splits", None)
index_col = kwargs.get("index_col", None)

pandas_dfs = []
for fname, start, end in chunks:
if start is not None and end is not None:
# pop "compression" from kwargs because bio is uncompressed
bio = FileDispatcher.file_open(
fname, "rb", kwargs.pop("compression", "infer")
)
if kwargs.get("encoding", None) is not None:
header = b"" + bio.readline()
else:
header = b""
bio.seek(start)
to_read = header + bio.read(end - start)
bio.close()
pandas_dfs.append(pandas.read_csv(BytesIO(to_read), **kwargs))
else:
# This only happens when we are reading with only one worker (Default)
return pandas.read_csv(fname, **kwargs)

# Combine read in data.
if len(pandas_dfs) > 1:
pandas_df = pandas.concat(pandas_dfs)
elif len(pandas_dfs) > 0:
pandas_df = pandas_dfs[0]
else:
pandas_df = pandas.DataFrame()

# Set internal index.
if index_col is not None:
index = pandas_df.index
else:
# The lengths will become the RangeIndex
index = len(pandas_df)
return _split_result_for_readers(1, num_splits, pandas_df) + [
index,
pandas_df.dtypes,
]


class PandasFWFParser(PandasParser):
@staticmethod
def parse(fname, **kwargs):
Expand Down
4 changes: 4 additions & 0 deletions modin/data_management/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def read_parquet(cls, **kwargs):
def read_csv(cls, **kwargs):
return cls.__engine._read_csv(**kwargs)

@classmethod
def read_csv_glob(cls, **kwargs):
return cls.__engine._read_csv_glob(**kwargs)

@classmethod
def read_json(cls, **kwargs):
return cls.__engine._read_json(**kwargs)
Expand Down
4 changes: 4 additions & 0 deletions modin/data_management/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def prepare(cls):

cls.io_cls = ExperimentalPandasOnRayIO

@classmethod
def _read_csv_glob(cls, **kwargs):
return cls.io_cls.read_csv_glob(**kwargs)


class ExperimentalPandasOnPythonFactory(ExperimentalBaseFactory, PandasOnPythonFactory):
pass
Expand Down
2 changes: 2 additions & 0 deletions modin/engines/base/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from modin.engines.base.io.io import BaseIO
from modin.engines.base.io.text.csv_dispatcher import CSVDispatcher
from modin.engines.base.io.text.csv_glob_dispatcher import CSVGlobDispatcher
from modin.engines.base.io.text.fwf_dispatcher import FWFDispatcher
from modin.engines.base.io.text.json_dispatcher import JSONDispatcher
from modin.engines.base.io.text.excel_dispatcher import ExcelDispatcher
Expand All @@ -26,6 +27,7 @@
__all__ = [
"BaseIO",
"CSVDispatcher",
"CSVGlobDispatcher",
"FWFDispatcher",
"JSONDispatcher",
"FileDispatcher",
Expand Down
Loading