Skip to content

Commit

Permalink
avoid returning metrics in non-analyze sql
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Dec 28, 2023
1 parent ce4044b commit cec73a9
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 23 deletions.
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "d849fa4" }
ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "986ff070e831d5ad5faeae3062ab79985d417886" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
8 changes: 7 additions & 1 deletion df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,19 @@ type ExecutableScanBuilderRef = Box<dyn ExecutableScanBuilder>;
pub struct RemoteTaskContext {
pub task_ctx: Arc<TaskContext>,
pub remote_metrics: Arc<Mutex<Option<String>>>,
pub is_analyze: bool,
}

impl RemoteTaskContext {
pub fn new(task_ctx: Arc<TaskContext>, remote_metrics: Arc<Mutex<Option<String>>>) -> Self {
pub fn new(
task_ctx: Arc<TaskContext>,
remote_metrics: Arc<Mutex<Option<String>>>,
is_analyze: bool,
) -> Self {
Self {
task_ctx,
remote_metrics,
is_analyze,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ impl ExecutionPlan for ResolvedPartitionedScan {
remote_metrics,
} = &self.remote_exec_ctx.plan_ctxs[partition];

let remote_task_ctx = RemoteTaskContext::new(context, remote_metrics.clone());
let remote_task_ctx =
RemoteTaskContext::new(context, remote_metrics.clone(), self.is_analyze);

// Send plan for remote execution.
let stream_future = self.remote_exec_ctx.executor.execute(
Expand Down
1 change: 1 addition & 0 deletions query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl {
default_schema,
query: display_plan.indent(true).to_string(),
priority,
is_analyze: task_context.is_analyze,
};

// Encode plan and schema
Expand Down
9 changes: 5 additions & 4 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,12 +718,13 @@ impl RemoteEngineServiceImpl {
let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
encoded_plan,
)));
// TODO: Use in handle_execute_plan fn to build stream with metrics
let physical_plan_clone = physical_plan.clone();

let rt = self
.runtimes
.read_runtime
.choose_runtime(&query_ctx.priority);
let physical_plan_clone = physical_plan.clone();

let stream = rt
.spawn(async move { handle_execute_plan(query_ctx, physical_plan, query_engine).await })
Expand All @@ -743,7 +744,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(stream), metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
Some(physical_plan_clone),
ctx.is_analyze.then_some(physical_plan_clone),
))
}

Expand Down Expand Up @@ -781,7 +782,7 @@ impl RemoteEngineServiceImpl {
let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
encoded_plan,
)));

// TODO: Use in handle_execute_plan fn to build stream with metrics
let physical_plan_clone = physical_plan.clone();

let QueryDedup {
Expand Down Expand Up @@ -822,7 +823,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
Some(physical_plan_clone),
ctx.is_analyze.then_some(physical_plan_clone),
))
}

Expand Down
4 changes: 4 additions & 0 deletions table_engine/src/remote/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ pub struct ExecContext {
pub default_schema: String,
pub query: String,
pub priority: Priority,
pub is_analyze: bool,
}

pub enum PhysicalPlan {
Expand All @@ -478,6 +479,7 @@ impl From<RemoteExecuteRequest> for ceresdbproto::remote_engine::ExecutePlanRequ
timeout_ms: rest_duration_ms,
priority: value.context.priority.as_u8() as i32,
displayable_query: value.context.query,
is_analyze: value.context.is_analyze,
};

let pb_plan = match value.physical_plan {
Expand Down Expand Up @@ -522,6 +524,7 @@ impl TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_schema,
timeout_ms,
displayable_query,
is_analyze,
..
} = pb_exec_ctx;

Expand All @@ -539,6 +542,7 @@ impl TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_schema,
query: displayable_query,
priority,
is_analyze,
};

// Plan
Expand Down

0 comments on commit cec73a9

Please sign in to comment.