Skip to content

Commit

Permalink
auto pass through hashagg (#9167)
Browse files Browse the repository at this point in the history
ref #9196

Signed-off-by: guo-shaoge <shaoge1994@163.com>
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>

Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com>
  • Loading branch information
guo-shaoge and Lloyd-Pottiger authored Aug 7, 2024
1 parent 16fb1d9 commit 1289b43
Show file tree
Hide file tree
Showing 51 changed files with 2,982 additions and 119 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
2 changes: 1 addition & 1 deletion dbms/src/Common/HashTable/HashTableKeyHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* make a persistent copy of the key in each of the following cases:
* 1) the aggregation method doesn't use temporary keys, so they're persistent
* from the start;
* 1) the key is already present in the hash table;
* 2) the key is already present in the hash table;
* 3) that particular key is stored by value, e.g. a short StringRef key in
* StringHashMap.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/AutoPassThroughAggregatingBlockInputStream.h>

namespace DB
{

template <bool force_streaming>
Block AutoPassThroughAggregatingBlockInputStream<force_streaming>::readImpl()
{
while (!build_done)
{
Block block = children[0]->read();
if (block)
{
auto_pass_through_context->onBlock<force_streaming>(block);
}
else
{
build_done = true;
break;
}

if (auto res = auto_pass_through_context->tryGetDataInAdvance())
return res;
}

assert(build_done);

if (auto res = auto_pass_through_context->tryGetDataInAdvance())
return res;

return auto_pass_through_context->getDataFromHashTable();
}

} // namespace DB
60 changes: 60 additions & 0 deletions dbms/src/DataStreams/AutoPassThroughAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Aggregator.h>
#include <Operators/AutoPassThroughHashAggContext.h>

namespace DB
{
static constexpr std::string_view autoPassThroughAggregatingExtraInfo = "auto pass through";

template <bool force_streaming>
class AutoPassThroughAggregatingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "Aggregating";

public:
AutoPassThroughAggregatingBlockInputStream(
const BlockInputStreamPtr & input_,
const Aggregator::Params & params_,
const String & req_id,
UInt64 row_limit_unit)
{
children.push_back(input_);
auto_pass_through_context = std::make_unique<AutoPassThroughHashAggContext>(
children[0]->getHeader(),
params_,
[&]() { return this->isCancelled(); },
req_id,
row_limit_unit);
}

String getName() const override { return NAME; }

Block getHeader() const override { return auto_pass_through_context->getHeader(); }

protected:
Block readImpl() override;

private:
AutoPassThroughHashAggContextPtr auto_pass_through_context;
bool build_done = false;
};

template class AutoPassThroughAggregatingBlockInputStream<true>;
template class AutoPassThroughAggregatingBlockInputStream<false>;
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ExchangeSenderBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ExchangeSenderBlockInputStream : public IProfilingBlockInputStream
String getName() const override { return name; }
Block getHeader() const override { return children.back()->getHeader(); }

bool canHandleSelectiveBlock() const override { return true; }

protected:
Block readImpl() override;
void readPrefixImpl() override { writer->prepare(getHeader()); }
Expand Down
17 changes: 16 additions & 1 deletion dbms/src/DataStreams/ExpressionBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,22 @@ Block ExpressionBlockInputStream::readImpl()
Block res = children.back()->read();
if (!res)
return res;
expression->execute(res);

if (res.info.selective)
{
const auto ori_rows = res.rows();
auto ori_info = res.info;
expression->execute(res);
res.info = ori_info;
// When block.info.selective is not null, the expression action should be cast/project.
// So the rows should not change.
RUNTIME_CHECK(ori_rows == res.rows());
}
else
{
expression->execute(res);
}
return res;
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ExpressionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class ExpressionBlockInputStream : public IProfilingBlockInputStream
String getName() const override { return NAME; }
Block getHeader() const override;

bool canHandleSelectiveBlock() const override { return true; }

protected:
Block readImpl() override;

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ class IBlockInputStream : private boost::noncopyable

virtual void appendInfo(FmtBuffer & /*buffer*/) const {};

virtual bool canHandleSelectiveBlock() const { return false; }

void setParent(const IBlockInputStream * parent_) { parent = parent_; }
const IBlockInputStream * getParent() const { return parent; }

BlockInputStreams getChildren() { return children; }

protected:
virtual uint64_t collectCPUTimeNsImpl(bool /*is_thread_runner*/) { return 0; }

Expand Down Expand Up @@ -224,6 +231,9 @@ class IBlockInputStream : private boost::noncopyable
bool thread_cnt_collected = false;
bool cpu_time_ns_collected = false;

// To avoid cyclic references, shared_ptr was not used.
const IBlockInputStream * parent = nullptr;

private:
TableLockHolders table_locks;

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte

if (quota != nullptr)
checkQuota(res);

RUNTIME_CHECK_MSG(
!parent || parent->canHandleSelectiveBlock() || !res.info.selective,
"{} cannot handle selective block",
parent->getName());
}
else
{
Expand Down Expand Up @@ -228,7 +233,7 @@ static bool handleOverflowMode(OverflowMode mode, const String & message, int co
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
};
}


bool IProfilingBlockInputStream::checkTimeLimit() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ void ParallelAggregatingBlockInputStream::cancel(bool kill)
processor.cancel(kill);
}


Block ParallelAggregatingBlockInputStream::readImpl()
{
if (!executed)
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Debug/MockExecutor/AggregationBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ bool AggregationBinder::toTiPBExecutor(
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
auto * agg = tipb_executor->mutable_aggregation();
if (switcher)
agg->set_pre_agg_mode(switcher->mode);
buildAggExpr(agg, collator_id, context);
buildGroupBy(agg, collator_id, context);
auto * child_executor = agg->mutable_child();
Expand Down Expand Up @@ -87,6 +89,7 @@ void AggregationBinder::toMPPSubPlan(
// todo support avg
if (has_uniq_raw_res)
throw Exception("uniq raw res not supported in mpp query");
// Partial agg cannot be fine grained shuffle. So set fine_grained_shuffle_stream_count as 0.
std::shared_ptr<AggregationBinder> partial_agg = std::make_shared<AggregationBinder>(
executor_index,
output_schema_for_partial_agg,
Expand All @@ -95,7 +98,8 @@ void AggregationBinder::toMPPSubPlan(
std::move(agg_exprs),
std::move(gby_exprs),
false,
fine_grained_shuffle_stream_count);
/*fine_grained_shuffle_stream_count*/ 0,
switcher);
partial_agg->children.push_back(children[0]);
std::vector<size_t> partition_keys;
size_t agg_func_num = partial_agg->agg_exprs.size();
Expand Down Expand Up @@ -136,6 +140,8 @@ void AggregationBinder::toMPPSubPlan(
gby_exprs.push_back(std::make_shared<ASTIdentifier>(output_schema_for_partial_agg[agg_func_num + i].first));
}
children[0] = exchange_receiver;
// Because this aggregation is 2nd agg, so reset auto_pass_through flag.
switcher = nullptr;
}

bool AggregationBinder::needAppendProject() const
Expand Down Expand Up @@ -235,7 +241,8 @@ ExecutorBinderPtr compileAggregation(
size_t & executor_index,
ASTPtr agg_funcs,
ASTPtr group_by_exprs,
uint64_t fine_grained_shuffle_stream_count)
uint64_t fine_grained_shuffle_stream_count,
std::shared_ptr<AutoPassThroughSwitcher> switcher)
{
std::vector<ASTPtr> agg_exprs;
std::vector<ASTPtr> gby_exprs;
Expand Down Expand Up @@ -308,7 +315,8 @@ ExecutorBinderPtr compileAggregation(
std::move(agg_exprs),
std::move(gby_exprs),
true,
fine_grained_shuffle_stream_count);
fine_grained_shuffle_stream_count,
switcher);
aggregation->children.push_back(input);
return aggregation;
}
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Debug/MockExecutor/AggregationBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Debug/MockExecutor/ExecutorBinder.h>
#include <Operators/AutoPassThroughHashAggContext.h>
#include <Parsers/ASTFunction.h>

namespace DB::mock
Expand All @@ -33,14 +34,16 @@ class AggregationBinder : public ExecutorBinder
ASTs && agg_exprs_,
ASTs && gby_exprs_,
bool is_final_mode_,
uint64_t fine_grained_shuffle_stream_count_)
uint64_t fine_grained_shuffle_stream_count_,
std::shared_ptr<AutoPassThroughSwitcher> switcher_)
: ExecutorBinder(index_, "aggregation_" + std::to_string(index_), output_schema_)
, has_uniq_raw_res(has_uniq_raw_res_)
, need_append_project(need_append_project_)
, agg_exprs(std::move(agg_exprs_))
, gby_exprs(std::move(gby_exprs_))
, is_final_mode(is_final_mode_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
, switcher(switcher_)
{}

bool toTiPBExecutor(
Expand Down Expand Up @@ -73,6 +76,7 @@ class AggregationBinder : public ExecutorBinder
bool is_final_mode;
DAGSchema output_schema_for_partial_agg;
uint64_t fine_grained_shuffle_stream_count;
std::shared_ptr<AutoPassThroughSwitcher> switcher;

private:
void buildGroupBy(tipb::Aggregation * agg, int32_t collator_id, const Context & context) const;
Expand All @@ -85,6 +89,7 @@ ExecutorBinderPtr compileAggregation(
size_t & executor_index,
ASTPtr agg_funcs,
ASTPtr group_by_exprs,
uint64_t fine_grained_shuffle_stream_count = 0);
uint64_t fine_grained_shuffle_stream_count = 0,
std::shared_ptr<AutoPassThroughSwitcher> switcher = nullptr);

} // namespace DB::mock
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fi

static constexpr size_t maxFineGrainedStreamCount = 1024;

inline bool enableFineGrainedShuffle(uint64_t stream_count)
inline bool fineGrainedShuffleEnabled(uint64_t stream_count)
{
return stream_count > 0;
}
Expand All @@ -40,7 +40,7 @@ struct FineGrainedShuffle
, batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0)
{}

bool enable() const { return enableFineGrainedShuffle(stream_count); }
bool enabled() const { return fineGrainedShuffleEnabled(stream_count); }

const UInt64 stream_count;
const UInt64 batch_size;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
: exc_log(Logger::get(req_id, executor_id))
, rpc_context(std::move(rpc_context_))
, source_num(source_num_)
, enable_fine_grained_shuffle_flag(enableFineGrainedShuffle(fine_grained_shuffle_stream_count_))
, enable_fine_grained_shuffle_flag(fineGrainedShuffleEnabled(fine_grained_shuffle_stream_count_))
, output_stream_count(
enable_fine_grained_shuffle_flag ? std::min(max_streams_, fine_grained_shuffle_stream_count_) : max_streams_)
, max_buffer_size(getMaxBufferSize(source_num, settings.recv_queue_size))
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void Pipeline::addPlanNode(const PhysicalPlanNodePtr & plan_node)
{
assert(plan_node);
/// For fine grained mode, all plan node should enable fine grained shuffle.
if (!plan_node->getFineGrainedShuffle().enable())
if (!plan_node->getFineGrainedShuffle().enabled())
is_fine_grained_mode = false;
plan_nodes.push_back(plan_node);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void PhysicalPlan::build(const tipb::Executor * executor)
{
RUNTIME_CHECK(executor);
RUNTIME_CHECK(executor->has_executor_id());

const auto & executor_id = executor->executor_id();
switch (executor->tp())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/PhysicalPlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void PhysicalPlanNode::buildPipeline(

EventPtr PhysicalPlanNode::sinkComplete(PipelineExecutorContext & exec_context)
{
if (getFineGrainedShuffle().enable())
if (getFineGrainedShuffle().enabled())
return nullptr;
return doSinkComplete(exec_context);
}
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Flash/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,24 @@ Planner::Planner(Context & context_)
, log(Logger::get(dagContext().log ? dagContext().log->identifier() : ""))
{}

void recursiveSetBlockInputStreamParent(BlockInputStreamPtr self, const IBlockInputStream * parent)
{
if (self->getParent() != nullptr)
return;

for (auto & child : self->getChildren())
{
recursiveSetBlockInputStreamParent(child, self.get());
}
self->setParent(parent);
}

BlockInputStreamPtr Planner::execute()
{
DAGPipeline pipeline;
executeImpl(pipeline);
executeCreatingSets(pipeline, context, max_streams, log);
pipeline.transform([](auto & stream) { recursiveSetBlockInputStreamParent(stream, nullptr); });
return pipeline.firstStream();
}

Expand Down
Loading

0 comments on commit 1289b43

Please sign in to comment.