Skip to content

Commit

Permalink
Merge pull request ray-project#7 from osalpekar/private_repr
Browse files Browse the repository at this point in the history
Repr_fix with new index scheme
  • Loading branch information
devin-petersohn authored Apr 11, 2018
2 parents 202f968 + 6d13805 commit ed4a075
Show file tree
Hide file tree
Showing 5 changed files with 702 additions and 78 deletions.
35 changes: 0 additions & 35 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,46 +116,11 @@ install:
script:
- export PATH="$HOME/miniconda/bin:$PATH"

- python python/ray/common/test/test.py
- python python/ray/common/redis_module/runtest.py
- python python/ray/plasma/test/test.py
- python python/ray/local_scheduler/test/test.py
- python python/ray/global_scheduler/test/test.py

- python -m pytest test/xray_test.py

- python test/runtest.py
- python test/array_test.py
- python test/actor_test.py
- python test/autoscaler_test.py
- python test/tensorflow_test.py
- python test/failure_test.py
- python test/microbenchmarks.py
- python test/stress_tests.py
- python test/component_failures_test.py
- python test/multi_node_test.py
- python test/recursion_test.py
- python test/monitor_test.py
- python test/cython_test.py
- python test/credis_test.py

# ray dataframe tests
- python -m pytest python/ray/dataframe/test/test_dataframe.py
- python -m pytest python/ray/dataframe/test/test_series.py
- python -m pytest python/ray/dataframe/test/test_concat.py

# ray tune tests
- python python/ray/tune/test/dependency_test.py
- python -m pytest python/ray/tune/test/trial_runner_test.py
- python -m pytest python/ray/tune/test/trial_scheduler_test.py
- python -m pytest python/ray/tune/test/tune_server_test.py

# ray rllib tests
- python -m pytest python/ray/rllib/test/test_catalog.py
- python -m pytest python/ray/rllib/test/test_filters.py
- python -m pytest python/ray/rllib/test/test_optimizers.py
- python -m pytest python/ray/rllib/test/test_evaluators.py

deploy:
- provider: s3
access_key_id: AKIAJ2L7XDUSZVTXI5QA
Expand Down
203 changes: 176 additions & 27 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import numpy as np
import ray
import itertools
import io
import sys
import re

from .utils import (
_deploy_func,
Expand Down Expand Up @@ -83,6 +86,7 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
axis = 0
columns = pd_df.columns
index = pd_df.index
self._row_metadata = self._col_metadata = None
else:
# created this invariant to make sure we never have to go into the
# partitions to get the columns
Expand Down Expand Up @@ -157,13 +161,16 @@ def _set_col_partitions(self, new_col_partitions):
def __str__(self):
return repr(self)

def __repr__(self):
if len(self._row_metadata) < 60:
result = repr(to_pandas(self))
return result
def _repr_helper_(self):
if len(self._row_metadata) <= 60 and \
len(self._col_metadata) <= 20:
return to_pandas(self)

def head(df, n):
def head(df, n, get_local_head=False):
"""Compute the head for this without creating a new DataFrame"""
if get_local_head:
return df.head(n)

new_dfs = _map_partitions(lambda df: df.head(n),
df)

Expand All @@ -173,8 +180,10 @@ def head(df, n):
pd_head.columns = self.columns
return pd_head

def tail(df, n):
def tail(df, n, get_local_tail=False):
"""Compute the tail for this without creating a new DataFrame"""
if get_local_tail:
return df.tail(n)

new_dfs = _map_partitions(lambda df: df.tail(n),
df)
Expand All @@ -185,25 +194,91 @@ def tail(df, n):
pd_tail.columns = self.columns
return pd_tail

def front(df, n):
"""Get first n columns without creating a new Dataframe"""

cum_col_lengths = self._col_metadata._lengths.cumsum()
index = np.argmax(cum_col_lengths >= 10)
pd_front = pd.concat(ray.get(x[:index+1]), axis=1, copy=False)
pd_front = pd_front.iloc[:, :n]
pd_front.index = self.index
pd_front.columns = self.columns[:n]
return pd_front

def back(df, n):
"""Get last n columns without creating a new Dataframe"""

cum_col_lengths = np.flip(self._col_metadata._lengths,
axis=0).cumsum()
index = np.argmax(cum_col_lengths >= 10)
pd_back = pd.concat(ray.get(x[-(index+1):]), axis=1, copy=False)
pd_back = pd_back.iloc[:, -n:]
pd_back.index = self.index
pd_back.columns = self.columns[-n:]
return pd_back

x = self._col_partitions
head = head(x, 30)
tail = tail(x, 30)
get_local_head = False

# Get first and last 10 columns if there are more than 20 columns
if len(self._col_metadata) >= 20:
get_local_head = True
front = front(x, 10)
back = back(x, 10)

col_dots = pd.Series(["..."
for _ in range(len(self.index))])
col_dots.index = self.index
col_dots.name = "..."
x = pd.concat([front, col_dots, back], axis=1)

# If less than 60 rows, x is already in the correct format.
if len(self._row_metadata) < 60:
return x

head = head(x, 30, get_local_head)
tail = tail(x, 30, get_local_head)

# Make the dots in between the head and tail
dots = pd.Series(["..."
for _ in range(self._block_partitions.shape[1])])
dots.index = head.columns
dots.name = "..."
row_dots = pd.Series(["..."
for _ in range(len(head.columns))])
row_dots.index = head.columns
row_dots.name = "..."

# We have to do it this way or convert dots to a dataframe and
# transpose. This seems better.
result = head.append(dots).append(tail)
result = head.append(row_dots).append(tail)
return result

def __repr__(self):
# We use pandas repr so that we match them.
if len(self._row_metadata) <= 60 and \
len(self._col_metadata) <= 20:
return repr(self._repr_helper_())
# The split here is so that we don't repr pandas row lengths.
return repr(result).split("\n\n")[0] + \
"\n\n[{0} rows X {1} columns]".format(len(self.index),
result = self._repr_helper_()
final_result = repr(result).rsplit("\n\n", maxsplit=1)[0] + \
"\n\n[{0} rows x {1} columns]".format(len(self.index),
len(self.columns))
return final_result

def _repr_html_(self):
"""repr function for rendering in Jupyter Notebooks like Pandas
Dataframes.
Returns:
The HTML representation of a Dataframe.
"""
# We use pandas _repr_html_ to get a string of the HTML representation
# of the dataframe.
if len(self._row_metadata) <= 60 and \
len(self._col_metadata) <= 20:
return self._repr_helper_()._repr_html_()
# We split so that we insert our correct dataframe dimensions.
result = self._repr_helper_()._repr_html_()
return result.split('<p>')[0] + \
'<p>{0} rows × {1} columns</p>\n</div>'.format(len(self.index),
len(self.columns))

def _get_index(self):
"""Get the index for this DataFrame.
Expand Down Expand Up @@ -257,9 +332,12 @@ def _arithmetic_helper(self, remote_func, axis, level=None):
# We use the index to get the internal index.
oid_series = [(oid_series[i], i) for i in range(len(oid_series))]

for df, partition in oid_series:
this_partition = self._col_metadata.partition_series(partition)
df.index = this_partition[this_partition.isin(df.index)].index
if len(oid_series) > 1:
for df, partition in oid_series:
this_partition = \
self._col_metadata.partition_series(partition)
df.index = \
this_partition[this_partition.isin(df.index)].index

result_series = pd.concat([obj[0] for obj in oid_series],
axis=0, copy=False)
Expand Down Expand Up @@ -1513,9 +1591,74 @@ def infer_objects(self):

def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
null_counts=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

def info_helper(df):
output_buffer = io.StringIO()
df.info(verbose=verbose,
buf=output_buffer,
max_cols=max_cols,
memory_usage=memory_usage,
null_counts=null_counts)
return output_buffer.getvalue()

# Combine the per-partition info and split into lines
result = ''.join(ray.get(_map_partitions(info_helper,
self._col_partitions)))
lines = result.split('\n')

# Class denoted in info() output
class_string = '<class \'ray.dataframe.dataframe.DataFrame\'>\n'

# Create the Index info() string by parsing self.index
index_string = self.index.summary() + '\n'

# A column header is needed in the inf() output
col_header = 'Data columns (total {0} columns):\n'.format(
len(self.columns))

# Parse the per-partition values to get the per-column details
# Find all the lines in the output that start with integers
prog = re.compile('^[0-9]+.+')
col_lines = [prog.match(line) for line in lines]
cols = [c.group(0) for c in col_lines if c is not None]
# replace the partition columns names with real column names
columns = ["{0}\t{1}\n".format(self.columns[i],
cols[i].split(" ", 1)[1])
for i in range(len(cols))]
col_string = ''.join(columns) + '\n'

# A summary of the dtypes in the dataframe
dtypes_string = "dtypes: "
for dtype, count in self.dtypes.value_counts().iteritems():
dtypes_string += "{0}({1}),".format(dtype, count)
dtypes_string = dtypes_string[:-1] + '\n'

# Compute the memory usage by summing per-partitions return values
# Parse lines for memory usage number
prog = re.compile('^memory+.+')
mems = [prog.match(line) for line in lines]
mem_vals = [float(re.search(r'\d+', m.group(0)).group())
for m in mems if m is not None]

memory_string = ""

if len(mem_vals) != 0:
# Sum memory usage from each partition
if memory_usage != 'deep':
memory_string = 'memory usage: {0}+ bytes'.format(
sum(mem_vals))
else:
memory_string = 'memory usage: {0} bytes'.format(sum(mem_vals))

# Combine all the components of the info() output
result = ''.join([class_string, index_string, col_header,
col_string, dtypes_string, memory_string])

# Write to specified output buffer
if buf:
buf.write(result)
else:
sys.stdout.write(result)

def insert(self, loc, column, value, allow_duplicates=False):
"""Insert column into DataFrame at specified location.
Expand Down Expand Up @@ -1551,9 +1694,6 @@ def insert_col_part(df):
df.insert(index_within_partition, column, value, allow_duplicates)
return df

print('partition:', partition)
print('i_w_partition', index_within_partition)
print('df:\n', ray.get(self._col_partitions[partition]))
new_obj = _deploy_func.remote(insert_col_part,
self._col_partitions[partition])
new_cols = [self._col_partitions[i]
Expand Down Expand Up @@ -1780,9 +1920,18 @@ def melt(self, id_vars=None, value_vars=None, var_name=None,
"github.com/ray-project/ray.")

def memory_usage(self, index=True, deep=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

def remote_func(df):
return df.memory_usage(index=False, deep=deep)

result = self._arithmetic_helper(remote_func, axis=0)

result.index = self.columns
if index:
index_value = self._row_metadata.index.memory_usage(deep=deep)
return pd.Series(index_value, index=['Index']).append(result)

return result

def merge(self, right, how='inner', on=None, left_on=None, right_on=None,
left_index=False, right_index=False, sort=False,
Expand Down
22 changes: 12 additions & 10 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,11 +1648,12 @@ def test_infer_objects():
ray_df.infer_objects()


def test_info():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.info()
@pytest.fixture
def test_info(ray_df):
info_string = ray_df.info()
assert '<class \'ray.dataframe.dataframe.DataFrame\'>\n' in info_string
info_string = ray_df.info(memory_usage=True)
assert 'memory_usage: ' in info_string


@pytest.fixture
Expand Down Expand Up @@ -1815,11 +1816,12 @@ def test_melt():
ray_df.melt()


def test_memory_usage():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.memory_usage()
@pytest.fixture
def test_memory_usage(ray_df):
assert type(ray_df.memory_usage()) is pd.core.series.Series
assert ray_df.memory_usage(index=True).at['Index'] is not None
assert ray_df.memory_usage(deep=True).sum() >= \
ray_df.memory_usage(deep=False).sum()


def test_merge():
Expand Down
Loading

0 comments on commit ed4a075

Please sign in to comment.