Skip to content

Commit

Permalink
[refactor](profile&names) using dst_id in pipelinex profile to be sam…
Browse files Browse the repository at this point in the history
…e as non pipeline; rename some function names (#28626)

Co-authored-by: yiguolei <yiguolei@gmail.com>
  • Loading branch information
yiguolei and Doris-Extras authored Dec 19, 2023
1 parent b99ac97 commit c72191e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {

_sink_buffer->set_query_statistics(_sink->query_statistics());
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
_sink->register_pipeline_channels(_sink_buffer.get());
return Status::OK();
}

Expand Down Expand Up @@ -249,7 +249,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
std::string ExchangeSinkLocalState::name_suffix() {
std::string name = " (id=" + std::to_string(_parent->node_id());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
name += ",dest_id=" + std::to_string(p._dest_node_id);
name += ",dst_id=" + std::to_string(p._dest_node_id);
name += ")";
return name;
}
Expand Down Expand Up @@ -450,7 +450,8 @@ Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
void ExchangeSinkLocalState::register_channels(
pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer) {
for (auto channel : channels) {
((vectorized::PipChannel<ExchangeSinkLocalState>*)channel)->registe(buffer);
((vectorized::PipChannel<ExchangeSinkLocalState>*)channel)
->register_exchange_buffer(buffer);
}
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,10 @@ Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** hol
return Status::OK();
}

void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer) {
void VDataStreamSender::register_pipeline_channels(
pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer) {
for (auto channel : _channels) {
((PipChannel<VDataStreamSender>*)channel)->registe(buffer);
((PipChannel<VDataStreamSender>*)channel)->register_exchange_buffer(buffer);
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class VDataStreamSender : public DataSink {

RuntimeState* state() { return _state; }

void registe_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer);
void register_pipeline_channels(pipeline::ExchangeSinkBuffer<VDataStreamSender>* buffer);

bool channel_all_can_write();

Expand Down Expand Up @@ -530,7 +530,7 @@ class PipChannel final : public Channel<Parent> {
return Status::OK();
}

void registe(pipeline::ExchangeSinkBuffer<Parent>* buffer) {
void register_exchange_buffer(pipeline::ExchangeSinkBuffer<Parent>* buffer) {
_buffer = buffer;
_buffer->register_sink(Channel<Parent>::_fragment_instance_id);
}
Expand Down

0 comments on commit c72191e

Please sign in to comment.