Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Sep 13, 2024
1 parent cb5610f commit a06fec5
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 121 deletions.
6 changes: 6 additions & 0 deletions src/query/service/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pub enum Credential {
NoNeed,
}

impl Credential {
pub fn need_refresh(&self) -> bool {
matches!(self, Credential::DatabendToken { .. })
}
}

impl AuthMgr {
pub fn init(cfg: &InnerConfig) -> Result<()> {
GlobalInstance::set(AuthMgr::create(cfg));
Expand Down
7 changes: 0 additions & 7 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::ServiceQueryExecutor;
use crate::servers::http::v1::ClientSessionManager;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::stream::DataBlockStream;
use crate::stream::ProgressStream;
use crate::stream::PullingExecutorStream;
Expand Down Expand Up @@ -191,11 +189,6 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
let typ = session.get_type();
if typ.is_user_session() {
SessionManager::instance().status.write().query_finish(now);
if typ == SessionType::HTTPQuery {
if let Some(cid) = session.get_client_session_id() {
ClientSessionManager::instance().on_query_finish(&cid, &session)
}
}
}

if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) {
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ async fn query_page_handler(
let http_query_manager = HttpQueryManager::instance();
match http_query_manager.get_query(&query_id) {
Some(query) => {
if query.user_name != ctx.user_name {
return Err(poem::error::Error::from_string(
format!(
"wrong user, query {} expect {}, got {}",
query_id, query.user_name, ctx.user_name
),
StatusCode::UNAUTHORIZED,
));
}
query.check_client_session_id(&ctx.client_session_id)?;
if let Some(reason) = query.check_removed() {
Err(query_id_removed(&query_id, reason))
Expand Down
56 changes: 27 additions & 29 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use crate::interpreters::InterpreterQueryLog;
use crate::servers::http::v1::http_query_handlers::QueryResponseField;
use crate::servers::http::v1::query::http_query::ResponseState;
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
use crate::servers::http::v1::ClientSessionManager;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
use crate::sessions::QueryAffect;
Expand Down Expand Up @@ -261,26 +260,14 @@ impl Executor {
#[async_backtrace::framed]
pub async fn stop<C>(this: &Arc<RwLock<Executor>>, reason: Result<(), C>) {
let reason = reason.with_context(|| "execution stopped");
let mut guard = this.write().await;

{
let guard = this.read().await;
if let Stopped(s) = &guard.state {
debug!(
"{}: http query already stopped, reason {:?}, new reason {:?}",
&guard.query_id, s.reason, reason
);
return;
} else {
let state = match &guard.state {
Starting(s) => {
info!(
"{}: http query change state to Stopped, reason {:?}",
"{}: http query begin changing state from Staring to Stopped, reason {:?}",
&guard.query_id, reason
);
}
}

let mut guard = this.write().await;
match &guard.state {
Starting(s) => {
if let Err(e) = &reason {
InterpreterQueryLog::log_finish(
&s.ctx,
Expand All @@ -295,38 +282,52 @@ impl Executor {
s.ctx.get_current_session().txn_mgr().lock().set_fail();
}
}
guard.state = Stopped(Box::new(ExecuteStopped {
ExecuteStopped {
stats: Default::default(),
schema: vec![],
has_result_set: None,
reason,
reason: reason.clone(),
session_state: ExecutorSessionState::new(s.ctx.get_current_session()),
query_duration_ms: s.ctx.get_query_duration_ms(),
warnings: s.ctx.pop_warnings(),
affect: Default::default(),
}))
}
}
Running(r) => {
info!(
"{}: http query changing state from Running to Stopped, reason {:?}",
&guard.query_id, reason
);
if let Err(e) = &reason {
if e.code() != ErrorCode::CLOSED_QUERY {
r.session.txn_mgr().lock().set_fail();
}
r.session.force_kill_query(e.clone());
}

guard.state = Stopped(Box::new(ExecuteStopped {
ExecuteStopped {
stats: Progresses::from_context(&r.ctx),
schema: r.schema.clone(),
has_result_set: Some(r.has_result_set),
reason,
reason: reason.clone(),
session_state: ExecutorSessionState::new(r.ctx.get_current_session()),
query_duration_ms: r.ctx.get_query_duration_ms(),
warnings: r.ctx.pop_warnings(),
affect: r.ctx.get_affect(),
}))
}
}
Stopped(_) => {}
}
Stopped(s) => {
debug!(
"{}: http query already stopped, reason {:?}, new reason {:?}",
&guard.query_id, s.reason, reason
);
return;
}
};
info!(
"{}: http query has change state to Stopped, reason {:?}",
&guard.query_id, reason
);
guard.state = Stopped(Box::new(state));
}
}

Expand All @@ -343,9 +344,6 @@ impl ExecuteState {
let make_error = || format!("failed to start query: {sql}");

info!("http query prepare to plan sql");
if let Some(cid) = session.get_client_session_id() {
ClientSessionManager::instance().on_query_start(&cid, &session)
}

// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, extras) = interpreter_plan_sql(ctx.clone(), &sql)
Expand Down
142 changes: 84 additions & 58 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use fastrace::prelude::*;
use http::StatusCode;
use log::info;
use log::warn;
use parking_lot::Mutex;
use poem::web::Json;
use poem::IntoResponse;
use serde::Deserialize;
Expand Down Expand Up @@ -355,7 +356,12 @@ pub struct HttpQuery {
/// should fetch the paginated result in a timely manner, and the interval should not
/// exceed this result_timeout_secs.
pub(crate) result_timeout_secs: u64,

pub(crate) need_refresh: bool,
pub(crate) is_txn_mgr_saved: AtomicBool,

pub(crate) has_temp_table_before_run: bool,
pub(crate) has_temp_table_after_run: Mutex<Option<bool>>,
pub(crate) is_session_handle_refreshed: AtomicBool,
}

Expand All @@ -367,25 +373,7 @@ fn try_set_txn(
) -> Result<()> {
match &session_conf.txn_state {
Some(TxnState::Active) => {
if let Some(ServerInfo { id, start_time }) = &session_conf.last_server_info {
if http_query_manager.server_info.id != *id {
return Err(ErrorCode::InvalidSessionState(format!(
"transaction is active, but the request routed to the wrong server: current server is {}, the last is {}.",
http_query_manager.server_info.id, id
)));
}
if http_query_manager.server_info.start_time != *start_time {
return Err(ErrorCode::CurrentTransactionIsAborted(format!(
"transaction is aborted because server restarted at {}.",
start_time
)));
}
} else {
return Err(ErrorCode::InvalidSessionState(
"transaction is active but missing server_info".to_string(),
));
}

http_query_manager.check_sticky_for_txn(&session_conf.last_server_info)?;
let last_query_id = session_conf.last_query_ids.first().ok_or_else(|| {
ErrorCode::InvalidSessionState(
"transaction is active but last_query_ids is empty".to_string(),
Expand Down Expand Up @@ -419,7 +407,7 @@ impl HttpQuery {
request: HttpQueryRequest,
) -> Result<Arc<HttpQuery>> {
let http_query_manager = HttpQueryManager::instance();

let need_refresh = ctx.credential.need_refresh();
let session = ctx
.upgrade_session(SessionType::HTTPQuery)
.map_err(|err| ErrorCode::Internal(format!("{err}")))?;
Expand Down Expand Up @@ -463,6 +451,11 @@ impl HttpQuery {
}
}
try_set_txn(&ctx.query_id, &session, session_conf, &http_query_manager)?;
if session_conf.need_sticky
&& matches!(session_conf.txn_state, None | Some(TxnState::AutoCommit))
{
http_query_manager.check_sticky_for_temp_table(&session_conf.last_server_info)?;
}
};

let settings = session.get_settings();
Expand Down Expand Up @@ -534,6 +527,13 @@ impl HttpQuery {
let format_settings_clone = format_settings.clone();
let tenant = session.get_current_tenant();
let user_name = session.get_current_user()?.name;

let has_temp_table_before_run = if let Some(cid) = session.get_client_session_id() {
ClientSessionManager::instance().on_query_start(&cid, &session);
true
} else {
false
};
http_query_runtime_instance.runtime().try_spawn(
async move {
let state = state_clone.clone();
Expand Down Expand Up @@ -585,9 +585,15 @@ impl HttpQuery {
state,
page_manager: data,
result_timeout_secs,
expire_state: Arc::new(parking_lot::Mutex::new(ExpireState::Working)),
is_txn_mgr_saved: AtomicBool::new(false),
is_session_handle_refreshed: AtomicBool::new(false),

expire_state: Arc::new(Mutex::new(ExpireState::Working)),

need_refresh,
has_temp_table_before_run,

is_txn_mgr_saved: Default::default(),
has_temp_table_after_run: Default::default(),
is_session_handle_refreshed: Default::default(),
};

Ok(Arc::new(query))
Expand All @@ -612,14 +618,13 @@ impl HttpQuery {
#[async_backtrace::framed]
pub async fn get_response_state_only(&self) -> Result<HttpQueryResponseInternal> {
let state = self.get_state().await;
let session = self.get_response_session().await?;

Ok(HttpQueryResponseInternal {
data: None,
session_id: self.session_id.clone(),
node_id: self.node_id.clone(),
state,
session: Some(session),
session: None,
})
}

Expand All @@ -631,13 +636,6 @@ impl HttpQuery {

#[async_backtrace::framed]
async fn get_response_session(&self) -> Result<HttpSessionConf> {
let keep_server_session_secs = self
.request
.session
.clone()
.map(|v| v.keep_server_session_secs)
.unwrap_or(None);

// reply the updated session state, includes:
// - current_database: updated by USE XXX;
// - role: updated by SET ROLE;
Expand All @@ -662,43 +660,71 @@ impl HttpQuery {
} else {
None
};
let mut need_sticky = false;
let (need_refresh, just_changed) = session_state.temp_tbl_mgr.lock().is_empty();
if let Some(cid) = &self.client_session_id {
if just_changed

if matches!(executor.state, ExecuteState::Stopped(_)) {
if let Some(cid) = &self.client_session_id {
let (has_temp_table_after_run, just_changed) = {
let mut guard = self.has_temp_table_after_run.lock();
match *guard {
None => {
let not_empty = !session_state.temp_tbl_mgr.lock().is_empty().0;
*guard = Some(not_empty);
ClientSessionManager::instance().on_query_finish(
cid,
session_state.temp_tbl_mgr,
!not_empty,
not_empty != self.has_temp_table_before_run,
);
(not_empty, true)
}
Some(v) => (v, false),
}
};

if !self.has_temp_table_before_run
&& has_temp_table_after_run
&& just_changed
&& self
.is_session_handle_refreshed
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
ClientSessionManager::instance()
.refresh_session_handle(
self.tenant.clone(),
self.user_name.to_string(),
cid,
)
.await?;
}
}

if txn_state != TxnState::AutoCommit
&& !self.is_txn_mgr_saved.load(Ordering::Relaxed)
&& self
.is_session_handle_refreshed
.is_txn_mgr_saved
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
ClientSessionManager::instance()
.refresh_session_handle(self.tenant.clone(), self.user_name.to_string(), cid)
.await?;
let timeout = session_state
.settings
.get_idle_transaction_timeout_secs()
.unwrap();
HttpQueryManager::instance()
.add_txn(self.id.clone(), session_state.txn_manager.clone(), timeout)
.await;
}
}
let has_temp_table = (*self.has_temp_table_after_run.lock()).unwrap_or(false);

let need_sticky = txn_state != TxnState::AutoCommit || has_temp_table;
let need_refresh = self.need_refresh || has_temp_table;

if txn_state != TxnState::AutoCommit
&& !self.is_txn_mgr_saved.load(Ordering::Relaxed)
&& matches!(executor.state, ExecuteState::Stopped(_))
&& self
.is_txn_mgr_saved
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
need_sticky = true;
let timeout = session_state
.settings
.get_idle_transaction_timeout_secs()
.unwrap();
HttpQueryManager::instance()
.add_txn(self.id.clone(), session_state.txn_manager.clone(), timeout)
.await;
}
Ok(HttpSessionConf {
database: Some(database),
role,
secondary_roles,
keep_server_session_secs,
keep_server_session_secs: None,
settings: Some(settings),
txn_state: Some(txn_state),
need_sticky,
Expand Down
Loading

0 comments on commit a06fec5

Please sign in to comment.