Skip to content

Commit

Permalink
feat: HTTP handler use cookie to pass session id to and from clients. (โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ#16735)

* feat: HTTP handler use cookie to pass session id to and from clients.

* rename

* adjust session related handlers.

* use cookie to store last access time.

* fix need_sticky flag

* adjust session related handlers.

* update error msg.

* logic test for cookie

* rebase

* fix

* fix test

* fix taplo
  • Loading branch information
youngsofun authored Oct 31, 2024
1 parent ac55f18 commit 41c4dbb
Show file tree
Hide file tree
Showing 22 changed files with 421 additions and 128 deletions.
93 changes: 93 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ parking_lot = "0.12.1"
parquet = { version = "53", features = ["async"] }
paste = "1.0.15"
pin-project-lite = "0.2.9"
poem = { version = "3.0", features = ["openssl-tls", "multipart", "compression"] }
poem = { version = "3.0", features = ["openssl-tls", "multipart", "compression", "cookie"] }
pprof = { version = "0.13.0", features = [
"flamegraph",
"protobuf-codec",
Expand Down Expand Up @@ -342,7 +342,7 @@ tokio-stream = "0.1.11"
tonic = { version = "0.12.3", features = ["transport", "codegen", "prost", "tls-roots", "tls"] }
tonic-build = { version = "0.12.3" }
tonic-reflection = { version = "0.12.3" }
tower = "0.5.1"
tower = { version = "0.5.1", features = ["util"] }
tower-service = "0.3.3"
typetag = "0.2.3"
unicode-segmentation = "1.10.1"
Expand Down
2 changes: 0 additions & 2 deletions src/common/base/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

pub const HEADER_TENANT: &str = "X-DATABEND-TENANT";
pub const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
pub const HEADER_SESSION_ID: &str = "X-DATABEND-SESSION-ID";

pub const HEADER_USER: &str = "X-DATABEND-USER";

pub const HEADER_FUNCTION: &str = "X-DATABEND-FUNCTION";
Expand Down
6 changes: 0 additions & 6 deletions src/query/service/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ pub enum Credential {
NoNeed,
}

impl Credential {
pub fn need_refresh(&self) -> bool {
matches!(self, Credential::DatabendToken { .. })
}
}

impl AuthMgr {
pub fn init(cfg: &InnerConfig) -> Result<()> {
GlobalInstance::set(AuthMgr::create(cfg));
Expand Down
15 changes: 8 additions & 7 deletions src/query/service/src/servers/http/http_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use log::info;
use poem::get;
use poem::listener::OpensslTlsConfig;
use poem::middleware::CatchPanic;
use poem::middleware::CookieJarManager;
use poem::middleware::NormalizePath;
use poem::middleware::TrailingSlash;
use poem::Endpoint;
Expand Down Expand Up @@ -92,13 +93,13 @@ impl HttpHandler {
#[allow(clippy::let_with_type_underscore)]
#[async_backtrace::framed]
async fn build_router(&self, sock: SocketAddr) -> impl Endpoint {
let ep_clickhouse =
Route::new()
.nest("/", clickhouse_router())
.with(HTTPSessionMiddleware::create(
self.kind,
EndpointKind::Clickhouse,
));
let ep_clickhouse = Route::new()
.nest("/", clickhouse_router())
.with(HTTPSessionMiddleware::create(
self.kind,
EndpointKind::Clickhouse,
))
.with(CookieJarManager::new());

let ep_usage = Route::new().at(
"/",
Expand Down
76 changes: 67 additions & 9 deletions src/query/service/src/servers/http/middleware/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL;
use databend_common_base::headers::HEADER_NODE_ID;
use databend_common_base::headers::HEADER_QUERY_ID;
use databend_common_base::headers::HEADER_SESSION_ID;
use databend_common_base::headers::HEADER_STICKY;
use databend_common_base::headers::HEADER_TENANT;
use databend_common_base::headers::HEADER_VERSION;
Expand All @@ -39,13 +38,15 @@ use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
use log::error;
use log::info;
use log::warn;
use opentelemetry::baggage::BaggageExt;
use opentelemetry::propagation::Extractor;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::BaggagePropagator;
use poem::error::ResponseError;
use poem::error::Result as PoemResult;
use poem::web::cookie::Cookie;
use poem::web::Json;
use poem::Addr;
use poem::Endpoint;
Expand All @@ -62,18 +63,24 @@ use crate::clusters::ClusterDiscovery;
use crate::servers::http::error::HttpErrorCode;
use crate::servers::http::error::JsonErrorOnly;
use crate::servers::http::error::QueryError;
use crate::servers::http::v1::unix_ts;
use crate::servers::http::v1::ClientSessionManager;
use crate::servers::http::v1::HttpQueryContext;
use crate::servers::http::v1::SessionClaim;
use crate::servers::HttpHandlerKind;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
const USER_AGENT: &str = "User-Agent";
const TRACE_PARENT: &str = "traceparent";
const COOKIE_LAST_ACCESS_TIME: &str = "last_access_time";
const COOKIE_SESSION_ID: &str = "session_id";
const COOKIE_COOKIE_ENABLED: &str = "cookie_enabled";
#[derive(Debug, Copy, Clone)]
pub enum EndpointKind {
Login,
Logout,
Refresh,
HeartBeat,
StartQuery,
PollQuery,
Clickhouse,
Expand All @@ -91,7 +98,10 @@ impl EndpointKind {
pub fn may_need_sticky(&self) -> bool {
matches!(
self,
EndpointKind::StartQuery | EndpointKind::PollQuery | EndpointKind::Logout
EndpointKind::StartQuery
| EndpointKind::PollQuery
| EndpointKind::Logout
| EndpointKind::HeartBeat
)
}
pub fn require_databend_token_type(&self) -> Result<Option<TokenType>> {
Expand All @@ -102,6 +112,7 @@ impl EndpointKind {
| EndpointKind::PollQuery
| EndpointKind::Logout
| EndpointKind::SystemInfo
| EndpointKind::HeartBeat
| EndpointKind::UploadToStage => {
if GlobalConfig::instance().query.management_mode {
Ok(None)
Expand Down Expand Up @@ -326,10 +337,13 @@ impl<E> HTTPSessionEndpoint<E> {
session.set_current_tenant(tenant);
}

let header_client_session_id = req
.headers()
.get(HEADER_SESSION_ID)
.map(|v| v.to_str().unwrap().to_string());
// cookie_enabled is used to recognize old clients that not support cookie yet.
// for these old clients, there is no session id available, thus can not use temp table.
let cookie_enabled = req.cookie().get(COOKIE_COOKIE_ENABLED).is_some();
let cookie_session_id = req
.cookie()
.get(COOKIE_SESSION_ID)
.map(|s| s.value_str().to_string());
let (user_name, authed_client_session_id) = self
.auth_manager
.auth(
Expand All @@ -338,11 +352,55 @@ impl<E> HTTPSessionEndpoint<E> {
self.endpoint_kind.need_user_info(),
)
.await?;
let client_session_id = authed_client_session_id.or(header_client_session_id);
if let Some(id) = client_session_id.clone() {
session.set_client_session_id(id)

let client_session_id = match (&authed_client_session_id, &cookie_session_id) {
(Some(id1), Some(id2)) => {
if id1 != id2 {
return Err(ErrorCode::AuthenticateFailure(format!(
"session id in token ({}) != session id in cookie({}) ",
id1, id2
)));
}
Some(id1.clone())
}
(Some(id), None) => {
req.cookie()
.add(Cookie::new_with_str(COOKIE_SESSION_ID, id));
Some(id.clone())
}
(None, Some(id)) => Some(id.clone()),
(None, None) => {
if cookie_enabled {
let id = Uuid::new_v4().to_string();
info!("new cookie session id: {}", id);
req.cookie()
.add(Cookie::new_with_str(COOKIE_SESSION_ID, &id));
Some(id)
} else {
None
}
}
};
if let Some(id) = &client_session_id {
session.set_client_session_id(id.clone());
let last_access_time = req
.cookie()
.get(COOKIE_LAST_ACCESS_TIME)
.map(|s| s.value_str().to_string());
if let Some(ts) = &last_access_time {
let ts = ts
.parse::<u64>()
.map_err(|_| ErrorCode::BadArguments(format!("bad last_access_time {}", ts)))?;
ClientSessionManager::instance()
.refresh_state(session.get_current_tenant(), id, &user_name, ts)
.await?;
}
}

let ts = unix_ts().as_secs().to_string();
req.cookie()
.add(Cookie::new_with_str(COOKIE_LAST_ACCESS_TIME, ts));

let session = session_manager.register_session(session)?;

let deduplicate_label = req
Expand Down
Loading

0 comments on commit 41c4dbb

Please sign in to comment.