From 9b2e39ca3720426fd48b54196a4a3aa9451dafb0 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 14 Mar 2024 09:11:56 +0800 Subject: [PATCH] refactor: http handler plan sql async. (#14908) * refactor: extract fn try_set_txn. * refactor: http handler plan sql async. --- .../servers/http/v1/http_query_handlers.rs | 5 +- .../service/src/servers/http/v1/json_block.rs | 18 +-- .../servers/http/v1/query/execute_state.rs | 38 +++++- .../src/servers/http/v1/query/http_query.rs | 125 +++++++++--------- .../src/servers/http/v1/query/page_manager.rs | 22 ++- src/query/service/src/sessions/query_ctx.rs | 2 +- .../it/servers/http/http_query_handlers.rs | 5 +- .../tests/it/servers/http/json_block.rs | 25 +--- 8 files changed, 120 insertions(+), 120 deletions(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index baf8ad3946c18..61cf0f4696601 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -100,7 +100,7 @@ pub struct QueryResponseField { } impl QueryResponseField { - fn from_schema(schema: DataSchemaRef) -> Vec { + pub fn from_schema(schema: DataSchemaRef) -> Vec { schema .fields() .iter() @@ -172,7 +172,6 @@ impl QueryResponse { metrics_incr_http_response_errors_count(err.name(), err.code()); } - let schema = data.schema().clone(); let session_id = r.session_id.clone(); let stats = QueryStats { progresses: state.progresses.clone(), @@ -183,7 +182,7 @@ impl QueryResponse { Json(QueryResponse { data: data.into(), state: state.state, - schema: QueryResponseField::from_schema(schema), + schema: state.schema.clone(), session_id: Some(session_id), node_id: r.node_id, session: r.session, diff --git a/src/query/service/src/servers/http/v1/json_block.rs b/src/query/service/src/servers/http/v1/json_block.rs index 14bde8e551ea5..443ec9cb9dc8e 100644 --- a/src/query/service/src/servers/http/v1/json_block.rs +++ b/src/query/service/src/servers/http/v1/json_block.rs @@ -17,8 +17,6 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::Column; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; -use databend_common_expression::DataSchemaRef; use databend_common_formats::field_encoder::FieldEncoderValues; use databend_common_io::prelude::FormatSettings; use serde_json::Value as JsonValue; @@ -26,7 +24,6 @@ use serde_json::Value as JsonValue; #[derive(Debug, Clone)] pub struct JsonBlock { pub(crate) data: Vec>, - pub(crate) schema: DataSchemaRef, } pub type JsonBlockRef = Arc; @@ -63,16 +60,12 @@ pub fn block_to_json_value( impl JsonBlock { pub fn empty() -> Self { - Self { - data: vec![], - schema: Arc::new(DataSchema::empty()), - } + Self { data: vec![] } } - pub fn new(schema: DataSchemaRef, block: &DataBlock, format: &FormatSettings) -> Result { + pub fn new(block: &DataBlock, format: &FormatSettings) -> Result { Ok(JsonBlock { data: block_to_json_value(block, format)?, - schema, }) } @@ -80,10 +73,9 @@ impl JsonBlock { if blocks.is_empty() { return Self::empty(); } - let schema = blocks[0].schema.clone(); let results = blocks.into_iter().map(|b| b.data).collect::>(); let data = results.concat(); - Self { data, schema } + Self { data } } pub fn num_rows(&self) -> usize { @@ -97,10 +89,6 @@ impl JsonBlock { pub fn data(&self) -> &Vec> { &self.data } - - pub fn schema(&self) -> &DataSchemaRef { - &self.schema - } } impl From for Vec> { diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index f7cf0e9064300..5bf8435b18456 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -25,6 +25,7 @@ use databend_common_expression::BlockEntry; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::Scalar; +use databend_common_io::prelude::FormatSettings; use databend_common_settings::Settings; use databend_common_sql::plans::Plan; use databend_common_sql::PlanExtras; @@ -41,9 +42,13 @@ use ExecuteState::*; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterFactory; use crate::interpreters::InterpreterQueryLog; +use crate::servers::http::v1::http_query_handlers::QueryResponseField; use crate::servers::http::v1::query::sized_spsc::SizedChannelSender; +use crate::sessions::AcquireQueueGuard; +use crate::sessions::QueriesQueueManager; use crate::sessions::QueryAffect; use crate::sessions::QueryContext; +use crate::sessions::QueryEntry; use crate::sessions::Session; use crate::sessions::TableContext; @@ -106,9 +111,13 @@ pub struct ExecuteRunning { session: Arc, // mainly used to get progress for now ctx: Arc, + schema: Vec, + #[allow(dead_code)] + queue_guard: AcquireQueueGuard, } pub struct ExecuteStopped { + pub schema: Vec, pub stats: Progresses, pub affect: Option, pub reason: Result<()>, @@ -147,6 +156,13 @@ impl ExecutorSessionState { } impl Executor { + pub fn get_schema(&self) -> Vec { + match &self.state { + Starting(_) => Default::default(), + Running(r) => r.schema.clone(), + Stopped(f) => f.schema.clone(), + } + } pub fn get_progress(&self) -> Progresses { match &self.state { Starting(_) => Default::default(), @@ -230,6 +246,7 @@ impl Executor { } guard.state = Stopped(Box::new(ExecuteStopped { stats: Default::default(), + schema: vec![], reason, session_state: ExecutorSessionState::new(s.ctx.get_current_session()), query_duration_ms: s.ctx.get_query_duration_ms(), @@ -254,6 +271,7 @@ impl Executor { guard.state = Stopped(Box::new(ExecuteStopped { stats: Progresses::from_context(&r.ctx), + schema: r.schema.clone(), reason, session_state: ExecutorSessionState::new(r.ctx.get_current_session()), query_duration_ms: r.ctx.get_query_duration_ms(), @@ -281,17 +299,31 @@ impl ExecuteState { #[async_backtrace::framed] pub(crate) async fn try_start_query( executor: Arc>, - plan: Plan, - extras: PlanExtras, + sql: String, session: Arc, ctx: Arc, block_sender: SizedChannelSender, + format_settings: Arc>>, ) -> Result<()> { - ctx.attach_query_str(plan.kind(), extras.statement.to_mask_sql()); + let entry = QueryEntry::create(&ctx)?; + let queue_guard = QueriesQueueManager::instance().acquire(entry).await?; + + let (plan, plan_extras) = ExecuteState::plan_sql(&sql, ctx.clone()) + .await + .map_err(|err| err.display_with_sql(&sql))?; + + ctx.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql()); + { + // set_var may change settings + let mut guard = format_settings.write(); + *guard = Some(ctx.get_format_settings()?); + } let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let running_state = ExecuteRunning { session, ctx: ctx.clone(), + queue_guard, + schema: QueryResponseField::from_schema(plan.schema()), }; info!("{}: http query change state to Running", &ctx.get_id()); Executor::start_to_running(&executor, Running(running_state)).await; diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index e479d7961a5be..b16bf3de9196c 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -28,6 +28,7 @@ use databend_common_base::runtime::TrySpawn; use databend_common_catalog::table_context::StageAttachment; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_io::prelude::FormatSettings; use databend_common_metrics::http::metrics_incr_http_response_errors_count; use databend_common_settings::ScopeLevel; use databend_storages_common_txn::TxnState; @@ -42,6 +43,7 @@ use serde::Serialize; use super::HttpQueryContext; use super::RemoveReason; use crate::interpreters::InterpreterQueryLog; +use crate::servers::http::v1::http_query_handlers::QueryResponseField; use crate::servers::http::v1::query::execute_state::ExecuteStarting; use crate::servers::http::v1::query::execute_state::ExecuteStopped; use crate::servers::http::v1::query::execute_state::ExecutorSessionState; @@ -60,10 +62,8 @@ use crate::servers::http::v1::QueryError; use crate::servers::http::v1::QueryResponse; use crate::servers::http::v1::QueryStats; use crate::sessions::short_sql; -use crate::sessions::AcquireQueueGuard; -use crate::sessions::QueriesQueueManager; use crate::sessions::QueryAffect; -use crate::sessions::QueryEntry; +use crate::sessions::Session; use crate::sessions::SessionType; use crate::sessions::TableContext; @@ -218,6 +218,7 @@ pub struct StageAttachmentConf { #[derive(Debug, Clone)] pub struct ResponseState { + pub schema: Vec, pub running_time_ms: i64, pub progresses: Progresses, pub state: ExecuteStateKind, @@ -259,8 +260,58 @@ pub struct HttpQuery { /// exceed this result_timeout_secs. pub(crate) result_timeout_secs: u64, pub(crate) is_txn_mgr_saved: AtomicBool, - #[allow(dead_code)] - guard: AcquireQueueGuard, +} + +fn try_set_txn( + query_id: &str, + session: &Arc, + session_conf: &HttpSessionConf, + http_query_manager: &Arc, +) -> Result<()> { + match &session_conf.txn_state { + Some(TxnState::Active) => { + if let Some(ServerInfo { id, start_time }) = &session_conf.last_server_info { + if http_query_manager.server_info.id != *id { + return Err(ErrorCode::InvalidSessionState(format!( + "transaction is active, but the request routed to the wrong server: current server is {}, the last is {}.", + http_query_manager.server_info.id, id + ))); + } + if http_query_manager.server_info.start_time != *start_time { + return Err(ErrorCode::CurrentTransactionIsAborted(format!( + "transaction is aborted because server restarted at {}.", + start_time + ))); + } + } else { + return Err(ErrorCode::InvalidSessionState( + "transaction is active but missing server_info".to_string(), + )); + } + + let last_query_id = session_conf.last_query_ids.first().ok_or_else(|| { + ErrorCode::InvalidSessionState( + "transaction is active but last_query_ids is empty".to_string(), + ) + })?; + if let Some(txn_mgr) = http_query_manager.get_txn(last_query_id) { + session.set_txn_mgr(txn_mgr); + info!( + "{}: continue transaction from last query {}", + query_id, last_query_id + ); + } else { + // the returned TxnState should be Fail + return Err(ErrorCode::TransactionTimeout(format!( + "transaction timeout: last_query_id {} not found", + last_query_id + ))); + } + } + Some(TxnState::Fail) => session.txn_mgr().lock().force_set_fail(), + _ => {} + } + Ok(()) } impl HttpQuery { @@ -339,49 +390,8 @@ impl HttpQuery { })?; } } - match &session_conf.txn_state { - Some(TxnState::Active) => { - if let Some(ServerInfo { id, start_time }) = &session_conf.last_server_info { - if http_query_manager.server_info.id != *id { - return Err(ErrorCode::InvalidSessionState(format!( - "transaction is active, but the request routed to the wrong server: current server is {}, the last is {}.", - http_query_manager.server_info.id, id - ))); - } - if http_query_manager.server_info.start_time != *start_time { - return Err(ErrorCode::CurrentTransactionIsAborted(format!( - "transaction is aborted because server restarted at {}.", - start_time - ))); - } - } else { - return Err(ErrorCode::InvalidSessionState( - "transaction is active but missing server_info".to_string(), - )); - } + try_set_txn(&ctx.query_id, &session, session_conf, &http_query_manager)?; - let last_query_id = session_conf.last_query_ids.first().ok_or_else(|| { - ErrorCode::InvalidSessionState( - "transaction is active but last_query_ids is empty".to_string(), - ) - })?; - if let Some(txn_mgr) = http_query_manager.get_txn(last_query_id) { - session.set_txn_mgr(txn_mgr); - info!( - "{}: continue transaction from last query {}", - &ctx.query_id, last_query_id - ); - } else { - // the returned TxnState should be Fail - return Err(ErrorCode::TransactionTimeout(format!( - "transaction timeout: last_query_id {} not found", - last_query_id - ))); - } - } - Some(TxnState::Fail) => session.txn_mgr().lock().force_set_fail(), - _ => {} - } if let Some(secs) = session_conf.keep_server_session_secs { if secs > 0 && request.session_id.is_none() { http_query_manager @@ -436,6 +446,7 @@ impl HttpQuery { }; let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer); + let state = Arc::new(RwLock::new(Executor { query_id: query_id.clone(), state: ExecuteState::Starting(ExecuteStarting { ctx: ctx.clone() }), @@ -446,37 +457,33 @@ impl HttpQuery { let sql = request.sql.clone(); let query_id_clone = query_id.clone(); - let entry = QueryEntry::create(&ctx)?; - let guard = QueriesQueueManager::instance().acquire(entry).await?; - - let (plan, plan_extras) = ExecuteState::plan_sql(&sql, ctx.clone()).await?; - let schema = plan.schema(); - + let http_query_runtime_instance = GlobalQueryRuntime::instance(); let span = if let Some(parent) = SpanContext::current_local_parent() { Span::root(std::any::type_name::(), parent) .with_properties(|| http_ctx.to_minitrace_properties()) } else { Span::noop() }; - - let http_query_runtime_instance = GlobalQueryRuntime::instance(); + let format_settings: Arc>> = Default::default(); + let format_settings_clone = format_settings.clone(); http_query_runtime_instance.runtime().try_spawn( ctx.get_id(), async move { let state = state_clone.clone(); if let Err(e) = ExecuteState::try_start_query( state, - plan, - plan_extras, + sql, session, ctx_clone.clone(), block_sender, + format_settings_clone, ) .await { InterpreterQueryLog::fail_to_start(ctx_clone.clone(), e.clone()); let state = ExecuteStopped { stats: Progresses::default(), + schema: vec![], reason: Err(e.clone()), session_state: ExecutorSessionState::new(ctx_clone.get_current_session()), query_duration_ms: ctx_clone.get_query_duration_ms(), @@ -495,12 +502,10 @@ impl HttpQuery { .in_span(span), )?; - let format_settings = ctx.get_format_settings()?; let data = Arc::new(TokioMutex::new(PageManager::new( query_id.clone(), request.pagination.max_rows_per_page, block_receiver, - schema, format_settings, ))); @@ -510,7 +515,6 @@ impl HttpQuery { node_id, request, state, - guard, page_manager: data, result_timeout_secs, expire_state: Arc::new(TokioMutex::new(ExpireState::Working)), @@ -561,6 +565,7 @@ impl HttpQuery { error: err, warnings: state.get_warnings(), affect: state.get_affect(), + schema: state.get_schema(), } } diff --git a/src/query/service/src/servers/http/v1/query/page_manager.rs b/src/query/service/src/servers/http/v1/query/page_manager.rs index 11f2ddecc3df8..9f39d79f8ed87 100644 --- a/src/query/service/src/servers/http/v1/query/page_manager.rs +++ b/src/query/service/src/servers/http/v1/query/page_manager.rs @@ -13,16 +13,17 @@ // limitations under the License. use std::collections::VecDeque; +use std::sync::Arc; use std::time::Instant; use databend_common_base::base::tokio; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchemaRef; use databend_common_io::prelude::FormatSettings; use log::debug; use log::info; +use parking_lot::RwLock; use serde_json::Value as JsonValue; use crate::servers::http::v1::json_block::block_to_json_value; @@ -53,11 +54,10 @@ pub struct PageManager { total_pages: usize, end: bool, block_end: bool, - schema: DataSchemaRef, last_page: Option, row_buffer: VecDeque>, block_receiver: SizedChannelReceiver, - format_settings: FormatSettings, + format_settings: Arc>>, } impl PageManager { @@ -65,8 +65,7 @@ impl PageManager { query_id: String, max_rows_per_page: usize, block_receiver: SizedChannelReceiver, - schema: DataSchemaRef, - format_settings: FormatSettings, + format_settings: Arc>>, ) -> PageManager { PageManager { query_id, @@ -76,7 +75,6 @@ impl PageManager { end: false, block_end: false, row_buffer: Default::default(), - schema, block_receiver, max_rows_per_page, format_settings, @@ -136,8 +134,11 @@ impl PageManager { block: DataBlock, remain: usize, ) -> Result<()> { - let format_settings = &self.format_settings; - let mut iter = block_to_json_value(&block, format_settings)? + let format_settings = { + let guard = self.format_settings.read(); + guard.as_ref().unwrap().clone() + }; + let mut iter = block_to_json_value(&block, &format_settings)? .into_iter() .peekable(); let chunk: Vec<_> = iter.by_ref().take(remain).collect(); @@ -192,10 +193,7 @@ impl PageManager { } } - let block = JsonBlock { - schema: self.schema.clone(), - data: res, - }; + let block = JsonBlock { data: res }; // try to report 'no more data' earlier to client to avoid unnecessary http call if !self.block_end { diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index c473fdd2be4ee..9aa0f288466d4 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -584,7 +584,7 @@ impl TableContext for QueryContext { } fn get_format_settings(&self) -> Result { - let tz = self.query_settings.get_timezone()?; + let tz = self.get_settings().get_timezone()?; let timezone = tz.parse::().map_err(|_| { ErrorCode::InvalidTimezone("Timezone has been checked and should be valid") })?; diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index a5fea8d41e8e0..6a6e657dca108 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -287,7 +287,7 @@ async fn test_simple_sql() -> Result<()> { assert!(result.error.is_none(), "{:?}", result); assert_eq!(result.data.len(), 0, "{:?}", result); assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result); - assert!(result.schema.is_empty(), "{:?}", result); + // assert!(result.schema.is_empty(), "{:?}", result); assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result); // get page, support retry @@ -491,7 +491,6 @@ async fn test_wait_time_secs() -> Result<()> { assert!(result.error.is_none(), "{:?}", result); assert_eq!(result.data.len(), 0, "{:?}", result); assert_eq!(result.next_uri, Some(next_uri.clone()), "{:?}", result); - assert!(!result.schema.is_empty(), "{:?}", result); let mut uri = make_page_uri(query_id, 0); let mut num_row = 0; @@ -515,7 +514,7 @@ async fn test_wait_time_secs() -> Result<()> { } None => { assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result); - assert!(result.schema.is_empty(), "{:?}", result); + // assert!(result.schema.is_empty(), "{:?}", result); assert_eq!(num_row, 1, "{:?}", result); return Ok(()); diff --git a/src/query/service/tests/it/servers/http/json_block.rs b/src/query/service/tests/it/servers/http/json_block.rs index e855a8aeb25f6..134ee5d591aa9 100644 --- a/src/query/service/tests/it/servers/http/json_block.rs +++ b/src/query/service/tests/it/servers/http/json_block.rs @@ -18,14 +18,10 @@ use databend_common_expression::types::nullable::NullableColumn; use databend_common_expression::types::number::Float64Type; use databend_common_expression::types::number::Int32Type; use databend_common_expression::types::BooleanType; -use databend_common_expression::types::DataType; use databend_common_expression::types::DateType; -use databend_common_expression::types::NumberDataType; use databend_common_expression::types::StringType; use databend_common_expression::Column; use databend_common_expression::DataBlock; -use databend_common_expression::DataField; -use databend_common_expression::DataSchemaRefExt; use databend_common_expression::FromData; use databend_common_io::prelude::FormatSettings; use databend_query::servers::http::v1::json_block::JsonBlock; @@ -40,23 +36,6 @@ where T: Serialize { } fn test_data_block(is_nullable: bool) -> Result<()> { - let schema = match is_nullable { - false => DataSchemaRefExt::create(vec![ - DataField::new("c1", DataType::Number(NumberDataType::Int32)), - DataField::new("c2", DataType::String), - DataField::new("c3", DataType::Boolean), - DataField::new("c4", DataType::Number(NumberDataType::Float64)), - DataField::new("c5", DataType::Date), - ]), - true => DataSchemaRefExt::create(vec![ - DataField::new_nullable("c1", DataType::Number(NumberDataType::Int32)), - DataField::new_nullable("c2", DataType::String), - DataField::new_nullable("c3", DataType::Boolean), - DataField::new_nullable("c4", DataType::Number(NumberDataType::Float64)), - DataField::new_nullable("c5", DataType::Date), - ]), - }; - let mut columns = vec![ Int32Type::from_data(vec![1, 2, 3]), StringType::from_data(vec!["a", "b", "c"]), @@ -80,7 +59,7 @@ fn test_data_block(is_nullable: bool) -> Result<()> { let block = DataBlock::new_from_columns(columns); let format = FormatSettings::default(); - let json_block = JsonBlock::new(schema, &block, &format)?; + let json_block = JsonBlock::new(&block, &format)?; let expect = vec![ vec![val("1"), val("a"), val("1"), val("1.1"), val("1970-01-02")], vec![val("2"), val("b"), val("1"), val("2.2"), val("1970-01-03")], @@ -105,7 +84,7 @@ fn test_data_block_not_nullable() -> Result<()> { fn test_empty_block() -> Result<()> { let block = DataBlock::empty(); let format = FormatSettings::default(); - let json_block = JsonBlock::new(DataSchemaRefExt::create(vec![]), &block, &format)?; + let json_block = JsonBlock::new(&block, &format)?; assert!(json_block.is_empty()); Ok(()) }