Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Handler Kill Query #5448

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/doc/30-reference/30-sql/60-kill/kill-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Attempts to forcibly terminate the currently running queries.
## Syntax

```
KILL QUERY|CONNECTION <query_id>
KILL QUERY|CONNECTION <session_id>
```

## Examples
Expand Down
49 changes: 35 additions & 14 deletions query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ impl KillInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: KillPlan) -> Result<InterpreterPtr> {
Ok(Arc::new(KillInterpreter { ctx, plan }))
}

async fn execute_kill(&self, session_id: &String) -> Result<SendableDataBlockStream> {
match self.ctx.get_session_by_id(session_id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
session_id
))),
Some(kill_session) if self.plan.kill_connection => {
kill_session.force_kill_session();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
Some(kill_session) => {
kill_session.force_kill_query();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
}
}
}

#[async_trait::async_trait]
Expand All @@ -54,21 +73,23 @@ impl Interpreter for KillInterpreter {
.await?;

let id = &self.plan.id;
match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
))),
Some(kill_session) if self.plan.kill_connection => {
kill_session.force_kill_session();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
Some(kill_session) => {
kill_session.force_kill_query();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
// If press Ctrl + C, MySQL Client will create a new session and send query
// `kill query mysql_connection_id` to server.
// the type of connection_id is u32, if parse success get session by connection_id,
// otherwise use the session_id.
// More info Link to: https://github.com/datafuselabs/databend/discussions/5405.
match id.parse::<u32>() {
Ok(mysql_conn_id) => {
let session_id = self.ctx.get_id_by_mysql_conn_id(&Some(mysql_conn_id)).await;
match session_id {
Some(get) => self.execute_kill(&get).await,
None => Err(ErrorCode::UnknownSession(format!(
"MySQL connection id {} not found session id",
mysql_conn_id
))),
}
}
Err(_) => self.execute_kill(id).await,
}
}
}
8 changes: 7 additions & 1 deletion query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ impl<W: std::io::Write + Send + Sync> AsyncMysqlShim<W> for InteractiveWorker<W>
}

fn connect_id(&self) -> u32 {
u32::from_le_bytes([0x08, 0x00, 0x00, 0x00])
match self.session.get_mysql_conn_id() {
Some(conn_id) => conn_id,
None => {
//default conn id
u32::from_le_bytes([0x08, 0x00, 0x00, 0x00])
}
}
}

fn default_auth_plugin(&self) -> &str {
Expand Down
12 changes: 12 additions & 0 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ impl QueryContext {
.await
}

// Get session id by mysql connection id.
pub async fn get_id_by_mysql_conn_id(
self: &Arc<Self>,
conn_id: &Option<u32>,
) -> Option<String> {
self.shared
.session
.get_session_manager()
.get_id_by_mysql_conn_id(conn_id)
.await
}

// Get all the processes list info.
pub async fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared
Expand Down
8 changes: 7 additions & 1 deletion query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct Session {
session_settings: Settings,
#[ignore_malloc_size_of = "insignificant"]
status: Arc<RwLock<SessionStatus>>,
pub(in crate::sessions) mysql_connection_id: Option<u32>,
}

impl Session {
Expand All @@ -58,12 +59,12 @@ impl Session {
id: String,
typ: SessionType,
session_mgr: Arc<SessionManager>,
mysql_connection_id: Option<u32>,
) -> Result<Arc<Session>> {
let session_ctx = Arc::new(SessionContext::try_create(conf.clone())?);
let session_settings = Settings::try_create(&conf)?;
let ref_count = Arc::new(AtomicUsize::new(0));
let status = Arc::new(Default::default());

Ok(Arc::new(Session {
id,
typ: RwLock::new(typ),
Expand All @@ -72,9 +73,14 @@ impl Session {
session_ctx,
session_settings,
status,
mysql_connection_id,
}))
}

pub fn get_mysql_conn_id(self: &Arc<Self>) -> Option<u32> {
self.mysql_connection_id
}

pub fn get_id(self: &Arc<Self>) -> String {
self.id.clone()
}
Expand Down
2 changes: 2 additions & 0 deletions query/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct ProcessInfo {
pub memory_usage: i64,
pub dal_metrics: Option<DalMetrics>,
pub scan_progress_value: Option<ProgressValues>,
pub mysql_connection_id: Option<u32>,
}

impl Session {
Expand Down Expand Up @@ -67,6 +68,7 @@ impl Session {
memory_usage,
dal_metrics: Session::query_dal_metrics(status),
scan_progress_value: Session::query_scan_progress_value(status),
mysql_connection_id: self.mysql_connection_id,
}
}

Expand Down
61 changes: 52 additions & 9 deletions query/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

use std::collections::HashMap;
use std::future::Future;
use std::ops::DerefMut;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -59,6 +62,9 @@ pub struct SessionManager {
storage_operator: RwLock<Operator>,
storage_runtime: Arc<Runtime>,
_guards: Vec<WorkerGuard>,
// When typ is MySQL, insert into this map, key is id, val is MySQL connection id.
pub(crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
pub(in crate::sessions) mysql_basic_conn_id: AtomicU32,
}

impl SessionManager {
Expand Down Expand Up @@ -93,6 +99,7 @@ impl SessionManager {
} else {
(Vec::new(), None)
};
let mysql_conn_map = Arc::new(RwLock::new(HashMap::with_capacity(max_sessions)));

Ok(Arc::new(SessionManager {
conf: RwLock::new(conf),
Expand All @@ -107,6 +114,8 @@ impl SessionManager {
storage_operator: RwLock::new(storage_operator),
storage_runtime: Arc::new(storage_runtime),
_guards,
mysql_conn_map,
mysql_basic_conn_id: AtomicU32::new(9_u32.to_le() as u32),
}))
}

Expand Down Expand Up @@ -145,17 +154,34 @@ impl SessionManager {
let sessions = self.active_sessions.read();
if sessions.len() == self.max_sessions {
return Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded mysql_handler_thread_num config",
"The current accept connection has exceeded max_active_sessions config",
));
}
}
let session = Session::try_create(
config.clone(),
uuid::Uuid::new_v4().to_string(),
typ,
self.clone(),
)
.await?;
let id = uuid::Uuid::new_v4().to_string();
let session_typ = typ.clone();
let mut mysql_conn_id = None;
match session_typ {
SessionType::MySQL => {
let mut conn_id_session_id = self.mysql_conn_map.write();
mysql_conn_id = Some(self.mysql_basic_conn_id.fetch_add(1, Ordering::Relaxed));
if conn_id_session_id.len() < self.max_sessions {
conn_id_session_id.insert(mysql_conn_id, id.clone());
} else {
return Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded max_active_sessions config",
));
}
}
_ => {
tracing::debug!(
"session type is {}, mysql_conn_map no need to change.",
session_typ
);
}
}
let session =
Session::try_create(config.clone(), id, typ, self.clone(), mysql_conn_id).await?;

let mut sessions = self.active_sessions.write();
if sessions.len() < self.max_sessions {
Expand All @@ -170,7 +196,7 @@ impl SessionManager {
Ok(SessionRef::create(session))
} else {
Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded mysql_handler_thread_num config",
"The current accept connection has exceeded max_active_sessions config",
))
}
}
Expand All @@ -195,6 +221,7 @@ impl SessionManager {
id.clone(),
SessionType::FlightRPC,
self.clone(),
None,
)
.await?;

Expand Down Expand Up @@ -226,6 +253,15 @@ impl SessionManager {
.map(|session| SessionRef::create(session.clone()))
}

#[allow(clippy::ptr_arg)]
pub async fn get_id_by_mysql_conn_id(
self: &Arc<Self>,
mysql_conn_id: &Option<u32>,
) -> Option<String> {
let sessions = self.mysql_conn_map.read();
sessions.get(mysql_conn_id).cloned()
}

#[allow(clippy::ptr_arg)]
pub fn destroy_session(self: &Arc<Self>, session_id: &String) {
let config = self.get_conf();
Expand All @@ -237,6 +273,13 @@ impl SessionManager {

let mut sessions = self.active_sessions.write();
sessions.remove(session_id);
//also need remove mysql_conn_map
let mut mysql_conns_map = self.mysql_conn_map.write();
for (k, v) in mysql_conns_map.deref_mut().clone() {
if &v == session_id {
mysql_conns_map.remove(&k);
}
}
}

pub fn graceful_shutdown(
Expand Down
26 changes: 22 additions & 4 deletions query/src/sql/parsers/parser_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// Borrow from apache/arrow/rust/datafusion/src/sql/sql_parser
// See notice.md

use sqlparser::ast::Ident;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::Token;

use crate::sql::statements::DfKillStatement;
use crate::sql::DfParser;
Expand All @@ -34,9 +36,25 @@ impl<'a> DfParser<'a> {

// Parse 'KILL statement'.
fn parse_kill<const KILL_QUERY: bool>(&mut self) -> Result<DfStatement<'a>, ParserError> {
Ok(DfStatement::KillStatement(DfKillStatement {
object_id: self.parser.parse_identifier()?,
kill_query: KILL_QUERY,
}))
let token = self.parser.next_token();
match &token {
Token::Word(w) => Ok(DfStatement::KillStatement(DfKillStatement {
object_id: w.to_ident(),
kill_query: KILL_QUERY,
})),
// Sometimes MySQL Client could send `kill query connect_id`
// and the connect_id is a number, so we parse it as a SingleQuotedString.
Token::SingleQuotedString(s) | Token::Number(s, _) => {
Ok(DfStatement::KillStatement(DfKillStatement {
object_id: Ident::with_quote('\'', s),
kill_query: KILL_QUERY,
}))
}
Token::BackQuotedString(s) => Ok(DfStatement::KillStatement(DfKillStatement {
object_id: Ident::with_quote('`', s),
kill_query: KILL_QUERY,
})),
_ => self.expected("identifier", token),
}
}
}
4 changes: 4 additions & 0 deletions query/src/storages/system/processes_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl AsyncSystemTable for ProcessesTable {
let mut processes_dal_metrics_write_bytes = Vec::with_capacity(processes_info.len());
let mut processes_scan_progress_read_rows = Vec::with_capacity(processes_info.len());
let mut processes_scan_progress_read_bytes = Vec::with_capacity(processes_info.len());
let mut processes_mysql_connection_id = Vec::with_capacity(processes_info.len());

for process_info in &processes_info {
processes_id.push(process_info.id.clone().into_bytes());
Expand All @@ -77,6 +78,7 @@ impl AsyncSystemTable for ProcessesTable {
ProcessesTable::process_scan_progress_values(&process_info.scan_progress_value);
processes_scan_progress_read_rows.push(scan_progress_read_rows);
processes_scan_progress_read_bytes.push(scan_progress_read_bytes);
processes_mysql_connection_id.push(process_info.mysql_connection_id);
}

Ok(DataBlock::create(self.table_info.schema(), vec![
Expand All @@ -92,6 +94,7 @@ impl AsyncSystemTable for ProcessesTable {
Series::from_data(processes_dal_metrics_write_bytes),
Series::from_data(processes_scan_progress_read_rows),
Series::from_data(processes_scan_progress_read_bytes),
Series::from_data(processes_mysql_connection_id),
]))
}
}
Expand All @@ -111,6 +114,7 @@ impl ProcessesTable {
DataField::new_nullable("dal_metrics_write_bytes", u64::to_data_type()),
DataField::new_nullable("scan_progress_read_rows", u64::to_data_type()),
DataField::new_nullable("scan_progress_read_bytes", u64::to_data_type()),
DataField::new_nullable("mysql_connection_id", u32::to_data_type()),
]);

let table_info = TableInfo {
Expand Down
4 changes: 2 additions & 2 deletions query/tests/it/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn test_rejected_session_with_sequence() -> Result<()> {
Ok(_) => panic!("Expected rejected connection"),
Err(error) => {
assert_eq!(error.code(), 1067);
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded mysql_handler_thread_num config'");
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded max_active_sessions config'");
}
};

Expand Down Expand Up @@ -99,7 +99,7 @@ async fn test_rejected_session_with_parallel() -> Result<()> {
Err(error) => {
destroy_barrier.wait().await;
assert_eq!(error.code(), 1067);
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded mysql_handler_thread_num config'");
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded max_active_sessions config'");
CreateServerResult::Rejected
}
}
Expand Down
2 changes: 2 additions & 0 deletions query/tests/it/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn test_session() -> Result<()> {
String::from("test-001"),
SessionType::Dummy,
session_manager,
None,
)
.await?;

Expand Down Expand Up @@ -75,6 +76,7 @@ async fn test_session_in_management_mode() -> Result<()> {
String::from("test-001"),
SessionType::Dummy,
session_manager,
None,
)
.await?;

Expand Down
Loading