Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#3853: interacting with Dask interface through 'DaskWrapper' class #3854

Merged
merged 22 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ When reading data from a CSV file, for example, the :py:class:`~modin.core.execu
the user query to the :meth:`~modin.core.io.text.CSVDispatcher._read` method of :py:class:`~modin.core.io.text.CSVDispatcher`, where the query's parameters are preprocessed
to check if they are supported by the execution (defaulting to pandas if they are not) and computes some metadata
common for all partitions to be read. Then, the file is split into row chunks, and this data is used to launch remote tasks on the Dask workers
via the :meth:`~modin.core.execution.dask.common.task_wrapper.DaskTask.deploy` method of :py:class:`~modin.core.execution.dask.common.task_wrapper.DaskTask`.
via the :meth:`~modin.core.execution.dask.common.dask_wrapper.DaskWrapper.deploy` method of :py:class:`~modin.core.execution.dask.common.dask_wrapper.DaskWrapper`.
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
On each Dask worker, the :py:class:`~modin.core.storage_formats.pandas.parsers.PandasCSVParser` parses data.
After the remote tasks are finished, additional result postprocessing is performed,
and a new query compiler with the data read is returned.
Expand Down
2 changes: 1 addition & 1 deletion docs/img/pandas_on_dask_data_ingress.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@
from distributed.client import default_client


class DaskTask:
class DaskWrapper:
"""The class responsible for execution of remote operations."""

@classmethod
def deploy(cls, func, num_returns, kwargs):
def deploy(cls, func, *args, num_returns=1, pure=None, **kwargs):
"""
Deploy a function in a worker process.

Parameters
----------
func : callable
Function to be deployed in a worker process.
num_returns : int
*args : list
Additional positional arguments to be passed in `func`.
num_returns : int, default: 1
The number of returned objects.
kwargs : dict
pure : bool, optional
Whether or not `func` is pure. See `Client.submit` for details.
**kwargs : dict
Additional keyword arguments to be passed in ``func``.

Returns
Expand All @@ -39,11 +43,13 @@ def deploy(cls, func, num_returns, kwargs):
The result of ``func`` splitted into parts in accordance with ``num_returns``.
"""
client = default_client()
remote_task_future = client.submit(func, **kwargs)
return [
client.submit(lambda l, i: l[i], remote_task_future, i)
for i in range(num_returns)
]
remote_task_future = client.submit(func, *args, pure=pure, **kwargs)
if num_returns != 1:
return [
client.submit(lambda l, i: l[i], remote_task_future, i)
for i in range(num_returns)
]
return remote_task_future

@classmethod
def materialize(cls, future):
Expand All @@ -62,3 +68,22 @@ def materialize(cls, future):
"""
client = default_client()
return client.gather(future)

@classmethod
def put(cls, data, **kwargs):
"""
Put data into distributed memory.

Parameters
----------
data : list, dict, or object
Data to scatter out to workers. Output type matches input type.
**kwargs : dict
Additional keyword arguments to be passed in `Client.scatter`.

Returns
-------
List, dict, iterator, or queue of futures matching the type of input.
"""
client = default_client()
return client.scatter(data, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager

from distributed.client import default_client


class PandasOnDaskDataframe(PandasDataframe):
"""
Expand Down Expand Up @@ -53,10 +51,11 @@ def _row_lengths(self):
list
A list of row partitions lengths.
"""
client = default_client()
if self._row_lengths_cache is None:
self._row_lengths_cache = client.gather(
[obj.apply(lambda df: len(df)).future for obj in self._partitions.T[0]]
self._row_lengths_cache = (
self._partition_mgr_cls.get_objects_from_partitions(
[obj.apply(lambda df: len(df)) for obj in self._partitions.T[0]]
)
)
return self._row_lengths_cache

Expand All @@ -70,12 +69,13 @@ def _column_widths(self):
list
A list of column partitions widths.
"""
client = default_client()
if self._column_widths_cache is None:
self._column_widths_cache = client.gather(
[
obj.apply(lambda df: len(df.columns)).future
for obj in self._partitions[0]
]
self._column_widths_cache = (
self._partition_mgr_cls.get_objects_from_partitions(
[
obj.apply(lambda df: len(df.columns))
for obj in self._partitions[0]
]
)
)
return self._column_widths_cache
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
PandasSQLParser,
PandasExcelParser,
)
from modin.core.execution.dask.common.task_wrapper import DaskTask
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper


class PandasOnDaskIO(BaseIO):
Expand All @@ -51,17 +51,19 @@ class PandasOnDaskIO(BaseIO):
query_compiler_cls=PandasQueryCompiler,
)

read_csv = type("", (DaskTask, PandasCSVParser, CSVDispatcher), build_args).read
read_json = type("", (DaskTask, PandasJSONParser, JSONDispatcher), build_args).read
read_csv = type("", (DaskWrapper, PandasCSVParser, CSVDispatcher), build_args).read
read_json = type(
"", (DaskWrapper, PandasJSONParser, JSONDispatcher), build_args
).read
read_parquet = type(
"", (DaskTask, PandasParquetParser, ParquetDispatcher), build_args
"", (DaskWrapper, PandasParquetParser, ParquetDispatcher), build_args
).read
# Blocked on pandas-dev/pandas#12236. It is faster to default to pandas.
# read_hdf = type("", (DaskTask, PandasHDFParser, HDFReader), build_args).read
# read_hdf = type("", (DaskWrapper, PandasHDFParser, HDFReader), build_args).read
read_feather = type(
"", (DaskTask, PandasFeatherParser, FeatherDispatcher), build_args
"", (DaskWrapper, PandasFeatherParser, FeatherDispatcher), build_args
).read
read_sql = type("", (DaskTask, PandasSQLParser, SQLDispatcher), build_args).read
read_sql = type("", (DaskWrapper, PandasSQLParser, SQLDispatcher), build_args).read
read_excel = type(
"", (DaskTask, PandasExcelParser, ExcelDispatcher), build_args
"", (DaskWrapper, PandasExcelParser, ExcelDispatcher), build_args
).read
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@

"""Module houses class that wraps data (block partition) and its metadata."""

import pandas

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition

from distributed.client import default_client
from distributed import Future
from distributed.utils import get_ip
from dask.distributed import wait

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.pandas.indexing import compute_sliced_len
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper


class PandasOnDaskDataframePartition(PandasDataframePartition):
Expand Down Expand Up @@ -63,10 +60,7 @@ def get(self):
The object from the distributed memory.
"""
self.drain_call_queue()
# blocking operation
if isinstance(self.future, pandas.DataFrame):
return self.future
return self.future.result()
return DaskWrapper.materialize(self.future)

def apply(self, func, *args, **kwargs):
"""
Expand All @@ -90,20 +84,24 @@ def apply(self, func, *args, **kwargs):
-----
The keyword arguments are sent as a dictionary.
"""
client = default_client()
call_queue = self.call_queue + [[func, args, kwargs]]
if len(call_queue) > 1:
future = client.submit(
apply_list_of_funcs, call_queue, self.future, pure=False
futures = DaskWrapper.deploy(
apply_list_of_funcs, call_queue, self.future, num_returns=2, pure=False
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this improves performance a bit.
func, args, kwargs = call_queue[0]
future = client.submit(apply_func, self.future, func, *args, **kwargs)
futures = [
client.submit(lambda l, i: l[i], future, i, pure=False) for i in range(2)
]
futures = DaskWrapper.deploy(
apply_func,
self.future,
func,
*args,
num_returns=2,
pure=False,
**kwargs,
)
return PandasOnDaskDataframePartition(futures[0], ip=futures[1])

def add_to_apply_calls(self, func, *args, **kwargs):
Expand Down Expand Up @@ -137,19 +135,23 @@ def drain_call_queue(self):
if len(self.call_queue) == 0:
return
call_queue = self.call_queue
client = default_client()
if len(call_queue) > 1:
future = client.submit(
apply_list_of_funcs, call_queue, self.future, pure=False
futures = DaskWrapper.deploy(
apply_list_of_funcs, call_queue, self.future, num_returns=2, pure=False
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this improves performance a bit.
func, args, kwargs = call_queue[0]
future = client.submit(apply_func, self.future, func, *args, **kwargs)
futures = [
client.submit(lambda l, i: l[i], future, i, pure=False) for i in range(2)
]
futures = DaskWrapper.deploy(
apply_func,
self.future,
func,
*args,
num_returns=2,
pure=False,
**kwargs,
)
self.future = futures[0]
self._ip_cache = futures[1]
self.call_queue = []
Expand All @@ -176,13 +178,12 @@ def mask(self, row_labels, col_labels):
A new ``PandasOnDaskDataframePartition`` object.
"""
new_obj = super().mask(row_labels, col_labels)
client = default_client()
if isinstance(row_labels, slice) and isinstance(self._length_cache, Future):
new_obj._length_cache = client.submit(
new_obj._length_cache = DaskWrapper.deploy(
compute_sliced_len, row_labels, self._length_cache
)
if isinstance(col_labels, slice) and isinstance(self._width_cache, Future):
new_obj._width_cache = client.submit(
new_obj._width_cache = DaskWrapper.deploy(
compute_sliced_len, col_labels, self._width_cache
)
return new_obj
Expand Down Expand Up @@ -219,8 +220,7 @@ def put(cls, obj):
PandasOnDaskDataframePartition
A new ``PandasOnDaskDataframePartition`` object.
"""
client = default_client()
return cls(client.scatter(obj, hash=False))
return cls(DaskWrapper.put(obj, hash=False))

@classmethod
def preprocess_func(cls, func):
Expand All @@ -237,7 +237,7 @@ def preprocess_func(cls, func):
callable
An object that can be accepted by ``apply``.
"""
return default_client().scatter(func, hash=False, broadcast=True)
return DaskWrapper.put(func, hash=False, broadcast=True)

def length(self):
"""
Expand All @@ -251,7 +251,7 @@ def length(self):
if self._length_cache is None:
self._length_cache = self.apply(lambda df: len(df)).future
if isinstance(self._length_cache, Future):
self._length_cache = self._length_cache.result()
self._length_cache = DaskWrapper.materialize(self._length_cache)
return self._length_cache

def width(self):
Expand All @@ -266,7 +266,7 @@ def width(self):
if self._width_cache is None:
self._width_cache = self.apply(lambda df: len(df.columns)).future
if isinstance(self._width_cache, Future):
self._width_cache = self._width_cache.result()
self._width_cache = DaskWrapper.materialize(self._width_cache)
return self._width_cache

def ip(self):
Expand All @@ -281,7 +281,7 @@ def ip(self):
if self._ip_cache is None:
self._ip_cache = self.apply(lambda df: df)._ip_cache
if isinstance(self._ip_cache, Future):
self._ip_cache = self._ip_cache.result()
self._ip_cache = DaskWrapper.materialize(self._ip_cache)
YarShev marked this conversation as resolved.
Show resolved Hide resolved
return self._ip_cache


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@

"""Module houses class that implements ``PandasDataframePartitionManager``."""

from distributed.client import default_client

from modin.core.dataframe.pandas.partitioning.partition_manager import (
PandasDataframePartitionManager,
)
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper
from .virtual_partition import (
PandasOnDaskDataframeColumnPartition,
PandasOnDaskDataframeRowPartition,
Expand Down Expand Up @@ -48,5 +47,4 @@ def get_objects_from_partitions(cls, partitions):
list
The objects wrapped by `partitions`.
"""
client = default_client()
return client.gather([partition.future for partition in partitions])
return DaskWrapper.materialize([partition.future for partition in partitions])
Loading