Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#2606: Support creating DataFrame from remote partitions #2613

Merged
merged 7 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ jobs:
run: python -m pytest modin/test/backends/pandas/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_npartitions.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_partition_api.py

test-defaults:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
Expand Down Expand Up @@ -285,7 +287,7 @@ jobs:
run: pytest modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py
- shell: bash -l {0}
run: bash <(curl -s https://codecov.io/bash)

test-asv-benchmarks:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ jobs:
run: python -m pytest modin/test/backends/pandas/test_internals.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_npartitions.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_partition_api.py

test-defaults:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions modin/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from .partition_api import unwrap_partitions
from .partition_api import unwrap_partitions, create_df_from_partitions

__all__ = ["unwrap_partitions"]
__all__ = ["unwrap_partitions", "create_df_from_partitions"]
85 changes: 81 additions & 4 deletions modin/api/partition_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import numpy as np

from modin.pandas.dataframe import DataFrame
from modin.backends.pandas.query_compiler import PandasQueryCompiler


def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
"""
Expand Down Expand Up @@ -47,15 +52,13 @@ def unwrap_partitions(api_layer_object, axis=None, bind_ip=False):
def _unwrap_partitions(oid):
if bind_ip:
return [
(partition.ip, getattr(partition, oid))
[(partition.ip, getattr(partition, oid)) for partition in row]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug in _unwrap_partitions? How is this change needed for create_df_from_partitions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we hold all partitions as 2D np.array the unwrap_partitions now returns 2D list instead of 1D list when passed axis is None. It is that what I described at the top of the PR. This change is needed for create_df_from_partitions to create df from 2D list of partitions when passed axis is None.

for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]
else:
return [
getattr(partition, oid)
[getattr(partition, oid) for partition in row]
for row in api_layer_object._query_compiler._modin_frame._partitions
for partition in row
]

actual_engine = type(
Expand All @@ -78,3 +81,77 @@ def _unwrap_partitions(oid):
part.coalesce(bind_ip=bind_ip).unwrap(squeeze=True, bind_ip=bind_ip)
for part in partitions
]


def create_df_from_partitions(partitions, axis):
YarShev marked this conversation as resolved.
Show resolved Hide resolved
"""
Create DataFrame from remote partitions.

Parameters
----------
partitions : list
List of Ray.ObjectRef/Dask.Future referencing to partitions in depend of the engine used.
Or list containing tuples of Ray.ObjectRef/Dask.Future referencing to ip addresses of partitions
and partitions itself in depend of the engine used.
axis : None, 0 or 1
The `axis` parameter is used to identify what are the partitions passed.
You have to set:
- `axis` to 0 if you want to create DataFrame from row partitions.
- `axis` to 1 if you want to create DataFrame from column partitions.
- `axis` to None if you want to create DataFrame from 2D list of partitions.

Returns
-------
DataFrame
DataFrame instance created from remote partitions.
"""
from modin.data_management.factories.dispatcher import EngineDispatcher

factory = EngineDispatcher.get_engine()

partition_class = factory.io_cls.frame_cls._frame_mgr_cls._partition_class
partition_frame_class = factory.io_cls.frame_cls
partition_mgr_class = factory.io_cls.frame_cls._frame_mgr_cls

# When collecting partitions to NumPy array they will be kept row-wise
if axis is None:
if isinstance(partitions[0][0], tuple):
parts = np.array(
[
[partition_class(partition, ip=ip) for ip, partition in row]
for row in partitions
]
)
else:
parts = np.array(
[
[partition_class(partition) for partition in row]
for row in partitions
]
)
# When collecting partitions to NumPy array they will be kept row-wise
elif axis == 0:
if isinstance(partitions[0], tuple):
parts = np.array(
[[partition_class(partition, ip=ip)] for ip, partition in partitions]
)
else:
parts = np.array([[partition_class(partition)] for partition in partitions])
# When collecting partitions to NumPy array they will be kept column-wise
YarShev marked this conversation as resolved.
Show resolved Hide resolved
elif axis == 1:
if isinstance(partitions[0], tuple):
parts = np.array(
[[partition_class(partition, ip=ip) for ip, partition in partitions]]
)
else:
parts = np.array([[partition_class(partition) for partition in partitions]])
else:
raise ValueError(
f"Got unacceptable value of axis {axis}. Possible values are {0}, {1} or {None}."
)

index = partition_mgr_class.get_indices(0, parts, lambda df: df.axes[0])
columns = partition_mgr_class.get_indices(1, parts, lambda df: df.axes[1])
return DataFrame(
query_compiler=PandasQueryCompiler(partition_frame_class(parts, index, columns))
)
98 changes: 98 additions & 0 deletions modin/test/test_partition_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import numpy as np
import pandas
import pytest

import modin.pandas as pd
from modin.api import unwrap_partitions, create_df_from_partitions
from modin.config import Engine, NPartitions
from modin.pandas.test.utils import df_equals


if Engine.get() == "Ray":
import ray
if Engine.get() == "Dask":
from distributed.client import get_client

NPartitions.put(4)


@pytest.mark.parametrize("axis", [None, 0, 1])
def test_unwrap_partitions(axis):
data = np.random.randint(0, 100, size=(2 ** 16, 2 ** 8))
df = pd.DataFrame(data)

if axis is None:
expected_partitions = df._query_compiler._modin_frame._partitions
actual_partitions = np.array(unwrap_partitions(df, axis=axis))
assert (
expected_partitions.shape[0] == actual_partitions.shape[0]
and expected_partitions.shape[1] == expected_partitions.shape[1]
)
for row_idx in range(expected_partitions.shape[0]):
for col_idx in range(expected_partitions.shape[1]):
if Engine.get() == "Ray":
assert (
expected_partitions[row_idx][col_idx].oid
== actual_partitions[row_idx][col_idx]
)
if Engine.get() == "Dask":
assert (
expected_partitions[row_idx][col_idx].future
== actual_partitions[row_idx][col_idx]
)
else:
expected_axis_partitions = (
df._query_compiler._modin_frame._frame_mgr_cls.axis_partition(
df._query_compiler._modin_frame._partitions, axis ^ 1
)
)
expected_axis_partitions = [
axis_partition.coalesce().unwrap(squeeze=True)
for axis_partition in expected_axis_partitions
]
actual_axis_partitions = unwrap_partitions(df, axis=axis)
assert len(expected_axis_partitions) == len(actual_axis_partitions)
for item_idx in range(len(expected_axis_partitions)):
if Engine.get() == "Ray":
df_equals(
ray.get(expected_axis_partitions[item_idx]),
ray.get(actual_axis_partitions[item_idx]),
)
if Engine.get() == "Dask":
df_equals(
expected_axis_partitions[item_idx].result(),
actual_axis_partitions[item_idx].result(),
)


@pytest.mark.parametrize("axis", [None, 0, 1])
def test_create_df_from_partitions(axis):
data = np.random.randint(0, 100, size=(2 ** 16, 2 ** 8))
df1, df2 = pandas.DataFrame(data), pandas.DataFrame(data)
expected_df = pandas.concat([df1, df2], axis=1 if axis is None else axis)
if Engine.get() == "Ray":
if axis is None:
futures = [[ray.put(df1), ray.put(df2)]]
else:
futures = [ray.put(df1), ray.put(df2)]
if Engine.get() == "Dask":
client = get_client()
if axis is None:
futures = [client.scatter([df1, df2], hash=False)]
else:
futures = client.scatter([df1, df2], hash=False)
actual_df = create_df_from_partitions(futures, axis)
df_equals(expected_df, actual_df)