Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#2342: Add axis partitions API #2515

Merged
merged 4 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions modin/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from .partition_api import unwrap_partitions

__all__ = ["unwrap_partitions"]
80 changes: 80 additions & 0 deletions modin/api/partition_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.


def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
"""
Unwrap partitions of the `api_layer_object`.

Parameters
----------
api_layer_object : DataFrame or Series
The API layer object.
axis : None, 0 or 1. Default is None
The axis to unwrap partitions for (0 - row partitions, 1 - column partitions).
If axis is None, all the partitions of the API layer object are unwrapped.
bind_ip : boolean. Default is False
Whether to bind node ip address to each partition or not.

Returns
-------
list
A list of Ray.ObjectRef/Dask.Future to partitions of the `api_layer_object`
if Ray/Dask is used as an engine.

Notes
-----
In case bind_ip=True, a list containing tuples of Ray.ObjectRef/Dask.Future to node ip addresses
and partitions of the `api_layer_object`, respectively, is returned if Ray/Dask is used as an engine.
"""
if not hasattr(api_layer_object, "_query_compiler"):
raise ValueError(
f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead."
)

if axis is None:

def _unwrap_partitions(oid):
if bind_ip:
return [
(partition.ip, getattr(partition, oid))
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]
else:
return [
getattr(partition, oid)
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]

actual_engine = type(
api_layer_object._query_compiler._modin_frame._partitions[0][0]
).__name__
if actual_engine in ("PandasOnRayFramePartition",):
return _unwrap_partitions("oid")
elif actual_engine in ("PandasOnDaskFramePartition",):
return _unwrap_partitions("future")
raise ValueError(
f"Do not know how to unwrap '{actual_engine}' underlying partitions"
)
else:
partitions = (
api_layer_object._query_compiler._modin_frame._frame_mgr_cls.axis_partition(
api_layer_object._query_compiler._modin_frame._partitions, axis ^ 1
)
)
return [
part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip)
for part in partitions
]
50 changes: 50 additions & 0 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,56 @@ def shuffle(self, func, lengths, **kwargs):
def _wrap_partitions(self, partitions):
return [self.partition_type(obj) for obj in partitions]

def coalesce(self, bind_ip=False):
"""
Coalesce the axis partitions into a single partition.

Parameters
----------
bind_ip : boolean, default False
Whether to bind node ip address to a single partition or not.

Returns
-------
BaseFrameAxisPartition
An axis partition containing only a single coalesced partition.
"""
coalesced = self.apply(lambda x: x, num_splits=1, maintain_partitioning=False)
return type(self)(coalesced, bind_ip=bind_ip)
YarShev marked this conversation as resolved.
Show resolved Hide resolved

def unwrap(self, squeeze=False, bind_ip=False):
"""
Unwrap partitions from axis partition.

Parameters
----------
squeeze : boolean, default False
The flag used to unwrap only one partition.
bind_ip : boolean, default False
Whether to bind node ip address to each partition or not.

Returns
-------
list
List of partitions from axis partition.

Notes
-----
In case bind_ip=True, list containing tuples of Ray.ObjectRef/Dask.Future
to node ip addresses and unwrapped partitions, respectively, is returned
if Ray/Dask is used as an engine.
"""
if squeeze and len(self.list_of_blocks) == 1:
if bind_ip:
return self.list_of_ips[0], self.list_of_blocks[0]
else:
return self.list_of_blocks[0]
else:
if bind_ip:
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
return self.list_of_blocks


class PandasFrameAxisPartition(BaseFrameAxisPartition):
"""An abstract class is created to simplify and consolidate the code for AxisPartitions that run pandas.
Expand Down
41 changes: 38 additions & 3 deletions modin/engines/dask/pandas_on_dask/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

from distributed.client import get_client
from distributed import Future
from distributed.utils import get_ip
import pandas


class PandasOnDaskFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks):
def __init__(self, list_of_blocks, bind_ip=False):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.future for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]

partition_type = PandasOnDaskFramePartition
instance_type = Future
Expand All @@ -34,6 +38,7 @@ def deploy_axis_func(
):
client = get_client()
axis_result = client.submit(
deploy_dask_func,
PandasFrameAxisPartition.deploy_axis_func,
axis,
func,
Expand All @@ -51,7 +56,7 @@ def deploy_axis_func(
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(result_num_splits)
for i in range(result_num_splits * 4)
YarShev marked this conversation as resolved.
Show resolved Hide resolved
]

@classmethod
Expand All @@ -60,6 +65,7 @@ def deploy_func_between_two_axis_partitions(
):
client = get_client()
axis_result = client.submit(
deploy_dask_func,
PandasFrameAxisPartition.deploy_func_between_two_axis_partitions,
axis,
func,
Expand All @@ -74,7 +80,13 @@ def deploy_func_between_two_axis_partitions(
# get futures for each.
return [
client.submit(lambda l: l[i], axis_result, pure=False)
for i in range(num_splits)
for i in range(num_splits * 4)
]

def _wrap_partitions(self, partitions):
return [
self.partition_type(future, length, width, ip)
for (future, length, width, ip) in zip(*[iter(partitions)] * 4)
]


Expand All @@ -94,3 +106,26 @@ class PandasOnDaskFrameRowPartition(PandasOnDaskFrameAxisPartition):
"""

axis = 1


def deploy_dask_func(func, *args):
"""
Run a function on a remote partition.

Parameters
----------
func : callable
The function to run.

Returns
-------
The result of the function `func`.
"""
result = func(*args)
ip = get_ip()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
else:
return [i for r in result for i in [r, None, None, ip]]
13 changes: 9 additions & 4 deletions modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from modin.data_management.utils import length_fn_pandas, width_fn_pandas

from distributed.client import get_client
from distributed.utils import get_ip
import cloudpickle as pkl


Expand All @@ -25,7 +26,7 @@ def apply_list_of_funcs(funcs, df):
if isinstance(func, bytes):
func = pkl.loads(func)
df = func(df, **kwargs)
return df
return df, get_ip()


class PandasOnDaskFramePartition(BaseFramePartition):
Expand All @@ -40,13 +41,14 @@ class PandasOnDaskFramePartition(BaseFramePartition):
subclasses. There is no logic for updating inplace.
"""

def __init__(self, future, length=None, width=None, call_queue=None):
def __init__(self, future, length=None, width=None, ip=None, call_queue=None):
self.future = future
if call_queue is None:
call_queue = []
self.call_queue = call_queue
self._length_cache = length
self._width_cache = width
self.ip = ip

def get(self):
"""Flushes the call_queue and returns the data.
Expand Down Expand Up @@ -81,7 +83,8 @@ def apply(self, func, **kwargs):
future = get_client().submit(
apply_list_of_funcs, call_queue, self.future, pure=False
)
return PandasOnDaskFramePartition(future)
futures = [get_client().submit(lambda l: l[i], future) for i in range(2)]
return PandasOnDaskFramePartition(futures[0], ip=futures[1])

def add_to_apply_calls(self, func, **kwargs):
return PandasOnDaskFramePartition(
Expand All @@ -91,7 +94,9 @@ def add_to_apply_calls(self, func, **kwargs):
def drain_call_queue(self):
if len(self.call_queue) == 0:
return
self.future = self.apply(lambda x: x).future
new_partition = self.apply(lambda x: x)
self.future = new_partition.future
self.ip = new_partition.ip
self.call_queue = []

def mask(self, row_indices, col_indices):
Expand Down
38 changes: 24 additions & 14 deletions modin/engines/ray/pandas_on_ray/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
from .partition import PandasOnRayFramePartition

import ray
from ray.services import get_node_ip_address


class PandasOnRayFrameAxisPartition(PandasFrameAxisPartition):
def __init__(self, list_of_blocks):
def __init__(self, list_of_blocks, bind_ip=False):
# Unwrap from BaseFramePartition object for ease of use
for obj in list_of_blocks:
obj.drain_call_queue()
self.list_of_blocks = [obj.oid for obj in list_of_blocks]
if bind_ip:
self.list_of_ips = [obj.ip for obj in list_of_blocks]

partition_type = PandasOnRayFramePartition
instance_type = ray.ObjectID
Expand All @@ -44,7 +47,7 @@ def deploy_axis_func(
maintain_partitioning,
)
+ tuple(partitions),
num_returns=num_splits * 3 if lengths is None else len(lengths) * 3,
num_returns=num_splits * 4 if lengths is None else len(lengths) * 4,
)

@classmethod
Expand All @@ -62,13 +65,13 @@ def deploy_func_between_two_axis_partitions(
kwargs,
)
+ tuple(partitions),
num_returns=num_splits * 3,
num_returns=num_splits * 4,
)

def _wrap_partitions(self, partitions):
return [
self.partition_type(partitions[i], partitions[i + 1], partitions[i + 2])
for i in range(0, len(partitions), 3)
self.partition_type(object_id, length, width, ip)
for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4)
]


Expand All @@ -92,20 +95,27 @@ class PandasOnRayFrameRowPartition(PandasOnRayFrameAxisPartition):

@ray.remote
def deploy_ray_func(func, *args): # pragma: no cover
"""Run a function on a remote partition.

Note: Ray functions are not detected by codecov (thus pragma: no cover)
"""
Run a function on a remote partition.

Args:
func: The function to run.
Parameters
----------
func : callable
The function to run.

Returns:
Returns
-------
The result of the function `func`.

Notes
-----
Ray functions are not detected by codecov (thus pragma: no cover)
"""
result = func(*args)
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns)
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
return [i for r in result for i in [r, len(r), len(r.columns)]]
return [i for r in result for i in [r, len(r), len(r.columns), ip]]
else:
return [i for r in result for i in [r, None, None]]
return [i for r in result for i in [r, None, None, ip]]
Loading