Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#2496: Change internal reader names to dispatcher #2497

Merged
merged 1 commit into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions modin/backends/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pandas.io.common import infer_compression
import warnings

from modin.engines.base.io import FileReader
from modin.engines.base.io import FileDispatcher
from modin.data_management.utils import split_result_of_axis_func_pandas
from modin.error_message import ErrorMessage

Expand Down Expand Up @@ -99,7 +99,9 @@ def parse(fname, **kwargs):
index_col = kwargs.get("index_col", None)
if start is not None and end is not None:
# pop "compression" from kwargs because bio is uncompressed
bio = FileReader.file_open(fname, "rb", kwargs.pop("compression", "infer"))
bio = FileDispatcher.file_open(
fname, "rb", kwargs.pop("compression", "infer")
)
if kwargs.get("encoding", None) is not None:
header = b"" + bio.readline()
else:
Expand Down Expand Up @@ -131,7 +133,9 @@ def parse(fname, **kwargs):
index_col = kwargs.get("index_col", None)
if start is not None and end is not None:
# pop "compression" from kwargs because bio is uncompressed
bio = FileReader.file_open(fname, "rb", kwargs.pop("compression", "infer"))
bio = FileDispatcher.file_open(
fname, "rb", kwargs.pop("compression", "infer")
)
if kwargs.get("encoding", None) is not None:
header = b"" + bio.readline()
else:
Expand Down Expand Up @@ -331,7 +335,9 @@ def parse(fname, **kwargs):
end = kwargs.pop("end", None)
if start is not None and end is not None:
# pop "compression" from kwargs because bio is uncompressed
bio = FileReader.file_open(fname, "rb", kwargs.pop("compression", "infer"))
bio = FileDispatcher.file_open(
fname, "rb", kwargs.pop("compression", "infer")
)
bio.seek(start)
to_read = b"" + bio.read(end - start)
bio.close()
Expand Down
40 changes: 20 additions & 20 deletions modin/engines/base/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,27 @@
# governing permissions and limitations under the License.

from modin.engines.base.io.io import BaseIO
from modin.engines.base.io.text.csv_reader import CSVReader
from modin.engines.base.io.text.fwf_reader import FWFReader
from modin.engines.base.io.text.json_reader import JSONReader
from modin.engines.base.io.text.excel_reader import ExcelReader
from modin.engines.base.io.file_reader import FileReader
from modin.engines.base.io.text.text_file_reader import TextFileReader
from modin.engines.base.io.column_stores.parquet_reader import ParquetReader
from modin.engines.base.io.column_stores.hdf_reader import HDFReader
from modin.engines.base.io.column_stores.feather_reader import FeatherReader
from modin.engines.base.io.sql.sql_reader import SQLReader
from modin.engines.base.io.text.csv_dispatcher import CSVDispatcher
from modin.engines.base.io.text.fwf_dispatcher import FWFDispatcher
from modin.engines.base.io.text.json_dispatcher import JSONDispatcher
from modin.engines.base.io.text.excel_dispatcher import ExcelDispatcher
from modin.engines.base.io.file_dispatcher import FileDispatcher
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher
from modin.engines.base.io.column_stores.parquet_dispatcher import ParquetDispatcher
from modin.engines.base.io.column_stores.hdf_dispatcher import HDFDispatcher
from modin.engines.base.io.column_stores.feather_dispatcher import FeatherDispatcher
from modin.engines.base.io.sql.sql_dispatcher import SQLDispatcher

__all__ = [
"BaseIO",
"CSVReader",
"FWFReader",
"JSONReader",
"FileReader",
"TextFileReader",
"ParquetReader",
"HDFReader",
"FeatherReader",
"SQLReader",
"ExcelReader",
"CSVDispatcher",
"FWFDispatcher",
"JSONDispatcher",
"FileDispatcher",
"TextFileDispatcher",
"ParquetDispatcher",
"HDFDispatcher",
"FeatherDispatcher",
"SQLDispatcher",
"ExcelDispatcher",
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import pandas

from modin.data_management.utils import compute_chunksize
from modin.engines.base.io.file_reader import FileReader
from modin.engines.base.io.file_dispatcher import FileDispatcher


class ColumnStoreReader(FileReader):
class ColumnStoreDispatcher(FileDispatcher):
@classmethod
def call_deploy(cls, fname, col_partitions, **kwargs):
from modin.pandas import DEFAULT_NPARTITIONS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader
from modin.engines.base.io.column_stores.column_store_dispatcher import (
ColumnStoreDispatcher,
)


class FeatherReader(ColumnStoreReader):
class FeatherDispatcher(ColumnStoreDispatcher):
@classmethod
def _read(cls, path, columns=None, **kwargs):
"""Read data from the file path, returning a Modin DataFrame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@

import pandas

from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader
from modin.engines.base.io.column_stores.column_store_dispatcher import (
ColumnStoreDispatcher,
)
from modin.error_message import ErrorMessage


class HDFReader(ColumnStoreReader): # pragma: no cover
class HDFDispatcher(ColumnStoreDispatcher): # pragma: no cover
@classmethod
def _validate_hdf_format(cls, path_or_buf):
s = pandas.HDFStore(path_or_buf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@

import os

from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader
from modin.engines.base.io.column_stores.column_store_dispatcher import (
ColumnStoreDispatcher,
)
from modin.error_message import ErrorMessage


class ParquetReader(ColumnStoreReader):
class ParquetDispatcher(ColumnStoreDispatcher):
@classmethod
def _read(cls, path, engine, columns, **kwargs):
"""Load a parquet object from the file path, returning a Modin DataFrame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
NOT_IMPLEMENTED_MESSAGE = "Implement in children classes!"


class FileReader:
class FileDispatcher:
frame_cls = None
frame_partition_cls = None
query_compiler_cls = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import pandas
import warnings

from modin.engines.base.io.file_reader import FileReader
from modin.engines.base.io.file_dispatcher import FileDispatcher


class SQLReader(FileReader):
class SQLDispatcher(FileDispatcher):
@classmethod
def _read(cls, sql, con, index_col=None, **kwargs):
"""Reads a SQL query or database table into a DataFrame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.engines.base.io.text.text_file_reader import TextFileReader
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher
from modin.data_management.utils import compute_chunksize
from pandas.io.parsers import _validate_usecols_arg
import pandas
import csv
import sys


class CSVReader(TextFileReader):
class CSVDispatcher(TextFileDispatcher):
@classmethod
def _read(cls, filepath_or_buffer, **kwargs):
if isinstance(filepath_or_buffer, str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import warnings

from modin.data_management.utils import compute_chunksize
from modin.engines.base.io.text.text_file_reader import TextFileReader
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher


EXCEL_READ_BLOCK_SIZE = 4096


class ExcelReader(TextFileReader):
class ExcelDispatcher(TextFileDispatcher):
@classmethod
def _read(cls, io, **kwargs):
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.engines.base.io.text.text_file_reader import TextFileReader
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher
from modin.data_management.utils import compute_chunksize
from pandas.io.parsers import _validate_usecols_arg
import pandas
from csv import QUOTE_NONE
import sys


class FWFReader(TextFileReader):
class FWFDispatcher(TextFileDispatcher):
@classmethod
def read(cls, filepath_or_buffer, **kwargs):
if isinstance(filepath_or_buffer, str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.engines.base.io.text.text_file_reader import TextFileReader
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher
from modin.data_management.utils import compute_chunksize
from io import BytesIO
import pandas
import numpy as np
from csv import QUOTE_NONE


class JSONReader(TextFileReader):
class JSONDispatcher(TextFileDispatcher):
@classmethod
def _read(cls, path_or_buf, **kwargs):
if isinstance(path_or_buf, str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.engines.base.io.file_reader import FileReader
from modin.engines.base.io.file_dispatcher import FileDispatcher
import numpy as np
import warnings
import os


class TextFileReader(FileReader):
class TextFileDispatcher(FileDispatcher):
@classmethod
def build_partition(cls, partition_ids, row_lengths, column_widths):
return np.array(
Expand Down
26 changes: 14 additions & 12 deletions modin/engines/dask/pandas_on_dask/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
from modin.engines.dask.pandas_on_dask.frame.data import PandasOnDaskFrame
from modin.engines.dask.pandas_on_dask.frame.partition import PandasOnDaskFramePartition
from modin.engines.base.io import (
CSVReader,
JSONReader,
ParquetReader,
FeatherReader,
SQLReader,
ExcelReader,
CSVDispatcher,
JSONDispatcher,
ParquetDispatcher,
FeatherDispatcher,
SQLDispatcher,
ExcelDispatcher,
)
from modin.backends.pandas.parsers import (
PandasCSVParser,
Expand All @@ -44,15 +44,17 @@ class PandasOnDaskIO(BaseIO):
query_compiler_cls=PandasQueryCompiler,
)

read_csv = type("", (DaskTask, PandasCSVParser, CSVReader), build_args).read
read_json = type("", (DaskTask, PandasJSONParser, JSONReader), build_args).read
read_csv = type("", (DaskTask, PandasCSVParser, CSVDispatcher), build_args).read
read_json = type("", (DaskTask, PandasJSONParser, JSONDispatcher), build_args).read
read_parquet = type(
"", (DaskTask, PandasParquetParser, ParquetReader), build_args
"", (DaskTask, PandasParquetParser, ParquetDispatcher), build_args
).read
# Blocked on pandas-dev/pandas#12236. It is faster to default to pandas.
# read_hdf = type("", (DaskTask, PandasHDFParser, HDFReader), build_args).read
read_feather = type(
"", (DaskTask, PandasFeatherParser, FeatherReader), build_args
"", (DaskTask, PandasFeatherParser, FeatherDispatcher), build_args
).read
read_sql = type("", (DaskTask, PandasSQLParser, SQLDispatcher), build_args).read
read_excel = type(
"", (DaskTask, PandasExcelParser, ExcelDispatcher), build_args
).read
read_sql = type("", (DaskTask, PandasSQLParser, SQLReader), build_args).read
read_excel = type("", (DaskTask, PandasExcelParser, ExcelReader), build_args).read
30 changes: 16 additions & 14 deletions modin/engines/ray/pandas_on_ray/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from modin.backends.pandas.query_compiler import PandasQueryCompiler
from modin.engines.ray.generic.io import RayIO
from modin.engines.base.io import (
CSVReader,
FWFReader,
JSONReader,
ParquetReader,
FeatherReader,
SQLReader,
ExcelReader,
CSVDispatcher,
FWFDispatcher,
JSONDispatcher,
ParquetDispatcher,
FeatherDispatcher,
SQLDispatcher,
ExcelDispatcher,
)
from modin.backends.pandas.parsers import (
PandasCSVParser,
Expand All @@ -45,16 +45,18 @@ class PandasOnRayIO(RayIO):
query_compiler_cls=PandasQueryCompiler,
frame_cls=PandasOnRayFrame,
)
read_csv = type("", (RayTask, PandasCSVParser, CSVReader), build_args).read
read_fwf = type("", (RayTask, PandasFWFParser, FWFReader), build_args).read
read_json = type("", (RayTask, PandasJSONParser, JSONReader), build_args).read
read_csv = type("", (RayTask, PandasCSVParser, CSVDispatcher), build_args).read
read_fwf = type("", (RayTask, PandasFWFParser, FWFDispatcher), build_args).read
read_json = type("", (RayTask, PandasJSONParser, JSONDispatcher), build_args).read
read_parquet = type(
"", (RayTask, PandasParquetParser, ParquetReader), build_args
"", (RayTask, PandasParquetParser, ParquetDispatcher), build_args
).read
# Blocked on pandas-dev/pandas#12236. It is faster to default to pandas.
# read_hdf = type("", (RayTask, PandasHDFParser, HDFReader), build_args).read
read_feather = type(
"", (RayTask, PandasFeatherParser, FeatherReader), build_args
"", (RayTask, PandasFeatherParser, FeatherDispatcher), build_args
).read
read_sql = type("", (RayTask, PandasSQLParser, SQLDispatcher), build_args).read
read_excel = type(
"", (RayTask, PandasExcelParser, ExcelDispatcher), build_args
).read
read_sql = type("", (RayTask, PandasSQLParser, SQLReader), build_args).read
read_excel = type("", (RayTask, PandasExcelParser, ExcelReader), build_args).read
6 changes: 3 additions & 3 deletions modin/experimental/engines/pyarrow_on_ray/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
)
from modin.backends.pyarrow.parsers import PyarrowCSVParser
from modin.engines.ray.task_wrapper import RayTask
from modin.engines.base.io import CSVReader
from modin.engines.base.io import CSVDispatcher


class PyarrowOnRayCSVReader(RayTask, PyarrowCSVParser, CSVReader):
class PyarrowOnRayCSVDispatcher(RayTask, PyarrowCSVParser, CSVDispatcher):
frame_cls = PyarrowOnRayFrame
frame_partition_cls = PyarrowOnRayFramePartition
query_compiler_cls = PyarrowQueryCompiler
Expand All @@ -32,7 +32,7 @@ class PyarrowOnRayIO(RayIO):
frame_cls = PyarrowOnRayFrame
frame_partition_cls = PyarrowOnRayFramePartition
query_compiler_cls = PyarrowQueryCompiler
csv_reader = PyarrowOnRayCSVReader
csv_reader = PyarrowOnRayCSVDispatcher

read_parquet_remote_task = None
read_hdf_remote_task = None
Expand Down