Skip to content

Commit

Permalink
[feature](pipelineX) control exchange sink by memory usage (apache#28814
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Mryange authored Dec 25, 2023
1 parent d42fd68 commit e326ebb
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 7 deletions.
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,22 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_wait_channel_timer.resize(local_size);
auto deps_for_channels = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
for (auto channel : channels) {
auto deps_for_channels_mem_limit = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
for (auto* channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] = channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
_profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name);
auto local_recvr = channel->local_recvr();
deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency());
dep_id++;
}
}
_exchange_sink_dependency->add_child(deps_for_channels);
_exchange_sink_dependency->add_child(deps_for_channels_mem_limit);
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ class LocalExchangeChannelDependency final : public Dependency {
LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {}
~LocalExchangeChannelDependency() override = default;
// TODO(gabriel): blocked by memory
};

class LocalExchangeMemLimitDependency final : public Dependency {
ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency);
LocalExchangeMemLimitDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeMemLimitDependency", true, query_ctx) {}
~LocalExchangeMemLimitDependency() override = default;
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(), state->get_query_ctx());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
Expand Down
16 changes: 12 additions & 4 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ VDataStreamRecvr::VDataStreamRecvr(
_profile(profile),
_peak_memory_usage_counter(nullptr),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()),
_mem_available(std::make_shared<bool>(true)) {
_enable_pipeline(state->enable_pipeline_exec()) {
// DataStreamRecvr may be destructed after the instance execution thread ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
Expand Down Expand Up @@ -506,12 +505,21 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
auto val = _blocks_memory_usage_current_value.fetch_add(size);
if (val + size > config::exchg_node_buffer_size_bytes) {
*_mem_available = false;
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->block();
}
} else {
*_mem_available = true;
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->set_ready();
}
}
}

void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx) {
_exchange_sink_mem_limit_dependency =
pipeline::LocalExchangeMemLimitDependency::create_shared(id, node_id, query_ctx);
}

void VDataStreamRecvr::close() {
if (_is_closed) {
return;
Expand Down
9 changes: 8 additions & 1 deletion be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
Expand All @@ -61,6 +62,7 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class LocalExchangeChannelDependency;
class LocalExchangeMemLimitDependency;
class ExchangeLocalState;
} // namespace pipeline

Expand Down Expand Up @@ -130,6 +132,10 @@ class VDataStreamRecvr {
std::shared_ptr<pipeline::LocalExchangeChannelDependency> get_local_channel_dependency(
int sender_id);

void create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx);

auto get_mem_limit_dependency() { return _exchange_sink_mem_limit_dependency; }

private:
void update_blocks_memory_usage(int64_t size);
class PipSenderQueue;
Expand Down Expand Up @@ -189,7 +195,8 @@ class VDataStreamRecvr {
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_sender_to_local_channel_dependency;

std::shared_ptr<bool> _mem_available;
// use to limit sink write
std::shared_ptr<pipeline::LocalExchangeMemLimitDependency> _exchange_sink_mem_limit_dependency;
};

class ThreadClosure : public google::protobuf::Closure {
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ class Channel {

void set_receiver_eof(Status st) { _receiver_status = st; }

auto local_recvr() {
DCHECK(is_local());
return _local_recvr;
}

protected:
bool _recvr_is_valid() {
if (_local_recvr && !_local_recvr->is_closed()) {
Expand Down

0 comments on commit e326ebb

Please sign in to comment.