From f3d8ce08177ca6d97224fa0aecdf642c421517bd Mon Sep 17 00:00:00 2001 From: b41sh Date: Wed, 20 Nov 2024 13:44:15 +0800 Subject: [PATCH] refactor(sqlsmith): refactor sqlsmith using http client --- Cargo.lock | 152 +----------- Cargo.toml | 3 - src/common/http/Cargo.toml | 4 + src/common/http/src/http_client.rs | 231 ++++++++++++++++++ src/common/http/src/lib.rs | 5 + src/query/expression/src/row/row_converter.rs | 18 +- src/tests/sqlsmith/Cargo.toml | 7 +- src/tests/sqlsmith/src/bin/main.rs | 32 ++- src/tests/sqlsmith/src/reducer.rs | 127 +++++----- src/tests/sqlsmith/src/runner.rs | 224 +++++++++-------- src/tests/sqlsmith/src/sql_gen/expr.rs | 115 ++++++++- tests/sqllogictests/Cargo.toml | 5 +- .../src/client/global_cookie_store.rs | 62 ----- tests/sqllogictests/src/client/http_client.rs | 145 ++--------- tests/sqllogictests/src/client/mod.rs | 1 - .../sqllogictests/src/client/mysql_client.rs | 2 +- tests/sqllogictests/src/util.rs | 21 -- 17 files changed, 604 insertions(+), 550 deletions(-) create mode 100644 src/common/http/src/http_client.rs delete mode 100644 tests/sqllogictests/src/client/global_cookie_store.rs diff --git a/Cargo.lock b/Cargo.lock index cc7a1b0ff3b0a..dff23a9065c04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -708,8 +708,6 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "zstd 0.13.2", - "zstd-safe 7.2.1", ] [[package]] @@ -2999,28 +2997,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "databend-client" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0819048a792e2eac58b455bbbcf6077c81e2780b3cc4f565a6e72e92dde56bd1" -dependencies = [ - "async-trait", - "log", - "once_cell", - "parking_lot 0.12.3", - "percent-encoding", - "reqwest", - "serde", - "serde_json", - "tokio", - "tokio-retry", - "tokio-stream", - "tokio-util", - "url", - "uuid", -] - [[package]] name = "databend-codegen" version = "0.1.0" @@ -3284,7 +3260,7 @@ dependencies = [ "backtrace", "bincode 2.0.0-rc.3", "databend-common-ast", - "geozero 0.14.0", + "geozero", "gimli 0.31.1", "http 1.1.0", "libc", @@ -3339,7 +3315,7 @@ dependencies = [ "ethnum", "futures", "geo", - "geozero 0.14.0", + "geozero", "goldenfile", "hex", "hyper-util", @@ -3385,7 +3361,7 @@ dependencies = [ "databend-common-settings", "databend-storages-common-blocks", "databend-storages-common-table-meta", - "geozero 0.14.0", + "geozero", "hex", "jiff", "jsonb", @@ -3426,7 +3402,7 @@ dependencies = [ "ethnum", "geo", "geohash", - "geozero 0.14.0", + "geozero", "goldenfile", "h3o", "hex", @@ -3496,6 +3472,7 @@ name = "databend-common-http" version = "0.1.0" dependencies = [ "anyerror", + "cookie", "databend-common-base", "databend-common-exception", "futures", @@ -3503,9 +3480,12 @@ dependencies = [ "log", "poem", "pretty_assertions", + "reqwest", "serde", + "serde_json", "tempfile", "thiserror", + "url", ] [[package]] @@ -3524,7 +3504,7 @@ dependencies = [ "enumflags2", "ethnum", "geo", - "geozero 0.14.0", + "geozero", "hex", "jiff", "lexical-core", @@ -4677,64 +4657,6 @@ dependencies = [ "ndarray", ] -[[package]] -name = "databend-driver" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbea10312fa203003b572b701b6121cecd8eaf260e7ec7687796b552044541c6" -dependencies = [ - "arrow", - "async-compression 0.4.12", - "async-trait", - "chrono", - "csv", - "databend-client", - "databend-driver-core", - "databend-driver-macros", - "dyn-clone", - "glob", - "log", - "once_cell", - "percent-encoding", - "serde_json", - "tokio", - "tokio-stream", - "url", -] - -[[package]] -name = "databend-driver-core" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1b8f3bf7bb87d0f2fce8a1992eea4a0b442c01165a0d68aab2727a386c945ab" -dependencies = [ - "arrow", - "chrono", - "databend-client", - "geozero 0.13.0", - "glob", - "hex", - "itertools 0.12.1", - "jsonb", - "lexical-core", - "memchr", - "roaring", - "serde", - "serde_json", - "tokio-stream", - "url", -] - -[[package]] -name = "databend-driver-macros" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40bed5c66f36e79baea7c95d2d0dc8433671606ca7152012562c7145e3b909a1" -dependencies = [ - "quote", - "syn 2.0.58", -] - [[package]] name = "databend-enterprise-aggregating-index" version = "0.1.0" @@ -5201,9 +5123,9 @@ version = "0.1.0" dependencies = [ "async-trait", "clap", - "cookie", "databend-common-base", "databend-common-exception", + "databend-common-http", "env_logger 0.11.5", "futures-util", "msql-srv", @@ -5218,7 +5140,6 @@ dependencies = [ "sqlparser 0.50.0", "thiserror", "tokio", - "url", "walkdir", ] @@ -5228,22 +5149,21 @@ version = "0.1.0" dependencies = [ "chrono-tz 0.8.6", "clap", - "databend-client", "databend-common-ast", + "databend-common-exception", "databend-common-expression", "databend-common-formats", "databend-common-functions", + "databend-common-http", "databend-common-io", "databend-common-sql", - "databend-driver", - "databend-driver-core", "ethnum", "itertools 0.13.0", "jiff", "jsonb", "rand", + "serde_json", "tokio", - "tokio-stream", "tracing", "tracing-subscriber", ] @@ -6899,21 +6819,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "geozero" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cd8fb67347739a057fd607b6d8b43ba4ed93619ed84b8f429fa3296f8ae504c" -dependencies = [ - "geo-types", - "geojson", - "log", - "scroll 0.11.0", - "serde_json", - "thiserror", - "wkt 0.10.3", -] - [[package]] name = "geozero" version = "0.14.0" @@ -9731,16 +9636,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "minijinja" version = "0.31.0" @@ -12351,7 +12246,6 @@ dependencies = [ "js-sys", "log", "mime", - "mime_guess", "native-tls", "once_cell", "percent-encoding", @@ -14623,17 +14517,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand", - "tokio", -] - [[package]] name = "tokio-retry2" version = "0.5.6" @@ -15132,15 +15015,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "unicase" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.15" diff --git a/Cargo.toml b/Cargo.toml index d8bafb180e45c..4897a40896c7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -536,9 +536,6 @@ tracing-appender = "0.2.3" tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json", "valuable"] } # Databend Integration Test -databend-client = { version = "0.22" } -databend-driver = { version = "0.22" } -databend-driver-core = { version = "0.22" } msql-srv = "0.11.0" mysql_common = "0.32.4" quickcheck = "1.0" diff --git a/src/common/http/Cargo.toml b/src/common/http/Cargo.toml index 08b910ce7ba5c..293ede66fd94b 100644 --- a/src/common/http/Cargo.toml +++ b/src/common/http/Cargo.toml @@ -15,15 +15,19 @@ memory-profiling = ["tempfile"] [dependencies] anyerror = { workspace = true } +cookie = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } futures = { workspace = true } http = { workspace = true } log = { workspace = true } poem = { workspace = true } +reqwest = { workspace = true, features = ["cookies"] } serde = { workspace = true } +serde_json = { workspace = true } tempfile = { workspace = true, optional = true } thiserror = { workspace = true } +url = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/src/common/http/src/http_client.rs b/src/common/http/src/http_client.rs new file mode 100644 index 0000000000000..786dbbfc1335c --- /dev/null +++ b/src/common/http/src/http_client.rs @@ -0,0 +1,231 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; +use std::time::Duration; + +use cookie::Cookie; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use reqwest::cookie::CookieStore; +use reqwest::header::HeaderMap; +use reqwest::header::HeaderValue; +use reqwest::Client; +use reqwest::ClientBuilder; +use serde::Deserialize; +use serde::Serialize; +use url::Url; + +struct GlobalCookieStore { + cookies: RwLock>>, +} + +impl GlobalCookieStore { + pub fn new() -> Self { + GlobalCookieStore { + cookies: RwLock::new(HashMap::new()), + } + } +} + +impl CookieStore for GlobalCookieStore { + fn set_cookies(&self, cookie_headers: &mut dyn Iterator, _url: &Url) { + let iter = cookie_headers + .filter_map(|val| std::str::from_utf8(val.as_bytes()).ok()) + .filter_map(|kv| Cookie::parse(kv).map(|c| c.into_owned()).ok()); + + let mut guard = self.cookies.write().unwrap(); + for cookie in iter { + guard.insert(cookie.name().to_string(), cookie); + } + } + + fn cookies(&self, _url: &Url) -> Option { + let guard = self.cookies.read().unwrap(); + let s: String = guard + .values() + .map(|cookie| cookie.name_value()) + .map(|(name, value)| format!("{name}={value}")) + .collect::>() + .join("; "); + + if s.is_empty() { + return None; + } + + HeaderValue::from_str(&s).ok() + } +} + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] +pub struct ServerInfo { + pub id: String, + pub start_time: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct HttpSessionConf { + pub database: Option, + pub role: Option, + pub secondary_roles: Option>, + pub settings: Option>, + pub txn_state: Option, + pub last_server_info: Option, + #[serde(default)] + pub last_query_ids: Vec, + pub internal: Option, +} + +#[derive(serde::Deserialize, Debug)] +pub struct QueryResponse { + pub session: Option, + pub data: Option, + next_uri: Option, + + pub error: Option, +} + +#[derive(Deserialize)] +struct TokenInfo { + session_token: String, +} + +#[derive(Deserialize)] +struct LoginResponse { + tokens: Option, +} + +pub struct HttpClient { + pub host: String, + pub client: Client, + pub session_token: String, + pub session: Option, +} + +impl HttpClient { + pub async fn create(host: String, username: String, password: String) -> Result { + let mut header = HeaderMap::new(); + header.insert( + "Content-Type", + HeaderValue::from_str("application/json").unwrap(), + ); + header.insert("Accept", HeaderValue::from_str("application/json").unwrap()); + let cookie_provider = GlobalCookieStore::new(); + let cookie = HeaderValue::from_str("cookie_enabled=true").unwrap(); + let mut initial_cookies = [&cookie].into_iter(); + cookie_provider.set_cookies(&mut initial_cookies, &Url::parse("https://a.com").unwrap()); + let client = ClientBuilder::new() + .cookie_provider(Arc::new(cookie_provider)) + .default_headers(header) + // https://github.com/hyperium/hyper/issues/2136#issuecomment-589488526 + .http2_keep_alive_timeout(Duration::from_secs(15)) + .pool_max_idle_per_host(0) + .build()?; + + let url = format!("{}/v1/session/login", host); + + let login_resp = client + .post(&url) + .body("{}") + .basic_auth(username, Some(password)) + .send() + .await + .inspect_err(|e| { + println!("fail to send to {}: {:?}", url, e); + })? + .json::() + .await + .inspect_err(|e| { + println!("fail to decode json when call {}: {:?}", url, e); + })?; + let session_token = match login_resp.tokens { + Some(tokens) => tokens.session_token, + None => { + return Err(ErrorCode::AuthenticateFailure( + "failed to get session token", + )); + } + }; + + Ok(Self { + host, + client, + session_token, + session: None, + }) + } + + pub async fn query(&mut self, sql: &str) -> Result> { + let url = format!("{}/v1/query", self.host); + let mut responses = vec![]; + let response = self.post_query(sql, &url).await?; + let mut next_uri_opt = response.next_uri.clone(); + responses.push(response); + while let Some(next_uri) = &next_uri_opt { + let url = format!("{}{}", self.host, next_uri); + let new_response = self.poll_query_result(&url).await?; + if new_response.session.is_some() { + self.session = new_response.session.clone(); + } + next_uri_opt = new_response.next_uri.clone(); + responses.push(new_response); + } + Ok(responses) + } + + // Send request and get response by json format + async fn post_query(&self, sql: &str, url: &str) -> Result { + let mut query = HashMap::new(); + query.insert("sql", serde_json::to_value(sql)?); + if let Some(session) = &self.session { + query.insert("session", serde_json::to_value(session)?); + } + + Ok(self + .client + .post(url) + .json(&query) + .bearer_auth(&self.session_token) + .send() + .await + .inspect_err(|e| { + println!("fail to send to {}: {:?}", url, e); + })? + .json::() + .await + .inspect_err(|e| { + println!("fail to decode json when call {}: {:?}", url, e); + })?) + } + + async fn poll_query_result(&self, url: &str) -> Result { + Ok(self + .client + .get(url) + .bearer_auth(&self.session_token) + .send() + .await + .inspect_err(|e| { + println!("fail to send to {}: {:?}", url, e); + })? + .json::() + .await + .inspect_err(|e| { + println!("fail to decode json when call {}: {:?}", url, e); + })?) + } +} diff --git a/src/common/http/src/lib.rs b/src/common/http/src/lib.rs index 4d90e4569b7a2..06b77961a2366 100644 --- a/src/common/http/src/lib.rs +++ b/src/common/http/src/lib.rs @@ -17,9 +17,14 @@ mod debug; mod errors; mod health; +mod http_client; mod http_shutdown_handlers; pub use debug::*; pub use errors::HttpError; pub use health::*; +pub use http_client::HttpClient; +pub use http_client::HttpSessionConf; +pub use http_client::QueryResponse; +pub use http_client::ServerInfo; pub use http_shutdown_handlers::HttpShutdownHandler; diff --git a/src/query/expression/src/row/row_converter.rs b/src/query/expression/src/row/row_converter.rs index f259df0731920..549221e8384b8 100644 --- a/src/query/expression/src/row/row_converter.rs +++ b/src/query/expression/src/row/row_converter.rs @@ -57,15 +57,17 @@ impl RowConverter { fn support_data_type(d: &DataType) -> bool { match d { - DataType::Array(_) - | DataType::EmptyArray - | DataType::EmptyMap - | DataType::Map(_) - | DataType::Bitmap - | DataType::Tuple(_) - | DataType::Generic(_) => false, + DataType::Null + | DataType::Boolean + | DataType::Number(_) + | DataType::Decimal(_) + | DataType::Timestamp + | DataType::Date + | DataType::Binary + | DataType::String + | DataType::Variant => true, DataType::Nullable(inner) => Self::support_data_type(inner.as_ref()), - _ => true, + _ => false, } } diff --git a/src/tests/sqlsmith/Cargo.toml b/src/tests/sqlsmith/Cargo.toml index ba3dade807c31..fc9b436d3dab5 100644 --- a/src/tests/sqlsmith/Cargo.toml +++ b/src/tests/sqlsmith/Cargo.toml @@ -9,22 +9,21 @@ edition = { workspace = true } [dependencies] chrono-tz = { workspace = true } clap = { workspace = true } -databend-client = { workspace = true } databend-common-ast = { workspace = true } +databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-common-formats = { workspace = true } databend-common-functions = { workspace = true } databend-common-io = { workspace = true } databend-common-sql = { workspace = true } -databend-driver = { workspace = true } -databend-driver-core = { workspace = true } +databend-common-http = { workspace = true } ethnum = { workspace = true } itertools = { workspace = true } jiff = { workspace = true } jsonb = { workspace = true } rand = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } -tokio-stream = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/src/tests/sqlsmith/src/bin/main.rs b/src/tests/sqlsmith/src/bin/main.rs index 62ca1b5e1f058..840098bc47457 100644 --- a/src/tests/sqlsmith/src/bin/main.rs +++ b/src/tests/sqlsmith/src/bin/main.rs @@ -13,7 +13,10 @@ // limitations under the License. use clap::Parser; +use databend_common_exception::Result; use databend_sqlsmith::Runner; +use tracing::metadata::LevelFilter; +use tracing_subscriber::EnvFilter; #[derive(Clone, Debug, PartialEq, Eq, Parser)] #[clap(about, author)] @@ -27,7 +30,7 @@ pub struct Args { port: u16, /// The test database. - #[clap(long, default_value = "default")] + #[clap(long, default_value = "sqlsmith_test")] db: String, /// The username. @@ -48,15 +51,26 @@ pub struct Args { } #[tokio::main(flavor = "multi_thread", worker_threads = 5)] -async fn main() { - tracing_subscriber::fmt::init(); +async fn main() -> Result<()> { + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .parse("") + .unwrap(); + tracing_subscriber::fmt().with_env_filter(filter).init(); let args = Args::parse(); + let host = format!("http://{}:{}", args.host, args.port); + let mut runner = Runner::try_new( + host, + args.user.clone(), + args.pass.clone(), + args.db.clone(), + args.count, + None, + args.timeout, + ) + .await?; + runner.run().await?; - let dsn = format!( - "databend://{}:{}@{}:{}/{}?sslmode=disable", - args.user, args.pass, args.host, args.port, args.db - ); - let runner = Runner::new(dsn, args.count, None, args.timeout); - runner.run().await; + Ok(()) } diff --git a/src/tests/sqlsmith/src/reducer.rs b/src/tests/sqlsmith/src/reducer.rs index 0dbd65e342be2..3b2eac238caad 100644 --- a/src/tests/sqlsmith/src/reducer.rs +++ b/src/tests/sqlsmith/src/reducer.rs @@ -12,82 +12,83 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_client::error::Error as ClientError; use databend_common_ast::ast::Query; use databend_common_ast::ast::SetExpr; -use databend_driver::Connection; -use databend_driver::Error; -pub(crate) async fn try_reduce_query( - conn: Box, - code: u16, - mut query: Query, -) -> Query { - // reduce limit and offset - if !query.limit.is_empty() || query.offset.is_some() { - let mut reduced_query = query.clone(); - reduced_query.limit = vec![]; - reduced_query.offset = None; - if execute_reduce_query(conn.clone(), code, &reduced_query).await { - query = reduced_query; - } - } +use crate::Runner; - // reduce order by - if !query.order_by.is_empty() { - let mut reduced_query = query.clone(); - reduced_query.order_by = vec![]; - if execute_reduce_query(conn.clone(), code, &reduced_query).await { - query = reduced_query; +impl Runner { + pub(crate) async fn try_reduce_query(&mut self, code: u64, mut query: Query) -> Query { + // reduce limit and offset + if !query.limit.is_empty() || query.offset.is_some() { + let mut reduced_query = query.clone(); + reduced_query.limit = vec![]; + reduced_query.offset = None; + if self.execute_reduce_query(code, &reduced_query).await { + query = reduced_query; + } } - } - if let SetExpr::Select(select_stmt) = query.body.clone() { - let mut select_stmt = select_stmt.clone(); - let mut reduced_query = query.clone(); - // TODO: reduce expr - if select_stmt.selection.is_some() { - let mut reduced_select_stmt = select_stmt.clone(); - reduced_select_stmt.selection = None; - reduced_query.body = SetExpr::Select(reduced_select_stmt.clone()); - if execute_reduce_query(conn.clone(), code, &reduced_query).await { - select_stmt = reduced_select_stmt; + // reduce order by + if !query.order_by.is_empty() { + let mut reduced_query = query.clone(); + reduced_query.order_by = vec![]; + if self.execute_reduce_query(code, &reduced_query).await { + query = reduced_query; } } - if select_stmt.having.is_some() { - let mut reduced_select_stmt = select_stmt.clone(); - reduced_select_stmt.having = None; - reduced_query.body = SetExpr::Select(reduced_select_stmt.clone()); - if execute_reduce_query(conn.clone(), code, &reduced_query).await { - select_stmt = reduced_select_stmt; + + if let SetExpr::Select(select_stmt) = query.body.clone() { + let mut select_stmt = select_stmt.clone(); + let mut reduced_query = query.clone(); + // TODO: reduce expr + if select_stmt.selection.is_some() { + let mut reduced_select_stmt = select_stmt.clone(); + reduced_select_stmt.selection = None; + reduced_query.body = SetExpr::Select(reduced_select_stmt.clone()); + if self.execute_reduce_query(code, &reduced_query).await { + select_stmt = reduced_select_stmt; + } } - } - if select_stmt.window_list.is_none() && reduced_query.with.is_some() { - reduced_query.with = None; - if execute_reduce_query(conn.clone(), code, &reduced_query).await { - query = reduced_query.clone(); + if select_stmt.having.is_some() { + let mut reduced_select_stmt = select_stmt.clone(); + reduced_select_stmt.having = None; + reduced_query.body = SetExpr::Select(reduced_select_stmt.clone()); + if self.execute_reduce_query(code, &reduced_query).await { + select_stmt = reduced_select_stmt; + } } - } - let select_list = select_stmt.select_list.clone(); - let mut reduced_select_stmt = select_stmt.clone(); - for item in &select_list { - reduced_select_stmt.select_list = vec![item.clone()]; - reduced_query.body = SetExpr::Select(reduced_select_stmt.clone()); - if execute_reduce_query(conn.clone(), code, &reduced_query).await { - select_stmt = reduced_select_stmt; - break; + if select_stmt.window_list.is_none() && reduced_query.with.is_some() { + reduced_query.with = None; + if self.execute_reduce_query(code, &reduced_query).await { + query = reduced_query.clone(); + } + } + let select_list = select_stmt.select_list.clone(); + let mut reduced_select_stmt = select_stmt.clone(); + for item in &select_list { + reduced_select_stmt.select_list = vec![item.clone()]; + reduced_query.body = SetExpr::Select(reduced_select_stmt.clone()); + if self.execute_reduce_query(code, &reduced_query).await { + select_stmt = reduced_select_stmt; + break; + } } + query.body = SetExpr::Select(select_stmt); } - query.body = SetExpr::Select(select_stmt); - } - query -} + query + } -async fn execute_reduce_query(conn: Box, code: u16, query: &Query) -> bool { - let sql = query.to_string(); - if let Err(Error::Api(ClientError::InvalidResponse(err))) = conn.exec(&sql).await { - return err.code == code; + async fn execute_reduce_query(&mut self, err_code: u64, query: &Query) -> bool { + let query_sql = query.to_string(); + if let Ok(responses) = self.client.query(&query_sql).await { + if let Some(error) = &responses[0].error { + let value = error.as_object().unwrap(); + let code = value["code"].as_u64().unwrap(); + return err_code == code; + } + } + false } - false } diff --git a/src/tests/sqlsmith/src/runner.rs b/src/tests/sqlsmith/src/runner.rs index b224c83aceaed..4f17d3172f07f 100644 --- a/src/tests/sqlsmith/src/runner.rs +++ b/src/tests/sqlsmith/src/runner.rs @@ -15,26 +15,22 @@ use std::future::Future; use std::time::Duration; -use databend_client::error::Error as ClientError; use databend_common_ast::ast::AlterTableAction; use databend_common_ast::ast::CreateTableSource; use databend_common_ast::ast::CreateTableStmt; use databend_common_ast::ast::DropTableStmt; +use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::TableField; use databend_common_expression::TableSchemaRefExt; +use databend_common_http::HttpClient as Client; +use databend_common_http::QueryResponse; use databend_common_sql::resolve_type_name; -use databend_driver::Client; -use databend_driver::Connection; -use databend_driver::Error; -use databend_driver_core::value::Value; use rand::rngs::SmallRng; use rand::Rng; use rand::SeedableRng; -use tokio_stream::StreamExt; -use crate::reducer::try_reduce_query; use crate::sql_gen::SqlGenerator; use crate::sql_gen::Table; @@ -74,25 +70,37 @@ const KNOWN_ERRORS: &[&str] = &[ "Expected Number, Date or Timestamp type, but got", "Unsupported data type for generate_series", "Having clause can't contain window functions", + "Cannot find common type for", + "null value in column", ]; pub struct Runner { count: usize, seed: Option, - client: Client, + pub(crate) client: Client, + db: String, timeout: u64, } impl Runner { - pub fn new(dsn: String, count: usize, seed: Option, timeout: u64) -> Self { - let client = Client::new(dsn); + pub async fn try_new( + host: String, + username: String, + password: String, + db: String, + count: usize, + seed: Option, + timeout: u64, + ) -> Result { + let client = Client::create(host, username, password).await?; - Self { + Ok(Self { count, seed, client, + db, timeout, - } + }) } fn generate_rng(seed: Option) -> impl Rng { @@ -103,20 +111,18 @@ impl Runner { } } - #[allow(clippy::borrowed_box)] async fn create_base_table( - &self, - conn: &Box, + &mut self, table_stmts: Vec<(DropTableStmt, CreateTableStmt)>, - ) -> Vec { + ) -> Result> { let mut tables = Vec::with_capacity(table_stmts.len()); for (drop_table_stmt, create_table_stmt) in table_stmts { let drop_table_sql = drop_table_stmt.to_string(); tracing::info!("drop_table_sql: {}", drop_table_sql); - Self::check_res(conn.exec(&drop_table_sql).await); + Self::check_res(self.client.query(&drop_table_sql).await); let create_table_sql = create_table_stmt.to_string(); tracing::info!("create_table_sql: {}", create_table_sql); - Self::check_res(conn.exec(&create_table_sql).await); + Self::check_res(self.client.query(&create_table_sql).await); let table_name = create_table_stmt.table.name.clone(); let mut fields = Vec::new(); @@ -132,75 +138,83 @@ impl Runner { let table = Table::new(table_name, schema); tables.push(table); } - tables + Ok(tables) } - #[allow(clippy::borrowed_box)] - async fn get_settings(&self, conn: &Box) -> Vec<(String, DataType)> { - let mut settings = vec![]; + async fn get_settings(&mut self) -> Result> { let show_settings = "show settings".to_string(); - let rows = conn.query_iter(&show_settings).await; - match rows { - Ok(mut rows) => { - while let Some(row) = rows.next().await { - if let Ok(row) = row { - match (row.values().first(), row.values().get(5)) { - (Some(Value::String(name)), Some(Value::String(ty))) => { - let data_type = match ty.as_str() { - "UInt64" => DataType::Number(NumberDataType::UInt64), - "String" => DataType::String, - _ => DataType::String, - }; - settings.push((name.clone(), data_type)); - } - (_, _) => { - continue; - } - } + let responses = self.client.query(&show_settings).await?; + + let mut settings = vec![]; + for response in responses { + if let Some(serde_json::Value::Array(arr)) = response.data { + for row in arr { + let name = row.get(0); + let ty = row.get(6); + if let ( + Some(serde_json::Value::String(name)), + Some(serde_json::Value::String(ty)), + ) = (name, ty) + { + let data_type = match ty.as_str() { + "UInt64" => DataType::Number(NumberDataType::UInt64), + "String" => DataType::String, + _ => DataType::String, + }; + settings.push((name.clone(), data_type)); } } } - Err(e) => { - let err = format!("show settings exec err: {}", e); - tracing::error!(err); - } } - settings + Ok(settings) } - async fn check_timeout(future: F, sql: String, sec: u64) + async fn check_timeout(future: F, sec: u64, timeout_err: &mut Option) where F: Future { if let Err(e) = tokio::time::timeout(Duration::from_secs(sec), future).await { - tracing::info!("sql: {}", sql); - let err = format!("sql timeout: {}", e); - tracing::error!(err); + *timeout_err = Some(format!("{}", e)); } } - fn check_res(res: databend_driver::Result) { - if let Err(Error::Api(ClientError::InvalidResponse(err))) = res { - if err.code == 1005 || err.code == 1065 { - return; + fn check_res(responses: Result>) { + match responses { + Ok(responses) => { + if let Some(error) = &responses[0].error { + let value = error.as_object().unwrap(); + let code = value["code"].as_u64().unwrap(); + let message = value["message"].as_str().unwrap(); + if code == 1005 || code == 1065 { + return; + } + if KNOWN_ERRORS + .iter() + .any(|known_err| message.starts_with(known_err)) + { + return; + } + let err = format!("sql exec err code: {} message: {}", code, message); + tracing::error!(err); + } } - if KNOWN_ERRORS - .iter() - .any(|known_err| err.message.starts_with(known_err)) - { - return; + Err(err) => { + let err = format!("http err: {}", err); + tracing::error!(err); } - let err = format!("sql exec err: {}", err.message); - tracing::error!(err); } } - pub async fn run(&self) { - let conn = self.client.get_conn().await.unwrap(); - let settings = self.get_settings(&conn).await; + pub async fn run(&mut self) -> Result<()> { + let create_db_sql = format!("CREATE OR REPLACE database {}", self.db); + let _ = self.client.query(&create_db_sql).await?; + let use_db_sql = format!("USE {}", self.db); + let _ = self.client.query(&use_db_sql).await?; + + let settings = self.get_settings().await?; let mut rng = Self::generate_rng(self.seed); let mut generator = SqlGenerator::new(&mut rng, settings); let table_stmts = generator.gen_base_tables(); - let tables = self.create_base_table(&conn, table_stmts).await; + let tables = self.create_base_table(table_stmts).await?; let row_count = 10; let mut new_tables = tables.clone(); @@ -208,7 +222,7 @@ impl Runner { let insert_stmt = generator.gen_insert(table, row_count); let insert_sql = insert_stmt.to_string(); tracing::info!("insert_sql: {}", insert_sql); - Self::check_res(conn.exec(&insert_sql).await); + Self::check_res(self.client.query(&insert_sql).await); let alter_stmt_opt = generator.gen_alter(table, row_count); if let Some((alter_stmt, new_table, insert_stmt_opt)) = alter_stmt_opt { @@ -222,24 +236,24 @@ impl Runner { }; let drop_table_sql = drop_table_stmt.to_string(); tracing::info!("drop_table_sql: {}", drop_table_sql); - Self::check_res(conn.exec(&drop_table_sql).await); + Self::check_res(self.client.query(&drop_table_sql).await); } let alter_sql = alter_stmt.to_string(); tracing::info!("alter_sql: {}", alter_sql); - Self::check_res(conn.exec(&alter_sql).await); + Self::check_res(self.client.query(&alter_sql).await); // save new table schema new_tables[i] = new_table; if let Some(insert_stmt) = insert_stmt_opt { let insert_sql = insert_stmt.to_string(); tracing::info!("after alter insert_sql: {}", insert_sql); - Self::check_res(conn.exec(&insert_sql).await); + Self::check_res(self.client.query(&insert_sql).await); } } } generator.tables = new_tables; let enable_merge = "set enable_experimental_merge_into = 1".to_string(); - Self::check_res(conn.exec(&enable_merge).await); + Self::check_res(self.client.query(&enable_merge).await); // generate merge, replace, update, delete for _ in 0..20 { let sql = match generator.rng.gen_range(0..=20) { @@ -249,56 +263,74 @@ impl Runner { 20 => generator.gen_delete().to_string(), _ => unreachable!(), }; - tracing::info!("sql: {}", sql); + let mut timeout_err = None; + tracing::info!("dml sql: {}", sql); Self::check_timeout( - async { Self::check_res(conn.exec(&sql.clone()).await) }, - sql.clone(), + async { Self::check_res(self.client.query(&sql).await) }, self.timeout, + &mut timeout_err, ) .await; + if let Some(timeout_err) = timeout_err { + tracing::error!("sql timeout: {}", timeout_err); + } } // generate query for _ in 0..self.count { let query = generator.gen_query(); let query_sql = query.to_string(); + let mut timeout_err = None; + let mut is_error = false; let mut try_reduce = false; let mut err_code = 0; - let mut err = String::new(); + let mut err_message = String::new(); Self::check_timeout( async { - if let Err(e) = conn.exec(&query_sql).await { - if let Error::Api(ClientError::InvalidResponse(err)) = &e { - // TODO: handle Syntax, Semantic and InvalidArgument errors - if err.code == 1005 - || err.code == 1065 - || err.code == 2004 - || err.code == 1010 - { - return; - } - if KNOWN_ERRORS - .iter() - .any(|known_err| err.message.starts_with(known_err)) - { - return; + match self.client.query(&query_sql).await { + Ok(responses) => { + if let Some(error) = &responses[0].error { + let value = error.as_object().unwrap(); + let code = value["code"].as_u64().unwrap(); + let message = value["message"].as_str().unwrap(); + if code == 1005 || code == 1065 || code == 2004 || code == 1010 { + return; + } + if KNOWN_ERRORS + .iter() + .any(|known_err| message.starts_with(known_err)) + { + return; + } + is_error = true; + err_code = code; + err_message = format!("error: {}", message); + try_reduce = true; } - err_code = err.code; } - err = format!("error: {}", e); - try_reduce = true; + Err(err) => { + is_error = true; + err_message = format!("http err: {}", err); + } } }, - query_sql.clone(), self.timeout, + &mut timeout_err, ) .await; - if try_reduce { + + if let Some(timeout_err) = timeout_err { tracing::info!("query_sql: {}", query_sql); - let reduced_query = try_reduce_query(conn.clone(), err_code, query).await; - tracing::info!("reduced query_sql: {}", reduced_query.to_string()); - tracing::error!(err); + tracing::error!("sql timeout: {}", timeout_err); + } else if is_error { + tracing::info!("query_sql: {}", query_sql); + if try_reduce { + let reduced_query = self.try_reduce_query(err_code, query).await; + tracing::info!("reduced query_sql: {}", reduced_query.to_string()); + } + tracing::error!(err_message); } } + Ok(()) } } diff --git a/src/tests/sqlsmith/src/sql_gen/expr.rs b/src/tests/sqlsmith/src/sql_gen/expr.rs index eb632b259ff08..bfe1a0dab2e9e 100644 --- a/src/tests/sqlsmith/src/sql_gen/expr.rs +++ b/src/tests/sqlsmith/src/sql_gen/expr.rs @@ -267,11 +267,10 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { } } DataType::Geometry => { - let x: f64 = self.rng.gen_range(-1.7e10..=1.7e10); - let y: f64 = self.rng.gen_range(-1.7e10..=1.7e10); + let geo = self.gen_geometry(); let arg = Expr::Literal { span: None, - value: Literal::String(format!("POINT({} {})", x, y)), + value: Literal::String(geo), }; Expr::FunctionCall { span: None, @@ -634,19 +633,18 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { } } DataType::Geometry => { - let (func_name, args) = match self.rng.gen_range(0..=1) { + let (func_name, args) = match self.rng.gen_range(0..=10) { 0 => { let arg_ty = DataType::Number(NumberDataType::Float64); let x = self.gen_expr(&arg_ty); let y = self.gen_expr(&arg_ty); ("st_makegeompoint", vec![x, y]) } - 1 => { - let x: f64 = self.rng.gen_range(-1.7e10..=1.7e10); - let y: f64 = self.rng.gen_range(-1.7e10..=1.7e10); + _ => { + let geo = self.gen_geometry(); let arg0 = Expr::Literal { span: None, - value: Literal::String(format!("POINT({} {})", x, y)), + value: Literal::String(geo), }; let args = if self.rng.gen_bool(0.5) { let arg1_ty = DataType::Number(NumberDataType::Int32); @@ -657,7 +655,6 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { }; ("st_geometryfromwkt", args) } - _ => unreachable!(), }; Expr::FunctionCall { span: None, @@ -701,6 +698,106 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { } } + pub(crate) fn gen_point(&mut self) -> String { + let x: f64 = self.rng.gen_range(-1.7e10..=1.7e10); + let y: f64 = self.rng.gen_range(-1.7e10..=1.7e10); + format!("{} {}", x, y) + } + + pub(crate) fn gen_points(&mut self) -> String { + let mut points = vec![]; + for _ in 0..self.rng.gen_range(1..=6) { + let point = format!("{}", self.gen_point()); + points.push(point); + } + points.join(", ") + } + + pub(crate) fn gen_simple_geometry(&mut self) -> String { + match self.rng.gen_range(0..=5) { + 0 => match self.rng.gen_range(0..=10) { + 0 => "POINT EMPTY".to_string(), + _ => { + format!("POINT({})", self.gen_point()) + } + }, + 1 => match self.rng.gen_range(0..=10) { + 0 => "MULTIPOINT EMPTY".to_string(), + _ => { + let mut points = vec![]; + for _ in 0..self.rng.gen_range(1..=6) { + let point = format!("({})", self.gen_point()); + points.push(point); + } + format!("MULTIPOINT({})", points.join(", ")) + } + }, + 2 => match self.rng.gen_range(0..=10) { + 0 => "LINESTRING EMPTY".to_string(), + _ => { + let points = self.gen_points(); + format!("LINESTRING({})", points) + } + }, + 3 => match self.rng.gen_range(0..=10) { + 0 => "MULTILINESTRING EMPTY".to_string(), + _ => { + let mut lines = vec![]; + for _ in 0..self.rng.gen_range(1..=6) { + let points = self.gen_points(); + let line = format!("({})", points); + lines.push(line); + } + format!("MULTILINESTRING({})", lines.join(", ")) + } + }, + 4 => match self.rng.gen_range(0..=10) { + 0 => "POLYGON EMPTY".to_string(), + _ => { + let mut polygons = vec![]; + for _ in 0..self.rng.gen_range(1..=6) { + let points = self.gen_points(); + let polygon = format!("({})", points); + polygons.push(polygon); + } + format!("POLYGON({})", polygons.join(", ")) + } + }, + 5 => match self.rng.gen_range(0..=10) { + 0 => "MULTIPOLYGON EMPTY".to_string(), + _ => { + let mut polygons = vec![]; + for _ in 0..self.rng.gen_range(1..=6) { + let points = self.gen_points(); + let polygon = format!("(({}))", points); + polygons.push(polygon); + } + format!("MULTIPOLYGON({})", polygons.join(", ")) + } + }, + _ => unreachable!(), + } + } + + pub(crate) fn gen_geometry(&mut self) -> String { + let geo = match self.rng.gen_range(0..=8) { + 0 => { + let mut geos = vec![]; + for _ in 0..self.rng.gen_range(1..=4) { + geos.push(self.gen_simple_geometry()); + } + format!("GEOMETRYCOLLECTION({})", geos.join(", ")) + } + _ => self.gen_simple_geometry(), + }; + if self.rng.gen_bool(0.4) { + let srid = self.rng.gen_range(1..=10000); + format!("SRID={};{}", srid, geo) + } else { + geo + } + } + pub(crate) fn gen_binary_expr(&mut self) -> Expr { let (op, left, right) = match self.rng.gen_range(0..=3) { 0..=1 => { diff --git a/tests/sqllogictests/Cargo.toml b/tests/sqllogictests/Cargo.toml index dbc6345fbb1b9..aff7c2b7c4751 100644 --- a/tests/sqllogictests/Cargo.toml +++ b/tests/sqllogictests/Cargo.toml @@ -16,9 +16,9 @@ name = "databend-sqllogictests" [dependencies] async-trait = { workspace = true } clap = { workspace = true } -cookie = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } +databend-common-http = { workspace = true } env_logger = { workspace = true } futures-util = { workspace = true } msql-srv = { workspace = true } @@ -26,14 +26,13 @@ mysql_async = { workspace = true } mysql_common = { workspace = true } rand = { workspace = true } regex = { workspace = true } -reqwest = { workspace = true, features = ["cookies"] } +reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sqllogictest = { workspace = true } sqlparser = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } -url = { workspace = true } walkdir = { workspace = true } [lints] diff --git a/tests/sqllogictests/src/client/global_cookie_store.rs b/tests/sqllogictests/src/client/global_cookie_store.rs deleted file mode 100644 index 498e2d82c9cd0..0000000000000 --- a/tests/sqllogictests/src/client/global_cookie_store.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::RwLock; - -use cookie::Cookie; -use reqwest::cookie::CookieStore; -use reqwest::header::HeaderValue; -use url::Url; - -pub(crate) struct GlobalCookieStore { - cookies: RwLock>>, -} - -impl GlobalCookieStore { - pub fn new() -> Self { - GlobalCookieStore { - cookies: RwLock::new(HashMap::new()), - } - } -} - -impl CookieStore for GlobalCookieStore { - fn set_cookies(&self, cookie_headers: &mut dyn Iterator, _url: &Url) { - let iter = cookie_headers - .filter_map(|val| std::str::from_utf8(val.as_bytes()).ok()) - .filter_map(|kv| Cookie::parse(kv).map(|c| c.into_owned()).ok()); - - let mut guard = self.cookies.write().unwrap(); - for cookie in iter { - guard.insert(cookie.name().to_string(), cookie); - } - } - - fn cookies(&self, _url: &Url) -> Option { - let guard = self.cookies.read().unwrap(); - let s: String = guard - .values() - .map(|cookie| cookie.name_value()) - .map(|(name, value)| format!("{name}={value}")) - .collect::>() - .join("; "); - - if s.is_empty() { - return None; - } - - HeaderValue::from_str(&s).ok() - } -} diff --git a/tests/sqllogictests/src/client/http_client.rs b/tests/sqllogictests/src/client/http_client.rs index 4d2b56f2be477..9fef2639f9d3a 100644 --- a/tests/sqllogictests/src/client/http_client.rs +++ b/tests/sqllogictests/src/client/http_client.rs @@ -12,44 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; use std::time::Instant; -use reqwest::cookie::CookieStore; -use reqwest::header::HeaderMap; -use reqwest::header::HeaderValue; -use reqwest::Client; -use reqwest::ClientBuilder; -use serde::Deserialize; +use databend_common_http::HttpClient as Client; +use databend_common_http::QueryResponse; use sqllogictest::DBOutput; use sqllogictest::DefaultColumnType; -use url::Url; -use crate::client::global_cookie_store::GlobalCookieStore; use crate::error::Result; use crate::util::parser_rows; -use crate::util::HttpSessionConf; pub struct HttpClient { pub client: Client, - pub session_token: String, pub debug: bool, - pub session: Option, -} - -#[derive(serde::Deserialize, Debug)] -struct QueryResponse { - session: Option, - data: Option, - next_uri: Option, - - error: Option, } // make error message the same with ErrorCode::display -fn format_error(value: serde_json::Value) -> String { +fn format_error(value: &serde_json::Value) -> String { let value = value.as_object().unwrap(); let detail = value.get("detail").and_then(|v| v.as_str()); let code = value["code"].as_u64().unwrap(); @@ -64,60 +43,16 @@ fn format_error(value: serde_json::Value) -> String { } } -#[derive(Deserialize)] -struct TokenInfo { - session_token: String, -} - -#[derive(Deserialize)] -struct LoginResponse { - tokens: Option, -} - impl HttpClient { pub async fn create() -> Result { - let mut header = HeaderMap::new(); - header.insert( - "Content-Type", - HeaderValue::from_str("application/json").unwrap(), - ); - header.insert("Accept", HeaderValue::from_str("application/json").unwrap()); - let cookie_provider = GlobalCookieStore::new(); - let cookie = HeaderValue::from_str("cookie_enabled=true").unwrap(); - let mut initial_cookies = [&cookie].into_iter(); - cookie_provider.set_cookies(&mut initial_cookies, &Url::parse("https://a.com").unwrap()); - let client = ClientBuilder::new() - .cookie_provider(Arc::new(cookie_provider)) - .default_headers(header) - // https://github.com/hyperium/hyper/issues/2136#issuecomment-589488526 - .http2_keep_alive_timeout(Duration::from_secs(15)) - .pool_max_idle_per_host(0) - .build()?; - - let url = "http://127.0.0.1:8000/v1/session/login"; + let host = "http://127.0.0.1:48000".to_string(); + let username = "root".to_string(); + let password = "".to_string(); - let session_token = client - .post(url) - .body("{}") - .basic_auth("root", Some("")) - .send() - .await - .inspect_err(|e| { - println!("fail to send to {}: {:?}", url, e); - })? - .json::() - .await - .inspect_err(|e| { - println!("fail to decode json when call {}: {:?}", url, e); - })? - .tokens - .unwrap() - .session_token; + let client = Client::create(host, username, password).await?; Ok(Self { client, - session_token, - session: None, debug: false, }) } @@ -125,23 +60,15 @@ impl HttpClient { pub async fn query(&mut self, sql: &str) -> Result> { let start = Instant::now(); - let url = "http://127.0.0.1:8000/v1/query".to_string(); - let mut parsed_rows = vec![]; - let mut response = self.post_query(sql, &url).await?; - self.handle_response(&response, &mut parsed_rows)?; - while let Some(next_uri) = &response.next_uri { - let url = format!("http://127.0.0.1:8000{next_uri}"); - let new_response = self.poll_query_result(&url).await?; - if new_response.next_uri.is_some() { - self.handle_response(&new_response, &mut parsed_rows)?; - response = new_response; - } else { - break; - } - } - if let Some(error) = response.error { + let responses = self.client.query(sql).await?; + if let Some(error) = &responses[0].error { return Err(format_error(error).into()); } + + let mut parsed_rows = vec![]; + for response in responses { + self.handle_response(&response, &mut parsed_rows)?; + } // Todo: add types to compare let mut types = vec![]; if !parsed_rows.is_empty() { @@ -166,53 +93,9 @@ impl HttpClient { response: &QueryResponse, parsed_rows: &mut Vec>, ) -> Result<()> { - if response.session.is_some() { - self.session = response.session.clone(); - } if let Some(data) = &response.data { parsed_rows.append(&mut parser_rows(data)?); } Ok(()) } - - // Send request and get response by json format - async fn post_query(&self, sql: &str, url: &str) -> Result { - let mut query = HashMap::new(); - query.insert("sql", serde_json::to_value(sql)?); - if let Some(session) = &self.session { - query.insert("session", serde_json::to_value(session)?); - } - Ok(self - .client - .post(url) - .json(&query) - .bearer_auth(&self.session_token) - .send() - .await - .inspect_err(|e| { - println!("fail to send to {}: {:?}", url, e); - })? - .json::() - .await - .inspect_err(|e| { - println!("fail to decode json when call {}: {:?}", url, e); - })?) - } - - async fn poll_query_result(&self, url: &str) -> Result { - Ok(self - .client - .get(url) - .bearer_auth(&self.session_token) - .send() - .await - .inspect_err(|e| { - println!("fail to send to {}: {:?}", url, e); - })? - .json::() - .await - .inspect_err(|e| { - println!("fail to decode json when call {}: {:?}", url, e); - })?) - } } diff --git a/tests/sqllogictests/src/client/mod.rs b/tests/sqllogictests/src/client/mod.rs index d4f0098437c5f..439bfb5e3ddca 100644 --- a/tests/sqllogictests/src/client/mod.rs +++ b/tests/sqllogictests/src/client/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod global_cookie_store; mod http_client; mod mysql_client; diff --git a/tests/sqllogictests/src/client/mysql_client.rs b/tests/sqllogictests/src/client/mysql_client.rs index b821e760f3201..d8128c79512db 100644 --- a/tests/sqllogictests/src/client/mysql_client.rs +++ b/tests/sqllogictests/src/client/mysql_client.rs @@ -32,7 +32,7 @@ pub struct MySQLClient { impl MySQLClient { pub async fn create(database: &str) -> Result { - let url = format!("mysql://root:@127.0.0.1:3307/{database}"); + let url = format!("mysql://root:@127.0.0.1:43307/{database}"); let pool = Pool::new(url.as_str()); let conn = pool.get_conn().await?; Ok(Self { diff --git a/tests/sqllogictests/src/util.rs b/tests/sqllogictests/src/util.rs index 6cc3abb0ae380..d34e7bbc72f8d 100644 --- a/tests/sqllogictests/src/util.rs +++ b/tests/sqllogictests/src/util.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; use std::path::Path; use std::path::PathBuf; use clap::Parser; -use serde::Deserialize; -use serde::Serialize; use serde_json::Value; use walkdir::DirEntry; use walkdir::WalkDir; @@ -26,24 +23,6 @@ use walkdir::WalkDir; use crate::arg::SqlLogicTestArgs; use crate::error::DSqlLogicTestError; use crate::error::Result; -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] -pub struct ServerInfo { - pub id: String, - pub start_time: String, -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct HttpSessionConf { - pub database: Option, - pub role: Option, - pub secondary_roles: Option>, - pub settings: Option>, - pub txn_state: Option, - pub last_server_info: Option, - #[serde(default)] - pub last_query_ids: Vec, - pub internal: Option, -} pub fn parser_rows(rows: &Value) -> Result>> { let mut parsed_rows = Vec::new();