Skip to content

Commit

Permalink
Merge pull request #2701 from finos/sparse-tree-parallel
Browse files Browse the repository at this point in the history
Calculate `group_by` in parallel
  • Loading branch information
texodus authored Aug 4, 2024
2 parents 938c513 + b09659c commit 012c403
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions cpp/perspective/src/cpp/sparse_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,7 @@ t_stree::build_strand_table(
t_uindex npivotlike = metadata.m_npivotlike;
std::vector<const t_column*> piv_fcols(npivotlike);
std::vector<t_column*> piv_scols(npivotlike);

t_uindex insert_count = 0;

for (t_uindex pidx = 0; pidx < npivotlike; ++pidx) {
const std::string& piv = metadata.m_strand_schema.m_columns[pidx];
piv_fcols[pidx] = flattened.get_const_column(piv).get();
Expand All @@ -694,9 +692,7 @@ t_stree::build_strand_table(
t_uindex aggcolsize = metadata.m_aggschema.m_columns.size();
std::vector<const t_column*> agg_fcols(aggcolsize);
std::vector<t_column*> agg_acols(aggcolsize);

t_uindex strand_count_idx = 0;

for (t_uindex aggidx = 0; aggidx < aggcolsize; ++aggidx) {
const std::string& aggcol = metadata.m_aggschema.m_columns[aggidx];
if (aggcol == "psp_strand_count") {
Expand All @@ -710,46 +706,59 @@ t_stree::build_strand_table(
}

t_column* agg_scount = aggs->get_column("psp_strand_count").get();

t_column* spkey = strands->get_column("psp_pkey").get();

t_mask msk;

if (config.has_filters()) {
msk = filter_table_for_config(flattened, config);
}

bool has_filters = config.has_filters();
const t_uindex loop_end = flattened.size();
const t_uindex size = loop_end - msk.count();
const t_uindex ploop_end = metadata.m_pivot_like_columns.size();
parallel_for(int(aggcolsize + 1), [&](int aggidx) {
// This over-allocates for `OP_DELETE`, as it only accounts for
// filtered count.
if (aggidx > 0) {
if (aggidx - 1 != strand_count_idx) {
agg_acols[aggidx - 1]->reserve(size);
}
} else {
spkey->reserve(size);
for (t_uindex pidx = 0; pidx < ploop_end; ++pidx) {
piv_scols[pidx]->reserve(size);
}
}

for (t_uindex idx = 0, loop_end = flattened.size(); idx < loop_end; ++idx) {
bool filter = !has_filters || msk.get(idx);
t_tscalar pkey = pkey_col->get_scalar(idx);
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);
for (t_uindex idx = 0; idx < loop_end; ++idx) {
bool filter = !has_filters || msk.get(idx);
if (!filter) {
continue;
}

if (!filter || op == OP_DELETE) {
// nothing to do
continue;
}
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);
if (op == OP_DELETE) {
continue;
}

for (t_uindex pidx = 0,
ploop_end = metadata.m_pivot_like_columns.size();
pidx < ploop_end;
++pidx) {
piv_scols[pidx]->push_back(piv_fcols[pidx]->get_scalar(idx));
}
if (aggidx == 0) {
t_tscalar pkey = pkey_col->get_scalar(idx);
for (t_uindex pidx = 0; pidx < ploop_end; ++pidx) {
piv_scols[pidx]->push_back(piv_fcols[pidx]->get_scalar(idx)
);
}

for (t_uindex aggidx = 0; aggidx < aggcolsize; ++aggidx) {
if (aggidx != strand_count_idx) {
agg_acols[aggidx]->push_back(agg_fcols[aggidx]->get_scalar(idx)
agg_scount->push_back<std::int8_t>(1);
spkey->push_back(pkey);
++insert_count;
} else if (aggidx - 1 != strand_count_idx) {
agg_acols[aggidx - 1]->push_back(
agg_fcols[aggidx - 1]->get_scalar(idx)
);
}
}

agg_scount->push_back<std::int8_t>(1);
spkey->push_back(pkey);
++insert_count;
}
});

strands->reserve(insert_count);
strands->set_size(insert_count);
Expand Down

0 comments on commit 012c403

Please sign in to comment.