diff --git a/Cargo.lock b/Cargo.lock index 271041656a..a8724e6bc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,7 @@ dependencies = [ "atomic_enum", "base64 0.13.1", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "codec", "common_types", "datafusion", @@ -1345,7 +1345,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "1.0.23" -source = "git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4#d849fa44e29ea04c7d99c082a38efb8ce5200d5e" +source = "git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886#986ff070e831d5ad5faeae3062ab79985d417886" dependencies = [ "prost", "protoc-bin-vendored", @@ -1528,7 +1528,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "etcd-client", "future_ext", @@ -1606,7 +1606,7 @@ dependencies = [ "arrow 43.0.0", "arrow_ext", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "datafusion", "hash_ext", @@ -2362,7 +2362,7 @@ dependencies = [ "async-recursion", "async-trait", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "datafusion", "datafusion-proto", @@ -3921,7 +3921,7 @@ name = "meta_client" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "futures 0.3.28", "generic_error", @@ -4446,7 +4446,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "clru", "crc", @@ -5323,7 +5323,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "clru", "cluster", "common_types", @@ -5451,7 +5451,7 @@ dependencies = [ "arrow 43.0.0", "async-trait", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "cluster", "codec", @@ -5765,7 +5765,7 @@ version = "1.2.6-alpha" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "futures 0.3.28", "generic_error", @@ -5894,7 +5894,7 @@ name = "router" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "cluster", "common_types", "generic_error", @@ -6269,7 +6269,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "clru", "cluster", "common_types", @@ -6795,7 +6795,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "codec", "common_types", "futures 0.3.28", @@ -6817,7 +6817,7 @@ dependencies = [ "arrow_ext", "async-trait", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "common_types", "datafusion", "datafusion-proto", @@ -7020,7 +7020,7 @@ dependencies = [ name = "time_ext" version = "1.2.6-alpha" dependencies = [ - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "common_types", "macros", @@ -7672,7 +7672,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes_ext", - "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)", + "ceresdbproto 1.0.23 (git+https://github.com/baojinri/ceresdbproto.git?rev=986ff070e831d5ad5faeae3062ab79985d417886)", "chrono", "codec", "common_types", diff --git a/Cargo.toml b/Cargo.toml index c99bbaf56b..24cc56be06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "d849fa4" } +ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "986ff070e831d5ad5faeae3062ab79985d417886" } codec = { path = "components/codec" } chrono = "0.4" clap = "3.0" diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs b/df_engine_extensions/src/dist_sql_query/mod.rs index abfc0cca1d..c2cd825092 100644 --- a/df_engine_extensions/src/dist_sql_query/mod.rs +++ b/df_engine_extensions/src/dist_sql_query/mod.rs @@ -66,13 +66,19 @@ type ExecutableScanBuilderRef = Box; pub struct RemoteTaskContext { pub task_ctx: Arc, pub remote_metrics: Arc>>, + pub is_analyze: bool, } impl RemoteTaskContext { - pub fn new(task_ctx: Arc, remote_metrics: Arc>>) -> Self { + pub fn new( + task_ctx: Arc, + remote_metrics: Arc>>, + is_analyze: bool, + ) -> Self { Self { task_ctx, remote_metrics, + is_analyze, } } } diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/df_engine_extensions/src/dist_sql_query/physical_plan.rs index 87cd18bdcd..1ebe669fc7 100644 --- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -349,7 +349,8 @@ impl ExecutionPlan for ResolvedPartitionedScan { remote_metrics, } = &self.remote_exec_ctx.plan_ctxs[partition]; - let remote_task_ctx = RemoteTaskContext::new(context, remote_metrics.clone()); + let remote_task_ctx = + RemoteTaskContext::new(context, remote_metrics.clone(), self.is_analyze); // Send plan for remote execution. let stream_future = self.remote_exec_ctx.executor.execute( diff --git a/query_engine/src/datafusion_impl/task_context.rs b/query_engine/src/datafusion_impl/task_context.rs index 8aefd563c1..5e34cdc746 100644 --- a/query_engine/src/datafusion_impl/task_context.rs +++ b/query_engine/src/datafusion_impl/task_context.rs @@ -202,6 +202,7 @@ impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl { default_schema, query: display_plan.indent(true).to_string(), priority, + is_analyze: task_context.is_analyze, }; // Encode plan and schema diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index eda9a61ada..9ce0c13493 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -718,12 +718,13 @@ impl RemoteEngineServiceImpl { let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote( encoded_plan, ))); + // TODO: Use in handle_execute_plan fn to build stream with metrics + let physical_plan_clone = physical_plan.clone(); let rt = self .runtimes .read_runtime .choose_runtime(&query_ctx.priority); - let physical_plan_clone = physical_plan.clone(); let stream = rt .spawn(async move { handle_execute_plan(query_ctx, physical_plan, query_engine).await }) @@ -743,7 +744,7 @@ impl RemoteEngineServiceImpl { let stream = StreamWithMetric::new(Box::pin(stream), metric); Ok(RemoteExecStream::new( Box::pin(stream), - Some(physical_plan_clone), + ctx.is_analyze.then_some(physical_plan_clone), )) } @@ -781,7 +782,7 @@ impl RemoteEngineServiceImpl { let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote( encoded_plan, ))); - + // TODO: Use in handle_execute_plan fn to build stream with metrics let physical_plan_clone = physical_plan.clone(); let QueryDedup { @@ -822,7 +823,7 @@ impl RemoteEngineServiceImpl { let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), metric); Ok(RemoteExecStream::new( Box::pin(stream), - Some(physical_plan_clone), + ctx.is_analyze.then_some(physical_plan_clone), )) } diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 9073e75ec6..00e3d9d96c 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -456,6 +456,7 @@ pub struct ExecContext { pub default_schema: String, pub query: String, pub priority: Priority, + pub is_analyze: bool, } pub enum PhysicalPlan { @@ -478,6 +479,7 @@ impl From for ceresdbproto::remote_engine::ExecutePlanRequ timeout_ms: rest_duration_ms, priority: value.context.priority.as_u8() as i32, displayable_query: value.context.query, + is_analyze: value.context.is_analyze, }; let pb_plan = match value.physical_plan { @@ -522,6 +524,7 @@ impl TryFrom for RemoteExecuteR default_schema, timeout_ms, displayable_query, + is_analyze, .. } = pb_exec_ctx; @@ -539,6 +542,7 @@ impl TryFrom for RemoteExecuteR default_schema, query: displayable_query, priority, + is_analyze, }; // Plan