From a35de81113906f199cfbebe4ae48aad36513f0b9 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Wed, 1 Nov 2023 19:48:09 +0800 Subject: [PATCH] [branch-2.0][fix](multi-table) fix unknown source slot descriptor when load multi table (#25762) (#26223) --- be/src/pipeline/pipeline_fragment_context.cpp | 14 ++- be/src/pipeline/pipeline_fragment_context.h | 2 + be/src/runtime/plan_fragment_executor.cpp | 14 +-- be/src/runtime/plan_fragment_executor.h | 2 + .../kafka/scripts/multi_table_csv1.csv | 3 + .../kafka/scripts/multi_table_json1.json | 3 + .../thirdparties/run-thirdparties-docker.sh | 4 +- .../routine_load/test_routine_load.out | 14 ++- .../routine_load/test_routine_load.groovy | 89 +++++++++++++++++++ 9 files changed, 131 insertions(+), 14 deletions(-) create mode 100644 docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv create mode 100644 docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index cd529ee835d299..87578fbc6658a9 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -246,13 +246,19 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re fragment_context->set_is_report_success(request.query_options.is_report_success); } - auto* desc_tbl = _query_ctx->desc_tbl; - _runtime_state->set_desc_tbl(desc_tbl); + if (request.is_simplified_param) { + _desc_tbl = _query_ctx->desc_tbl; + } else { + DCHECK(request.__isset.desc_tbl); + RETURN_IF_ERROR( + DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); + } + _runtime_state->set_desc_tbl(_desc_tbl); // 2. Create ExecNode to build pipeline with PipelineFragmentContext RETURN_IF_ERROR_OR_CATCH_EXCEPTION( ExecNode::create_tree(_runtime_state.get(), _runtime_state->obj_pool(), - request.fragment.plan, *desc_tbl, &_root_plan)); + request.fragment.plan, *_desc_tbl, &_root_plan)); // Set senders of exchange nodes before pipeline build std::vector exch_nodes; @@ -308,7 +314,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( _runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, request, idx, _root_plan->row_desc(), - _runtime_state.get(), &_sink, *desc_tbl)); + _runtime_state.get(), &_sink, *_desc_tbl)); } _root_pipeline = fragment_context->add_pipeline(); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 287fac9e40a956..ed79800ec60f9c 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -200,6 +200,8 @@ class PipelineFragmentContext : public std::enable_shared_from_thisdesc_tbl; + if (request.is_simplified_param) { + _desc_tbl = query_ctx->desc_tbl; } else { DCHECK(request.__isset.desc_tbl); - RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl, &desc_tbl)); + RETURN_IF_ERROR( + DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); } - _runtime_state->set_desc_tbl(desc_tbl); + _runtime_state->set_desc_tbl(_desc_tbl); // set up plan DCHECK(request.__isset.fragment); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree( - _runtime_state.get(), obj_pool(), request.fragment.plan, *desc_tbl, &_plan)); + _runtime_state.get(), obj_pool(), request.fragment.plan, *_desc_tbl, &_plan)); // set #senders of exchange nodes before calling Prepare() std::vector exch_nodes; @@ -212,7 +212,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, if (request.fragment.__isset.output_sink) { RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params, - row_desc(), runtime_state(), &_sink, *desc_tbl)); + row_desc(), runtime_state(), &_sink, *_desc_tbl)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state())); RuntimeProfile* sink_profile = _sink->profile(); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index c95ddc75c17c03..f2a9f10d9a3a3f 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -208,6 +208,8 @@ class PlanFragmentExecutor { OpentelemetrySpan _span; + DescriptorTbl* _desc_tbl; + ObjectPool* obj_pool() { return _runtime_state->obj_pool(); } // typedef for TPlanFragmentExecParams.per_node_scan_ranges diff --git a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv new file mode 100644 index 00000000000000..1df0d787733b07 --- /dev/null +++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv @@ -0,0 +1,3 @@ +routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} +routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} +routine_load_mow_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} \ No newline at end of file diff --git a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json new file mode 100644 index 00000000000000..0099b0b5432501 --- /dev/null +++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json @@ -0,0 +1,3 @@ +routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} +routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} +routine_load_mow_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} \ No newline at end of file diff --git a/docker/thirdparties/run-thirdparties-docker.sh b/docker/thirdparties/run-thirdparties-docker.sh index 241ac6acc7ba83..e94ddbca9f83c9 100755 --- a/docker/thirdparties/run-thirdparties-docker.sh +++ b/docker/thirdparties/run-thirdparties-docker.sh @@ -257,7 +257,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then local container_id="$1" local ip_host="$2" - declare -a topics=("basic_data" "basic_array_data" "basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" "basic_array_data_timezone" "multi_table_csv") + declare -a topics=("basic_data" "basic_array_data" "basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" "basic_array_data_timezone" "multi_table_csv" "multi_table_csv1") for topic in "${topics[@]}"; do while IFS= read -r line; do @@ -265,7 +265,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then done < "${ROOT}/docker-compose/kafka/scripts/${topic}.csv" done - declare -a json_topics=("basic_data_json" "basic_array_data_json" "basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json") + declare -a json_topics=("basic_data_json" "basic_array_data_json" "basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json" "multi_table_json1") for json_topic in "${json_topics[@]}"; do echo ${json_topics} diff --git a/regression-test/data/load_p0/routine_load/test_routine_load.out b/regression-test/data/load_p0/routine_load/test_routine_load.out index 161af660b4741d..4288223ca02476 100644 --- a/regression-test/data/load_p0/routine_load/test_routine_load.out +++ b/regression-test/data/load_p0/routine_load/test_routine_load.out @@ -986,4 +986,16 @@ 49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} -- !sql_multi_table_one_data -- -8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N \ No newline at end of file +8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N + +-- !sql_multi_table -- +49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {} + +-- !sql_multi_table -- +49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {} + +-- !sql_multi_table -- +8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N + +-- !sql_multi_table -- +8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy index 4b659df4eff984..d9560e312c56d0 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy @@ -31,6 +31,11 @@ suite("test_routine_load","p0") { "dup_tbl_basic_multi_table", ] + def multiTables1 = [ + "dup_tbl_basic", + "uniq_tbl_basic", + ] + def jobs = [ "dup_tbl_basic_job", "uniq_tbl_basic_job", @@ -127,6 +132,11 @@ suite("test_routine_load","p0") { "multi_table_json", ] + def multiTableJobName1 = [ + "multi_table_csv1", + "multi_table_json1", + ] + def formats = [ "csv", "json", @@ -980,4 +990,83 @@ suite("test_routine_load","p0") { j++ } } + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def j = 0 + for (String jobName in multiTableJobName1) { + try { + for (String tableName in multiTables1) { + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + } + + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "format" = "${formats[j]}", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${jobName}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + i = 0 + for (String tableName in multiTables1) { + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state == "NEED_SCHEDULE") { + continue; + } + assertEquals(res[0][8].toString(), "RUNNING") + break; + } + + def count = 0 + def tableName1 = "routine_load_" + tableName + while (true) { + def res = sql "select count(*) from ${tableName1}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + + if (i <= 3) { + qt_sql_multi_table "select * from ${tableName1} order by k00,k01" + } else { + qt_sql_multi_table "select * from ${tableName1} order by k00" + } + + i++ + } + } finally { + sql "stop routine load for ${jobName}" + for (String tableName in multiTables1) { + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + } + } + j++ + } + } }