Skip to content

Commit

Permalink
Merge pull request #4454 from youngsofun/http
Browse files Browse the repository at this point in the history
query add http api /v1/status
  • Loading branch information
mergify[bot] authored Mar 15, 2022
2 parents 908faae + b2f5874 commit c36243d
Show file tree
Hide file tree
Showing 25 changed files with 353 additions and 35 deletions.
3 changes: 2 additions & 1 deletion query/benches/suites/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use criterion::Criterion;
use databend_query::configs::Config;
use databend_query::interpreters::SelectInterpreter;
use databend_query::sessions::SessionManager;
use databend_query::sessions::SessionType;
use databend_query::sql::PlanParser;
use futures::StreamExt;

Expand All @@ -29,7 +30,7 @@ pub mod bench_sort_query_sql;

pub async fn select_executor(sql: &str) -> Result<()> {
let sessions = SessionManager::from_conf(Config::default()).await?;
let executor_session = sessions.create_session("Benches").await?;
let executor_session = sessions.create_session(SessionType::Test).await?;
let ctx = executor_session.create_query_context().await?;

if let PlanNode::Select(plan) = PlanParser::parse(ctx.clone(), sql).await? {
Expand Down
5 changes: 4 additions & 1 deletion query/src/api/http/v1/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use poem::web::IntoResponse;
use poem::web::Json;

use crate::sessions::SessionManager;
use crate::sessions::SessionType;

// GET /v1/cluster/list
// list all nodes in current databend-query cluster
Expand All @@ -42,7 +43,9 @@ pub async fn cluster_list_handler(
}

async fn list_nodes(sessions: &Arc<SessionManager>) -> Result<Vec<Arc<NodeInfo>>> {
let watch_cluster_session = sessions.create_session("WatchCluster").await?;
let watch_cluster_session = sessions
.create_session(SessionType::HTTPAPI("WatchCluster".to_string()))
.await?;
let watch_cluster_context = watch_cluster_session.create_query_context().await?;
Ok(watch_cluster_context.get_cluster().get_nodes())
}
6 changes: 5 additions & 1 deletion query/src/api/http/v1/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio_stream::StreamExt;

use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::storages::ToReadDataSourcePlan;

// read log files from cfg.log.log_dir
Expand All @@ -42,8 +43,11 @@ pub async fn logs_handler(
}

async fn select_table(sessions: &Arc<SessionManager>) -> Result<Body> {
let session = sessions.create_session("WatchLogs").await?;
let session = sessions
.create_session(SessionType::HTTPAPI("GetStatus".to_string()))
.await?;
let query_context = session.create_query_context().await?;

let mut tracing_table_stream = execute_query(query_context).await?;

let stream = async_stream::try_stream! {
Expand Down
1 change: 1 addition & 0 deletions query/src/api/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pub mod cluster;
pub mod config;
pub mod health;
pub mod logs;
pub mod status;
60 changes: 60 additions & 0 deletions query/src/api/http/v1/status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::SystemTime;

use poem::web::Data;
use poem::web::Json;
use poem::IntoResponse;
use serde::Deserialize;
use serde::Serialize;

use crate::sessions::SessionManager;

#[derive(Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct Status {
pub running_queries_count: u64,
// secs since epoch
pub last_query_started_at: Option<u64>,
// secs since epoch
pub last_query_finished_at: Option<u64>,
// secs since epoch
instance_started_at: u64,
}

fn secs_since_epoch(t: SystemTime) -> u64 {
t.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
}

// lightweight way to get status
// return Status in json
#[poem::handler]
pub async fn status_handler(
sessions_extension: Data<&Arc<SessionManager>>,
) -> poem::Result<impl IntoResponse> {
let status = {
let status = sessions_extension.0.status.read();
status.clone()
};
let status = Status {
running_queries_count: status.running_queries_count,
last_query_started_at: status.last_query_started_at.map(secs_since_epoch),
last_query_finished_at: status.last_query_finished_at.map(secs_since_epoch),
instance_started_at: secs_since_epoch(status.instance_started_at),
};
Ok(Json(status))
}
1 change: 1 addition & 0 deletions query/src/api/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl HttpService {
.at("/v1/health", get(super::http::v1::health::health_handler))
.at("/v1/config", get(super::http::v1::config::config_handler))
.at("/v1/logs", get(super::http::v1::logs::logs_handler))
.at("/v1/status", get(super::http::v1::status::status_handler))
.at(
"/v1/cluster/list",
get(super::http::v1::cluster::cluster_list_handler),
Expand Down
23 changes: 21 additions & 2 deletions query/src/interpreters/interpreter_factory_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::sync::Arc;
use std::time::SystemTime;

use common_exception::Result;
use common_planners::PlanNode;
Expand Down Expand Up @@ -65,10 +66,28 @@ impl Interpreter for InterceptorInterpreter {
}

async fn start(&self) -> Result<()> {
self.query_log.log_start().await
let session = self.ctx.get_current_session();
let now = SystemTime::now();
if session.get_type().is_user_session() {
session
.get_session_manager()
.status
.write()
.query_start(now);
}
self.query_log.log_start(now).await
}

async fn finish(&self) -> Result<()> {
self.query_log.log_finish().await
let session = self.ctx.get_current_session();
let now = SystemTime::now();
if session.get_type().is_user_session() {
session
.get_session_manager()
.status
.write()
.query_finish(now)
}
self.query_log.log_finish(now).await
}
}
10 changes: 4 additions & 6 deletions query/src/interpreters/interpreter_query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ impl InterpreterQueryLog {
Ok(())
}

pub async fn log_start(&self) -> Result<()> {
pub async fn log_start(&self, now: SystemTime) -> Result<()> {
// User.
let handler_type = self.ctx.get_current_session().get_type();
let handler_type = self.ctx.get_current_session().get_type().to_string();
let tenant_id = self.ctx.get_tenant();
let cluster_id = self.ctx.get_config().query.cluster_id;
let user = self.ctx.get_current_user()?;
Expand All @@ -192,7 +192,6 @@ impl InterpreterQueryLog {
let current_database = self.ctx.get_current_database();

// Stats.
let now = SystemTime::now();
let event_time = now
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
Expand Down Expand Up @@ -275,9 +274,9 @@ impl InterpreterQueryLog {
self.write_log(&log_event).await
}

pub async fn log_finish(&self) -> Result<()> {
pub async fn log_finish(&self, now: SystemTime) -> Result<()> {
// User.
let handler_type = self.ctx.get_current_session().get_type();
let handler_type = self.ctx.get_current_session().get_type().to_string();
let tenant_id = self.ctx.get_config().query.tenant_id;
let cluster_id = self.ctx.get_config().query.cluster_id;
let user = self.ctx.get_current_user()?;
Expand All @@ -291,7 +290,6 @@ impl InterpreterQueryLog {
let query_text = self.ctx.get_query_str();

// Stats.
let now = SystemTime::now();
let event_time = now
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
Expand Down
3 changes: 2 additions & 1 deletion query/src/servers/clickhouse/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::servers::clickhouse::reject_connection::RejectCHConnection;
use crate::servers::server::ListeningStream;
use crate::servers::server::Server;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;

pub struct ClickHouseHandler {
sessions: Arc<SessionManager>,
Expand Down Expand Up @@ -88,7 +89,7 @@ impl ClickHouseHandler {

fn accept_socket(sessions: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
executor.spawn(async move {
match sessions.create_session("ClickHouseSession").await {
match sessions.create_session(SessionType::Clickhouse).await {
Err(error) => Self::reject_connection(socket, error).await,
Ok(session) => {
tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr());
Expand Down
3 changes: 2 additions & 1 deletion query/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use serde::Serialize;

use crate::interpreters::InterpreterFactory;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::sql::PlanParser;

#[derive(Serialize, Deserialize, Debug)]
Expand All @@ -55,7 +56,7 @@ pub async fn streaming_load(
) -> PoemResult<Json<LoadResponse>> {
let session_manager = sessions_extension.0;
let session = session_manager
.create_session("Streaming load")
.create_session(SessionType::HTTPStreamingLoad)
.await
.map_err(InternalServerError)?;

Expand Down
5 changes: 4 additions & 1 deletion query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::interpreters::InterpreterFactory;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
use crate::sessions::SessionRef;
use crate::sessions::SessionType;
use crate::sql::PlanParser;

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)]
Expand Down Expand Up @@ -142,7 +143,9 @@ impl ExecuteState {
) -> Result<(Arc<RwLock<Executor>>, DataSchemaRef)> {
let sql = &request.sql;
let start_time = Instant::now();
let session = session_manager.create_session("http-statement").await?;
let session = session_manager
.create_session(SessionType::HTTPQuery)
.await?;
let ctx = session.create_query_context().await?;
if let Some(db) = &request.session.database {
ctx.set_current_database(db.clone()).await?;
Expand Down
3 changes: 2 additions & 1 deletion query/src/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::servers::mysql::reject_connection::RejectConnection;
use crate::servers::server::ListeningStream;
use crate::servers::server::Server;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;

pub struct MySQLHandler {
sessions: Arc<SessionManager>,
Expand Down Expand Up @@ -81,7 +82,7 @@ impl MySQLHandler {

fn accept_socket(sessions: Arc<SessionManager>, executor: Arc<Runtime>, socket: TcpStream) {
executor.spawn(async move {
match sessions.create_session("MySQL").await {
match sessions.create_session(SessionType::MySQL).await {
Err(error) => Self::reject_session(socket, error).await,
Ok(session) => {
tracing::info!("MySQL connection coming: {:?}", socket.peer_addr());
Expand Down
4 changes: 4 additions & 0 deletions query/src/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ mod session_info;
mod session_mgr;
mod session_ref;
mod session_settings;
mod session_type;
mod status;

pub use query_ctx::QueryContext;
pub use query_ctx_shared::QueryContextShared;
Expand All @@ -31,3 +33,5 @@ pub use session_info::ProcessInfo;
pub use session_mgr::SessionManager;
pub use session_ref::SessionRef;
pub use session_settings::Settings;
pub use session_type::SessionType;
pub use status::Status;
8 changes: 5 additions & 3 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ use crate::sessions::QueryContext;
use crate::sessions::QueryContextShared;
use crate::sessions::SessionContext;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::sessions::Settings;

#[derive(Clone, MallocSizeOf)]
pub struct Session {
pub(in crate::sessions) id: String,
pub(in crate::sessions) typ: String,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) typ: SessionType,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) session_mgr: Arc<SessionManager>,
pub(in crate::sessions) ref_count: Arc<AtomicUsize>,
Expand All @@ -50,7 +52,7 @@ impl Session {
pub async fn try_create(
conf: Config,
id: String,
typ: String,
typ: SessionType,
session_mgr: Arc<SessionManager>,
) -> Result<Arc<Session>> {
let session_ctx = Arc::new(SessionContext::try_create(conf.clone())?);
Expand All @@ -72,7 +74,7 @@ impl Session {
self.id.clone()
}

pub fn get_type(self: &Arc<Self>) -> String {
pub fn get_type(self: &Arc<Self>) -> SessionType {
self.typ.clone()
}

Expand Down
7 changes: 4 additions & 3 deletions query/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_meta_types::UserInfo;

use crate::sessions::Session;
use crate::sessions::SessionContext;
use crate::sessions::SessionType;
use crate::sessions::Settings;

pub struct ProcessInfo {
Expand Down Expand Up @@ -56,7 +57,7 @@ impl Session {

ProcessInfo {
id: self.id.clone(),
typ: self.typ.clone(),
typ: self.typ.clone().to_string(),
state: self.process_state(status),
database: status.get_current_database(),
user: status.get_current_user(),
Expand All @@ -78,8 +79,8 @@ impl Session {
}

fn process_extra_info(self: &Arc<Self>, status: &SessionContext) -> Option<String> {
match self.typ.as_str() {
"RPCSession" => Session::rpc_extra_info(status),
match self.typ {
SessionType::FlightRPC => Session::rpc_extra_info(status),
_ => Session::query_extra_info(status),
}
}
Expand Down
Loading

0 comments on commit c36243d

Please sign in to comment.