Skip to content

Commit

Permalink
generate mysql connection id
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed May 18, 2022
1 parent 7a39bda commit cac7f4c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 37 deletions.
82 changes: 57 additions & 25 deletions query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,65 @@ impl Interpreter for KillInterpreter {
.await?;

let id = &self.plan.id;
//TODO: TCeason this code need delete on final pr.
let conn_id = self.ctx.get_current_session().get_mysql_conn_id();
let get_id = self.ctx.get_id_by_mysql_conn_id(&conn_id).await;
match get_id {
Some(get) => {
tracing::info!("=== test get id is {}, plan id is {}", get, id);
}
None => {
tracing::info!("=== test get id is None, plan id is {}", id);
}
}
match id.parse::<u32>() {
Ok(connid) => {
//TODO: TCeason this code need delete on final pr.
tracing::info!("***************");
let map = self
.ctx
.get_current_session()
.get_session_manager()
.mysql_conn_map
.read()
.clone();
for (k, v) in map {
tracing::info!("In kill the map is ({},{})", k.unwrap(), v);
}
tracing::info!("%%%%%%%%%%%%%%%");

match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
))),
Some(kill_session) if self.plan.kill_connection => {
kill_session.force_kill_session();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
Some(kill_session) => {
kill_session.force_kill_query();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
let get_id = self.ctx.get_id_by_mysql_conn_id(&Some(connid)).await;
match get_id {
Some(get) => {
tracing::info!("=== test get id is {}, plan id is {}", get, connid);
match self.ctx.get_session_by_id(&get).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
get
))),
Some(kill_session) if self.plan.kill_connection => {
kill_session.force_kill_session();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
Some(kill_session) => {
kill_session.force_kill_query();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
}
}
None => Err(ErrorCode::UnknownSession(format!(
"MySQL connection id {} not found session id",
connid
))),
}
}
Err(_) => match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
))),
Some(kill_session) if self.plan.kill_connection => {
kill_session.force_kill_session();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
Some(kill_session) => {
kill_session.force_kill_query();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
},
}
}
}
33 changes: 22 additions & 11 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use common_base::infallible::RwLock;
use common_base::mem_allocator::malloc_size;
Expand All @@ -24,6 +26,7 @@ use common_macros::MallocSizeOf;
use common_meta_types::GrantObject;
use common_meta_types::UserInfo;
use common_meta_types::UserPrivilegeType;
use common_tracing::tracing;
use futures::channel::*;
use opendal::Operator;

Expand Down Expand Up @@ -64,18 +67,26 @@ impl Session {
let session_settings = Settings::try_create(&conf)?;
let ref_count = Arc::new(AtomicUsize::new(0));
let status = Arc::new(Default::default());

match typ {
SessionType::MySQL => Ok(Arc::new(Session {
id,
typ,
session_mgr,
ref_count,
session_ctx,
session_settings,
status,
mysql_connection_id: Some(u32::from_le_bytes([0x08, 0x00, 0x00, 0x00])), //MySQL type need generate rand mysql connection id
})),
SessionType::MySQL => {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
// TODO:TCeason use nanos as conn id but can not sure the id is unique.
let conn_id = since_the_epoch.subsec_nanos().to_le();
tracing::info!("=== test the conn_id is {}", conn_id);
Ok(Arc::new(Session {
id,
typ,
session_mgr,
ref_count,
session_ctx,
session_settings,
status,
mysql_connection_id: Some(conn_id), //MySQL type need generate rand mysql connection id
}))
}
_ => Ok(Arc::new(Session {
id,
typ,
Expand Down
10 changes: 9 additions & 1 deletion query/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct SessionManager {
storage_runtime: Arc<Runtime>,
_guards: Vec<WorkerGuard>,
// When typ is MySQL, insert into this map, the id is key, MySQL connection id is val.
pub(in crate::sessions) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
pub(in crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
}

impl SessionManager {
Expand Down Expand Up @@ -185,9 +185,17 @@ impl SessionManager {
SessionType::MySQL => {
let mut conn_id_query_id = self.mysql_conn_map.write();
if conn_id_query_id.len() < self.max_sessions {
// tracing::info!("+++++++");
// for (key, value) in conn_id_query_id.clone() {
// tracing::info!("before insert : ({},{})", key.unwrap(), value);
// }
// force insert
// one session can generate various query id, but the generate is seq.
conn_id_query_id.insert(session.get_mysql_conn_id(), session.get_id());
// for (key, value) in conn_id_query_id.clone() {
// tracing::info!("after insert : ({},{})", key.unwrap(), value);
// }
// tracing::info!("=======");
} else {
return Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded mysql_handler_thread_num config",
Expand Down

0 comments on commit cac7f4c

Please sign in to comment.