diff --git a/docs/doc/30-reference/30-sql/60-kill/kill-query.md b/docs/doc/30-reference/30-sql/60-kill/kill-query.md index 6be95613fadd2..c3278f9124f2f 100644 --- a/docs/doc/30-reference/30-sql/60-kill/kill-query.md +++ b/docs/doc/30-reference/30-sql/60-kill/kill-query.md @@ -7,7 +7,7 @@ Attempts to forcibly terminate the currently running queries. ## Syntax ``` -KILL QUERY|CONNECTION +KILL QUERY|CONNECTION ``` ## Examples diff --git a/query/src/interpreters/interpreter_kill.rs b/query/src/interpreters/interpreter_kill.rs index d4de47ce7c01f..daa5a629781f9 100644 --- a/query/src/interpreters/interpreter_kill.rs +++ b/query/src/interpreters/interpreter_kill.rs @@ -36,6 +36,25 @@ impl KillInterpreter { pub fn try_create(ctx: Arc, plan: KillPlan) -> Result { Ok(Arc::new(KillInterpreter { ctx, plan })) } + + async fn execute_kill(&self, session_id: &String) -> Result { + match self.ctx.get_session_by_id(session_id).await { + None => Err(ErrorCode::UnknownSession(format!( + "Not found session id {}", + session_id + ))), + Some(kill_session) if self.plan.kill_connection => { + kill_session.force_kill_session(); + let schema = Arc::new(DataSchema::empty()); + Ok(Box::pin(DataBlockStream::create(schema, None, vec![]))) + } + Some(kill_session) => { + kill_session.force_kill_query(); + let schema = Arc::new(DataSchema::empty()); + Ok(Box::pin(DataBlockStream::create(schema, None, vec![]))) + } + } + } } #[async_trait::async_trait] @@ -54,21 +73,23 @@ impl Interpreter for KillInterpreter { .await?; let id = &self.plan.id; - match self.ctx.get_session_by_id(id).await { - None => Err(ErrorCode::UnknownSession(format!( - "Not found session id {}", - id - ))), - Some(kill_session) if self.plan.kill_connection => { - kill_session.force_kill_session(); - let schema = Arc::new(DataSchema::empty()); - Ok(Box::pin(DataBlockStream::create(schema, None, vec![]))) - } - Some(kill_session) => { - kill_session.force_kill_query(); - let schema = Arc::new(DataSchema::empty()); - Ok(Box::pin(DataBlockStream::create(schema, None, vec![]))) + // If press Ctrl + C, MySQL Client will create a new session and send query + // `kill query mysql_connection_id` to server. + // the type of connection_id is u32, if parse success get session by connection_id, + // otherwise use the session_id. + // More info Link to: https://github.com/datafuselabs/databend/discussions/5405. + match id.parse::() { + Ok(mysql_conn_id) => { + let session_id = self.ctx.get_id_by_mysql_conn_id(&Some(mysql_conn_id)).await; + match session_id { + Some(get) => self.execute_kill(&get).await, + None => Err(ErrorCode::UnknownSession(format!( + "MySQL connection id {} not found session id", + mysql_conn_id + ))), + } } + Err(_) => self.execute_kill(id).await, } } } diff --git a/query/src/servers/mysql/mysql_interactive_worker.rs b/query/src/servers/mysql/mysql_interactive_worker.rs index 370964f7cfe20..d30ca0d6ca51c 100644 --- a/query/src/servers/mysql/mysql_interactive_worker.rs +++ b/query/src/servers/mysql/mysql_interactive_worker.rs @@ -71,7 +71,13 @@ impl AsyncMysqlShim for InteractiveWorker } fn connect_id(&self) -> u32 { - u32::from_le_bytes([0x08, 0x00, 0x00, 0x00]) + match self.session.get_mysql_conn_id() { + Some(conn_id) => conn_id, + None => { + //default conn id + u32::from_le_bytes([0x08, 0x00, 0x00, 0x00]) + } + } } fn default_auth_plugin(&self) -> &str { diff --git a/query/src/sessions/query_ctx.rs b/query/src/sessions/query_ctx.rs index a4d1c2a9b281c..0f633235b962b 100644 --- a/query/src/sessions/query_ctx.rs +++ b/query/src/sessions/query_ctx.rs @@ -351,6 +351,18 @@ impl QueryContext { .await } + // Get session id by mysql connection id. + pub async fn get_id_by_mysql_conn_id( + self: &Arc, + conn_id: &Option, + ) -> Option { + self.shared + .session + .get_session_manager() + .get_id_by_mysql_conn_id(conn_id) + .await + } + // Get all the processes list info. pub async fn get_processes_info(self: &Arc) -> Vec { self.shared diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index 80b56e2546cdc..dba8c2275f019 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -50,6 +50,7 @@ pub struct Session { session_settings: Settings, #[ignore_malloc_size_of = "insignificant"] status: Arc>, + pub(in crate::sessions) mysql_connection_id: Option, } impl Session { @@ -58,12 +59,12 @@ impl Session { id: String, typ: SessionType, session_mgr: Arc, + mysql_connection_id: Option, ) -> Result> { let session_ctx = Arc::new(SessionContext::try_create(conf.clone())?); let session_settings = Settings::try_create(&conf)?; let ref_count = Arc::new(AtomicUsize::new(0)); let status = Arc::new(Default::default()); - Ok(Arc::new(Session { id, typ: RwLock::new(typ), @@ -72,9 +73,14 @@ impl Session { session_ctx, session_settings, status, + mysql_connection_id, })) } + pub fn get_mysql_conn_id(self: &Arc) -> Option { + self.mysql_connection_id + } + pub fn get_id(self: &Arc) -> String { self.id.clone() } diff --git a/query/src/sessions/session_info.rs b/query/src/sessions/session_info.rs index 2b85e71d33a0f..8c76e1839c198 100644 --- a/query/src/sessions/session_info.rs +++ b/query/src/sessions/session_info.rs @@ -36,6 +36,7 @@ pub struct ProcessInfo { pub memory_usage: i64, pub dal_metrics: Option, pub scan_progress_value: Option, + pub mysql_connection_id: Option, } impl Session { @@ -67,6 +68,7 @@ impl Session { memory_usage, dal_metrics: Session::query_dal_metrics(status), scan_progress_value: Session::query_scan_progress_value(status), + mysql_connection_id: self.mysql_connection_id, } } diff --git a/query/src/sessions/session_mgr.rs b/query/src/sessions/session_mgr.rs index 7fc2edcd5c4ba..ca868369b922a 100644 --- a/query/src/sessions/session_mgr.rs +++ b/query/src/sessions/session_mgr.rs @@ -14,6 +14,9 @@ use std::collections::HashMap; use std::future::Future; +use std::ops::DerefMut; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -59,6 +62,9 @@ pub struct SessionManager { storage_operator: RwLock, storage_runtime: Arc, _guards: Vec, + // When typ is MySQL, insert into this map, key is id, val is MySQL connection id. + pub(crate) mysql_conn_map: Arc, String>>>, + pub(in crate::sessions) mysql_basic_conn_id: AtomicU32, } impl SessionManager { @@ -93,6 +99,7 @@ impl SessionManager { } else { (Vec::new(), None) }; + let mysql_conn_map = Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))); Ok(Arc::new(SessionManager { conf: RwLock::new(conf), @@ -107,6 +114,8 @@ impl SessionManager { storage_operator: RwLock::new(storage_operator), storage_runtime: Arc::new(storage_runtime), _guards, + mysql_conn_map, + mysql_basic_conn_id: AtomicU32::new(9_u32.to_le() as u32), })) } @@ -145,17 +154,34 @@ impl SessionManager { let sessions = self.active_sessions.read(); if sessions.len() == self.max_sessions { return Err(ErrorCode::TooManyUserConnections( - "The current accept connection has exceeded mysql_handler_thread_num config", + "The current accept connection has exceeded max_active_sessions config", )); } } - let session = Session::try_create( - config.clone(), - uuid::Uuid::new_v4().to_string(), - typ, - self.clone(), - ) - .await?; + let id = uuid::Uuid::new_v4().to_string(); + let session_typ = typ.clone(); + let mut mysql_conn_id = None; + match session_typ { + SessionType::MySQL => { + let mut conn_id_session_id = self.mysql_conn_map.write(); + mysql_conn_id = Some(self.mysql_basic_conn_id.fetch_add(1, Ordering::Relaxed)); + if conn_id_session_id.len() < self.max_sessions { + conn_id_session_id.insert(mysql_conn_id, id.clone()); + } else { + return Err(ErrorCode::TooManyUserConnections( + "The current accept connection has exceeded max_active_sessions config", + )); + } + } + _ => { + tracing::debug!( + "session type is {}, mysql_conn_map no need to change.", + session_typ + ); + } + } + let session = + Session::try_create(config.clone(), id, typ, self.clone(), mysql_conn_id).await?; let mut sessions = self.active_sessions.write(); if sessions.len() < self.max_sessions { @@ -170,7 +196,7 @@ impl SessionManager { Ok(SessionRef::create(session)) } else { Err(ErrorCode::TooManyUserConnections( - "The current accept connection has exceeded mysql_handler_thread_num config", + "The current accept connection has exceeded max_active_sessions config", )) } } @@ -195,6 +221,7 @@ impl SessionManager { id.clone(), SessionType::FlightRPC, self.clone(), + None, ) .await?; @@ -226,6 +253,15 @@ impl SessionManager { .map(|session| SessionRef::create(session.clone())) } + #[allow(clippy::ptr_arg)] + pub async fn get_id_by_mysql_conn_id( + self: &Arc, + mysql_conn_id: &Option, + ) -> Option { + let sessions = self.mysql_conn_map.read(); + sessions.get(mysql_conn_id).cloned() + } + #[allow(clippy::ptr_arg)] pub fn destroy_session(self: &Arc, session_id: &String) { let config = self.get_conf(); @@ -237,6 +273,13 @@ impl SessionManager { let mut sessions = self.active_sessions.write(); sessions.remove(session_id); + //also need remove mysql_conn_map + let mut mysql_conns_map = self.mysql_conn_map.write(); + for (k, v) in mysql_conns_map.deref_mut().clone() { + if &v == session_id { + mysql_conns_map.remove(&k); + } + } } pub fn graceful_shutdown( diff --git a/query/src/sql/parsers/parser_kill.rs b/query/src/sql/parsers/parser_kill.rs index 18ed16feabaf6..c819eedf5cd2e 100644 --- a/query/src/sql/parsers/parser_kill.rs +++ b/query/src/sql/parsers/parser_kill.rs @@ -15,7 +15,9 @@ // Borrow from apache/arrow/rust/datafusion/src/sql/sql_parser // See notice.md +use sqlparser::ast::Ident; use sqlparser::parser::ParserError; +use sqlparser::tokenizer::Token; use crate::sql::statements::DfKillStatement; use crate::sql::DfParser; @@ -34,9 +36,25 @@ impl<'a> DfParser<'a> { // Parse 'KILL statement'. fn parse_kill(&mut self) -> Result, ParserError> { - Ok(DfStatement::KillStatement(DfKillStatement { - object_id: self.parser.parse_identifier()?, - kill_query: KILL_QUERY, - })) + let token = self.parser.next_token(); + match &token { + Token::Word(w) => Ok(DfStatement::KillStatement(DfKillStatement { + object_id: w.to_ident(), + kill_query: KILL_QUERY, + })), + // Sometimes MySQL Client could send `kill query connect_id` + // and the connect_id is a number, so we parse it as a SingleQuotedString. + Token::SingleQuotedString(s) | Token::Number(s, _) => { + Ok(DfStatement::KillStatement(DfKillStatement { + object_id: Ident::with_quote('\'', s), + kill_query: KILL_QUERY, + })) + } + Token::BackQuotedString(s) => Ok(DfStatement::KillStatement(DfKillStatement { + object_id: Ident::with_quote('`', s), + kill_query: KILL_QUERY, + })), + _ => self.expected("identifier", token), + } } } diff --git a/query/src/storages/system/processes_table.rs b/query/src/storages/system/processes_table.rs index f85bfba2d185d..e8639d1e04dd9 100644 --- a/query/src/storages/system/processes_table.rs +++ b/query/src/storages/system/processes_table.rs @@ -57,6 +57,7 @@ impl AsyncSystemTable for ProcessesTable { let mut processes_dal_metrics_write_bytes = Vec::with_capacity(processes_info.len()); let mut processes_scan_progress_read_rows = Vec::with_capacity(processes_info.len()); let mut processes_scan_progress_read_bytes = Vec::with_capacity(processes_info.len()); + let mut processes_mysql_connection_id = Vec::with_capacity(processes_info.len()); for process_info in &processes_info { processes_id.push(process_info.id.clone().into_bytes()); @@ -77,6 +78,7 @@ impl AsyncSystemTable for ProcessesTable { ProcessesTable::process_scan_progress_values(&process_info.scan_progress_value); processes_scan_progress_read_rows.push(scan_progress_read_rows); processes_scan_progress_read_bytes.push(scan_progress_read_bytes); + processes_mysql_connection_id.push(process_info.mysql_connection_id); } Ok(DataBlock::create(self.table_info.schema(), vec![ @@ -92,6 +94,7 @@ impl AsyncSystemTable for ProcessesTable { Series::from_data(processes_dal_metrics_write_bytes), Series::from_data(processes_scan_progress_read_rows), Series::from_data(processes_scan_progress_read_bytes), + Series::from_data(processes_mysql_connection_id), ])) } } @@ -111,6 +114,7 @@ impl ProcessesTable { DataField::new_nullable("dal_metrics_write_bytes", u64::to_data_type()), DataField::new_nullable("scan_progress_read_rows", u64::to_data_type()), DataField::new_nullable("scan_progress_read_bytes", u64::to_data_type()), + DataField::new_nullable("mysql_connection_id", u32::to_data_type()), ]); let table_info = TableInfo { diff --git a/query/tests/it/servers/mysql/mysql_handler.rs b/query/tests/it/servers/mysql/mysql_handler.rs index e82e2f45d7139..d00b4b841c152 100644 --- a/query/tests/it/servers/mysql/mysql_handler.rs +++ b/query/tests/it/servers/mysql/mysql_handler.rs @@ -62,7 +62,7 @@ async fn test_rejected_session_with_sequence() -> Result<()> { Ok(_) => panic!("Expected rejected connection"), Err(error) => { assert_eq!(error.code(), 1067); - assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded mysql_handler_thread_num config'"); + assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded max_active_sessions config'"); } }; @@ -99,7 +99,7 @@ async fn test_rejected_session_with_parallel() -> Result<()> { Err(error) => { destroy_barrier.wait().await; assert_eq!(error.code(), 1067); - assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded mysql_handler_thread_num config'"); + assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded max_active_sessions config'"); CreateServerResult::Rejected } } diff --git a/query/tests/it/sessions/session.rs b/query/tests/it/sessions/session.rs index 3a1f38f5d7835..aa4fcd2f38251 100644 --- a/query/tests/it/sessions/session.rs +++ b/query/tests/it/sessions/session.rs @@ -30,6 +30,7 @@ async fn test_session() -> Result<()> { String::from("test-001"), SessionType::Dummy, session_manager, + None, ) .await?; @@ -75,6 +76,7 @@ async fn test_session_in_management_mode() -> Result<()> { String::from("test-001"), SessionType::Dummy, session_manager, + None, ) .await?; diff --git a/query/tests/it/sessions/session_setting.rs b/query/tests/it/sessions/session_setting.rs index be26c7be0ee79..4a6d1c977ec5e 100644 --- a/query/tests/it/sessions/session_setting.rs +++ b/query/tests/it/sessions/session_setting.rs @@ -29,6 +29,7 @@ async fn test_session_setting() -> Result<()> { String::from("test-001"), SessionType::Dummy, session_manager, + None, ) .await?; diff --git a/scripts/ci/ci-run-stateful-tests-standalone-s3.sh b/scripts/ci/ci-run-stateful-tests-standalone-s3.sh index 801f73095c915..536a0caf7674a 100755 --- a/scripts/ci/ci-run-stateful-tests-standalone-s3.sh +++ b/scripts/ci/ci-run-stateful-tests-standalone-s3.sh @@ -15,6 +15,9 @@ export STORAGE_S3_ENDPOINT_URL=http://127.0.0.1:9900 export STORAGE_S3_ACCESS_KEY_ID=minioadmin export STORAGE_S3_SECRET_ACCESS_KEY=minioadmin +echo "Install dependence" +python3 -m pip install --quiet mysql-connector-python + echo "calling test suite" echo "Starting standalone DatabendQuery(debug)" ./scripts/ci/deploy/databend-query-standalone.sh diff --git a/tests/suites/1_stateful/02_query/02_0000_kill_query.py b/tests/suites/1_stateful/02_query/02_0000_kill_query.py new file mode 100755 index 0000000000000..1afd61ab6c689 --- /dev/null +++ b/tests/suites/1_stateful/02_query/02_0000_kill_query.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import os +import time +import mysql.connector +import sys +import signal + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, '../../../helpers')) + +from native_client import NativeClient +from native_client import prompt + +log = None + +with NativeClient(name='client1>') as client1: + client1.expect(prompt) + client1.send('SET enable_new_processor_framework=0;') + client1.expect('') + client1.send('SELECT * FROM numbers(999999999);') + client1.expect(prompt) + +with NativeClient(name='client2>') as client2: + client2.expect(prompt) + # client1 send long query, client select the long query and kill it + # Use to mock in MySQL Client press Ctrl C to intercept a long query. + mydb = mysql.connector.connect(host="127.0.0.1", + user="root", + passwd="root", + port="3307") + mycursor = mydb.cursor() + mycursor.execute("SELECT mysql_connection_id FROM system.processes WHERE extra_info LIKE '%select * from numbers(999999999)%'") + res = mycursor.fetchone() + kill_query = 'kill query ' + str(res[0]) + ';' + client2.send(kill_query) + client2.expect(prompt) + time.sleep(5) + mycursor.execute("SELECT * FROM system.processes WHERE extra_info LIKE '%select * from numbers(999999999)%' AND extra_info NOT LIKE '%system.processes%'") + res = mycursor.fetchone() + assert res is None diff --git a/tests/suites/1_stateful/02_query/02_0000_kill_query.result b/tests/suites/1_stateful/02_query/02_0000_kill_query.result new file mode 100755 index 0000000000000..e69de29bb2d1d