Skip to content

Commit

Permalink
refactor: http handler plan sql async. (#14908)
Browse files Browse the repository at this point in the history
* refactor: extract fn try_set_txn.

* refactor: http handler plan sql async.
  • Loading branch information
youngsofun authored Mar 14, 2024
1 parent 91d32cd commit 9b2e39c
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 120 deletions.
5 changes: 2 additions & 3 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct QueryResponseField {
}

impl QueryResponseField {
fn from_schema(schema: DataSchemaRef) -> Vec<Self> {
pub fn from_schema(schema: DataSchemaRef) -> Vec<Self> {
schema
.fields()
.iter()
Expand Down Expand Up @@ -172,7 +172,6 @@ impl QueryResponse {
metrics_incr_http_response_errors_count(err.name(), err.code());
}

let schema = data.schema().clone();
let session_id = r.session_id.clone();
let stats = QueryStats {
progresses: state.progresses.clone(),
Expand All @@ -183,7 +182,7 @@ impl QueryResponse {
Json(QueryResponse {
data: data.into(),
state: state.state,
schema: QueryResponseField::from_schema(schema),
schema: state.schema.clone(),
session_id: Some(session_id),
node_id: r.node_id,
session: r.session,
Expand Down
18 changes: 3 additions & 15 deletions src/query/service/src/servers/http/v1/json_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ use std::sync::Arc;
use databend_common_exception::Result;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_formats::field_encoder::FieldEncoderValues;
use databend_common_io::prelude::FormatSettings;
use serde_json::Value as JsonValue;

#[derive(Debug, Clone)]
pub struct JsonBlock {
pub(crate) data: Vec<Vec<JsonValue>>,
pub(crate) schema: DataSchemaRef,
}

pub type JsonBlockRef = Arc<JsonBlock>;
Expand Down Expand Up @@ -63,27 +60,22 @@ pub fn block_to_json_value(

impl JsonBlock {
pub fn empty() -> Self {
Self {
data: vec![],
schema: Arc::new(DataSchema::empty()),
}
Self { data: vec![] }
}

pub fn new(schema: DataSchemaRef, block: &DataBlock, format: &FormatSettings) -> Result<Self> {
pub fn new(block: &DataBlock, format: &FormatSettings) -> Result<Self> {
Ok(JsonBlock {
data: block_to_json_value(block, format)?,
schema,
})
}

pub fn concat(blocks: Vec<JsonBlock>) -> Self {
if blocks.is_empty() {
return Self::empty();
}
let schema = blocks[0].schema.clone();
let results = blocks.into_iter().map(|b| b.data).collect::<Vec<_>>();
let data = results.concat();
Self { data, schema }
Self { data }
}

pub fn num_rows(&self) -> usize {
Expand All @@ -97,10 +89,6 @@ impl JsonBlock {
pub fn data(&self) -> &Vec<Vec<JsonValue>> {
&self.data
}

pub fn schema(&self) -> &DataSchemaRef {
&self.schema
}
}

impl From<JsonBlock> for Vec<Vec<JsonValue>> {
Expand Down
38 changes: 35 additions & 3 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_expression::BlockEntry;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Scalar;
use databend_common_io::prelude::FormatSettings;
use databend_common_settings::Settings;
use databend_common_sql::plans::Plan;
use databend_common_sql::PlanExtras;
Expand All @@ -41,9 +42,13 @@ use ExecuteState::*;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterQueryLog;
use crate::servers::http::v1::http_query_handlers::QueryResponseField;
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryAffect;
use crate::sessions::QueryContext;
use crate::sessions::QueryEntry;
use crate::sessions::Session;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -106,9 +111,13 @@ pub struct ExecuteRunning {
session: Arc<Session>,
// mainly used to get progress for now
ctx: Arc<QueryContext>,
schema: Vec<QueryResponseField>,
#[allow(dead_code)]
queue_guard: AcquireQueueGuard,
}

pub struct ExecuteStopped {
pub schema: Vec<QueryResponseField>,
pub stats: Progresses,
pub affect: Option<QueryAffect>,
pub reason: Result<()>,
Expand Down Expand Up @@ -147,6 +156,13 @@ impl ExecutorSessionState {
}

impl Executor {
pub fn get_schema(&self) -> Vec<QueryResponseField> {
match &self.state {
Starting(_) => Default::default(),
Running(r) => r.schema.clone(),
Stopped(f) => f.schema.clone(),
}
}
pub fn get_progress(&self) -> Progresses {
match &self.state {
Starting(_) => Default::default(),
Expand Down Expand Up @@ -230,6 +246,7 @@ impl Executor {
}
guard.state = Stopped(Box::new(ExecuteStopped {
stats: Default::default(),
schema: vec![],
reason,
session_state: ExecutorSessionState::new(s.ctx.get_current_session()),
query_duration_ms: s.ctx.get_query_duration_ms(),
Expand All @@ -254,6 +271,7 @@ impl Executor {

guard.state = Stopped(Box::new(ExecuteStopped {
stats: Progresses::from_context(&r.ctx),
schema: r.schema.clone(),
reason,
session_state: ExecutorSessionState::new(r.ctx.get_current_session()),
query_duration_ms: r.ctx.get_query_duration_ms(),
Expand Down Expand Up @@ -281,17 +299,31 @@ impl ExecuteState {
#[async_backtrace::framed]
pub(crate) async fn try_start_query(
executor: Arc<RwLock<Executor>>,
plan: Plan,
extras: PlanExtras,
sql: String,
session: Arc<Session>,
ctx: Arc<QueryContext>,
block_sender: SizedChannelSender<DataBlock>,
format_settings: Arc<parking_lot::RwLock<Option<FormatSettings>>>,
) -> Result<()> {
ctx.attach_query_str(plan.kind(), extras.statement.to_mask_sql());
let entry = QueryEntry::create(&ctx)?;
let queue_guard = QueriesQueueManager::instance().acquire(entry).await?;

let (plan, plan_extras) = ExecuteState::plan_sql(&sql, ctx.clone())
.await
.map_err(|err| err.display_with_sql(&sql))?;

ctx.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql());
{
// set_var may change settings
let mut guard = format_settings.write();
*guard = Some(ctx.get_format_settings()?);
}
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
let running_state = ExecuteRunning {
session,
ctx: ctx.clone(),
queue_guard,
schema: QueryResponseField::from_schema(plan.schema()),
};
info!("{}: http query change state to Running", &ctx.get_id());
Executor::start_to_running(&executor, Running(running_state)).await;
Expand Down
Loading

0 comments on commit 9b2e39c

Please sign in to comment.