Skip to content

Commit

Permalink
ENH: reduce groupby.apply overhead by internal DataFrame manipulation.
Browse files Browse the repository at this point in the history
…close #535
  • Loading branch information
wesm committed Dec 5, 2012
1 parent 1f41f01 commit 9e95ce2
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 34 deletions.
2 changes: 2 additions & 0 deletions RELEASE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pandas 0.10.0
- Add `line_terminator` option to DataFrame.to_csv (#2383)
- added implementation of str(x)/unicode(x)/bytes(x) to major pandas data
structures, which should do the right thing on both py2.x and py3.x. (#2224)
- Reduce groupby.apply overhead substantially by low-level manipulation of
internal NumPy arrays in DataFrames (#535)

**Bug fixes**

Expand Down
105 changes: 78 additions & 27 deletions pandas/core/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,40 +552,53 @@ def get_iterator(self, data, axis=0, keep_internal=True):
comp_ids, _, ngroups = self.group_info
splitter = get_splitter(data, comp_ids, ngroups, axis=axis,
keep_internal=keep_internal)
splitter = self._get_splitter(data, axis=axis,
keep_internal=keep_internal)
keys = self._get_group_keys()
for key, (i, group) in izip(keys, splitter):
yield key, group

def _get_splitter(self, data, axis=0, keep_internal=True):
comp_ids, _, ngroups = self.group_info
return get_splitter(data, comp_ids, ngroups, axis=axis,
keep_internal=keep_internal)


def _get_group_keys(self):
if len(self.groupings) == 1:
levels = self.groupings[0].group_index
for i, group in splitter:
yield levels[i], group
return self.levels[0]
else:
comp_ids, _, ngroups = self.group_info
# provide "flattened" iterator for multi-group setting
mapper = _KeyMapper(comp_ids, ngroups, self.labels, self.levels)
for i, group in splitter:
key = mapper.get_key(i)
yield key, group

def apply(self, f, data, axis=0):
result_keys = []
result_values = []
return [mapper.get_key(i) for i in range(ngroups)]

def apply(self, f, data, axis=0, keep_internal=False):
mutated = False
splitter = self.get_iterator(data, axis=axis)
splitter = self._get_splitter(data, axis=axis,
keep_internal=keep_internal)
group_keys = self._get_group_keys()

for key, group in splitter:
# oh boy
if hasattr(splitter, 'fast_apply') and axis == 0:
try:
values, mutated = splitter.fast_apply(f, group_keys)
return group_keys, values, mutated
except lib.InvalidApply:
pass

result_values = []
for key, (i, group) in izip(group_keys, splitter):
object.__setattr__(group, 'name', key)

# group might be modified
group_axes = _get_axes(group)

res = f(group)

if not _is_indexed_like(res, group_axes):
mutated = True

result_keys.append(key)
result_values.append(res)

return result_keys, result_values, mutated
return group_keys, result_values, mutated

@cache_readonly
def indices(self):
Expand Down Expand Up @@ -685,7 +698,7 @@ def get_group_levels(self):
recons_labels = decons_group_index(obs_ids, self.shape)

name_list = []
for ping, labels in zip(self.groupings, recons_labels):
for ping, labels in izip(self.groupings, recons_labels):
labels = com._ensure_platform_int(labels)
name_list.append(ping.group_index.take(labels))

Expand Down Expand Up @@ -933,13 +946,31 @@ def get_iterator(self, data, axis=0):
raise NotImplementedError

start = 0
for edge, label in zip(self.bins, self.binlabels):
for edge, label in izip(self.bins, self.binlabels):
yield label, data[start:edge]
start = edge

if edge < len(data):
yield self.binlabels[-1], data[edge:]

def apply(self, f, data, axis=0, keep_internal=False):
result_keys = []
result_values = []
mutated = False
for key, group in self.get_iterator(data, axis=axis):
object.__setattr__(group, 'name', key)

# group might be modified
group_axes = _get_axes(group)
res = f(group)
if not _is_indexed_like(res, group_axes):
mutated = True

result_keys.append(key)
result_values.append(res)

return result_keys, result_values, mutated

@cache_readonly
def ngroups(self):
return len(self.binlabels)
Expand Down Expand Up @@ -1190,7 +1221,7 @@ def _get_grouper(obj, key=None, axis=0, level=None, sort=True):

groupings = []
exclusions = []
for i, (gpr, level) in enumerate(zip(keys, levels)):
for i, (gpr, level) in enumerate(izip(keys, levels)):
name = None
try:
obj._data.items.get_loc(gpr)
Expand Down Expand Up @@ -2007,11 +2038,18 @@ def __init__(self, data, labels, ngroups, axis=0, keep_internal=False):
self.labels = com._ensure_int64(labels)
self.ngroups = ngroups

self.sort_idx = _algos.groupsort_indexer(self.labels,
self.ngroups)[0]
self.slabels = com.ndtake(self.labels, self.sort_idx)
self.axis = axis

@cache_readonly
def slabels(self):
# Sorted labels
return com.ndtake(self.labels, self.sort_idx)

@cache_readonly
def sort_idx(self):
# Counting sort indexer
return _algos.groupsort_indexer(self.labels, self.ngroups)[0]

def __iter__(self):
sdata = self._get_sorted_data()

Expand All @@ -2020,7 +2058,7 @@ def __iter__(self):

starts, ends = lib.generate_slices(self.slabels, self.ngroups)

for i, (start, end) in enumerate(zip(starts, ends)):
for i, (start, end) in enumerate(izip(starts, ends)):
# Since I'm now compressing the group ids, it's now not "possible"
# to produce empty slices because such groups would not be observed
# in the data
Expand Down Expand Up @@ -2054,6 +2092,19 @@ def __init__(self, data, labels, ngroups, axis=0, keep_internal=False):
DataSplitter.__init__(self, data, labels, ngroups, axis=axis,
keep_internal=keep_internal)

def fast_apply(self, f, names):
# must return keys::list, values::list, mutated::bool
try:
starts, ends = lib.generate_slices(self.slabels, self.ngroups)
except:
# fails when all -1
return [], True

sdata = self._get_sorted_data()
results, mutated = lib.apply_frame_axis0(sdata, f, names, starts, ends)

return results, mutated

def _chop(self, sdata, slice_obj):
if self.axis == 0:
return sdata[slice_obj]
Expand Down Expand Up @@ -2169,7 +2220,7 @@ def _lexsort_indexer(keys, orders=None):
elif orders is None:
orders = [True] * len(keys)

for key, order in zip(keys, orders):
for key, order in izip(keys, orders):
rizer = _hash.Factorizer(len(key))

if not key.dtype == np.object_:
Expand Down Expand Up @@ -2203,12 +2254,12 @@ def __init__(self, comp_ids, ngroups, labels, levels):
self._populate_tables()

def _populate_tables(self):
for labs, table in zip(self.labels, self.tables):
for labs, table in izip(self.labels, self.tables):
table.map(self.comp_ids, labs.astype(np.int64))

def get_key(self, comp_id):
return tuple(level[table.get_item(comp_id)]
for table, level in zip(self.tables, self.levels))
for table, level in izip(self.tables, self.levels))



Expand Down
4 changes: 4 additions & 0 deletions pandas/src/numpy_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ void set_array_owndata(PyArrayObject *ao) {
ao->flags |= NPY_OWNDATA;
}

void set_array_not_contiguous(PyArrayObject *ao) {
ao->flags &= ~(NPY_C_CONTIGUOUS | NPY_F_CONTIGUOUS);
}


// PANDAS_INLINE PyObject*
// get_base_ndarray(PyObject* ap) {
Expand Down
137 changes: 132 additions & 5 deletions pandas/src/reduce.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ cdef class SeriesBinGrouper:
raise
finally:
# so we don't free the wrong memory
islider.cleanup()
vslider.cleanup()
islider.reset()
vslider.reset()

if result.dtype == np.object_:
result = maybe_convert_objects(result)
Expand Down Expand Up @@ -313,8 +313,8 @@ cdef class SeriesGrouper:
raise
finally:
# so we don't free the wrong memory
islider.cleanup()
vslider.cleanup()
islider.reset()
vslider.reset()

if result.dtype == np.object_:
result = maybe_convert_objects(result)
Expand Down Expand Up @@ -361,15 +361,142 @@ cdef class Slider:
cpdef advance(self, Py_ssize_t k):
self.buf.data = <char*> self.buf.data + self.stride * k

cdef move(self, int start, int end):
'''
For slicing
'''
self.buf.data = self.values.data + self.stride * start
self.buf.shape[0] = end - start

cpdef set_length(self, Py_ssize_t length):
self.buf.shape[0] = length

cpdef cleanup(self):
cpdef reset(self):
self.buf.shape[0] = self.orig_len
self.buf.data = self.orig_data
self.buf.strides[0] = self.orig_stride


class InvalidApply(Exception):
pass

def apply_frame_axis0(object frame, object f, object names,
ndarray[int64_t] starts, ndarray[int64_t] ends):
cdef:
BlockSlider slider
Py_ssize_t i, n = len(starts)
list results
object piece
dict item_cache

if frame.index._has_complex_internals:
raise InvalidApply('Cannot modify frame index internals')


results = []

# Need to infer if our low-level mucking is going to cause a segfault
if n > 0:
chunk = frame[starts[0]:ends[0]]
shape_before = chunk.shape
try:
result = f(chunk)
if result is chunk:
raise InvalidApply('Function unsafe for fast apply')
except:
raise InvalidApply('Let this error raise above us')

slider = BlockSlider(frame)

mutated = False
item_cache = slider.dummy._item_cache
gin = slider.dummy.index._engine # f7u12
try:
for i in range(n):
slider.move(starts[i], ends[i])

item_cache.clear() # ugh
gin.clear_mapping()

setattr(slider.dummy, 'name', names[i])
piece = f(slider.dummy)

# I'm paying the price for index-sharing, ugh
try:
if piece.index is slider.dummy.index:
piece.index = piece.index.copy()
else:
mutated = True
except AttributeError:
pass
results.append(piece)
finally:
slider.reset()

return results, mutated

cdef class BlockSlider:
'''
Only capable of sliding on axis=0
'''

cdef public:
object frame, dummy
int nblocks
Slider idx_slider
list blocks

cdef:
char **base_ptrs

def __init__(self, frame):
self.frame = frame
self.dummy = frame[:0]

self.blocks = [b.values for b in self.dummy._data.blocks]

for x in self.blocks:
util.set_array_not_contiguous(x)

self.nblocks = len(self.blocks)
self.idx_slider = Slider(self.frame.index, self.dummy.index)

self.base_ptrs = <char**> malloc(sizeof(char*) * len(self.blocks))
for i, block in enumerate(self.blocks):
self.base_ptrs[i] = (<ndarray> block).data

def __dealloc__(self):
free(self.base_ptrs)

cpdef move(self, int start, int end):
cdef:
ndarray arr

# move blocks
for i in range(self.nblocks):
arr = self.blocks[i]

# axis=1 is the frame's axis=0
arr.data = self.base_ptrs[i] + arr.strides[1] * start
arr.shape[1] = end - start

self.idx_slider.move(start, end)

cdef reset(self):
cdef:
ndarray arr

# move blocks
for i in range(self.nblocks):
arr = self.blocks[i]

# axis=1 is the frame's axis=0
arr.data = self.base_ptrs[i]
arr.shape[1] = 0

self.idx_slider.reset()


def reduce(arr, f, axis=0, dummy=None, labels=None):
if labels._has_complex_internals:
raise Exception('Cannot use shortcut')
Expand Down
1 change: 1 addition & 0 deletions pandas/src/util.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ cimport cpython

cdef extern from "numpy_helper.h":
inline void set_array_owndata(ndarray ao)
inline void set_array_not_contiguous(ndarray ao)

inline int is_integer_object(object)
inline int is_float_object(object)
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -2144,5 +2144,5 @@ def testit(label_list, shape):

if __name__ == '__main__':
import nose
nose.runmodule(argv=[__file__,'-vvs','-x','--pdb', '--pdb-failure'],
nose.runmodule(argv=[__file__,'-vvs','-x','--pdb', '--pdb-failure', '-s'],
exit=False)
Loading

0 comments on commit 9e95ce2

Please sign in to comment.