Skip to content

Commit

Permalink
[fix](arrow-flight-sql) Fix pipelineX Unknown result sink type (apach…
Browse files Browse the repository at this point in the history
…e#35804)

Fix meet error status: [INTERNAL_ERROR]Unknown result sink type

```
W20240422 14:52:07.509462 40713 status.h:380] meet error status: [INTERNAL_ERROR]Unknown result sink type

        0#  doris::pipeline::ResultSinkLocalState::open(doris::RuntimeState*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:0
        1#  doris::pipeline::PipelineXTask::_open() at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449
        2#  doris::pipeline::PipelineXTask::execute(bool*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449
        3#  doris::pipeline::TaskScheduler::_do_work(unsigned long) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:345
        4#  doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
        5#  doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
        6#  start_thread
        7#  clone
```
  • Loading branch information
xinyiZzz committed Jul 9, 2024
1 parent 19eb9de commit c3eff02
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
#include "runtime/buffer_control_block.h"
#include "runtime/exec_env.h"
#include "runtime/result_buffer_mgr.h"
#include "util/arrow/row_batch.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/sink/varrow_flight_result_writer.h"
#include "vec/sink/vmysql_result_writer.h"
#include "vec/sink/vresult_sink.h"

Expand Down Expand Up @@ -80,7 +82,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
}
// create writer based on sink type
switch (p._sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
case TResultSinkType::MYSQL_PROTOCAL: {
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
Expand All @@ -89,6 +91,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
break;
}
default:
return Status::InternalError("Unknown result sink type");
}
Expand Down

0 comments on commit c3eff02

Please sign in to comment.