Skip to content

Commit

Permalink
FEAT-#2342: Add axis partitions API (#2515)
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Yaroslav <yaroslav.igoshev@intel.com>

Co-authored-by: Devin Petersohn <devin.petersohn@gmail.com>
  • Loading branch information
YarShev and devin-petersohn authored Jan 13, 2021
1 parent 477c5f6 commit 7cfc85c
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 33 deletions.
16 changes: 16 additions & 0 deletions modin/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from .partition_api import unwrap_partitions

__all__ = ["unwrap_partitions"]
80 changes: 80 additions & 0 deletions modin/api/partition_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.


def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
"""
Unwrap partitions of the `api_layer_object`.
Parameters
----------
api_layer_object : DataFrame or Series
The API layer object.
axis : None, 0 or 1. Default is None
The axis to unwrap partitions for (0 - row partitions, 1 - column partitions).
If axis is None, all the partitions of the API layer object are unwrapped.
bind_ip : boolean. Default is False
Whether to bind node ip address to each partition or not.
Returns
-------
list
A list of Ray.ObjectRef/Dask.Future to partitions of the `api_layer_object`
if Ray/Dask is used as an engine.
Notes
-----
In case bind_ip=True, a list containing tuples of Ray.ObjectRef/Dask.Future to node ip addresses
and partitions of the `api_layer_object`, respectively, is returned if Ray/Dask is used as an engine.
"""
if not hasattr(api_layer_object, "_query_compiler"):
raise ValueError(
f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead."
)

if axis is None:

def _unwrap_partitions(oid):
if bind_ip:
return [
(partition.ip, getattr(partition, oid))
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]
else:
return [
getattr(partition, oid)
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]

actual_engine = type(
api_layer_object._query_compiler._modin_frame._partitions[0][0]
).__name__
if actual_engine in ("PandasOnRayFramePartition",):
return _unwrap_partitions("oid")
elif actual_engine in ("PandasOnDaskFramePartition",):
return _unwrap_partitions("future")
raise ValueError(
f"Do not know how to unwrap '{actual_engine}' underlying partitions"
)
else:
partitions = (
api_layer_object._query_compiler._modin_frame._frame_mgr_cls.axis_partition(
api_layer_object._query_compiler._modin_frame._partitions, axis ^ 1
)
)
return [
part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip)
for part in partitions
]
50 changes: 50 additions & 0 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,56 @@ def shuffle(self, func, lengths, **kwargs):
def _wrap_partitions(self, partitions):
return [self.partition_type(obj) for obj in partitions]

def coalesce(self, bind_ip=False):
"""
Coalesce the axis partitions into a single partition.
Parameters
----------
bind_ip : boolean, default False
Whether to bind node ip address to a single partition or not.
Returns
-------
BaseFrameAxisPartition
An axis partition containing only a single coalesced partition.
"""
coalesced = self.apply(lambda x: x, num_splits=1, maintain_partitioning=False)
return type(self)(coalesced, bind_ip=bind_ip)

def unwrap(self, squeeze=False, bind_ip=False):
"""
Unwrap partitions from axis partition.
Parameters
----------
squeeze : boolean, default False
The flag used to unwrap only one partition.
bind_ip : boolean, default False
Whether to bind node ip address to each partition or not.
Returns
-------
list
List of partitions from axis partition.
Notes
-----
In case bind_ip=True, list containing tuples of Ray.ObjectRef/Dask.Future
to node ip addresses and unwrapped partitions, respectively, is returned
if Ray/Dask is used as an engine.
"""
if squeeze and len(self.list_of_blocks) == 1:
if bind_ip:
return self.list_of_ips[0], self.list_of_blocks[0]
else:
return self.list_of_blocks[0]
else:
if bind_ip:
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
return self.list_of_blocks


class PandasFrameAxisPartition(BaseFrameAxisPartition):
"""An abstract class is created to simplify and consolidate the code for AxisPartitions that run pandas.
Expand Down
41 changes: 38 additions & 3 deletions modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

from distributed.client import get_client
from distributed import Future
from distributed.utils import get_ip
import pandas


class PandasOnDaskFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks):
def __init__(self, list_of_blocks, bind_ip=False):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.future for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]

partition_type = PandasOnDaskFramePartition
instance_type = Future
Expand All @@ -34,6 +38,7 @@ def deploy_axis_func(
):
client = get_client()
axis_result = client.submit(
deploy_dask_func,
PandasFrameAxisPartition.deploy_axis_func,
axis,
func,
Expand All @@ -51,7 +56,7 @@ def deploy_axis_func(
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(result_num_splits)
for i in range(result_num_splits * 4)
]

@classmethod
Expand All @@ -60,6 +65,7 @@ def deploy_func_between_two_axis_partitions(
):
client = get_client()
axis_result = client.submit(
deploy_dask_func,
PandasFrameAxisPartition.deploy_func_between_two_axis_partitions,
axis,
func,
Expand All @@ -74,7 +80,13 @@ def deploy_func_between_two_axis_partitions(
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(num_splits)
for i in range(num_splits * 4)
]

def _wrap_partitions(self, partitions):
return [
self.partition_type(future, length, width, ip)
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
]


Expand All @@ -94,3 +106,26 @@ class PandasOnDaskFrameRowPartition(PandasOnDaskFrameAxisPartition):
"""

axis = 1


def deploy_dask_func(func, *args):
"""
Run a function on a remote partition.
Parameters
----------
func : callable
The function to run.
Returns
-------
The result of the function `func`.
"""
result = func(*args)
ip = get_ip()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
13 changes: 9 additions & 4 deletions modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from modin.data_management.utils import length_fn_pandas, width_fn_pandas

from distributed.client import get_client
from distributed.utils import get_ip
import cloudpickle as pkl


Expand All @@ -25,7 +26,7 @@ def apply_list_of_funcs(funcs, df):
if isinstance(func, bytes):
func = pkl.loads(func)
df = func(df, **kwargs)
return df
return df, get_ip()


class PandasOnDaskFramePartition(BaseFramePartition):
Expand All @@ -40,13 +41,14 @@ class PandasOnDaskFramePartition(BaseFramePartition):
subclasses. There is no logic for updating inplace.
"""

def __init__(self, future, length=None, width=None, call_queue=None):
def __init__(self, future, length=None, width=None, ip=None, call_queue=None):
self.future = future
if call_queue is None:
call_queue = []
self.call_queue = call_queue
self._length_cache = length
self._width_cache = width
self.ip = ip

def get(self):
"""Flushes the call_queue and returns the data.
Expand Down Expand Up @@ -81,7 +83,8 @@ def apply(self, func, **kwargs):
future = get_client().submit(
apply_list_of_funcs, call_queue, self.future, pure=False
)
return PandasOnDaskFramePartition(future)
futures = [get_client().submit(lambda l: l[i], future) for i in range(2)]
return PandasOnDaskFramePartition(futures[0], ip=futures[1])

def add_to_apply_calls(self, func, **kwargs):
return PandasOnDaskFramePartition(
Expand All @@ -91,7 +94,9 @@ def add_to_apply_calls(self, func, **kwargs):
def drain_call_queue(self):
if len(self.call_queue) == 0:
return
self.future = self.apply(lambda x: x).future
new_partition = self.apply(lambda x: x)
self.future = new_partition.future
self.ip = new_partition.ip
self.call_queue = []

def mask(self, row_indices, col_indices):
Expand Down
38 changes: 24 additions & 14 deletions modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
from .partition import PandasOnRayFramePartition

import ray
from ray.services import get_node_ip_address


class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks):
def __init__(self, list_of_blocks, bind_ip=False):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.oid for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]

partition_type = PandasOnRayFramePartition
instance_type = ray.ObjectID
Expand All @@ -44,7 +47,7 @@ def deploy_axis_func(
maintain_partitioning,
)
+ tuple(partitions),
num_returns=num_splits * 3 if lengths is None else len(lengths) * 3,
num_returns=num_splits * 4 if lengths is None else len(lengths) * 4,
)

@classmethod
Expand All @@ -62,13 +65,13 @@ def deploy_func_between_two_axis_partitions(
kwargs,
)
+ tuple(partitions),
num_returns=num_splits * 3,
num_returns=num_splits * 4,
)

def _wrap_partitions(self, partitions):
return [
self.partition_type(partitions[i], partitions[i + 1], partitions[i + 2])
for i in range(0, len(partitions), 3)
self.partition_type(object_id, length, width, ip)
for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4)
]


Expand All @@ -92,20 +95,27 @@ class PandasOnRayFrameRowPartition(PandasOnRayFrameAxisPartition):

@ray.remote
def deploy_ray_func(func, *args): # pragma: no cover
"""Run a function on a remote partition.
Note: Ray functions are not detected by codecov (thus pragma: no cover)
"""
Run a function on a remote partition.
Args:
func: The function to run.
Parameters
----------
func : callable
The function to run.
Returns:
Returns
-------
The result of the function `func`.
Notes
-----
Ray functions are not detected by codecov (thus pragma: no cover)
"""
result = func(*args)
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns)
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns)]]
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
else:
return [i for r in result for i in [r, None, None]]
return [i for r in result for i in [r, None, None, ip]]
Loading

0 comments on commit 7cfc85c

Please sign in to comment.