Skip to content

Commit

Permalink
chore(cubesql): Do not call async Node functions while planning
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou committed Dec 3, 2024
1 parent 4b85612 commit d810695
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 121 deletions.
74 changes: 69 additions & 5 deletions rust/cubesql/cubesql/src/compile/query_engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::compile::engine::df::planner::CubeQueryPlanner;
use std::{backtrace::Backtrace, collections::HashMap, future::Future, pin::Pin, sync::Arc};
use std::{
backtrace::Backtrace, collections::HashMap, future::Future, pin::Pin, sync::Arc,
time::SystemTime,
};

use crate::{
compile::{
Expand All @@ -21,8 +24,9 @@ use crate::{
},
config::ConfigObj,
sql::{
compiler_cache::CompilerCache, statement::SensitiveDataSanitizer, SessionManager,
SessionState,
compiler_cache::{CompilerCache, CompilerCacheEntry},
statement::SensitiveDataSanitizer,
SessionManager, SessionState,
},
transport::{LoadRequestMeta, MetaContext, SpanId, TransportService},
CubeErrorCauseType,
Expand Down Expand Up @@ -78,6 +82,11 @@ pub trait QueryEngine {

fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType;

async fn get_cache_entry(
&self,
state: Arc<SessionState>,
) -> Result<Arc<CompilerCacheEntry>, CompilationError>;

async fn plan(
&self,
stmt: Self::AstStatementType,
Expand All @@ -86,6 +95,26 @@ pub trait QueryEngine {
meta: Arc<MetaContext>,
state: Arc<SessionState>,
) -> CompilationResult<(QueryPlan, Self::PlanMetadataType)> {
let cache_entry = self.get_cache_entry(state.clone()).await?;

let planning_start = SystemTime::now();
if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = state.auth_context() {
self.transport_ref()
.log_load_state(
Some(span_id.clone()),
auth_context,
state.get_load_request_meta(),
"SQL API Query Planning".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
}),
)
.await

Check warning on line 113 in rust/cubesql/cubesql/src/compile/query_engine.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/query_engine.rs#L113

Added line #L113 was not covered by tests
.map_err(|e| CompilationError::internal(e.to_string()))?;
}

Check warning on line 115 in rust/cubesql/cubesql/src/compile/query_engine.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/query_engine.rs#L115

Added line #L115 was not covered by tests
}

let ctx = self.create_session_ctx(state.clone())?;
let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?;

Expand Down Expand Up @@ -144,7 +173,7 @@ pub trait QueryEngine {
let mut finalized_graph = self
.compiler_cache_ref()
.rewrite(
state.auth_context().unwrap(),
Arc::clone(&cache_entry),
cube_ctx.clone(),
converter.take_egraph(),
&query_params.unwrap(),
Expand Down Expand Up @@ -192,6 +221,7 @@ pub trait QueryEngine {
let result = rewriter
.find_best_plan(
root,
cache_entry,
state.auth_context().unwrap(),
qtrace,
span_id.clone(),
Expand Down Expand Up @@ -243,12 +273,31 @@ pub trait QueryEngine {
// TODO: We should find what optimizers will be safety to use for OLAP queries
guard.optimizer.rules = vec![];
}
if let Some(span_id) = span_id {
if let Some(span_id) = &span_id {
span_id.set_is_data_query(true).await;
}
};

log::debug!("Rewrite: {:#?}", rewrite_plan);

if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = state.auth_context() {
self.transport_ref()
.log_load_state(
Some(span_id.clone()),
auth_context,
state.get_load_request_meta(),
"SQL API Query Planning Success".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
"duration": planning_start.elapsed().unwrap().as_millis() as u64,
}),
)
.await

Check warning on line 296 in rust/cubesql/cubesql/src/compile/query_engine.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/query_engine.rs#L296

Added line #L296 was not covered by tests
.map_err(|e| CompilationError::internal(e.to_string()))?;
}

Check warning on line 298 in rust/cubesql/cubesql/src/compile/query_engine.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/query_engine.rs#L298

Added line #L298 was not covered by tests
}

let rewrite_plan = Self::evaluate_wrapped_sql(
self.transport_ref().clone(),
Arc::new(state.get_load_request_meta()),
Expand Down Expand Up @@ -501,6 +550,21 @@ impl QueryEngine for SqlQueryEngine {
fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType {
SensitiveDataSanitizer::new().replace(stmt.clone())
}

async fn get_cache_entry(
&self,
state: Arc<SessionState>,
) -> Result<Arc<CompilerCacheEntry>, CompilationError> {
self.compiler_cache_ref()
.get_cache_entry(
state.auth_context().ok_or_else(|| {
CompilationError::internal("Unable to get auth context".to_string())

Check warning on line 561 in rust/cubesql/cubesql/src/compile/query_engine.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/query_engine.rs#L561

Added line #L561 was not covered by tests
})?,
state.protocol.clone(),
)
.await

Check warning on line 565 in rust/cubesql/cubesql/src/compile/query_engine.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/query_engine.rs#L565

Added line #L565 was not covered by tests
.map_err(|e| CompilationError::internal(e.to_string()))
}
}

fn is_olap_query(parent: &LogicalPlan) -> Result<bool, CompilationError> {
Expand Down
17 changes: 5 additions & 12 deletions rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
CubeContext,
},
config::ConfigObj,
sql::AuthContextRef,
sql::{compiler_cache::CompilerCacheEntry, AuthContextRef},
transport::{MetaContext, SpanId},
CubeError,
};
Expand Down Expand Up @@ -310,7 +310,7 @@ impl Rewriter {

pub async fn run_rewrite_to_completion(
&mut self,
auth_context: AuthContextRef,
cache_entry: Arc<CompilerCacheEntry>,
qtrace: &mut Option<Qtrace>,
) -> Result<CubeEGraph, CubeError> {
let cube_context = self.cube_context.clone();
Expand All @@ -323,11 +323,7 @@ impl Rewriter {
.sessions
.server
.compiler_cache
.rewrite_rules(
auth_context.clone(),
cube_context.session_state.protocol.clone(),
false,
)
.rewrite_rules(cache_entry, false)
.await?;

let (plan, qtrace_egraph_iterations) = tokio::task::spawn_blocking(move || {
Expand Down Expand Up @@ -392,6 +388,7 @@ impl Rewriter {
pub async fn find_best_plan(
&mut self,
root: Id,
cache_entry: Arc<CompilerCacheEntry>,
auth_context: AuthContextRef,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
Expand All @@ -407,11 +404,7 @@ impl Rewriter {
.sessions
.server
.compiler_cache
.rewrite_rules(
auth_context.clone(),
cube_context.session_state.protocol.clone(),
true,
)
.rewrite_rules(cache_entry, true)
.await?;

let (plan, qtrace_egraph_iterations, qtrace_best_graph) =
Expand Down
48 changes: 3 additions & 45 deletions rust/cubesql/cubesql/src/compile/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::compile::{
StatusFlags,
};
use sqlparser::ast;
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use std::{collections::HashMap, sync::Arc};

use crate::{
compile::{
Expand Down Expand Up @@ -61,50 +61,8 @@ impl QueryRouter {
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
) -> CompilationResult<QueryPlan> {
let planning_start = SystemTime::now();
if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = self.state.auth_context() {
self.session_manager
.server
.transport
.log_load_state(
Some(span_id.clone()),
auth_context,
self.state.get_load_request_meta(),
"SQL API Query Planning".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
}),
)
.await
.map_err(|e| CompilationError::internal(e.to_string()))?;
}
}
let result = self
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
.await?;

if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = self.state.auth_context() {
self.session_manager
.server
.transport
.log_load_state(
Some(span_id.clone()),
auth_context,
self.state.get_load_request_meta(),
"SQL API Query Planning Success".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
"duration": planning_start.elapsed().unwrap().as_millis() as u64,
}),
)
.await
.map_err(|e| CompilationError::internal(e.to_string()))?;
}
}

return Ok(result);
self.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
.await
}

pub async fn plan(
Expand Down
Loading

0 comments on commit d810695

Please sign in to comment.