From 26a26b35a2819912a4e16bd3ff23829cc9c44217 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 3 Jun 2024 15:11:12 +0800 Subject: [PATCH 1/2] 1 --- be/src/pipeline/exec/result_sink_operator.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 7b355bc77b1b16..38d0d5c4917b13 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -25,8 +25,10 @@ #include "runtime/buffer_control_block.h" #include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" +#include "util/arrow/row_batch.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/sink/varrow_flight_result_writer.h" #include "vec/sink/vmysql_result_writer.h" namespace doris::pipeline { @@ -61,7 +63,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) { } // create writer based on sink type switch (p._sink_type) { - case TResultSinkType::MYSQL_PROTOCAL: + case TResultSinkType::MYSQL_PROTOCAL: { if (state->mysql_row_binary_format()) { _writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter( _sender.get(), _output_vexpr_ctxs, _profile)); @@ -70,6 +72,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) { _sender.get(), _output_vexpr_ctxs, _profile)); } break; + } + case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { + std::shared_ptr arrow_schema; + RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); + state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), + arrow_schema); + _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs, + _profile, arrow_schema)); + break; + } default: return Status::InternalError("Unknown result sink type"); } From 1193ae28a1d0874e47417527bace6f5533e7bc71 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 3 Jun 2024 16:51:52 +0800 Subject: [PATCH 2/2] [fix](arrow-flight-sql) Fix pipelineX Unknown result sink type --- be/src/pipeline/exec/result_sink_operator.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 38d0d5c4917b13..09a4f2d94b1cbf 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -78,8 +78,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), arrow_schema); - _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs, - _profile, arrow_schema)); + _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( + _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); break; } default: