Skip to content

Commit

Permalink
apacheGH-33976: [Python] Remove usage of TableGroupBy helper in favor…
Browse files Browse the repository at this point in the history
… of pyarrow.acero.Declaration (apache#34769)

### Rationale for this change

Now we have the pyarrow.acero building blocks (apacheGH-33976), we can easily construct the Declaration, which `arrow::compute::TableGroupBy` creates under the hood, ourselves in pyarrow. 

### Are these changes tested?

Existing tests are passing.

### Are there any user-facing changes?

No

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
jorisvandenbossche authored Jun 1, 2023
1 parent c1359c5 commit 0bb2d83
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 185 deletions.
3 changes: 1 addition & 2 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ macro(append_acero_avx2_src SRC)
endmacro()

set(ARROW_ACERO_SRCS
groupby.cc
accumulation_queue.cc
aggregate_node.cc
asof_join_node.cc
Expand Down Expand Up @@ -166,7 +165,7 @@ add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
add_arrow_acero_test(groupby_test SOURCES groupby_test.cc)
add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc)
add_arrow_acero_test(util_test SOURCES util_test.cc task_util_test.cc)
add_arrow_acero_test(hash_aggregate_test SOURCES hash_aggregate_test.cc)

Expand Down
15 changes: 14 additions & 1 deletion cpp/src/arrow/acero/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

#include <vector>

#include "arrow/acero/groupby.h"
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/array/array_primitive.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
Expand Down Expand Up @@ -347,6 +348,18 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(
return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays));
}

Result<std::shared_ptr<Table>> BatchGroupBy(
std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, bool use_threads = false,
MemoryPool* memory_pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(batch)}));
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/acero/groupby.h"
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include <memory>

#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"

namespace arrow {

namespace acero {

Result<std::shared_ptr<Table>> TableGroupBy(
std::shared_ptr<Table> table, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, bool use_threads = false,
MemoryPool* memory_pool = default_memory_pool()) {
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

TEST(GroupByConvenienceFunc, Basic) {
std::shared_ptr<Schema> in_schema =
schema({field("key1", utf8()), field("key2", int32()), field("value", int32())});
Expand Down
61 changes: 0 additions & 61 deletions cpp/src/arrow/acero/groupby.cc

This file was deleted.

65 changes: 0 additions & 65 deletions cpp/src/arrow/acero/groupby.h

This file was deleted.

32 changes: 0 additions & 32 deletions python/pyarrow/_acero.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -527,35 +527,3 @@ cdef class Declaration(_Weakrefable):
GetResultValue(DeclarationToReader(self.unwrap(), use_threads)).release()
)
return reader


def _group_by(table, aggregates, keys):
cdef:
shared_ptr[CTable] c_table
vector[CAggregate] c_aggregates
vector[CFieldRef] c_keys
CAggregate c_aggr

c_table = (<Table> table).sp_table

for aggr_arg_indices, aggr_func_name, aggr_opts, aggr_name in aggregates:
c_aggr.function = tobytes(aggr_func_name)
if aggr_opts is not None:
c_aggr.options = (<FunctionOptions?>aggr_opts).wrapped
else:
c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr
for field_idx in aggr_arg_indices:
c_aggr.target.push_back(CFieldRef(<int> field_idx))

c_aggr.name = tobytes(aggr_name)
c_aggregates.push_back(move(c_aggr))

for key_idx in keys:
c_keys.push_back(CFieldRef(<int> key_idx))

with nogil:
sp_table = GetResultValue(
CTableGroupBy(c_table, c_aggregates, c_keys)
)

return pyarrow_wrap_table(sp_table)
10 changes: 9 additions & 1 deletion python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from pyarrow.compute import Expression, field

from pyarrow._acero import ( # noqa
_group_by,
Declaration,
ExecNodeOptions,
TableSourceNodeOptions,
Expand Down Expand Up @@ -292,3 +291,12 @@ def _sort_source(table_or_dataset, sort_keys, output_type=Table, **kwargs):
return ds.InMemoryDataset(result_table)
else:
raise TypeError("Unsupported output type")


def _group_by(table, aggregates, keys):

decl = Declaration.from_sequence([
Declaration("table_source", TableSourceNodeOptions(table)),
Declaration("aggregate", AggregateNodeOptions(aggregates, keys=keys))
])
return decl.to_table(use_threads=True)
6 changes: 6 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2346,6 +2346,12 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
CIndexOptions(shared_ptr[CScalar] value)
shared_ptr[CScalar] value

cdef cppclass CAggregate "arrow::compute::Aggregate":
c_string function
shared_ptr[CFunctionOptions] options
vector[CFieldRef] target
c_string name

cdef enum CMapLookupOccurrence \
"arrow::compute::MapLookupOptions::Occurrence":
CMapLookupOccurrence_ALL "arrow::compute::MapLookupOptions::ALL"
Expand Down
14 changes: 0 additions & 14 deletions python/pyarrow/includes/libarrow_acero.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@ from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *


cdef extern from "arrow/acero/groupby.h" namespace \
"arrow::acero" nogil:
cdef cppclass CAggregate "arrow::compute::Aggregate":
c_string function
shared_ptr[CFunctionOptions] options
vector[CFieldRef] target
c_string name

CResult[shared_ptr[CTable]] CTableGroupBy "arrow::acero::TableGroupBy"(
shared_ptr[CTable] table,
vector[CAggregate] aggregates,
vector[CFieldRef] keys)


cdef extern from "arrow/acero/options.h" namespace "arrow::acero" nogil:
cdef enum CJoinType "arrow::acero::JoinType":
CJoinType_LEFT_SEMI "arrow::acero::JoinType::LEFT_SEMI"
Expand Down
11 changes: 3 additions & 8 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -5430,11 +5430,6 @@ list[tuple(str, str, FunctionOptions)]
aggr_name = func_nohash
else:
aggr_name = "_".join(target) + "_" + func_nohash
# Calculate target indices by resolving field names
target_indices = [
self._table.schema.get_field_index(f) for f in target]
group_by_aggrs.append((target_indices, func, opt, aggr_name))

key_indices = [
self._table.schema.get_field_index(k) for k in self.keys]
return _pac()._group_by(self._table, group_by_aggrs, key_indices)
group_by_aggrs.append((target, func, opt, aggr_name))

return _pac()._group_by(self._table, group_by_aggrs, self.keys)

0 comments on commit 0bb2d83

Please sign in to comment.