From 10e2ede44f23115119a8dcadd1ff54174223c506 Mon Sep 17 00:00:00 2001 From: baojinri Date: Thu, 28 Dec 2023 21:25:20 +0800 Subject: [PATCH] add is_analyze field --- Cargo.lock | 32 +++++++++---------- Cargo.toml | 2 +- .../src/dist_sql_query/mod.rs | 8 ++++- .../src/dist_sql_query/physical_plan.rs | 3 +- .../env/cluster/ddl/partition_table.result | 4 +-- .../src/datafusion_impl/task_context.rs | 1 + server/src/grpc/remote_engine_service/mod.rs | 7 ++-- table_engine/src/remote/model.rs | 4 +++ 8 files changed, 37 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e32493fe4d..143bd471f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,7 @@ dependencies = [ "atomic_enum", "base64 0.13.1", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "codec", "common_types", "datafusion", @@ -1345,7 +1345,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "1.0.23" -source = "git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4#d849fa44e29ea04c7d99c082a38efb8ce5200d5e" +source = "git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886#986ff070e831d5ad5faeae3062ab79985d417886" dependencies = [ "prost", "protoc-bin-vendored", @@ -1528,7 +1528,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "etcd-client", "future_ext", @@ -1606,7 +1606,7 @@ dependencies = [ "arrow 43.0.0", "arrow_ext", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "datafusion", "hash_ext", @@ -2362,7 +2362,7 @@ dependencies = [ "async-recursion", "async-trait", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "datafusion", "datafusion-proto", @@ -3916,7 +3916,7 @@ name = "meta_client" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "futures 0.3.28", "generic_error", @@ -4441,7 +4441,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "clru", "crc", @@ -5318,7 +5318,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "clru", "cluster", "common_types", @@ -5445,7 +5445,7 @@ dependencies = [ "arrow 43.0.0", "async-trait", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "cluster", "codec", "common_types", @@ -5756,7 +5756,7 @@ version = "1.2.6-alpha" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "futures 0.3.28", "generic_error", @@ -5885,7 +5885,7 @@ name = "router" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "cluster", "common_types", "generic_error", @@ -6260,7 +6260,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "clru", "cluster", "common_types", @@ -6786,7 +6786,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "codec", "common_types", "futures 0.3.28", @@ -6808,7 +6808,7 @@ dependencies = [ "arrow_ext", "async-trait", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "datafusion", "datafusion-proto", @@ -7011,7 +7011,7 @@ dependencies = [ name = "time_ext" version = "1.2.6-alpha" dependencies = [ - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "common_types", "macros", @@ -7663,7 +7663,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "codec", "common_types", diff --git a/Cargo.toml b/Cargo.toml index a01a722260..40da0b6b10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "d849fa4" } +ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "986ff070e831d5ad5faeae3062ab79985d417886" } codec = { path = "components/codec" } chrono = "0.4" clap = "3.0" diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs b/df_engine_extensions/src/dist_sql_query/mod.rs index 4bbf6b36ef..630e346600 100644 --- a/df_engine_extensions/src/dist_sql_query/mod.rs +++ b/df_engine_extensions/src/dist_sql_query/mod.rs @@ -64,13 +64,19 @@ type ExecutableScanBuilderRef = Box; pub struct RemoteTaskContext { pub task_ctx: Arc, pub remote_metrics: Arc>>, + pub is_analyze: bool, } impl RemoteTaskContext { - pub fn new(task_ctx: Arc, remote_metrics: Arc>>) -> Self { + pub fn new( + task_ctx: Arc, + remote_metrics: Arc>>, + is_analyze: bool, + ) -> Self { Self { task_ctx, remote_metrics, + is_analyze, } } } diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/df_engine_extensions/src/dist_sql_query/physical_plan.rs index 0dbaf415ac..2855a2c16c 100644 --- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -346,7 +346,8 @@ impl ExecutionPlan for ResolvedPartitionedScan { remote_metrics, } = &self.remote_exec_ctx.plan_ctxs[partition]; - let remote_task_ctx = RemoteTaskContext::new(context, remote_metrics.clone()); + let remote_task_ctx = + RemoteTaskContext::new(context, remote_metrics.clone(), self.is_analyze); // Send plan for remote execution. let stream_future = self.remote_exec_ctx.executor.execute( diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index 18e023c006..9a937acd26 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -83,7 +83,7 @@ UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0) EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), -- SQLNESS REPLACE duration=\d+.?\d*(ยต|m|n) duration=xx @@ -92,7 +92,7 @@ String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:f EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), ALTER TABLE partition_table_t ADD COLUMN (b string); diff --git a/query_engine/src/datafusion_impl/task_context.rs b/query_engine/src/datafusion_impl/task_context.rs index f19c5dde35..6831b8ab50 100644 --- a/query_engine/src/datafusion_impl/task_context.rs +++ b/query_engine/src/datafusion_impl/task_context.rs @@ -199,6 +199,7 @@ impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl { default_catalog, default_schema, query: display_plan.indent(true).to_string(), + is_analyze: task_context.is_analyze, }; // Encode plan and schema diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 1135930014..fd353f6302 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -691,6 +691,7 @@ impl RemoteEngineServiceImpl { let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote( encoded_plan, ))); + // TODO: Use in handle_execute_plan fn to build stream with metrics let physical_plan_clone = physical_plan.clone(); let stream = self @@ -713,7 +714,7 @@ impl RemoteEngineServiceImpl { let stream = StreamWithMetric::new(Box::pin(stream), metric); Ok(RemoteExecStream::new( Box::pin(stream), - Some(physical_plan_clone), + ctx.is_analyze.then_some(physical_plan_clone), )) } @@ -749,7 +750,7 @@ impl RemoteEngineServiceImpl { let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote( encoded_plan, ))); - + // TODO: Use in handle_execute_plan fn to build stream with metrics let physical_plan_clone = physical_plan.clone(); let QueryDedup { @@ -785,7 +786,7 @@ impl RemoteEngineServiceImpl { let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), metric); Ok(RemoteExecStream::new( Box::pin(stream), - Some(physical_plan_clone), + ctx.is_analyze.then_some(physical_plan_clone), )) } diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index bd99670375..380787ed2c 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -454,6 +454,7 @@ pub struct ExecContext { pub default_catalog: String, pub default_schema: String, pub query: String, + pub is_analyze: bool, } pub enum PhysicalPlan { @@ -476,6 +477,7 @@ impl From for ceresdbproto::remote_engine::ExecutePlanRequ timeout_ms: rest_duration_ms, priority: 0, // not used now displayable_query: value.context.query, + is_analyze: value.context.is_analyze, }; let pb_plan = match value.physical_plan { @@ -516,6 +518,7 @@ impl TryFrom for RemoteExecuteR default_schema, timeout_ms, displayable_query, + is_analyze, .. } = pb_exec_ctx; @@ -532,6 +535,7 @@ impl TryFrom for RemoteExecuteR default_catalog, default_schema, query: displayable_query, + is_analyze, }; // Plan