Skip to content

Commit

Permalink
Various fixes (ray-project#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn authored and kunalgosar committed Mar 20, 2018
1 parent fa840f7 commit a1bcbf4
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 22 deletions.
130 changes: 110 additions & 20 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas.compat import lzip
import pandas.core.common as com
from pandas.core.dtypes.common import (
is_bool_dtype,
is_numeric_dtype,
is_timedelta64_dtype)
from pandas.core.indexing import convert_to_index_sliceable
import warnings
import numpy as np
import ray
Expand Down Expand Up @@ -2874,12 +2876,72 @@ def __getitem__(self, key):
Returns:
A Pandas Series representing the value for the column.
"""
partition_id = self.get_col_partition(key)
index = self.get_col_index_within_partition(key)
res = ray.get(_deploy_func.remote(lambda df: df.__getitem__(index),
self._col_partitions[partition_id]))
res.name = key
return res
key = com._apply_if_callable(key, self)

# shortcut if we are an actual column
is_mi_columns = isinstance(self.columns, pd.MultiIndex)
try:
if key in self.columns and not is_mi_columns:
return self._getitem_column(key)
except:
pass

# see if we can slice the rows
indexer = convert_to_index_sliceable(self._row_index, key)
if indexer is not None:
raise NotImplementedError("To contribute to Pandas on Ray, please"
"visit github.com/ray-project/ray.")
# return self._getitem_slice(indexer)

if isinstance(key, (pd.Series, np.ndarray, pd.Index, list)):
return self._getitem_array(key)
elif isinstance(key, DataFrame):
raise NotImplementedError("To contribute to Pandas on Ray, please"
"visit github.com/ray-project/ray.")
# return self._getitem_frame(key)
elif is_mi_columns:
raise NotImplementedError("To contribute to Pandas on Ray, please"
"visit github.com/ray-project/ray.")
# return self._getitem_multilevel(key)
else:
return self._getitem_column(key)

def _getitem_column(self, key):
partition = self._get_col_locations(key).loc['partition']
result = ray.get(self._getitem_indiv_col(key, partition))
result.name = key
result.index = self.index
return result

def _getitem_array(self, array_key):
partitions = \
self._get_col_locations(array_key)['partition'].unique()

new_col_parts = [self._getitem_indiv_col(array_key, part)
for part in partitions]

# Pandas doesn't allow Index.get_loc for lists, so we have to do this.
isin = self.columns.isin(array_key)
indices_for_rows = [i for i in range(len(isin)) if isin[i]]

new_row_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
part) for part in self._row_partitions]

return DataFrame(col_partitions=new_col_parts,
row_partitions=new_row_parts,
columns=array_key,
index=self.index)

def _getitem_indiv_col(self, key, partition):
loc = self._col_index.loc[key]
if isinstance(loc, pd.Series):
index = loc[loc['partition'] == partition]
else:
index = loc[loc['partition'] == partition]['index_within_partition']
return _deploy_func.remote(
lambda df: df.__getitem__(index),
self._col_partitions[partition])

def __setitem__(self, key, value):
raise NotImplementedError(
Expand Down Expand Up @@ -2981,9 +3043,7 @@ def __delitem__(self, key):
key: key to delete
"""
# Create helper method for deleting column(s) in row partition.
to_delete = self.columns.get_loc(key)

def del_helper(df):
def del_helper(df, to_delete):
cols = df.columns[to_delete] # either int or an array of ints

if isinstance(cols, int):
Expand All @@ -2992,23 +3052,41 @@ def del_helper(df):
for col in cols:
df.__delitem__(col)

# Reset the column index to conserve space
df.columns = pd.RangeIndex(0, len(df.columns))
return df

to_delete = self.columns.get_loc(key)
self._row_partitions = _map_partitions(
del_helper, self._row_partitions)
del_helper, self._row_partitions, to_delete)

# This structure is used to get the correct index inside the partition.
del_df = self._col_index.loc[key]

# We need to standardize between multiple and single occurrences in the
# columns. Putting single occurrences in a pd.DataFrame and transposing
# results in the same structure as multiple with 'loc'.
if isinstance(del_df, pd.Series):
del_df = pd.DataFrame(del_df).T

# Cast cols as pd.Series as duplicate columns mean result may be
# np.int64 or pd.Series
col_parts_to_del = pd.Series(
self._col_index.loc[key, 'partition']).unique()
self._col_index = self._col_index.drop(key)
self._col_index.drop(key, inplace=True)
for i in col_parts_to_del:
# Compute the correct index inside the partition to delete.
to_delete_in_partition = \
del_df[del_df['partition'] == i]['index_within_partition']

self._col_partitions[i] = _deploy_func.remote(
del_helper, self._col_partitions[i])
del_helper, self._col_partitions[i], to_delete_in_partition)

partition_mask = (self._col_index['partition'] == i)

# Since we are replacing columns with RangeIndex inside the
# partition, we have to make sure that our reference to it is
# updated as well.
try:
self._col_index.loc[partition_mask,
'index_within_partition'] = [
Expand Down Expand Up @@ -3258,14 +3336,26 @@ def iloc(self):
from .indexing import _iLoc_Indexer
return _iLoc_Indexer(self)

def get_col_partition(self, col):
return self._col_index['partition'][col]
def _get_col_locations(self, col):
"""Gets the location(s) from the column index DataFrame.
def get_col_index_within_partition(self, col):
return self._col_index['index_within_partition'][col]
Args:
col: The column name.
def get_row_partition(self, row):
return self._row_index['partition'][row]
Returns:
The index(es) of _col_partitions and the local index(es) where
columns with this name exist.
"""
return self._col_index.loc[col]

def _get_row_locations(self, row):
"""Gets the location(s) from the row index DataFrame.
Args:
row: The index name.
def get_row_index_within_partition(self, row):
return self._row_index['index_within_partition'][row]
Returns:
The index(es) of _row_partitions and the local index(es) where rows
with this name exist.
"""
return self._row_index.loc[row]
10 changes: 8 additions & 2 deletions python/ray/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ def _rebuild_rows(col_partitions, index, columns):
"""
n_rows = min(max(get_npartitions(), len(col_partitions)), len(index))
partition_assignments = assign_partitions.remote(index, n_rows)
shufflers = [ShuffleActor.remote(x, partition_axis=1, shuffle_axis=0)
for x in col_partitions]
shufflers = [ShuffleActor.remote(
col_partitions[i] if i < len(col_partitions) else pd.DataFrame(),
partition_axis=1,
shuffle_axis=0)
for i in range(n_rows)]

shufflers_done = \
[shufflers[i].shuffle.remote(
Expand Down Expand Up @@ -291,6 +294,9 @@ def _map_partitions(func, partitions, *argslists):
assert(callable(func))
if argslists is None:
return [_deploy_func.remote(func, part) for part in partitions]
elif len(argslists) == 1:
return [_deploy_func.remote(func, part, argslists[0])
for part in partitions]
else:
assert(all([len(args) == len(partitions) for args in argslists]))
return [_deploy_func.remote(func, part, *args) for part, *args in zip(partitions, *argslists)]
Expand Down

0 comments on commit a1bcbf4

Please sign in to comment.