Skip to content

Commit

Permalink
add is_analyze field
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Dec 28, 2023
1 parent b02bac2 commit 10e2ede
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 24 deletions.
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "d849fa4" }
ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "986ff070e831d5ad5faeae3062ab79985d417886" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
8 changes: 7 additions & 1 deletion df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,19 @@ type ExecutableScanBuilderRef = Box<dyn ExecutableScanBuilder>;
pub struct RemoteTaskContext {
pub task_ctx: Arc<TaskContext>,
pub remote_metrics: Arc<Mutex<Option<String>>>,
pub is_analyze: bool,
}

impl RemoteTaskContext {
pub fn new(task_ctx: Arc<TaskContext>, remote_metrics: Arc<Mutex<Option<String>>>) -> Self {
pub fn new(
task_ctx: Arc<TaskContext>,
remote_metrics: Arc<Mutex<Option<String>>>,
is_analyze: bool,
) -> Self {
Self {
task_ctx,
remote_metrics,
is_analyze,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ impl ExecutionPlan for ResolvedPartitionedScan {
remote_metrics,
} = &self.remote_exec_ctx.plan_ctxs[partition];

let remote_task_ctx = RemoteTaskContext::new(context, remote_metrics.clone());
let remote_task_ctx =
RemoteTaskContext::new(context, remote_metrics.clone(), self.is_analyze);

// Send plan for remote execution.
let stream_future = self.remote_exec_ctx.executor.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0)
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";

plan_type,plan,
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1:\n=0]\n=0]\n"),
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),


-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
Expand All @@ -92,7 +92,7 @@ String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:f
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");

plan_type,plan,
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1:\n=0]\n=0]\n"),
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=2\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),


ALTER TABLE partition_table_t ADD COLUMN (b string);
Expand Down
1 change: 1 addition & 0 deletions query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl {
default_catalog,
default_schema,
query: display_plan.indent(true).to_string(),
is_analyze: task_context.is_analyze,
};

// Encode plan and schema
Expand Down
7 changes: 4 additions & 3 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ impl RemoteEngineServiceImpl {
let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
encoded_plan,
)));
// TODO: Use in handle_execute_plan fn to build stream with metrics
let physical_plan_clone = physical_plan.clone();

let stream = self
Expand All @@ -713,7 +714,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(stream), metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
Some(physical_plan_clone),
ctx.is_analyze.then_some(physical_plan_clone),
))
}

Expand Down Expand Up @@ -749,7 +750,7 @@ impl RemoteEngineServiceImpl {
let physical_plan = Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
encoded_plan,
)));

// TODO: Use in handle_execute_plan fn to build stream with metrics
let physical_plan_clone = physical_plan.clone();

let QueryDedup {
Expand Down Expand Up @@ -785,7 +786,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
Some(physical_plan_clone),
ctx.is_analyze.then_some(physical_plan_clone),
))
}

Expand Down
4 changes: 4 additions & 0 deletions table_engine/src/remote/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ pub struct ExecContext {
pub default_catalog: String,
pub default_schema: String,
pub query: String,
pub is_analyze: bool,
}

pub enum PhysicalPlan {
Expand All @@ -476,6 +477,7 @@ impl From<RemoteExecuteRequest> for ceresdbproto::remote_engine::ExecutePlanRequ
timeout_ms: rest_duration_ms,
priority: 0, // not used now
displayable_query: value.context.query,
is_analyze: value.context.is_analyze,
};

let pb_plan = match value.physical_plan {
Expand Down Expand Up @@ -516,6 +518,7 @@ impl TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_schema,
timeout_ms,
displayable_query,
is_analyze,
..
} = pb_exec_ctx;

Expand All @@ -532,6 +535,7 @@ impl TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_catalog,
default_schema,
query: displayable_query,
is_analyze,
};

// Plan
Expand Down

0 comments on commit 10e2ede

Please sign in to comment.