diff --git a/Cargo.lock b/Cargo.lock index e6aecf3744f89..6867d4ddf00bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -526,6 +526,7 @@ dependencies = [ "common-building", "common-datavalues", "common-meta-raft-store", + "common-tracing", "csv", "databend-meta", "databend-query", @@ -541,7 +542,6 @@ dependencies = [ "itertools", "lexical-util", "libc", - "log", "nix", "num-format", "portpicker", @@ -1103,9 +1103,9 @@ dependencies = [ name = "common-cache" version = "0.1.0" dependencies = [ + "common-tracing", "filetime", "heapsize", - "log", "ritelinked", "tempfile", "walkdir", @@ -1122,6 +1122,7 @@ dependencies = [ "chrono-tz", "combine", "common-io", + "common-tracing", "env_logger 0.9.0", "futures", "futures-core", @@ -1233,11 +1234,11 @@ dependencies = [ "common-arrow", "common-base", "common-exception", + "common-tracing", "futures", "hyper", "jwt-simple", "lazy_static", - "log", "pretty_assertions", "prost", "serde", @@ -1291,6 +1292,7 @@ dependencies = [ "bytes", "common-exception", "rand 0.8.4", + "serde", ] [[package]] @@ -1390,7 +1392,6 @@ dependencies = [ "hyper", "jwt-simple", "lazy_static", - "log", "pretty_assertions", "prost", "rand 0.8.4", @@ -2077,7 +2078,7 @@ dependencies = [ "hyper", "indexmap", "lazy_static", - "log", + "maplit", "metrics", "msql-srv", "mysql", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 132351c987a30..754babc0bec59 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -22,6 +22,8 @@ path = "src/bin/bendctl.rs" databend-query = {path = "../query"} common-base = {path = "../common/base" } common-datavalues = { path = "../common/datavalues" } +common-tracing = {path = "../common/tracing" } + itertools = "0.10.3" databend-meta = {path = "../metasrv" } common-meta-raft-store= {path = "../common/meta/raft-store"} @@ -48,7 +50,6 @@ tar = "0.4.37" thiserror = "1.0.30" ureq = { version = "2.3.1", features = ["json"] } nix = "0.23.0" -log = "0.4.14" serde_yaml = "0.8.21" structopt = "0.3.25" structopt-toml = "0.5.0" diff --git a/cli/src/cmds/status.rs b/cli/src/cmds/status.rs index 0e16389c780fa..a572a665d977a 100644 --- a/cli/src/cmds/status.rs +++ b/cli/src/cmds/status.rs @@ -27,10 +27,10 @@ use std::time; use async_trait::async_trait; use common_base::tokio::time::Duration; +use common_tracing::tracing; use databend_meta::configs::Config as MetaConfig; use databend_query::configs::Config as QueryConfig; use libc::pid_t; -use log::info; use nix::unistd::Pid; use reqwest::Client; use serde::Deserialize; @@ -228,7 +228,7 @@ impl LocalRuntime for LocalDashboardConfig { .stdout(unsafe { Stdio::from_raw_fd(out_file.into_raw_fd()) }) .stderr(unsafe { Stdio::from_raw_fd(err_file.into_raw_fd()) }); // logging debug - info!("executing command {:?}", command); + tracing::info!("executing command {:?}", command); Ok(command) } @@ -349,7 +349,7 @@ impl LocalRuntime for LocalMetaConfig { .stdout(unsafe { Stdio::from_raw_fd(out_file.into_raw_fd()) }) .stderr(unsafe { Stdio::from_raw_fd(err_file.into_raw_fd()) }); // logging debug - info!("executing command {:?}", command); + tracing::info!("executing command {:?}", command); Ok(command) } @@ -519,7 +519,7 @@ impl LocalRuntime for LocalQueryConfig { .stdout(unsafe { Stdio::from_raw_fd(out_file.into_raw_fd()) }) .stderr(unsafe { Stdio::from_raw_fd(err_file.into_raw_fd()) }); // logging debug - info!("executing command {:?}", command); + tracing::info!("executing command {:?}", command); Ok(command) } fn set_pid(&mut self, id: pid_t) { @@ -555,7 +555,7 @@ impl LocalRuntime for LocalQueryConfig { impl Status { pub fn read(conf: Config) -> Result { let status_path = format!("{}/.status.json", conf.databend_dir); - log::info!("{}", status_path.as_str()); + tracing::info!("{}", status_path.as_str()); let local_config_dir = format!("{}/configs/local", conf.databend_dir); std::fs::create_dir_all(local_config_dir.as_str()) .expect("cannot create dir to store local profile"); diff --git a/common/cache/Cargo.toml b/common/cache/Cargo.toml index 889d85657066d..10e6c474629c8 100644 --- a/common/cache/Cargo.toml +++ b/common/cache/Cargo.toml @@ -15,13 +15,13 @@ heapsize = ["heapsize_"] amortized = ["ritelinked/ahash-amortized", "ritelinked/inline-more-amortized"] [dependencies] # In alphabetical order +common-tracing = {path = "../tracing" } # Workspace dependencies # Github dependencies # Crates.io dependencies filetime = "0.2.15" -log = "0.4.14" ritelinked = { version = "0.3.2", default-features = false, features = ["ahash", "inline-more"] } walkdir = "2.3.2" diff --git a/common/cache/src/disk_cache.rs b/common/cache/src/disk_cache.rs index 259b968ea4523..6d0ea67b10e5d 100644 --- a/common/cache/src/disk_cache.rs +++ b/common/cache/src/disk_cache.rs @@ -23,6 +23,7 @@ use std::io::prelude::*; use std::path::Path; use std::path::PathBuf; +use common_tracing::tracing; use filetime::set_file_times; use filetime::FileTime; use ritelinked::DefaultHashBuilder; @@ -163,14 +164,15 @@ where for (file, size) in get_all_files(&self.root) { if !self.can_store(size) { fs::remove_file(file).unwrap_or_else(|e| { - error!( + tracing::error!( "Error removing file `{}` which is too large for the cache ({} bytes)", - e, size + e, + size ) }); } else { self.add_file(AddFile::AbsPath(file), size) - .unwrap_or_else(|e| error!("Error adding file: {}", e)); + .unwrap_or_else(|e| tracing::error!("Error adding file: {}", e)); } } Ok(self) @@ -225,7 +227,7 @@ where let size = size.unwrap_or(fs::metadata(path)?.len()); self.add_file(AddFile::RelPath(rel_path), size) .map_err(|e| { - error!( + tracing::error!( "Failed to insert file `{}`: {}", rel_path.to_string_lossy(), e @@ -259,10 +261,10 @@ where let size = fs::metadata(path.as_ref())?.len(); self.insert_by(key, Some(size), |new_path| { fs::rename(path.as_ref(), new_path).or_else(|_| { - warn!("fs::rename failed, falling back to copy!"); + tracing::warn!("fs::rename failed, falling back to copy!"); fs::copy(path.as_ref(), new_path)?; fs::remove_file(path.as_ref()).unwrap_or_else(|e| { - error!("Failed to remove original file in insert_file: {}", e) + tracing::error!("Failed to remove original file in insert_file: {}", e) }); Ok(()) }) @@ -301,7 +303,7 @@ where Some(_) => { let path = self.rel_to_abs_path(key.as_ref()); fs::remove_file(&path).map_err(|e| { - error!("Error removing file from cache: `{:?}`: {}", path, e); + tracing::error!("Error removing file from cache: `{:?}`: {}", path, e); Into::into(e) }) } diff --git a/common/cache/src/lib.rs b/common/cache/src/lib.rs index a6db6b4d10aab..d220fa0e7bd4e 100644 --- a/common/cache/src/lib.rs +++ b/common/cache/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[macro_use] -extern crate log; #[cfg(feature = "heapsize")] #[cfg(not(target_os = "macos"))] extern crate heapsize_; diff --git a/common/clickhouse-srv/Cargo.toml b/common/clickhouse-srv/Cargo.toml index 8a13fc5ea76b0..457f7a8512b7c 100644 --- a/common/clickhouse-srv/Cargo.toml +++ b/common/clickhouse-srv/Cargo.toml @@ -18,6 +18,7 @@ tokio_io = ["tokio"] [dependencies] common-io = { path = "../io" } +common-tracing = {path = "../tracing" } lazy_static = "1.4.0" thiserror = "1.0.30" diff --git a/common/clickhouse-srv/examples/simple.rs b/common/clickhouse-srv/examples/simple.rs index a990f228871a3..d9bae60045235 100644 --- a/common/clickhouse-srv/examples/simple.rs +++ b/common/clickhouse-srv/examples/simple.rs @@ -25,12 +25,11 @@ use common_clickhouse_srv::types::Block; use common_clickhouse_srv::types::Progress; use common_clickhouse_srv::CHContext; use common_clickhouse_srv::ClickHouseServer; +use common_tracing::tracing; use futures::task::Context; use futures::task::Poll; use futures::Stream; use futures::StreamExt; -use log::debug; -use log::info; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -46,7 +45,7 @@ async fn main() -> std::result::Result<(), Box> { // Note that this is the Tokio TcpListener, which is fully async. let listener = TcpListener::bind(host_port).await?; - info!("Server start at {}", host_port); + tracing::info!("Server start at {}", host_port); loop { // Asynchronously wait for an inbound TcpStream. @@ -76,7 +75,7 @@ struct Session { impl common_clickhouse_srv::ClickHouseSession for Session { async fn execute_query(&self, ctx: &mut CHContext, connection: &mut Connection) -> Result<()> { let query = ctx.state.query.clone(); - debug!("Receive query {}", query); + tracing::debug!("Receive query {}", query); let start = Instant::now(); @@ -125,9 +124,10 @@ impl common_clickhouse_srv::ClickHouseSession for Session { } let duration = start.elapsed(); - debug!( + tracing::debug!( "ClickHouseHandler executor cost:{:?}, statistics:{:?}", - duration, "xxx", + duration, + "xxx", ); Ok(()) } diff --git a/common/clickhouse-srv/src/lib.rs b/common/clickhouse-srv/src/lib.rs index 5b23efc3b356a..d903658eb7667 100644 --- a/common/clickhouse-srv/src/lib.rs +++ b/common/clickhouse-srv/src/lib.rs @@ -14,8 +14,8 @@ use std::sync::Arc; +use common_tracing::tracing; use errors::Result; -use log::debug; use protocols::Stage; use tokio::net::TcpStream; use tokio::sync::mpsc::Sender; @@ -145,7 +145,7 @@ impl ClickHouseServer { } async fn run(&mut self, session: Arc, stream: TcpStream) -> Result<()> { - debug!("Handle New session"); + tracing::debug!("Handle New session"); let tz = session.timezone().to_string(); let mut ctx = CHContext::new(QueryState::default()); let mut connection = Connection::new(stream, session, tz)?; @@ -164,7 +164,7 @@ impl ClickHouseServer { return Err(e); } Ok(None) => { - debug!("{:?}", "none data reset"); + tracing::debug!("{:?}", "none data reset"); ctx.state.reset(); return Ok(()); } diff --git a/common/datavalues/src/series/series_impl.rs b/common/datavalues/src/series/series_impl.rs index 592bbbfdf001f..74cc41ea1c845 100644 --- a/common/datavalues/src/series/series_impl.rs +++ b/common/datavalues/src/series/series_impl.rs @@ -258,6 +258,7 @@ impl_from!([i64], DFInt64Array, new_from_slice); impl_from!([f32], DFFloat32Array, new_from_slice); impl_from!([f64], DFFloat64Array, new_from_slice); impl_from!([Vec], DFStringArray, new_from_slice); +impl_from!([String], DFStringArray, new_from_slice); impl_from!([Option], DFBooleanArray, new_from_opt_slice); impl_from!([Option], DFUInt8Array, new_from_opt_slice); diff --git a/common/exception/src/exception.rs b/common/exception/src/exception.rs index be706b4b4ecd1..a706e83313f59 100644 --- a/common/exception/src/exception.rs +++ b/common/exception/src/exception.rs @@ -188,6 +188,7 @@ build_exceptions! { SHA1CheckFailed(57), UnknownColumn(58), InvalidSourceFormat(59), + StrParseError(60), // uncategorized UnexpectedResponseType(600), diff --git a/common/exception/tests/it/exception.rs b/common/exception/tests/it/exception.rs index db047472946c7..6c576814d6467 100644 --- a/common/exception/tests/it/exception.rs +++ b/common/exception/tests/it/exception.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_exception::ErrorCode; +use common_exception::SerializedError; use tonic::Code; use tonic::Status; @@ -88,6 +90,18 @@ fn test_derive_from_display() { ); } +#[test] +fn test_from_and_to_serialized_error() { + let ec = ErrorCode::UnknownDatabase("foo"); + let se: SerializedError = ec.clone().into(); + + let ec2: ErrorCode = se.into(); + assert_eq!(ec.code(), ec2.code()); + assert_eq!(ec.message(), ec2.message()); + assert_eq!(format!("{}", ec), format!("{}", ec2)); + assert_eq!(ec.backtrace_str(), ec2.backtrace_str()); +} + #[test] fn test_from_and_to_status() -> anyhow::Result<()> { use common_exception::exception::*; diff --git a/common/flight-rpc/Cargo.toml b/common/flight-rpc/Cargo.toml index e7851e3d230f9..390143d2698cc 100644 --- a/common/flight-rpc/Cargo.toml +++ b/common/flight-rpc/Cargo.toml @@ -15,13 +15,13 @@ test = false common-arrow = {path = "../arrow"} common-base = {path = "../base" } common-exception= {path = "../exception"} +common-tracing = {path = "../tracing" } # Github dependencies # Crates.io dependencies futures = "0.3.18" jwt-simple = "0.10.7" -log = "0.4.14" prost = "0.9.0" serde = { version = "1.0.131", features = ["derive"] } serde_json = "1.0.72" diff --git a/common/flight-rpc/src/dns_resolver.rs b/common/flight-rpc/src/dns_resolver.rs index 9980593bd698b..2d3acfb98b681 100644 --- a/common/flight-rpc/src/dns_resolver.rs +++ b/common/flight-rpc/src/dns_resolver.rs @@ -25,6 +25,7 @@ use common_base::tokio; use common_base::tokio::task::JoinHandle; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use hyper::client::connect::dns::Name; use hyper::client::HttpConnector; use hyper::service::Service; @@ -157,7 +158,7 @@ impl ConnectionFactory { let builder = Channel::builder(uri.clone()); let mut endpoint = if let Some(conf) = rpc_client_config { - log::info!("tls rpc enabled"); + tracing::info!("tls rpc enabled"); let client_tls_config = Self::client_tls_config(&conf).map_err(|e| { ErrorCode::TLSConfigurationFailure(format!( "loading client tls config failure: {} ", diff --git a/common/io/Cargo.toml b/common/io/Cargo.toml index a14317e1d8aa2..1640f2c046cc6 100644 --- a/common/io/Cargo.toml +++ b/common/io/Cargo.toml @@ -15,6 +15,7 @@ test = false [dependencies] common-exception= {path = "../exception"} bytes = "1.1.0" +serde = { version = "1.0.130", features = ["derive"] } [dev-dependencies] rand = "0.8.4" diff --git a/common/io/src/lib.rs b/common/io/src/lib.rs index ac39f73bf1869..a72d862cadce3 100644 --- a/common/io/src/lib.rs +++ b/common/io/src/lib.rs @@ -20,6 +20,7 @@ mod binary_ser; mod binary_write; mod buf_read; mod marshal; +mod options_deserializer; mod stat_buffer; mod unmarshal; mod utils; diff --git a/common/io/src/options_deserializer.rs b/common/io/src/options_deserializer.rs new file mode 100644 index 0000000000000..8e4a13bd8f41c --- /dev/null +++ b/common/io/src/options_deserializer.rs @@ -0,0 +1,438 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt; +use std::fmt::Display; + +use serde::de; +use serde::de::DeserializeSeed; +use serde::de::EnumAccess; +use serde::de::Error; +use serde::de::MapAccess; +use serde::de::VariantAccess; +use serde::de::Visitor; +use serde::forward_to_deserialize_any; +use serde::Deserializer; + +/// This type represents errors that can occur when deserializing. +#[derive(Debug, Eq, PartialEq)] +pub struct OptionsDeserializerError(pub String); + +impl de::Error for OptionsDeserializerError { + #[inline] + fn custom(msg: T) -> Self { + OptionsDeserializerError(msg.to_string()) + } +} + +impl std::error::Error for OptionsDeserializerError { + #[inline] + fn description(&self) -> &str { + "options deserializer error" + } +} + +impl fmt::Display for OptionsDeserializerError { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + OptionsDeserializerError(msg) => write!(f, "{}", msg), + } + } +} + +macro_rules! unsupported_type { + ($trait_fn:ident, $name:literal) => { + fn $trait_fn(self, _: V) -> Result + where V: Visitor<'de> { + Err(OptionsDeserializerError::custom(concat!( + "unsupported type: ", + $name + ))) + } + }; +} + +pub struct OptionsDeserializer<'de> { + options: &'de HashMap, +} + +impl<'de> OptionsDeserializer<'de> { + #[inline] + pub fn new(options: &'de HashMap) -> Self { + OptionsDeserializer { options } + } +} + +impl<'de> Deserializer<'de> for OptionsDeserializer<'de> { + type Error = OptionsDeserializerError; + + unsupported_type!(deserialize_any, "'any'"); + unsupported_type!(deserialize_unit, "()"); + unsupported_type!(deserialize_bytes, "bytes"); + unsupported_type!(deserialize_byte_buf, "bytes"); + unsupported_type!(deserialize_option, "Option"); + unsupported_type!(deserialize_identifier, "identifier"); + unsupported_type!(deserialize_ignored_any, "ignored_any"); + unsupported_type!(deserialize_bool, "bool"); + unsupported_type!(deserialize_i8, "i8"); + unsupported_type!(deserialize_i16, "i16"); + unsupported_type!(deserialize_i32, "i32"); + unsupported_type!(deserialize_i64, "i64"); + unsupported_type!(deserialize_u8, "i8"); + unsupported_type!(deserialize_u16, "i16"); + unsupported_type!(deserialize_u32, "i32"); + unsupported_type!(deserialize_u64, "i64"); + unsupported_type!(deserialize_f32, "f32"); + unsupported_type!(deserialize_f64, "f64"); + unsupported_type!(deserialize_char, "char"); + unsupported_type!(deserialize_str, "char"); + unsupported_type!(deserialize_string, "string"); + unsupported_type!(deserialize_seq, "seq"); + + fn deserialize_map(self, visitor: V) -> Result + where V: Visitor<'de> { + visitor.visit_map(MapDeserializer { + options: self.options.iter(), + value: None, + }) + } + + fn deserialize_struct( + self, + _name: &'static str, + _fields: &'static [&'static str], + visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + self.deserialize_map(visitor) + } + + fn deserialize_unit_struct( + self, + _name: &'static str, + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + Err(OptionsDeserializerError::custom( + "unsupported type: unit_struct", + )) + } + + fn deserialize_newtype_struct( + self, + _name: &'static str, + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + Err(OptionsDeserializerError::custom( + "unsupported type: newtype_struct", + )) + } + + fn deserialize_tuple(self, _len: usize, _visitor: V) -> Result + where V: Visitor<'de> { + Err(OptionsDeserializerError::custom("unsupported type: tuple")) + } + + fn deserialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + Err(OptionsDeserializerError::custom( + "unsupported type: tuple_struct", + )) + } + + fn deserialize_enum( + self, + _name: &'static str, + _variants: &'static [&'static str], + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + Err(OptionsDeserializerError::custom("unsupported type: enum")) + } +} + +struct MapDeserializer<'de> { + options: std::collections::hash_map::Iter<'de, String, String>, + value: Option<&'de String>, +} + +impl<'de> MapAccess<'de> for MapDeserializer<'de> { + type Error = OptionsDeserializerError; + + fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> + where K: DeserializeSeed<'de> { + match self.options.next() { + Some((key, value)) => { + self.value = Some(value); + seed.deserialize(KeyDeserializer { key }).map(Some) + } + None => Ok(None), + } + } + + fn next_value_seed(&mut self, seed: V) -> Result + where V: DeserializeSeed<'de> { + match self.value.take() { + Some(value) => seed.deserialize(ValueDeserializer { value }), + None => Err(serde::de::Error::custom("value is missing")), + } + } +} + +struct KeyDeserializer<'de> { + key: &'de str, +} + +macro_rules! parse_key { + ($trait_fn:ident) => { + fn $trait_fn(self, visitor: V) -> Result + where V: Visitor<'de> { + visitor.visit_str(&self.key.to_lowercase()) + } + }; +} + +impl<'de> Deserializer<'de> for KeyDeserializer<'de> { + type Error = OptionsDeserializerError; + + parse_key!(deserialize_identifier); + parse_key!(deserialize_str); + parse_key!(deserialize_string); + + fn deserialize_any(self, _visitor: V) -> Result + where V: Visitor<'de> { + Err(OptionsDeserializerError::custom("Unexpected")) + } + + forward_to_deserialize_any! { + bool i8 i16 i32 i64 u8 u16 u32 u64 f32 f64 char bytes + byte_buf option unit unit_struct seq tuple + tuple_struct map newtype_struct struct enum ignored_any + } +} + +macro_rules! parse_value { + ($trait_fn:ident, $visit_fn:ident, $ty:literal) => { + fn $trait_fn(self, visitor: V) -> Result + where V: Visitor<'de> { + let v = self.value.parse().map_err(|e| { + OptionsDeserializerError::custom(format!( + "can not parse `{:?}` to a `{}`, err: {}", + self.value, $ty, e + )) + })?; + visitor.$visit_fn(v) + } + }; +} + +pub struct ValueDeserializer<'de> { + value: &'de str, +} + +impl<'de> Deserializer<'de> for ValueDeserializer<'de> { + type Error = OptionsDeserializerError; + + unsupported_type!(deserialize_any, "any"); + unsupported_type!(deserialize_seq, "seq"); + unsupported_type!(deserialize_map, "map"); + unsupported_type!(deserialize_identifier, "identifier"); + + //special case for boolean + fn deserialize_bool(self, visitor: V) -> Result + where V: Visitor<'de> { + let v = self.value.parse(); + match v { + Ok(v) => visitor.visit_bool(v), + Err(e) => match self.value { + "1" => visitor.visit_bool(true), + "0" => visitor.visit_bool(false), + _ => Err(OptionsDeserializerError::custom(format!( + "can not parse `{:?}` to a `{}`, error: {}", + self.value, "bool", e + ))), + }, + } + } + + parse_value!(deserialize_i8, visit_i8, "i8"); + parse_value!(deserialize_i16, visit_i16, "i16"); + parse_value!(deserialize_i32, visit_i32, "i16"); + parse_value!(deserialize_i64, visit_i64, "i64"); + parse_value!(deserialize_u8, visit_u8, "u8"); + parse_value!(deserialize_u16, visit_u16, "u16"); + parse_value!(deserialize_u32, visit_u32, "u32"); + parse_value!(deserialize_u64, visit_u64, "u64"); + parse_value!(deserialize_f32, visit_f32, "f32"); + parse_value!(deserialize_f64, visit_f64, "f64"); + parse_value!(deserialize_string, visit_string, "String"); + parse_value!(deserialize_byte_buf, visit_string, "String"); + parse_value!(deserialize_char, visit_char, "char"); + + fn deserialize_str(self, visitor: V) -> Result + where V: Visitor<'de> { + visitor.visit_borrowed_str(self.value) + } + + fn deserialize_bytes(self, visitor: V) -> Result + where V: Visitor<'de> { + visitor.visit_borrowed_bytes(self.value.as_bytes()) + } + + fn deserialize_option(self, visitor: V) -> Result + where V: Visitor<'de> { + visitor.visit_some(self) + } + + fn deserialize_unit(self, visitor: V) -> Result + where V: Visitor<'de> { + visitor.visit_unit() + } + + fn deserialize_unit_struct( + self, + _name: &'static str, + visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + visitor.visit_unit() + } + + fn deserialize_newtype_struct( + self, + _name: &'static str, + visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + visitor.visit_newtype_struct(self) + } + + fn deserialize_tuple(self, _len: usize, _visitor: V) -> Result + where V: Visitor<'de> { + Err(OptionsDeserializerError::custom("unsupported type: tuple")) + } + + fn deserialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + Err(OptionsDeserializerError::custom( + "unsupported type: tuple struct", + )) + } + + fn deserialize_struct( + self, + _name: &'static str, + _fields: &'static [&'static str], + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + Err(OptionsDeserializerError::custom("unsupported type: struct")) + } + + fn deserialize_enum( + self, + _name: &'static str, + _variants: &'static [&'static str], + visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + visitor.visit_enum(EnumDeserializer { value: self.value }) + } + + fn deserialize_ignored_any(self, visitor: V) -> Result + where V: Visitor<'de> { + visitor.visit_unit() + } +} + +struct EnumDeserializer<'de> { + value: &'de str, +} + +impl<'de> EnumAccess<'de> for EnumDeserializer<'de> { + type Error = OptionsDeserializerError; + type Variant = UnitVariant; + + fn variant_seed(self, seed: V) -> Result<(V::Value, Self::Variant), Self::Error> + where V: de::DeserializeSeed<'de> { + Ok(( + seed.deserialize(KeyDeserializer { key: self.value })?, + UnitVariant, + )) + } +} + +struct UnitVariant; + +impl<'de> VariantAccess<'de> for UnitVariant { + type Error = OptionsDeserializerError; + + fn unit_variant(self) -> Result<(), Self::Error> { + Ok(()) + } + + fn newtype_variant_seed(self, _seed: T) -> Result + where T: DeserializeSeed<'de> { + Err(OptionsDeserializerError::custom("not supported")) + } + + fn tuple_variant(self, _len: usize, _visitor: V) -> Result + where V: Visitor<'de> { + Err(OptionsDeserializerError::custom("not supported")) + } + + fn struct_variant( + self, + _fields: &'static [&'static str], + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + Err(OptionsDeserializerError::custom("not supported")) + } +} diff --git a/common/io/src/prelude.rs b/common/io/src/prelude.rs index 3a1f337161306..e824c6f61911b 100644 --- a/common/io/src/prelude.rs +++ b/common/io/src/prelude.rs @@ -23,6 +23,8 @@ pub use crate::binary_write::BinaryWrite; pub use crate::binary_write::BinaryWriteBuf; pub use crate::buf_read::BufReadExt; pub use crate::marshal::Marshal; +pub use crate::options_deserializer::OptionsDeserializer; +pub use crate::options_deserializer::OptionsDeserializerError; pub use crate::stat_buffer::StatBuffer; pub use crate::unmarshal::Unmarshal; pub use crate::utils::*; diff --git a/common/io/tests/it/main.rs b/common/io/tests/it/main.rs index c41ca3844820f..e4c7e69d280fb 100644 --- a/common/io/tests/it/main.rs +++ b/common/io/tests/it/main.rs @@ -16,4 +16,5 @@ mod binary_read; mod binary_write; mod buf_read; mod marshal; +mod options_deserializer; mod utils; diff --git a/common/io/tests/it/options_deserializer.rs b/common/io/tests/it/options_deserializer.rs new file mode 100644 index 0000000000000..1cb226b3b4617 --- /dev/null +++ b/common/io/tests/it/options_deserializer.rs @@ -0,0 +1,115 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_io::prelude::OptionsDeserializer; +use serde::Deserialize; + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +enum Format { + Csv, + Parquet, + Json, +} +impl Default for Format { + fn default() -> Self { + Format::Csv + } +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +enum Compression { + Auto, + Gzip, + Bz2, + Brotli, + Zstd, + Deflate, + RawDeflate, + Lzo, + Snappy, + None, +} + +impl Default for Compression { + fn default() -> Self { + Self::None + } +} + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +struct FileFormat { + #[serde(default)] + pub format: Format, + #[serde(default = "default_record_delimiter")] + pub record_delimiter: String, + #[serde(default = "default_field_delimiter")] + pub field_delimiter: String, + #[serde(default = "default_csv_header")] + pub csv_header: bool, + #[serde(default = "default_compression")] + pub compression: Compression, +} + +fn default_record_delimiter() -> String { + "\n".to_string() +} + +fn default_field_delimiter() -> String { + ",".to_string() +} + +fn default_csv_header() -> bool { + false +} + +fn default_compression() -> Compression { + Compression::default() +} + +#[test] +fn test_options_de() { + let mut values = HashMap::new(); + values.insert("Format".to_string(), "Csv".to_string()); + values.insert("Field_delimiter".to_string(), "/".to_string()); + values.insert("csv_header".to_string(), "1".to_string()); + values.insert("compression".to_string(), "GZIP".to_string()); + + let fmt = FileFormat::deserialize(OptionsDeserializer::new(&values)).unwrap(); + assert_eq!(fmt, FileFormat { + format: Format::Csv, + record_delimiter: "\n".to_string(), + field_delimiter: "/".to_string(), + csv_header: true, + compression: Compression::Gzip + }); + + let fmt = FileFormat::deserialize(OptionsDeserializer::new(&HashMap::new())).unwrap(); + assert_eq!(fmt, FileFormat { + format: Format::Csv, + record_delimiter: "\n".to_string(), + field_delimiter: ",".to_string(), + csv_header: false, + compression: Compression::None + }); + + values.insert("nokey".to_string(), "Parquet".to_string()); + + let fmt = FileFormat::deserialize(OptionsDeserializer::new(&values)); + assert!(fmt.is_err()); +} diff --git a/common/management/tests/it/stage.rs b/common/management/tests/it/stage.rs index c627853bd2616..d641d79f254e8 100644 --- a/common/management/tests/it/stage.rs +++ b/common/management/tests/it/stage.rs @@ -21,6 +21,7 @@ use common_management::*; use common_meta_api::KVApi; use common_meta_embedded::MetaEmbedded; use common_meta_types::Credentials; +use common_meta_types::FileFormat; use common_meta_types::SeqV; use common_meta_types::StageParams; use common_meta_types::UserStageInfo; @@ -109,11 +110,11 @@ async fn test_unknown_stage_drop_stage() -> Result<()> { fn create_test_stage_info() -> UserStageInfo { UserStageInfo { stage_name: "mystage".to_string(), - stage_params: StageParams::new("test", Credentials::S3 { + stage_params: StageParams::new("test", Credentials { access_key_id: String::from("test"), secret_access_key: String::from("test"), }), - file_format: None, + file_format: FileFormat::default(), comments: "".to_string(), } } diff --git a/common/meta/api/src/meta_api_test_suite.rs b/common/meta/api/src/meta_api_test_suite.rs index b69e14db0adc5..de16e89659eec 100644 --- a/common/meta/api/src/meta_api_test_suite.rs +++ b/common/meta/api/src/meta_api_test_suite.rs @@ -490,14 +490,71 @@ impl MetaApiTestSuite { .await; tracing::debug!("get present database res: {:?}", res); let err = res.unwrap_err(); - println!("{:?}", err); assert_eq!(ErrorCode::UnknownDatabase("").code(), err.code()); - // TODO(xp): this does no pass. serialized error needs to be refined. - // assert_eq!("Code: 3, displayText = nonexistent.", err.message()); + assert_eq!("nonexistent", err.message()); + assert_eq!("Code: 3, displayText = nonexistent.", format!("{}", err)); } // TODO(xp): test drop is replicated to follower Ok(()) } + + pub async fn list_table_leader_follower( + &self, + leader: &MT, + follower: &MT, + ) -> anyhow::Result<()> { + tracing::info!("--- create db1 and tb1, tb2 on leader"); + let db_name = "db1"; + { + let req = CreateDatabaseReq { + if_not_exists: false, + db: db_name.to_string(), + engine: "github".to_string(), + options: Default::default(), + }; + let res = leader.create_database(req).await; + tracing::info!("create database res: {:?}", res); + assert!(res.is_ok()); + + let tables = vec!["tb1", "tb2"]; + let schema = Arc::new(DataSchema::new(vec![DataField::new( + "number", + DataType::UInt64, + false, + )])); + + let options = maplit::hashmap! {"opt‐1".into() => "val-1".into()}; + for tb in tables { + let req = CreateTableReq { + if_not_exists: false, + db: db_name.to_string(), + table: tb.to_string(), + table_meta: TableMeta { + schema: schema.clone(), + engine: "JSON".to_string(), + options: options.clone(), + }, + }; + let res = leader.create_table(req).await; + tracing::info!("create table res: {:?}", res); + assert!(res.is_ok()); + } + } + + tracing::info!("--- list tables from follower"); + { + let res = follower.list_tables(ListTableReq::new(db_name)).await; + tracing::debug!("get table list: {:?}", res); + let res = res?; + assert_eq!(2, res.len(), "table list len is 2"); + assert_eq!(1, res[0].ident.table_id, "tb1 id is 1"); + assert_eq!("tb1".to_string(), res[0].name, "tb1.name is tb1"); + assert_eq!(2, res[1].ident.table_id, "tb2 id is 2"); + assert_eq!("tb2".to_string(), res[1].name, "tb2.name is tb2"); + } + + Ok(()) + } } diff --git a/common/meta/flight/Cargo.toml b/common/meta/flight/Cargo.toml index 6637171d72f46..fc2e0334e6189 100644 --- a/common/meta/flight/Cargo.toml +++ b/common/meta/flight/Cargo.toml @@ -29,7 +29,6 @@ async-trait = "0.1.52" derive_more = "0.99.17" futures = "0.3.18" jwt-simple = "0.10.7" -log = "0.4.14" prost = "0.9.0" # prost-derive = "0.9.0" rand = "0.8.4" diff --git a/common/meta/types/src/lib.rs b/common/meta/types/src/lib.rs index f12c00c3ad050..8501725c85b7e 100644 --- a/common/meta/types/src/lib.rs +++ b/common/meta/types/src/lib.rs @@ -90,8 +90,4 @@ pub use user_info::UserInfo; pub use user_privilege::UserPrivilegeSet; pub use user_privilege::UserPrivilegeType; pub use user_quota::UserQuota; -pub use user_stage::Compression; -pub use user_stage::Credentials; -pub use user_stage::FileFormat; -pub use user_stage::StageParams; -pub use user_stage::UserStageInfo; +pub use user_stage::*; diff --git a/common/meta/types/src/user_stage.rs b/common/meta/types/src/user_stage.rs index 3d51b07dd3760..76a5e5eff8ae2 100644 --- a/common/meta/types/src/user_stage.rs +++ b/common/meta/types/src/user_stage.rs @@ -17,33 +17,95 @@ use std::str::FromStr; use common_exception::ErrorCode; use common_exception::Result; -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)] pub struct StageParams { pub url: String, pub credentials: Credentials, } +#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "lowercase")] +pub struct Credentials { + #[serde(default)] + pub access_key_id: String, + #[serde(default)] + pub secret_access_key: String, +} + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] -pub enum Credentials { - S3 { - access_key_id: String, - secret_access_key: String, - }, +#[serde(deny_unknown_fields)] +pub struct FileFormat { + #[serde(default)] + pub format: Format, + #[serde(default = "default_record_delimiter")] + pub record_delimiter: String, + #[serde(default = "default_field_delimiter")] + pub field_delimiter: String, + #[serde(default = "default_csv_header")] + pub csv_header: bool, + #[serde(default)] + pub compression: Compression, +} + +impl Default for FileFormat { + fn default() -> Self { + Self { + format: Format::default(), + record_delimiter: default_record_delimiter(), + field_delimiter: default_field_delimiter(), + csv_header: default_csv_header(), + compression: Compression::default(), + } + } +} + +fn default_record_delimiter() -> String { + "\n".to_string() +} + +fn default_field_delimiter() -> String { + ",".to_string() +} + +fn default_csv_header() -> bool { + false } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] -pub enum FileFormat { - Csv { - compression: Compression, - record_delimiter: String, - }, - Parquet { - compression: Compression, - }, +#[serde(rename_all = "lowercase")] +pub enum Format { + Csv, + Parquet, Json, } +impl Default for Format { + fn default() -> Self { + Self::Csv + } +} + +impl FromStr for Format { + type Err = ErrorCode; + + fn from_str(s: &str) -> Result { + let s = s.to_lowercase(); + match s.as_str() { + "csv" => Ok(Format::Csv), + "parquet" => Ok(Format::Parquet), + "json" => Ok(Format::Json), + + other => Err(ErrorCode::StrParseError(format!( + "no match for format: {}", + other + ))), + } + } +} + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] pub enum Compression { Auto, Gzip, @@ -57,21 +119,30 @@ pub enum Compression { None, } +impl Default for Compression { + fn default() -> Self { + Self::None + } +} + impl FromStr for Compression { - type Err = &'static str; + type Err = ErrorCode; - fn from_str(s: &str) -> std::result::Result { - let s = s.to_uppercase(); + fn from_str(s: &str) -> Result { + let s = s.to_lowercase(); match s.as_str() { - "AUTO" => Ok(Compression::Auto), - "GZIP" => Ok(Compression::Gzip), - "BZ2" => Ok(Compression::Bz2), - "BROTLI" => Ok(Compression::Brotli), - "ZSTD" => Ok(Compression::Zstd), - "DEFLATE" => Ok(Compression::Deflate), - "RAW_DEFLATE" => Ok(Compression::RawDeflate), - "NONE" => Ok(Compression::None), - _ => Err("no match for compression"), + "auto" => Ok(Compression::Auto), + "gzip" => Ok(Compression::Gzip), + "bz2" => Ok(Compression::Bz2), + "brotli" => Ok(Compression::Brotli), + "zstd" => Ok(Compression::Zstd), + "deflate" => Ok(Compression::Deflate), + "raw_deflate" => Ok(Compression::RawDeflate), + "none" => Ok(Compression::None), + other => Err(ErrorCode::StrParseError(format!( + "no match for compression: {}", + other + ))), } } } @@ -85,15 +156,13 @@ impl StageParams { } } /// Stage for data stage location. -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)] pub struct UserStageInfo { #[serde(default)] pub stage_name: String, pub stage_params: StageParams, - #[serde(default)] - pub file_format: Option, - #[serde(default)] + pub file_format: FileFormat, pub comments: String, } @@ -102,7 +171,7 @@ impl UserStageInfo { stage_name: &str, comments: &str, stage_params: StageParams, - file_format: Option, + file_format: FileFormat, ) -> Self { UserStageInfo { stage_name: stage_name.to_string(), diff --git a/common/meta/types/tests/it/user_stage.rs b/common/meta/types/tests/it/user_stage.rs index af4883e2f486a..7f744d82520cb 100644 --- a/common/meta/types/tests/it/user_stage.rs +++ b/common/meta/types/tests/it/user_stage.rs @@ -16,6 +16,7 @@ use common_exception::exception::Result; use common_meta_types::Compression; use common_meta_types::Credentials; use common_meta_types::FileFormat; +use common_meta_types::Format; use common_meta_types::StageParams; use common_meta_types::UserStageInfo; @@ -24,13 +25,15 @@ fn test_user_stage() -> Result<()> { let stage = UserStageInfo::new( "databend", "this is a comment", - StageParams::new("test", Credentials::S3 { + StageParams::new("test", Credentials { access_key_id: "test".to_string(), secret_access_key: "test".to_string(), }), - Some(FileFormat::Parquet { + FileFormat { compression: Compression::None, - }), + format: Format::Parquet, + ..Default::default() + }, ); let ser = serde_json::to_string(&stage)?; diff --git a/common/planners/src/lib.rs b/common/planners/src/lib.rs index 14d7e0bd048ca..89117d1595a8c 100644 --- a/common/planners/src/lib.rs +++ b/common/planners/src/lib.rs @@ -19,6 +19,7 @@ mod plan_builder; mod plan_copy; mod plan_database_create; mod plan_database_drop; +mod plan_describe_stage; mod plan_describe_table; mod plan_display; mod plan_display_indent; @@ -77,6 +78,7 @@ pub use plan_copy::CopyPlan; pub use plan_database_create::CreateDatabasePlan; pub use plan_database_create::DatabaseOptions; pub use plan_database_drop::DropDatabasePlan; +pub use plan_describe_stage::DescribeStagePlan; pub use plan_describe_table::DescribeTablePlan; pub use plan_empty::EmptyPlan; pub use plan_explain::ExplainPlan; diff --git a/common/planners/src/plan_describe_stage.rs b/common/planners/src/plan_describe_stage.rs new file mode 100644 index 0000000000000..2e1980de8ba74 --- /dev/null +++ b/common/planners/src/plan_describe_stage.rs @@ -0,0 +1,36 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datavalues::DataField; +use common_datavalues::DataSchemaRef; +use common_datavalues::DataSchemaRefExt; +use common_datavalues::DataType; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct DescribeStagePlan { + pub name: String, +} + +impl DescribeStagePlan { + pub fn schema(&self) -> DataSchemaRef { + DataSchemaRefExt::create(vec![ + DataField::new("parent_properties", DataType::String, false), + DataField::new("properties", DataType::String, false), + DataField::new("property_types", DataType::String, false), + DataField::new("property_values", DataType::String, false), + DataField::new("property_defaults", DataType::String, false), + DataField::new("property_changed", DataType::Boolean, false), + ]) + } +} diff --git a/common/planners/src/plan_node.rs b/common/planners/src/plan_node.rs index c3a9d84cb898d..3a77cee32eb86 100644 --- a/common/planners/src/plan_node.rs +++ b/common/planners/src/plan_node.rs @@ -26,6 +26,7 @@ use crate::CopyPlan; use crate::CreateDatabasePlan; use crate::CreateTablePlan; use crate::CreateUserPlan; +use crate::DescribeStagePlan; use crate::DescribeTablePlan; use crate::DropDatabasePlan; use crate::DropTablePlan; @@ -78,6 +79,7 @@ pub enum PlanNode { DropDatabase(DropDatabasePlan), CreateTable(CreateTablePlan), DescribeTable(DescribeTablePlan), + DescribeStage(DescribeStagePlan), DropTable(DropTablePlan), TruncateTable(TruncateTablePlan), UseDatabase(UseDatabasePlan), @@ -120,6 +122,7 @@ impl PlanNode { PlanNode::CreateTable(v) => v.schema(), PlanNode::DropTable(v) => v.schema(), PlanNode::DescribeTable(v) => v.schema(), + PlanNode::DescribeStage(v) => v.schema(), PlanNode::TruncateTable(v) => v.schema(), PlanNode::SetVariable(v) => v.schema(), PlanNode::Sort(v) => v.schema(), @@ -161,6 +164,7 @@ impl PlanNode { PlanNode::DropDatabase(_) => "DropDatabasePlan", PlanNode::CreateTable(_) => "CreateTablePlan", PlanNode::DescribeTable(_) => "DescribeTablePlan", + PlanNode::DescribeStage(_) => "DescribeStagePlan", PlanNode::DropTable(_) => "DropTablePlan", PlanNode::TruncateTable(_) => "TruncateTablePlan", PlanNode::SetVariable(_) => "SetVariablePlan", diff --git a/common/planners/src/plan_rewriter.rs b/common/planners/src/plan_rewriter.rs index a955f9a7ece9e..7213659adec5e 100644 --- a/common/planners/src/plan_rewriter.rs +++ b/common/planners/src/plan_rewriter.rs @@ -31,6 +31,7 @@ use crate::CreateDatabasePlan; use crate::CreateTablePlan; use crate::CreateUserPlan; use crate::CreateUserStagePlan; +use crate::DescribeStagePlan; use crate::DescribeTablePlan; use crate::DropDatabasePlan; use crate::DropTablePlan; @@ -106,6 +107,7 @@ pub trait PlanRewriter { PlanNode::Having(plan) => self.rewrite_having(plan), PlanNode::Expression(plan) => self.rewrite_expression(plan), PlanNode::DescribeTable(plan) => self.rewrite_describe_table(plan), + PlanNode::DescribeStage(plan) => self.rewrite_describe_stage(plan), PlanNode::DropTable(plan) => self.rewrite_drop_table(plan), PlanNode::DropDatabase(plan) => self.rewrite_drop_database(plan), PlanNode::Insert(plan) => self.rewrite_insert_into(plan), @@ -332,6 +334,10 @@ pub trait PlanRewriter { Ok(PlanNode::DescribeTable(plan.clone())) } + fn rewrite_describe_stage(&mut self, plan: &DescribeStagePlan) -> Result { + Ok(PlanNode::DescribeStage(plan.clone())) + } + fn rewrite_drop_table(&mut self, plan: &DropTablePlan) -> Result { Ok(PlanNode::DropTable(plan.clone())) } diff --git a/common/planners/src/plan_visitor.rs b/common/planners/src/plan_visitor.rs index 441eadbc6838e..d2015e02101aa 100644 --- a/common/planners/src/plan_visitor.rs +++ b/common/planners/src/plan_visitor.rs @@ -24,6 +24,7 @@ use crate::CreateDatabasePlan; use crate::CreateTablePlan; use crate::CreateUserPlan; use crate::CreateUserStagePlan; +use crate::DescribeStagePlan; use crate::DescribeTablePlan; use crate::DropDatabasePlan; use crate::DropTablePlan; @@ -114,6 +115,7 @@ pub trait PlanVisitor { PlanNode::CreateTable(plan) => self.visit_create_table(plan), PlanNode::DropTable(plan) => self.visit_drop_table(plan), PlanNode::DescribeTable(plan) => self.visit_describe_table(plan), + PlanNode::DescribeStage(plan) => self.visit_describe_stage(plan), PlanNode::TruncateTable(plan) => self.visit_truncate_table(plan), PlanNode::UseDatabase(plan) => self.visit_use_database(plan), PlanNode::SetVariable(plan) => self.visit_set_variable(plan), @@ -278,6 +280,10 @@ pub trait PlanVisitor { Ok(()) } + fn visit_describe_stage(&mut self, _: &DescribeStagePlan) -> Result<()> { + Ok(()) + } + fn visit_drop_table(&mut self, _: &DropTablePlan) -> Result<()> { Ok(()) } diff --git a/metasrv/src/executor/meta_handlers.rs b/metasrv/src/executor/meta_handlers.rs index b4d660bdc7605..6c4216cb0a584 100644 --- a/metasrv/src/executor/meta_handlers.rs +++ b/metasrv/src/executor/meta_handlers.rs @@ -290,8 +290,21 @@ impl RequestHandler> for ActionHandler { &self, req: FlightReq, ) -> common_exception::Result>> { - let sm = self.meta_node.get_state_machine().await; - sm.list_tables(req.req).await + let res = self + .meta_node + .handle_admin_req(AdminRequest { + forward_to_leader: true, + req: AdminRequestInner::ListTable(req.req), + }) + .await?; + + let res: Vec> = res + .try_into() + .map_err_to_code(ErrorCode::UnknownException, || { + "handling FlightReq ListTableReq".to_string() + })?; + + Ok(res) } } #[async_trait::async_trait] diff --git a/metasrv/src/meta_service/message.rs b/metasrv/src/meta_service/message.rs index 181765c550a1f..6f977f0da0b03 100644 --- a/metasrv/src/meta_service/message.rs +++ b/metasrv/src/meta_service/message.rs @@ -20,8 +20,10 @@ use async_raft::raft::VoteRequest; use common_meta_raft_store::state_machine::AppliedState; use common_meta_types::DatabaseInfo; use common_meta_types::GetDatabaseReq; +use common_meta_types::ListTableReq; use common_meta_types::LogEntry; use common_meta_types::NodeId; +use common_meta_types::TableInfo; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; @@ -40,8 +42,8 @@ pub struct JoinRequest { pub enum AdminRequestInner { Join(JoinRequest), Write(LogEntry), - GetDatabase(GetDatabaseReq), + ListTable(ListTableReq), } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] @@ -63,8 +65,8 @@ impl AdminRequest { pub enum AdminResponse { Join(()), AppliedState(AppliedState), - DatabaseInfo(Arc), + ListTable(Vec>), } impl tonic::IntoRequest for AdminRequest { diff --git a/metasrv/src/meta_service/meta_leader.rs b/metasrv/src/meta_service/meta_leader.rs index cb0edd36a8f9f..c2e220ce6bf11 100644 --- a/metasrv/src/meta_service/meta_leader.rs +++ b/metasrv/src/meta_service/meta_leader.rs @@ -59,11 +59,17 @@ impl<'a> MetaLeader<'a> { let res = self.write(entry).await?; Ok(AdminResponse::AppliedState(res)) } + AdminRequestInner::GetDatabase(req) => { let x = self.meta_node.get_state_machine().await; let res = x.get_database(req).await?; Ok(AdminResponse::DatabaseInfo(res)) } + AdminRequestInner::ListTable(req) => { + let sm = self.meta_node.get_state_machine().await; + let res = sm.list_tables(req).await?; + Ok(AdminResponse::ListTable(res)) + } } } diff --git a/metasrv/tests/it/flight/metasrv_flight_meta_api_leader_follower.rs b/metasrv/tests/it/flight/metasrv_flight_meta_api_leader_follower.rs index 751d37f4ff00e..d6e29d1cc6f2e 100644 --- a/metasrv/tests/it/flight/metasrv_flight_meta_api_leader_follower.rs +++ b/metasrv/tests/it/flight/metasrv_flight_meta_api_leader_follower.rs @@ -44,3 +44,28 @@ async fn test_meta_api_database_create_get_drop() -> anyhow::Result<()> { .database_create_get_drop_leader_follower(&client0, &client1) .await } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_meta_api_list_table() -> anyhow::Result<()> { + let (_log_guards, ut_span) = init_meta_ut!(); + let _ent = ut_span.enter(); + + let mut tc0 = new_metasrv_test_context(0); + let mut tc1 = new_metasrv_test_context(1); + + tc1.config.raft_config.single = false; + tc1.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr()]; + + start_metasrv_with_context(&mut tc0).await?; + start_metasrv_with_context(&mut tc1).await?; + + let addr0 = tc0.config.flight_api_address.clone(); + let addr1 = tc1.config.flight_api_address.clone(); + + let client0 = MetaFlightClient::try_create(addr0.as_str(), "root", "xxx").await?; + let client1 = MetaFlightClient::try_create(addr1.as_str(), "root", "xxx").await?; + + MetaApiTestSuite {} + .list_table_leader_follower(&client0, &client1) + .await +} diff --git a/query/Cargo.toml b/query/Cargo.toml index 083dd8b040e65..e5243662e9701 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -77,7 +77,7 @@ headers = "0.3.5" hyper = "0.14.15" indexmap = "1.7.0" lazy_static = "1.4.0" -log = "0.4.14" +maplit = "1.0.2" metrics = "0.17.0" nom = "7.1.0" num = "0.4.0" diff --git a/query/src/api/http_service.rs b/query/src/api/http_service.rs index 26249193382a7..7c4080b10db51 100644 --- a/query/src/api/http_service.rs +++ b/query/src/api/http_service.rs @@ -17,6 +17,7 @@ use std::path::Path; use std::sync::Arc; use common_exception::Result; +use common_tracing::tracing; use poem::get; use poem::listener::RustlsConfig; use poem::Endpoint; @@ -75,7 +76,7 @@ impl HttpService { } async fn start_with_tls(&mut self, listening: SocketAddr) -> Result { - log::info!("Http API TLS enabled"); + tracing::info!("Http API TLS enabled"); let tls_config = Self::build_tls(self.sessions.get_conf())?; let addr = self @@ -86,7 +87,7 @@ impl HttpService { } async fn start_without_tls(&mut self, listening: SocketAddr) -> Result { - log::warn!("Http API TLS not set"); + tracing::warn!("Http API TLS not set"); let addr = self .shutdown_handler diff --git a/query/src/api/rpc/flight_dispatcher.rs b/query/src/api/rpc/flight_dispatcher.rs index b02a7bba1731b..55378f5003427 100644 --- a/query/src/api/rpc/flight_dispatcher.rs +++ b/query/src/api/rpc/flight_dispatcher.rs @@ -26,6 +26,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_exception::ToErrorCode; use common_infallible::RwLock; +use common_tracing::tracing; use tokio_stream::StreamExt; use crate::api::rpc::flight_scatter::FlightScatter; @@ -149,7 +150,7 @@ impl DatabendQueryFlightDispatcher { Ok(mut abortable_stream) => { while let Some(item) = abortable_stream.next().await { if let Err(error) = tx.send(item).await { - log::error!( + tracing::error!( "Cannot push data when run_action_without_scatters. {}", error ); diff --git a/query/src/api/rpc_service.rs b/query/src/api/rpc_service.rs index 256f4f908694b..494560c0765f1 100644 --- a/query/src/api/rpc_service.rs +++ b/query/src/api/rpc_service.rs @@ -22,6 +22,7 @@ use common_base::tokio::net::TcpListener; use common_base::tokio::sync::Notify; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::Identity; use tonic::transport::Server; @@ -78,7 +79,7 @@ impl RpcService { let conf = self.sessions.get_conf(); let builder = Server::builder(); let mut builder = if conf.tls_rpc_server_enabled() { - log::info!("databend query tls rpc enabled"); + tracing::info!("databend query tls rpc enabled"); builder .tls_config(Self::server_tls_config(conf).await.map_err(|e| { ErrorCode::TLSConfigurationFailure(format!( diff --git a/query/src/bin/databend-benchmark.rs b/query/src/bin/databend-benchmark.rs index 57b2e5af942c0..f8b3e080b5781 100644 --- a/query/src/bin/databend-benchmark.rs +++ b/query/src/bin/databend-benchmark.rs @@ -33,6 +33,7 @@ use common_exception::Result; use common_exception::ToErrorCode; use common_infallible::RwLock; use common_macros::databend_main; +use common_tracing::tracing; use crossbeam_queue::ArrayQueue; use futures::future::try_join_all; use futures::StreamExt; @@ -166,7 +167,7 @@ async fn run(bench: BenchmarkRef) -> Result<()> { executors.push(tokio::spawn(async move { if let Err(e) = execute(b).await { b2.shutdown.store(true, Ordering::Relaxed); - log::error!("Got error in query {:?}", e); + tracing::error!("Got error in query {:?}", e); } })); } diff --git a/query/src/bin/databend-query.rs b/query/src/bin/databend-query.rs index 2a3bca1fe9e91..e20659b5c502c 100644 --- a/query/src/bin/databend-query.rs +++ b/query/src/bin/databend-query.rs @@ -20,6 +20,7 @@ use common_meta_embedded::MetaEmbedded; use common_metrics::init_default_metrics_recorder; use common_tracing::init_tracing_with_file; use common_tracing::set_panic_hook; +use common_tracing::tracing; use databend_query::api::HttpService; use databend_query::api::RpcService; use databend_query::configs::Config; @@ -30,7 +31,6 @@ use databend_query::servers::MySQLHandler; use databend_query::servers::Server; use databend_query::servers::ShutdownHandle; use databend_query::sessions::SessionManager; -use log::info; #[databend_main] async fn main(_global_tracker: Arc) -> common_exception::Result<()> { @@ -40,7 +40,7 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< // If config file is not empty: -c xx.toml // Reload configs from the file. if !conf.config_file.is_empty() { - info!("Config reload from {:?}", conf.config_file); + tracing::info!("Config reload from {:?}", conf.config_file); conf = Config::load_from_toml(conf.config_file.as_str())?; } @@ -66,8 +66,8 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< init_default_metrics_recorder(); set_panic_hook(); - info!("{:?}", conf); - info!( + tracing::info!("{:?}", conf); + tracing::info!( "DatabendQuery v-{}", *databend_query::configs::DATABEND_COMMIT_VERSION, ); @@ -83,7 +83,7 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< let listening = handler.start(listening.parse()?).await?; shutdown_handle.add_service(handler); - info!( + tracing::info!( "MySQL handler listening on {}, Usage: mysql -h{} -P{}", listening, listening.ip(), @@ -100,7 +100,7 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< let listening = srv.start(listening.parse()?).await?; shutdown_handle.add_service(srv); - info!( + tracing::info!( "ClickHouse handler listening on {}, Usage: clickhouse-client --host {} --port {}", listening, listening.ip(), @@ -117,9 +117,10 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< shutdown_handle.add_service(srv); let http_handler_usage = HttpHandler::usage(listening); - info!( + tracing::info!( "Http handler listening on {} {}", - listening, http_handler_usage + listening, + http_handler_usage ); } @@ -129,7 +130,7 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< let mut srv = MetricService::create(session_manager.clone()); let listening = srv.start(address.parse()?).await?; shutdown_handle.add_service(srv); - info!("Metric API server listening on {}", listening); + tracing::info!("Metric API server listening on {}", listening); } // HTTP API service. @@ -138,7 +139,7 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< let mut srv = HttpService::create(session_manager.clone()); let listening = srv.start(address.parse()?).await?; shutdown_handle.add_service(srv); - info!("HTTP API server listening on {}", listening); + tracing::info!("HTTP API server listening on {}", listening); } // RPC API service. @@ -147,7 +148,7 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< let mut srv = RpcService::create(session_manager.clone()); let listening = srv.start(address.parse()?).await?; shutdown_handle.add_service(srv); - info!("RPC API server listening on {}", listening); + tracing::info!("RPC API server listening on {}", listening); } // Cluster register. @@ -155,14 +156,15 @@ async fn main(_global_tracker: Arc) -> common_exception::Result< let cluster_discovery = session_manager.get_cluster_discovery(); let register_to_metastore = cluster_discovery.register_to_metastore(&conf); register_to_metastore.await?; - info!( + tracing::info!( "Databend query has been registered:{:?} to metasrv:[{:?}].", - conf.query.cluster_id, conf.meta.meta_address + conf.query.cluster_id, + conf.meta.meta_address ); } - log::info!("Ready for connections."); + tracing::info!("Ready for connections."); shutdown_handle.wait_for_termination_request().await; - log::info!("Shutdown server."); + tracing::info!("Shutdown server."); Ok(()) } diff --git a/query/src/clusters/cluster.rs b/query/src/clusters/cluster.rs index c00144948e69b..b92fac468945b 100644 --- a/query/src/clusters/cluster.rs +++ b/query/src/clusters/cluster.rs @@ -35,6 +35,7 @@ use common_management::ClusterApi; use common_management::ClusterMgr; use common_meta_api::KVApi; use common_meta_types::NodeInfo; +use common_tracing::tracing; use futures::future::select; use futures::future::Either; use futures::Future; @@ -115,7 +116,7 @@ impl ClusterDiscovery { if before_node.flight_address.eq(&node_info.flight_address) { let drop_invalid_node = self.api_provider.drop_node(before_node.id, None); if let Err(cause) = drop_invalid_node.await { - log::warn!("Drop invalid node failure: {:?}", cause); + tracing::warn!("Drop invalid node failure: {:?}", cause); } } } @@ -127,7 +128,7 @@ impl ClusterDiscovery { let mut heartbeat = self.heartbeat.lock().await; if let Err(shutdown_failure) = heartbeat.shutdown().await { - log::warn!( + tracing::warn!( "Cannot shutdown cluster heartbeat, cause {:?}", shutdown_failure ); @@ -139,7 +140,7 @@ impl ClusterDiscovery { match futures::future::select(drop_node, signal_future).await { Either::Left((drop_node_result, _)) => { if let Err(drop_node_failure) = drop_node_result { - log::warn!( + tracing::warn!( "Cannot drop cluster node(while shutdown), cause {:?}", drop_node_failure ); @@ -280,7 +281,7 @@ impl ClusterHeartbeat { shutdown_notified = new_shutdown_notified; let heartbeat = cluster_api.heartbeat(local_id.clone(), None); if let Err(failure) = heartbeat.await { - log::error!("Cluster cluster api heartbeat failure: {:?}", failure); + tracing::error!("Cluster cluster api heartbeat failure: {:?}", failure); } } } diff --git a/query/src/common/service/http_shutdown_handles.rs b/query/src/common/service/http_shutdown_handles.rs index f3c1c2ec1be85..d900ef3bb493e 100644 --- a/query/src/common/service/http_shutdown_handles.rs +++ b/query/src/common/service/http_shutdown_handles.rs @@ -18,6 +18,7 @@ use common_base::tokio::sync::oneshot; use common_base::tokio::task::JoinHandle; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use futures::FutureExt; use poem::listener::Acceptor; use poem::listener::AcceptorExt; @@ -94,7 +95,7 @@ impl HttpShutdownHandler { } if let Some(join_handle) = self.join_handle.take() { if let Err(error) = join_handle.await { - log::error!( + tracing::error!( "Unexpected error during shutdown Http Server {}. cause {}", self.service_name, error diff --git a/query/src/interpreters/interpreter_describe_stage.rs b/query/src/interpreters/interpreter_describe_stage.rs new file mode 100644 index 0000000000000..3fd685d655d15 --- /dev/null +++ b/query/src/interpreters/interpreter_describe_stage.rs @@ -0,0 +1,143 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_datavalues::prelude::*; +use common_datavalues::series::Series; +use common_exception::Result; +use common_meta_types::UserStageInfo; +use common_planners::DescribeStagePlan; +use common_streams::DataBlockStream; +use common_streams::SendableDataBlockStream; + +use crate::interpreters::Interpreter; +use crate::interpreters::InterpreterPtr; +use crate::sessions::QueryContext; + +pub struct DescribeStageInterpreter { + ctx: Arc, + plan: DescribeStagePlan, +} + +impl DescribeStageInterpreter { + pub fn try_create(ctx: Arc, plan: DescribeStagePlan) -> Result { + Ok(Arc::new(DescribeStageInterpreter { ctx, plan })) + } +} + +#[async_trait::async_trait] +impl Interpreter for DescribeStageInterpreter { + fn name(&self) -> &str { + "DescribeStageInterpreter" + } + + async fn execute( + &self, + _input_stream: Option, + ) -> Result { + let schema = self.plan.schema(); + let default_stage = UserStageInfo::default(); + let stage = self + .ctx + .get_sessions_manager() + .get_user_manager() + .get_stage(self.plan.name.as_str()) + .await?; + + let mut parent_properties: Vec<&str> = vec![]; + let mut properties: Vec<&str> = vec![]; + let mut property_types: Vec<&str> = vec![]; + let mut property_values: Vec = vec![]; + let mut property_defaults: Vec = vec![]; + + let params = &stage.stage_params; + + // url + parent_properties.push("stage_params"); + properties.push("url"); + property_types.push("String"); + property_values.push(params.url.clone()); + property_defaults.push(default_stage.stage_params.url.clone()); + + // credentials + parent_properties.push("credentials"); + properties.push("access_key_id"); + property_types.push("String"); + property_values.push(params.credentials.access_key_id.clone()); + property_defaults.push(default_stage.stage_params.credentials.access_key_id.clone()); + + parent_properties.push("credentials"); + properties.push("secret_access_key"); + property_types.push("String"); + property_values.push(params.credentials.secret_access_key.clone()); + property_defaults.push( + default_stage + .stage_params + .credentials + .secret_access_key + .clone(), + ); + + // format + { + parent_properties.push("file_format"); + properties.push("format"); + property_types.push("String"); + property_values.push(format!("{:?}", stage.file_format.format)); + property_defaults.push(format!("{:?}", default_stage.file_format.format)); + + parent_properties.push("file_format"); + properties.push("record_delimiter"); + property_types.push("String"); + property_values.push(stage.file_format.record_delimiter); + property_defaults.push(default_stage.file_format.record_delimiter); + + parent_properties.push("file_format"); + properties.push("field_delimiter"); + property_types.push("String"); + property_values.push(stage.file_format.field_delimiter.clone()); + property_defaults.push(default_stage.file_format.field_delimiter.clone()); + + parent_properties.push("file_format"); + properties.push("csv_header"); + property_types.push("Boolean"); + property_values.push(format!("{:?}", stage.file_format.csv_header)); + property_defaults.push(format!("{:?}", default_stage.file_format.csv_header)); + + parent_properties.push("file_format"); + properties.push("compression"); + property_types.push("String"); + property_values.push(format!("{:?}", stage.file_format.compression)); + property_defaults.push(format!("{:?}", default_stage.file_format.compression)); + } + + let property_changed = property_values + .iter() + .zip(property_defaults.iter()) + .map(|(v, d)| v != d) + .collect::>(); + + let block = DataBlock::create_by_array(schema.clone(), vec![ + Series::new(parent_properties), + Series::new(properties), + Series::new(property_types), + Series::new(property_values), + Series::new(property_defaults), + Series::new(property_changed), + ]); + Ok(Box::pin(DataBlockStream::create(schema, None, vec![block]))) + } +} diff --git a/query/src/interpreters/interpreter_factory.rs b/query/src/interpreters/interpreter_factory.rs index 43b09d59a3c9e..43323cae17a77 100644 --- a/query/src/interpreters/interpreter_factory.rs +++ b/query/src/interpreters/interpreter_factory.rs @@ -18,7 +18,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_planners::PlanNode; -use crate::interpreters::interpreter_show_grants::ShowGrantsInterpreter; +use super::DescribeStageInterpreter; use crate::interpreters::AlterUserInterpreter; use crate::interpreters::CopyInterpreter; use crate::interpreters::CreatStageInterpreter; @@ -39,6 +39,7 @@ use crate::interpreters::RevokePrivilegeInterpreter; use crate::interpreters::SelectInterpreter; use crate::interpreters::SettingInterpreter; use crate::interpreters::ShowCreateTableInterpreter; +use crate::interpreters::ShowGrantsInterpreter; use crate::interpreters::TruncateTableInterpreter; use crate::interpreters::UseDatabaseInterpreter; use crate::sessions::QueryContext; @@ -70,6 +71,7 @@ impl InterpreterFactory { PlanNode::Copy(v) => CopyInterpreter::try_create(ctx_clone, v), PlanNode::CreateUserStage(v) => CreatStageInterpreter::try_create(ctx_clone, v), PlanNode::ShowGrants(v) => ShowGrantsInterpreter::try_create(ctx_clone, v), + PlanNode::DescribeStage(v) => DescribeStageInterpreter::try_create(ctx_clone, v), _ => Result::Err(ErrorCode::UnknownTypeOfQuery(format!( "Can't get the interpreter by plan:{}", plan.name() diff --git a/query/src/interpreters/interpreter_show_create_table.rs b/query/src/interpreters/interpreter_show_create_table.rs index 3ba306e3e7288..b9d76ffacedbf 100644 --- a/query/src/interpreters/interpreter_show_create_table.rs +++ b/query/src/interpreters/interpreter_show_create_table.rs @@ -24,7 +24,7 @@ use common_exception::Result; use common_planners::ShowCreateTablePlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; -use log::debug; +use common_tracing::tracing; use crate::catalogs::Catalog; use crate::interpreters::Interpreter; @@ -87,7 +87,7 @@ impl Interpreter for ShowCreateTableInterpreter { Series::new(vec![name.as_bytes()]), Series::new(vec![table_info.into_bytes()]), ]); - debug!("Show create table executor result: {:?}", block); + tracing::debug!("Show create table executor result: {:?}", block); Ok(Box::pin(DataBlockStream::create(show_schema, None, vec![ block, diff --git a/query/src/interpreters/mod.rs b/query/src/interpreters/mod.rs index 7047a25f98f65..83e39e2de7833 100644 --- a/query/src/interpreters/mod.rs +++ b/query/src/interpreters/mod.rs @@ -17,6 +17,7 @@ mod interpreter_common; mod interpreter_copy; mod interpreter_database_create; mod interpreter_database_drop; +mod interpreter_describe_stage; mod interpreter_describe_table; mod interpreter_explain; mod interpreter_factory; @@ -48,6 +49,7 @@ pub use interpreter::InterpreterPtr; pub use interpreter_copy::CopyInterpreter; pub use interpreter_database_create::CreateDatabaseInterpreter; pub use interpreter_database_drop::DropDatabaseInterpreter; +pub use interpreter_describe_stage::DescribeStageInterpreter; pub use interpreter_describe_table::DescribeTableInterpreter; pub use interpreter_explain::ExplainInterpreter; pub use interpreter_factory::InterpreterFactory; @@ -62,6 +64,7 @@ pub use interpreter_revoke_privilege::RevokePrivilegeInterpreter; pub use interpreter_select::SelectInterpreter; pub use interpreter_setting::SettingInterpreter; pub use interpreter_show_create_table::ShowCreateTableInterpreter; +pub use interpreter_show_grants::ShowGrantsInterpreter; pub use interpreter_stage_create::CreatStageInterpreter; pub use interpreter_table_create::CreateTableInterpreter; pub use interpreter_table_drop::DropTableInterpreter; diff --git a/query/src/interpreters/plan_schedulers/plan_scheduler_error.rs b/query/src/interpreters/plan_schedulers/plan_scheduler_error.rs index f107da6f2420a..fb33bbf6133bf 100644 --- a/query/src/interpreters/plan_schedulers/plan_scheduler_error.rs +++ b/query/src/interpreters/plan_schedulers/plan_scheduler_error.rs @@ -15,6 +15,8 @@ use std::sync::Arc; +use common_tracing::tracing; + use crate::api::CancelAction; use crate::api::FlightAction; use crate::interpreters::plan_schedulers::Scheduled; @@ -28,7 +30,7 @@ pub async fn handle_error(context: &Arc, scheduled: Scheduled, tim for (_stream_name, scheduled_node) in scheduled { match cluster.create_node_conn(&scheduled_node.id, &config).await { Err(cause) => { - log::error!( + tracing::error!( "Cannot cancel action for {}, cause: {}", scheduled_node.id, cause @@ -40,7 +42,7 @@ pub async fn handle_error(context: &Arc, scheduled: Scheduled, tim }); let executing_action = flight_client.execute_action(cancel_action, timeout); if let Err(cause) = executing_action.await { - log::error!( + tracing::error!( "Cannot cancel action for {}, cause:{}", scheduled_node.id, cause diff --git a/query/src/interpreters/plan_schedulers/plan_scheduler_stream.rs b/query/src/interpreters/plan_schedulers/plan_scheduler_stream.rs index 48084667722fc..157ef19093ec4 100644 --- a/query/src/interpreters/plan_schedulers/plan_scheduler_stream.rs +++ b/query/src/interpreters/plan_schedulers/plan_scheduler_stream.rs @@ -25,6 +25,7 @@ use common_datablocks::DataBlock; use common_exception::Result; use common_meta_types::NodeInfo; use common_streams::SendableDataBlockStream; +use common_tracing::tracing; use futures::Stream; use futures::StreamExt; @@ -67,7 +68,7 @@ impl Drop for ScheduledStream { fn drop(&mut self) { if !self.is_success.load(Ordering::Relaxed) { if let Err(cause) = self.cancel_scheduled_action() { - log::error!("Cannot cancel action, cause: {:?}", cause); + tracing::error!("Cannot cancel action, cause: {:?}", cause); } } } diff --git a/query/src/pipelines/processors/processor_merge.rs b/query/src/pipelines/processors/processor_merge.rs index 692596abbbe9c..88fcb9552cd4f 100644 --- a/query/src/pipelines/processors/processor_merge.rs +++ b/query/src/pipelines/processors/processor_merge.rs @@ -21,7 +21,7 @@ use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_streams::SendableDataBlockStream; -use log::error; +use common_tracing::tracing; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; @@ -57,7 +57,7 @@ impl MergeProcessor { let mut stream = match processor.execute().await { Err(e) => { if let Err(error) = sender.send(Result::Err(e)).await { - error!("Merge processor cannot push data: {}", error); + tracing::error!("Merge processor cannot push data: {}", error); } return; } @@ -69,14 +69,14 @@ impl MergeProcessor { Ok(item) => { if let Err(error) = sender.send(Ok(item)).await { // Stop pulling data - error!("Merge processor cannot push data: {}", error); + tracing::error!("Merge processor cannot push data: {}", error); return; } } Err(error) => { // Stop pulling data if let Err(error) = sender.send(Err(error)).await { - error!("Merge processor cannot push data: {}", error); + tracing::error!("Merge processor cannot push data: {}", error); } return; } diff --git a/query/src/pipelines/processors/processor_mixed.rs b/query/src/pipelines/processors/processor_mixed.rs index 80775a53498f2..d33076a06353e 100644 --- a/query/src/pipelines/processors/processor_mixed.rs +++ b/query/src/pipelines/processors/processor_mixed.rs @@ -25,7 +25,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_infallible::RwLock; use common_streams::SendableDataBlockStream; -use log::error; +use common_tracing::tracing; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; @@ -66,7 +66,7 @@ impl MixedWorker { let i = index.fetch_add(1, Ordering::Relaxed) % outputs_len; // TODO: USE try_reserve when the channel is blocking if let Err(error) = senders[i].send(item).await { - error!("Mixed processor cannot push data: {}", error); + tracing::error!("Mixed processor cannot push data: {}", error); } } })?; diff --git a/query/src/servers/clickhouse/clickhouse_handler.rs b/query/src/servers/clickhouse/clickhouse_handler.rs index 4dbec7a29b259..5dabd42f944e3 100644 --- a/query/src/servers/clickhouse/clickhouse_handler.rs +++ b/query/src/servers/clickhouse/clickhouse_handler.rs @@ -22,6 +22,7 @@ use common_base::Runtime; use common_base::TrySpawn; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use futures::future::AbortHandle; use futures::future::AbortRegistration; use futures::stream::Abortable; @@ -69,7 +70,7 @@ impl ClickHouseHandler { let sessions = sessions.clone(); async move { match accept_socket { - Err(error) => log::error!("Broken session connection: {}", error), + Err(error) => tracing::error!("Broken session connection: {}", error), Ok(socket) => ClickHouseHandler::accept_socket(sessions, executor, socket), }; } @@ -79,7 +80,7 @@ impl ClickHouseHandler { fn reject_connection(stream: TcpStream, executor: Arc, error: ErrorCode) { executor.spawn(async move { if let Err(error) = RejectCHConnection::reject(stream, error).await { - log::error!( + tracing::error!( "Unexpected error occurred during reject connection: {:?}", error ); @@ -91,9 +92,9 @@ impl ClickHouseHandler { match sessions.create_session("ClickHouseSession") { Err(error) => Self::reject_connection(socket, executor, error), Ok(session) => { - log::info!("ClickHouse connection coming: {:?}", socket.peer_addr()); + tracing::info!("ClickHouse connection coming: {:?}", socket.peer_addr()); if let Err(error) = ClickHouseConnection::run_on_stream(session, socket) { - log::error!("Unexpected error occurred during query: {:?}", error); + tracing::error!("Unexpected error occurred during query: {:?}", error); } } } @@ -110,7 +111,7 @@ impl Server for ClickHouseHandler { if let Some(join_handle) = self.join_handle.take() { if let Err(error) = join_handle.await { - log::error!( + tracing::error!( "Unexpected error during shutdown ClickHouseHandler. cause {}", error ); diff --git a/query/src/servers/clickhouse/clickhouse_session.rs b/query/src/servers/clickhouse/clickhouse_session.rs index 7f0ab6810abff..7044352b6c9a5 100644 --- a/query/src/servers/clickhouse/clickhouse_session.rs +++ b/query/src/servers/clickhouse/clickhouse_session.rs @@ -22,6 +22,7 @@ use common_clickhouse_srv::ClickHouseServer; use common_exception::ErrorCode; use common_exception::Result; use common_exception::ToErrorCode; +use common_tracing::tracing; use crate::servers::clickhouse::interactive_worker::InteractiveWorker; use crate::sessions::SessionRef; @@ -52,7 +53,7 @@ impl ClickHouseConnection { let blocking_stream_ref = blocking_stream.try_clone()?; session.attach(host, move || { if let Err(error) = blocking_stream_ref.shutdown(Shutdown::Both) { - log::error!("Cannot shutdown ClickHouse session io {}", error); + tracing::error!("Cannot shutdown ClickHouse session io {}", error); } }); diff --git a/query/src/servers/clickhouse/interactive_worker.rs b/query/src/servers/clickhouse/interactive_worker.rs index 1a6a91f2c63c2..d868fbd2478f7 100644 --- a/query/src/servers/clickhouse/interactive_worker.rs +++ b/query/src/servers/clickhouse/interactive_worker.rs @@ -18,6 +18,7 @@ use std::time::Instant; use common_clickhouse_srv::connection::Connection; use common_clickhouse_srv::CHContext; use common_clickhouse_srv::ClickHouseSession; +use common_tracing::tracing; use metrics::histogram; use crate::servers::clickhouse::interactive_worker_base::InteractiveWorkerBase; @@ -119,7 +120,7 @@ impl ClickHouseSession for InteractiveWorker { res } Err(failure) => { - log::error!( + tracing::error!( "ClickHouse handler authenticate failed, \ user: {}, \ client_address: {}, \ diff --git a/query/src/servers/clickhouse/writers/query_writer.rs b/query/src/servers/clickhouse/writers/query_writer.rs index c07c2e4130a83..f23ee2c23bdad 100644 --- a/query/src/servers/clickhouse/writers/query_writer.rs +++ b/query/src/servers/clickhouse/writers/query_writer.rs @@ -33,6 +33,7 @@ use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use futures::channel::mpsc::Receiver; use futures::StreamExt; @@ -79,7 +80,7 @@ impl<'a> QueryWriter<'a> { } async fn write_error(&mut self, error: ErrorCode) -> Result<()> { - log::error!("OnQuery Error: {:?}", error); + tracing::error!("OnQuery Error: {:?}", error); let clickhouse_err = to_clickhouse_err(error); match self.conn.write_error(&clickhouse_err).await { Ok(_) => Ok(()), diff --git a/query/src/servers/http/v1/http_query_handlers.rs b/query/src/servers/http/v1/http_query_handlers.rs index 20213a1001473..f2648ed809114 100644 --- a/query/src/servers/http/v1/http_query_handlers.rs +++ b/query/src/servers/http/v1/http_query_handlers.rs @@ -20,6 +20,7 @@ use std::time::Instant; use common_base::ProgressValues; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; +use common_tracing::tracing; use poem::error::Error as PoemError; use poem::error::NotFound; use poem::error::Result as PoemResult; @@ -221,7 +222,7 @@ pub(crate) async fn query_handler( Query(params): Query, Json(req): Json, ) -> PoemResult> { - log::info!("receive http query: {:?} {:?}", req, params); + tracing::info!("receive http query: {:?} {:?}", req, params); let session_manager = sessions_extension.0; let http_query_manager = session_manager.get_http_query_manager(); let query_id = http_query_manager.next_query_id(); diff --git a/query/src/servers/http/v1/query/result_data_manager.rs b/query/src/servers/http/v1/query/result_data_manager.rs index 0452b63b631a6..0cd501628f9ed 100644 --- a/query/src/servers/http/v1/query/result_data_manager.rs +++ b/query/src/servers/http/v1/query/result_data_manager.rs @@ -22,6 +22,7 @@ use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use crate::servers::http::v1::block_to_json; use crate::servers::http::v1::JsonBlock; @@ -143,7 +144,7 @@ impl ResultDataManager { } Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => { - log::debug!("no more data"); + tracing::debug!("no more data"); end = true; break; } diff --git a/query/src/servers/mysql/mysql_handler.rs b/query/src/servers/mysql/mysql_handler.rs index 291fefbd0c99b..f62d077521b69 100644 --- a/query/src/servers/mysql/mysql_handler.rs +++ b/query/src/servers/mysql/mysql_handler.rs @@ -23,6 +23,7 @@ use common_base::Runtime; use common_base::TrySpawn; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use futures::future::AbortHandle; use futures::future::AbortRegistration; use futures::future::Abortable; @@ -71,7 +72,7 @@ impl MySQLHandler { let sessions = sessions.clone(); async move { match accept_socket { - Err(error) => log::error!("Broken session connection: {}", error), + Err(error) => tracing::error!("Broken session connection: {}", error), Ok(socket) => MySQLHandler::accept_socket(sessions, executor, socket), }; } @@ -82,9 +83,9 @@ impl MySQLHandler { match sessions.create_session("MySQL") { Err(error) => Self::reject_session(socket, executor, error), Ok(session) => { - log::info!("MySQL connection coming: {:?}", socket.peer_addr()); + tracing::info!("MySQL connection coming: {:?}", socket.peer_addr()); if let Err(error) = MySQLConnection::run_on_stream(session, socket) { - log::error!("Unexpected error occurred during query: {:?}", error); + tracing::error!("Unexpected error occurred during query: {:?}", error); }; } } @@ -100,7 +101,7 @@ impl MySQLHandler { if let Err(error) = RejectConnection::reject_mysql_connection(stream, kind, message).await { - log::error!( + tracing::error!( "Unexpected error occurred during reject connection: {:?}", error ); @@ -120,7 +121,7 @@ impl Server for MySQLHandler { if let Some(join_handle) = self.join_handle.take() { if let Err(error) = join_handle.await { - log::error!( + tracing::error!( "Unexpected error during shutdown MySQLHandler. cause {}", error ); diff --git a/query/src/servers/mysql/mysql_interactive_worker.rs b/query/src/servers/mysql/mysql_interactive_worker.rs index 8fc64827491f8..3b880828c75bb 100644 --- a/query/src/servers/mysql/mysql_interactive_worker.rs +++ b/query/src/servers/mysql/mysql_interactive_worker.rs @@ -340,7 +340,7 @@ impl InteractiveWorkerBase { let _ = interpreter .finish() .await - .map_err(|e| log::error!("interpreter.finish.error: {:?}", e)); + .map_err(|e| tracing::error!("interpreter.finish.error: {:?}", e)); query_result.map(|data| (data, Self::extra_info(context, instant))) } diff --git a/query/src/servers/mysql/mysql_session.rs b/query/src/servers/mysql/mysql_session.rs index 5d931dd4a93bf..bf261d9411ba7 100644 --- a/query/src/servers/mysql/mysql_session.rs +++ b/query/src/servers/mysql/mysql_session.rs @@ -20,6 +20,7 @@ use common_exception::exception::ABORT_SESSION; use common_exception::ErrorCode; use common_exception::Result; use common_exception::ToErrorCode; +use common_tracing::tracing; use msql_srv::MysqlIntermediary; use crate::servers::mysql::mysql_interactive_worker::InteractiveWorker; @@ -43,7 +44,7 @@ impl MySQLConnection { let interactive_worker = InteractiveWorker::create(session, client_addr); if let Err(error) = MysqlIntermediary::run_on_tcp(interactive_worker, blocking_stream) { if error.code() != ABORT_SESSION { - log::error!( + tracing::error!( "Unexpected error occurred during query execution: {:?}", error ); @@ -56,7 +57,7 @@ impl MySQLConnection { let blocking_stream_ref = blocking_stream.try_clone()?; session.attach(host, move || { if let Err(error) = blocking_stream_ref.shutdown(Shutdown::Both) { - log::error!("Cannot shutdown MySQL session io {}", error); + tracing::error!("Cannot shutdown MySQL session io {}", error); } }); diff --git a/query/src/servers/mysql/writers/init_result_writer.rs b/query/src/servers/mysql/writers/init_result_writer.rs index 16be1711ad9ef..c94df6a095bfb 100644 --- a/query/src/servers/mysql/writers/init_result_writer.rs +++ b/query/src/servers/mysql/writers/init_result_writer.rs @@ -14,6 +14,7 @@ use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use msql_srv::*; pub struct DFInitResultWriter<'a, W: std::io::Write> { @@ -42,7 +43,7 @@ impl<'a, W: std::io::Write> DFInitResultWriter<'a, W> { } fn err(error: &ErrorCode, writer: InitWriter<'a, W>) -> Result<()> { - log::error!("OnInit Error: {:?}", error); + tracing::error!("OnInit Error: {:?}", error); writer.error(ErrorKind::ER_UNKNOWN_ERROR, format!("{}", error).as_bytes())?; Ok(()) } diff --git a/query/src/servers/mysql/writers/query_result_writer.rs b/query/src/servers/mysql/writers/query_result_writer.rs index 8ea14655383c2..892b91515222c 100644 --- a/query/src/servers/mysql/writers/query_result_writer.rs +++ b/query/src/servers/mysql/writers/query_result_writer.rs @@ -23,6 +23,7 @@ use common_exception::exception::ABORT_QUERY; use common_exception::exception::ABORT_SESSION; use common_exception::ErrorCode; use common_exception::Result; +use common_tracing::tracing; use msql_srv::*; pub struct DFQueryResultWriter<'a, W: std::io::Write> { @@ -190,7 +191,7 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> { fn err(error: &ErrorCode, writer: QueryResultWriter<'a, W>) -> Result<()> { if error.code() != ABORT_QUERY && error.code() != ABORT_SESSION { - log::error!("OnQuery Error: {:?}", error); + tracing::error!("OnQuery Error: {:?}", error); writer.error(ErrorKind::ER_UNKNOWN_ERROR, format!("{}", error).as_bytes())?; } else { writer.error( diff --git a/query/src/servers/server.rs b/query/src/servers/server.rs index a642d8ef5ca9a..01fa321167909 100644 --- a/query/src/servers/server.rs +++ b/query/src/servers/server.rs @@ -22,6 +22,7 @@ use common_base::DummySignalStream; use common_base::SignalStream; use common_base::SignalType; use common_exception::Result; +use common_tracing::tracing; use futures::stream::Abortable; use futures::StreamExt; use tokio_stream::wrappers::TcpListenerStream; @@ -71,13 +72,13 @@ impl ShutdownHandle { pub async fn wait_for_termination_request(&mut self) { match signal_stream() { Err(cause) => { - log::error!("Cannot set shutdown signal handler, {:?}", cause); + tracing::error!("Cannot set shutdown signal handler, {:?}", cause); std::process::exit(1); } Ok(mut stream) => { stream.next().await; - log::info!("Received termination signal."); + tracing::info!("Received termination signal."); if let Ok(false) = self.shutdown .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) diff --git a/query/src/sessions/context.rs b/query/src/sessions/context.rs index aba226933ff54..d5d478034bb2d 100644 --- a/query/src/sessions/context.rs +++ b/query/src/sessions/context.rs @@ -43,6 +43,7 @@ use common_planners::ReadDataSourcePlan; use common_planners::Statistics; use common_streams::AbortStream; use common_streams::SendableDataBlockStream; +use common_tracing::tracing; use crate::catalogs::Catalog; use crate::catalogs::DatabaseCatalog; @@ -71,7 +72,7 @@ impl QueryContext { pub fn from_shared(shared: Arc) -> Arc { shared.increment_ref_count(); - log::info!("Create DatabendQueryContext"); + tracing::info!("Create DatabendQueryContext"); Arc::new(QueryContext { statistics: Arc::new(RwLock::new(Statistics::default())), @@ -324,7 +325,7 @@ impl QueryContextShared { pub(in crate::sessions) fn destroy_context_ref(&self) { if self.ref_count.fetch_sub(1, Ordering::Release) == 1 { std::sync::atomic::fence(Acquire); - log::info!("Destroy DatabendQueryContext"); + tracing::info!("Destroy DatabendQueryContext"); self.session.destroy_context_shared(); } } diff --git a/query/src/sessions/session_ref.rs b/query/src/sessions/session_ref.rs index 7c70070c6dbed..fbd4efe2037ae 100644 --- a/query/src/sessions/session_ref.rs +++ b/query/src/sessions/session_ref.rs @@ -17,6 +17,8 @@ use std::sync::atomic::Ordering; use std::sync::atomic::Ordering::Acquire; use std::sync::Arc; +use common_tracing::tracing; + use crate::sessions::Session; /// SessionRef is the ptr of session. @@ -56,7 +58,7 @@ impl Session { pub fn destroy_session_ref(self: &Arc) { if self.ref_count.fetch_sub(1, Ordering::Release) == 1 { std::sync::atomic::fence(Acquire); - log::debug!("Destroy session {}", self.id); + tracing::debug!("Destroy session {}", self.id); self.sessions.destroy_session(&self.id); } } diff --git a/query/src/sessions/sessions.rs b/query/src/sessions/sessions.rs index 015e5de235ea6..20b9b724cabc1 100644 --- a/query/src/sessions/sessions.rs +++ b/query/src/sessions/sessions.rs @@ -25,6 +25,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_infallible::RwLock; use common_metrics::label_counter; +use common_tracing::tracing; use futures::future::Either; use futures::StreamExt; @@ -171,7 +172,7 @@ impl SessionManager { ) -> impl Future { let active_sessions = self.active_sessions.clone(); async move { - log::info!( + tracing::info!( "Waiting {} secs for connections to close. You can press Ctrl + C again to force shutdown.", timeout_secs); let mut signal = Box::pin(signal.next()); @@ -189,7 +190,7 @@ impl SessionManager { }; } - log::info!("Will shutdown forcefully."); + tracing::info!("Will shutdown forcefully."); active_sessions .read() .values() @@ -209,7 +210,7 @@ impl SessionManager { match active_sessions { 0 => true, _ => { - log::info!("Waiting for {} connections to close.", active_sessions); + tracing::info!("Waiting for {} connections to close.", active_sessions); false } } diff --git a/query/src/sql/sql_parser.rs b/query/src/sql/sql_parser.rs index 50e6317fb95ae..d470cf1b3dd74 100644 --- a/query/src/sql/sql_parser.rs +++ b/query/src/sql/sql_parser.rs @@ -15,13 +15,13 @@ // Borrow from apache/arrow/rust/datafusion/src/sql/sql_parser // See notice.md +use std::collections::HashMap; use std::convert::TryFrom; -use std::str::FromStr; use std::time::Instant; use common_exception::ErrorCode; +use common_io::prelude::OptionsDeserializer; use common_meta_types::AuthType; -use common_meta_types::Compression; use common_meta_types::Credentials; use common_meta_types::FileFormat; use common_meta_types::StageParams; @@ -29,12 +29,12 @@ use common_meta_types::UserPrivilegeSet; use common_meta_types::UserPrivilegeType; use common_planners::ExplainType; use metrics::histogram; +use serde::Deserialize; use sqlparser::ast::BinaryOperator; use sqlparser::ast::ColumnDef; use sqlparser::ast::ColumnOptionDef; use sqlparser::ast::Expr; use sqlparser::ast::Ident; -use sqlparser::ast::SqlOption; use sqlparser::ast::Statement; use sqlparser::ast::TableConstraint; use sqlparser::ast::Value; @@ -49,6 +49,7 @@ use sqlparser::tokenizer::Tokenizer; use sqlparser::tokenizer::Whitespace; use super::statements::DfCopy; +use super::statements::DfDescribeStage; use crate::sql::statements::DfAlterUser; use crate::sql::statements::DfCompactTable; use crate::sql::statements::DfCreateDatabase; @@ -412,6 +413,7 @@ impl<'a> DfParser<'a> { /// This is a copy from sqlparser /// Parse a literal value (numbers, strings, date/time, booleans) + #[allow(dead_code)] fn parse_value(&mut self) -> Result { match self.parser.next_token() { Token::Word(w) => match w.keyword { @@ -439,6 +441,25 @@ impl<'a> DfParser<'a> { } } + fn parse_value_or_ident(&mut self) -> Result { + match self.parser.next_token() { + Token::Word(w) => match w.keyword { + Keyword::TRUE => Ok("true".to_string()), + Keyword::FALSE => Ok("false".to_string()), + Keyword::NULL => Ok("null".to_string()), + _ => Ok(w.value), + }, + // The call to n.parse() returns a bigdecimal when the + // bigdecimal feature is enabled, and is otherwise a no-op + // (i.e., it returns the input string). + Token::Number(n, _) => Ok(n), + Token::SingleQuotedString(s) => Ok(s), + Token::NationalStringLiteral(s) => Ok(s), + Token::HexStringLiteral(s) => Ok(s), + unexpected => self.expected("a value", unexpected), + } + } + fn parse_column_def(&mut self) -> Result { let name = self.parser.parse_identifier()?; let data_type = self.parser.parse_data_type()?; @@ -513,16 +534,23 @@ impl<'a> DfParser<'a> { if_not_exists, name: db_name, engine: db_engine, - options: vec![], + options: HashMap::new(), }; Ok(DfStatement::CreateDatabase(create)) } fn parse_describe(&mut self) -> Result { - let table_name = self.parser.parse_object_name()?; - let desc = DfDescribeTable { name: table_name }; - Ok(DfStatement::DescribeTable(desc)) + if self.consume_token("stage") { + let obj_name = self.parser.parse_object_name()?; + let desc = DfDescribeStage { name: obj_name }; + Ok(DfStatement::DescribeStage(desc)) + } else { + self.consume_token("table"); + let table_name = self.parser.parse_object_name()?; + let desc = DfDescribeTable { name: table_name }; + Ok(DfStatement::DescribeTable(desc)) + } } /// Drop database/table. @@ -698,110 +726,37 @@ impl<'a> DfParser<'a> { } } - fn parse_stage_file_format(&mut self) -> Result, ParserError> { - let file_format = if self.consume_token("FILE_FORMAT") { + fn parse_stage_file_format(&mut self) -> Result { + let options = if self.consume_token("FILE_FORMAT") { self.parser.expect_token(&Token::Eq)?; self.parser.expect_token(&Token::LParen)?; - - let format = if self.consume_token("FORMAT") { - self.parser.expect_token(&Token::Eq)?; - self.parser.next_token().to_string() - } else { - return parser_err!("Missing FORMAT"); - }; - - let file_format = match format.to_uppercase().as_str() { - "CSV" | "PARQUET" => { - let compression = if self.consume_token("COMPRESSION") { - self.parser.expect_token(&Token::Eq)?; - //TODO:check compression value correctness - let value = self.parser.next_token().to_string(); - Compression::from_str(value.as_str()) - .map_err(|e| ParserError::ParserError(e.to_string()))? - } else { - Compression::None - }; - if "CSV" == format.to_uppercase().as_str() { - if self.consume_token("RECORD_DELIMITER") { - self.parser.expect_token(&Token::Eq)?; - - let record_delimiter = match self.parser.next_token() { - Token::Word(w) => match w.value.to_uppercase().as_str() { - "NONE" => String::from(""), - _ => { - return self - .expected("record delimiter NONE", Token::Word(w)) - } - }, - Token::SingleQuotedString(s) => s, - unexpected => { - return self - .expected("not supported record delimiter", unexpected) - } - }; - - Some(FileFormat::Csv { - compression, - record_delimiter, - }) - } else { - Some(FileFormat::Csv { - compression, - record_delimiter: String::from(""), - }) - } - } else { - Some(FileFormat::Parquet { compression }) - } - } - "JSON" => Some(FileFormat::Json), - unexpected => { - return parser_err!(format!( - "Expected format type {}, found: {}", - "CSV|PARQUET|JSON", unexpected - )) - } - }; + let options = self.parse_options()?; self.parser.expect_token(&Token::RParen)?; - file_format + + options } else { - None + HashMap::new() }; - + let file_format = FileFormat::deserialize(OptionsDeserializer::new(&options)) + .map_err(|e| ParserError::ParserError(format!("Invalid file format options: {}", e)))?; Ok(file_format) } - fn parse_stage_credentials(&mut self, url: String) -> Result { - if !self.consume_token("CREDENTIALS") { - return parser_err!("Missing CREDENTIALS"); - } - self.parser.expect_token(&Token::Eq)?; - self.parser.expect_token(&Token::LParen)?; - - let credentials = if url.to_uppercase().starts_with("S3") { - //TODO: current credential field order is hard code - let access_key_id = if self.consume_token("ACCESS_KEY_ID") { - self.parser.expect_token(&Token::Eq)?; - self.parser.parse_literal_string()? - } else { - return parser_err!("Missing S3 ACCESS_KEY_ID"); - }; + fn parse_stage_credentials(&mut self) -> Result { + let options = if self.consume_token("CREDENTIALS") { + self.parser.expect_token(&Token::Eq)?; + self.parser.expect_token(&Token::LParen)?; + let options = self.parse_options()?; + self.parser.expect_token(&Token::RParen)?; - let secret_access_key = if self.consume_token("SECRET_ACCESS_KEY") { - self.parser.expect_token(&Token::Eq)?; - self.parser.parse_literal_string()? - } else { - return parser_err!("Missing S3 SECRET_ACCESS_KEY"); - }; - Credentials::S3 { - access_key_id, - secret_access_key, - } + options } else { - return parser_err!("Not supported storage"); + HashMap::new() }; - self.parser.expect_token(&Token::RParen)?; + + let credentials = Credentials::deserialize(OptionsDeserializer::new(&options)) + .map_err(|e| ParserError::ParserError(format!("Invalid credentials options: {}", e)))?; Ok(credentials) } @@ -817,7 +772,11 @@ impl<'a> DfParser<'a> { return parser_err!("Missing URL"); }; - let credentials = self.parse_stage_credentials(url.clone())?; + if !url.to_uppercase().starts_with("S3") { + return parser_err!("Not supported storage"); + } + + let credentials = self.parse_stage_credentials()?; let stage_params = StageParams::new(url.as_str(), credentials); let file_format = self.parse_stage_file_format()?; @@ -1065,17 +1024,19 @@ impl<'a> DfParser<'a> { })) } - fn parse_options(&mut self) -> Result, ParserError> { - let mut options = vec![]; + fn parse_options(&mut self) -> Result, ParserError> { + let mut options = HashMap::new(); loop { let name = self.parser.parse_identifier(); if name.is_err() { + self.parser.prev_token(); break; } let name = name.unwrap(); self.parser.expect_token(&Token::Eq)?; - let value = self.parse_value()?; - options.push(SqlOption { name, value }); + let value = self.parse_value_or_ident()?; + + options.insert(name.to_string(), value); } Ok(options) } diff --git a/query/src/sql/sql_statement.rs b/query/src/sql/sql_statement.rs index 2d4d9d18f5676..6f16b540e918f 100644 --- a/query/src/sql/sql_statement.rs +++ b/query/src/sql/sql_statement.rs @@ -20,6 +20,7 @@ use nom::character::complete::multispace1; use nom::IResult; use super::statements::DfCopy; +use super::statements::DfDescribeStage; use crate::sql::statements::DfAlterUser; use crate::sql::statements::DfCompactTable; use crate::sql::statements::DfCreateDatabase; @@ -66,6 +67,7 @@ pub enum DfStatement { ShowCreateTable(DfShowCreateTable), CreateTable(DfCreateTable), DescribeTable(DfDescribeTable), + DescribeStage(DfDescribeStage), DropTable(DfDropTable), TruncateTable(DfTruncateTable), CompactTable(DfCompactTable), diff --git a/query/src/sql/statements/analyzer_statement.rs b/query/src/sql/statements/analyzer_statement.rs index 53e5567a540d2..8a24abbcfaee8 100644 --- a/query/src/sql/statements/analyzer_statement.rs +++ b/query/src/sql/statements/analyzer_statement.rs @@ -153,6 +153,7 @@ impl AnalyzableStatement for DfStatement { DfStatement::DropDatabase(v) => v.analyze(ctx).await, DfStatement::CreateTable(v) => v.analyze(ctx).await, DfStatement::DescribeTable(v) => v.analyze(ctx).await, + DfStatement::DescribeStage(v) => v.analyze(ctx).await, DfStatement::DropTable(v) => v.analyze(ctx).await, DfStatement::TruncateTable(v) => v.analyze(ctx).await, DfStatement::CompactTable(v) => v.analyze(ctx).await, diff --git a/query/src/sql/statements/mod.rs b/query/src/sql/statements/mod.rs index cfed3e5c3b4ad..ee33e0f171968 100644 --- a/query/src/sql/statements/mod.rs +++ b/query/src/sql/statements/mod.rs @@ -24,6 +24,7 @@ mod statement_create_database; mod statement_create_stage; mod statement_create_table; mod statement_create_user; +mod statement_describe_stage; mod statement_describe_table; mod statement_drop_database; mod statement_drop_table; @@ -59,6 +60,7 @@ pub use statement_create_database::DfCreateDatabase; pub use statement_create_stage::DfCreateStage; pub use statement_create_table::DfCreateTable; pub use statement_create_user::DfCreateUser; +pub use statement_describe_stage::DfDescribeStage; pub use statement_describe_table::DfDescribeTable; pub use statement_drop_database::DfDropDatabase; pub use statement_drop_table::DfDropTable; diff --git a/query/src/sql/statements/statement_copy.rs b/query/src/sql/statements/statement_copy.rs index c87bac0d154e1..c860067174a4d 100644 --- a/query/src/sql/statements/statement_copy.rs +++ b/query/src/sql/statements/statement_copy.rs @@ -21,7 +21,6 @@ use common_planners::CopyPlan; use common_planners::PlanNode; use sqlparser::ast::Ident; use sqlparser::ast::ObjectName; -use sqlparser::ast::SqlOption; use crate::sessions::QueryContext; use crate::sql::statements::AnalyzableStatement; @@ -33,7 +32,7 @@ pub struct DfCopy { pub columns: Vec, pub location: String, pub format: String, - pub options: Vec, + pub options: HashMap, } #[async_trait::async_trait] @@ -61,17 +60,6 @@ impl AnalyzableStatement for DfCopy { schema = DataSchemaRefExt::create(fields); } - let mut options = HashMap::new(); - for p in self.options.iter() { - options.insert( - p.name.value.to_lowercase(), - p.value - .to_string() - .trim_matches(|s| s == '\'' || s == '"') - .to_string(), - ); - } - let plan_node = CopyPlan { db_name, tbl_name, @@ -79,7 +67,7 @@ impl AnalyzableStatement for DfCopy { schema, location: self.location.clone(), format: self.format.clone(), - options, + options: self.options.clone(), }; Ok(AnalyzedResult::SimpleQuery(Box::new(PlanNode::Copy( diff --git a/query/src/sql/statements/statement_create_database.rs b/query/src/sql/statements/statement_create_database.rs index 0dcdbfe078327..6822f72144948 100644 --- a/query/src/sql/statements/statement_create_database.rs +++ b/query/src/sql/statements/statement_create_database.rs @@ -21,7 +21,6 @@ use common_planners::CreateDatabasePlan; use common_planners::PlanNode; use common_tracing::tracing; use sqlparser::ast::ObjectName; -use sqlparser::ast::SqlOption; use crate::sessions::QueryContext; use crate::sql::statements::AnalyzableStatement; @@ -32,7 +31,7 @@ pub struct DfCreateDatabase { pub if_not_exists: bool, pub name: ObjectName, pub engine: String, - pub options: Vec, + pub options: HashMap, } #[async_trait::async_trait] @@ -41,7 +40,7 @@ impl AnalyzableStatement for DfCreateDatabase { async fn analyze(&self, _ctx: Arc) -> Result { let db = self.database_name()?; let engine = self.database_engine()?; - let options = self.database_options(); + let options = self.options.clone(); let if_not_exists = self.if_not_exists; Ok(AnalyzedResult::SimpleQuery(Box::new( @@ -67,11 +66,4 @@ impl DfCreateDatabase { fn database_engine(&self) -> Result { Ok(self.engine.clone()) } - - fn database_options(&self) -> HashMap { - self.options - .iter() - .map(|option| (option.name.value.to_lowercase(), option.value.to_string())) - .collect() - } } diff --git a/query/src/sql/statements/statement_create_stage.rs b/query/src/sql/statements/statement_create_stage.rs index d2bd5afc0565f..d21458ec91e5e 100644 --- a/query/src/sql/statements/statement_create_stage.rs +++ b/query/src/sql/statements/statement_create_stage.rs @@ -31,7 +31,7 @@ pub struct DfCreateStage { pub if_not_exists: bool, pub stage_name: String, pub stage_params: StageParams, - pub file_format: Option, + pub file_format: FileFormat, pub comments: String, } diff --git a/query/src/sql/statements/statement_create_table.rs b/query/src/sql/statements/statement_create_table.rs index ccbd3d53a664b..aa4a957db70ed 100644 --- a/query/src/sql/statements/statement_create_table.rs +++ b/query/src/sql/statements/statement_create_table.rs @@ -27,7 +27,6 @@ use common_tracing::tracing; use sqlparser::ast::ColumnDef; use sqlparser::ast::ColumnOption; use sqlparser::ast::ObjectName; -use sqlparser::ast::SqlOption; use super::analyzer_expr::ExpressionAnalyzer; use crate::sessions::QueryContext; @@ -42,7 +41,7 @@ pub struct DfCreateTable { pub name: ObjectName, pub columns: Vec, pub engine: String, - pub options: Vec, + pub options: HashMap, // The table name after "create .. like" statement. pub like: Option, @@ -80,30 +79,13 @@ impl DfCreateTable { } } - fn table_options(&self) -> HashMap { - self.options - .iter() - .map(|option| { - ( - option.name.value.to_lowercase(), - option - .value - .to_string() - .trim_matches(|s| s == '\'' || s == '"') - .to_string(), - ) - }) - .collect() - } - async fn table_meta(&self, ctx: Arc) -> Result { let engine = self.engine.clone(); let schema = self.table_schema(ctx).await?; - let options = self.table_options(); Ok(TableMeta { schema, engine, - options, + options: self.options.clone(), }) } diff --git a/query/src/sql/statements/statement_describe_stage.rs b/query/src/sql/statements/statement_describe_stage.rs new file mode 100644 index 0000000000000..4936dd82c158b --- /dev/null +++ b/query/src/sql/statements/statement_describe_stage.rs @@ -0,0 +1,59 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::DescribeStagePlan; +use common_planners::PlanNode; +use common_tracing::tracing; +use sqlparser::ast::ObjectName; + +use crate::sessions::QueryContext; +use crate::sql::statements::AnalyzableStatement; +use crate::sql::statements::AnalyzedResult; + +#[derive(Debug, Clone, PartialEq)] +pub struct DfDescribeStage { + pub name: ObjectName, +} + +#[async_trait::async_trait] +impl AnalyzableStatement for DfDescribeStage { + #[tracing::instrument(level = "info", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] + async fn analyze(&self, ctx: Arc) -> Result { + let (_, name) = self.resolve_table(ctx)?; + Ok(AnalyzedResult::SimpleQuery(Box::new( + PlanNode::DescribeStage(DescribeStagePlan { name }), + ))) + } +} + +impl DfDescribeStage { + fn resolve_table(&self, ctx: Arc) -> Result<(String, String)> { + let DfDescribeStage { + name: ObjectName(idents), + .. + } = self; + match idents.len() { + 0 => Err(ErrorCode::SyntaxException("Desc stage name is empty")), + 1 => Ok((ctx.get_current_database(), idents[0].value.clone())), + 2 => Ok((idents[0].value.clone(), idents[1].value.clone())), + _ => Err(ErrorCode::SyntaxException( + "Desc stage name must be [`db`].`table`", + )), + } + } +} diff --git a/query/src/storages/system/credits_table.rs b/query/src/storages/system/credits_table.rs index deecf67e19ee0..de50e0c892e2d 100644 --- a/query/src/storages/system/credits_table.rs +++ b/query/src/storages/system/credits_table.rs @@ -24,6 +24,7 @@ use common_meta_types::TableMeta; use common_planners::ReadDataSourcePlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; +use common_tracing::tracing; use crate::sessions::QueryContext; use crate::storages::Table; @@ -75,7 +76,7 @@ impl Table for CreditsTable { match cargo_license::get_dependencies_from_cargo_lock(metadata_command, false, false) { Ok(v) => v, Err(err) => { - log::error!("{:?}", err); + tracing::error!("{:?}", err); vec![] } }; diff --git a/query/tests/it/interpreters/interpreter_describe_stage.rs b/query/tests/it/interpreters/interpreter_describe_stage.rs new file mode 100644 index 0000000000000..660b163276497 --- /dev/null +++ b/query/tests/it/interpreters/interpreter_describe_stage.rs @@ -0,0 +1,71 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::tokio; +use common_exception::Result; +use common_planners::*; +use databend_query::interpreters::*; +use databend_query::sql::*; +use futures::TryStreamExt; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_desc_stageinterpreter() -> Result<()> { + common_tracing::init_default_ut_tracing(); + + let ctx = crate::tests::create_query_context()?; + + // create stage + { + static TEST_QUERY: &str = "CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter='|') comments='test'"; + if let PlanNode::CreateUserStage(plan) = PlanParser::parse(TEST_QUERY, ctx.clone()).await? { + let executor = CreatStageInterpreter::try_create(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "CreatStageInterpreter"); + let _ = executor.execute(None).await?; + } + } + + // desc stage + + { + if let PlanNode::DescribeStage(plan) = + PlanParser::parse("desc stage test_stage", ctx.clone()).await? + { + let executor = DescribeStageInterpreter::try_create(ctx.clone(), plan.clone())?; + let stream = executor.execute(None).await?; + let result = stream.try_collect::>().await?; + let expected = vec![ + + "+-------------------+-------------------+----------------+------------------+-------------------+------------------+", + "| parent_properties | properties | property_types | property_values | property_defaults | property_changed |", + "+-------------------+-------------------+----------------+------------------+-------------------+------------------+", + "| stage_params | url | String | s3://load/files/ | | true |", + "| credentials | access_key_id | String | 1a2b3c | | true |", + "| credentials | secret_access_key | String | 4x5y6z | | true |", + "| file_format | format | String | Csv | Csv | false |", + "| file_format | record_delimiter | String | | | | true |", + // default record_delimiter is \n, so it breaks with an empty line + "| | | | | | |", + "| file_format | field_delimiter | String | , | , | false |", + "| file_format | csv_header | Boolean | false | false | false |", + "| file_format | compression | String | Gzip | None | true |", + "+-------------------+-------------------+----------------+------------------+-------------------+------------------+", + + ]; + common_datablocks::assert_blocks_eq(expected, result.as_slice()); + } + } + + Ok(()) +} diff --git a/query/tests/it/interpreters/interpreter_stage_create.rs b/query/tests/it/interpreters/interpreter_stage_create.rs index 684a2060568bc..1002844190dcf 100644 --- a/query/tests/it/interpreters/interpreter_stage_create.rs +++ b/query/tests/it/interpreters/interpreter_stage_create.rs @@ -16,6 +16,7 @@ use common_base::tokio; use common_exception::Result; use common_meta_types::Compression; use common_meta_types::FileFormat; +use common_meta_types::Format; use common_planners::*; use databend_query::interpreters::*; use databend_query::sql::*; @@ -28,7 +29,7 @@ async fn test_create_stage_interpreter() -> Result<()> { let ctx = crate::tests::create_query_context()?; - static TEST_QUERY: &str = "CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=',') comments='test'"; + static TEST_QUERY: &str = "CREATE STAGE IF NOT EXISTS test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter='\n') comments='test'"; if let PlanNode::CreateUserStage(plan) = PlanParser::parse(TEST_QUERY, ctx.clone()).await? { let executor = CreatStageInterpreter::try_create(ctx.clone(), plan.clone())?; assert_eq!(executor.name(), "CreatStageInterpreter"); @@ -40,13 +41,11 @@ async fn test_create_stage_interpreter() -> Result<()> { .get_stage("test_stage") .await?; - assert_eq!( - stage.file_format, - Some(FileFormat::Csv { - compression: Compression::Gzip, - record_delimiter: ",".to_string() - }) - ); + assert_eq!(stage.file_format, FileFormat { + format: Format::Csv, + compression: Compression::Gzip, + ..Default::default() + }); assert_eq!(stage.comments, String::from("test")) } else { panic!() @@ -63,19 +62,17 @@ async fn test_create_stage_interpreter() -> Result<()> { .get_stage("test_stage") .await?; - assert_eq!( - stage.file_format, - Some(FileFormat::Csv { - compression: Compression::Gzip, - record_delimiter: ",".to_string() - }) - ); + assert_eq!(stage.file_format, FileFormat { + format: Format::Csv, + compression: Compression::Gzip, + ..Default::default() + }); assert_eq!(stage.comments, String::from("test")) } else { panic!() } - static TEST_QUERY1: &str = "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=',') comments='test'"; + static TEST_QUERY1: &str = "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter='\n') comments='test'"; if let PlanNode::CreateUserStage(plan) = PlanParser::parse(TEST_QUERY1, ctx.clone()).await? { let executor = CreatStageInterpreter::try_create(ctx.clone(), plan.clone())?; assert_eq!(executor.name(), "CreatStageInterpreter"); @@ -87,13 +84,11 @@ async fn test_create_stage_interpreter() -> Result<()> { .get_stage("test_stage") .await?; - assert_eq!( - stage.file_format, - Some(FileFormat::Csv { - compression: Compression::Gzip, - record_delimiter: ",".to_string() - }) - ); + assert_eq!(stage.file_format, FileFormat { + format: Format::Csv, + compression: Compression::Gzip, + ..Default::default() + }); assert_eq!(stage.comments, String::from("test")) } else { panic!() diff --git a/query/tests/it/interpreters/mod.rs b/query/tests/it/interpreters/mod.rs index 24015ca6a374d..eac6072cba363 100644 --- a/query/tests/it/interpreters/mod.rs +++ b/query/tests/it/interpreters/mod.rs @@ -14,6 +14,7 @@ mod interpreter_database_create; mod interpreter_database_drop; +mod interpreter_describe_stage; mod interpreter_describe_table; mod interpreter_explain; mod interpreter_grant_privilege; diff --git a/query/tests/it/sql/sql_parser.rs b/query/tests/it/sql/sql_parser.rs index 29bdb7630867a..a0b10ff05e786 100644 --- a/query/tests/it/sql/sql_parser.rs +++ b/query/tests/it/sql/sql_parser.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use common_exception::Result; use common_meta_types::AuthType; use common_meta_types::Compression; use common_meta_types::Credentials; use common_meta_types::FileFormat; +use common_meta_types::Format; use common_meta_types::StageParams; use common_meta_types::UserPrivilegeSet; use common_meta_types::UserPrivilegeType; @@ -57,6 +60,12 @@ fn expect_parse_err(sql: &str, expected: String) -> Result<()> { Ok(()) } +fn expect_parse_err_contains(sql: &str, expected: String) -> Result<()> { + let err = DfParser::parse_sql(sql).unwrap_err(); + assert!(err.message().contains(&expected)); + Ok(()) +} + fn make_column_def(name: impl Into, data_type: DataType) -> ColumnDef { ColumnDef { name: Ident { @@ -77,7 +86,7 @@ fn create_database() -> Result<()> { if_not_exists: false, name: ObjectName(vec![Ident::new("db1")]), engine: "".to_string(), - options: vec![], + options: HashMap::new(), }); expect_parse_ok(sql, expected)?; } @@ -88,7 +97,7 @@ fn create_database() -> Result<()> { if_not_exists: false, name: ObjectName(vec![Ident::new("db1")]), engine: "github".to_string(), - options: vec![], + options: HashMap::new(), }); expect_parse_ok(sql, expected)?; } @@ -99,7 +108,7 @@ fn create_database() -> Result<()> { if_not_exists: true, name: ObjectName(vec![Ident::new("db1")]), engine: "".to_string(), - options: vec![], + options: HashMap::new(), }); expect_parse_ok(sql, expected)?; } @@ -138,10 +147,7 @@ fn create_table() -> Result<()> { name: ObjectName(vec![Ident::new("t")]), columns: vec![make_column_def("c1", DataType::Int(None))], engine: "CSV".to_string(), - options: vec![SqlOption { - name: Ident::new("location".to_string()), - value: Value::SingleQuotedString("/data/33.csv".into()), - }], + options: maplit::hashmap! {"location".into() => "/data/33.csv".into()}, like: None, }); expect_parse_ok(sql, expected)?; @@ -157,16 +163,11 @@ fn create_table() -> Result<()> { make_column_def("c3", DataType::Varchar(Some(255))), ], engine: "Parquet".to_string(), - options: vec![ - SqlOption { - name: Ident::new("location".to_string()), - value: Value::SingleQuotedString("foo.parquet".into()), - }, - SqlOption { - name: Ident::new("comment".to_string()), - value: Value::SingleQuotedString("foo".into()), - }, - ], + + options: maplit::hashmap! { + "location".into() => "foo.parquet".into(), + "comment".into() => "foo".into(), + }, like: None, }); expect_parse_ok(sql, expected)?; @@ -178,10 +179,8 @@ fn create_table() -> Result<()> { name: ObjectName(vec![Ident::new("db1"), Ident::new("test1")]), columns: vec![], engine: "Parquet".to_string(), - options: vec![SqlOption { - name: Ident::new("location".to_string()), - value: Value::SingleQuotedString("batcave".into()), - }], + + options: maplit::hashmap! {"location".into() => "batcave".into()}, like: Some(ObjectName(vec![Ident::new("db2"), Ident::new("test2")])), }); expect_parse_ok(sql, expected)?; @@ -384,15 +383,12 @@ fn copy_test() -> Result<()> { columns: vec![], location: "@my_ext_stage/tutorials/sample.csv".to_string(), format: "csv".to_string(), - options: vec![SqlOption { - name: Ident::new("csv_header".to_string()), - value: Value::Number("1".to_owned(), false), - }, - SqlOption { - name: Ident::new("field_delimitor".to_string()), - value: Value::SingleQuotedString(",".into()), - }], - }), + options: maplit::hashmap! { + "csv_header".into() => "1".into(), + "field_delimitor".into() => ",".into(), + } + } + ), @@ -926,8 +922,8 @@ fn create_stage_test() -> Result<()> { DfStatement::CreateStage(DfCreateStage { if_not_exists: false, stage_name: "test_stage".to_string(), - stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), - file_format: None, + stage_params: StageParams::new("s3://load/files/", Credentials { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: FileFormat::default(), comments: "".to_string(), }), )?; @@ -937,8 +933,8 @@ fn create_stage_test() -> Result<()> { DfStatement::CreateStage(DfCreateStage { if_not_exists: true, stage_name: "test_stage".to_string(), - stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), - file_format: None, + stage_params: StageParams::new("s3://load/files/", Credentials { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: FileFormat::default(), comments: "".to_string(), }), )?; @@ -948,8 +944,8 @@ fn create_stage_test() -> Result<()> { DfStatement::CreateStage(DfCreateStage { if_not_exists: true, stage_name: "test_stage".to_string(), - stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), - file_format: Some(FileFormat::Csv { compression: Compression::Gzip, record_delimiter: ",".to_string() }), + stage_params: StageParams::new("s3://load/files/", Credentials { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: FileFormat { compression: Compression::Gzip, record_delimiter: ",".to_string(),..Default::default()}, comments: "".to_string(), }), )?; @@ -959,8 +955,8 @@ fn create_stage_test() -> Result<()> { DfStatement::CreateStage(DfCreateStage { if_not_exists: true, stage_name: "test_stage".to_string(), - stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), - file_format: Some(FileFormat::Csv { compression: Compression::Gzip, record_delimiter: ",".to_string() }), + stage_params: StageParams::new("s3://load/files/", Credentials { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: FileFormat { compression: Compression::Gzip, record_delimiter: ",".to_string(),..Default::default()}, comments: "test".to_string(), }), )?; @@ -970,19 +966,19 @@ fn create_stage_test() -> Result<()> { DfStatement::CreateStage(DfCreateStage { if_not_exists: false, stage_name: "test_stage".to_string(), - stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), - file_format: Some(FileFormat::Parquet { compression: Compression::Auto}), + stage_params: StageParams::new("s3://load/files/", Credentials { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: FileFormat { format: Format::Parquet, compression: Compression::Auto ,..Default::default()}, comments: "test".to_string(), }), )?; expect_parse_ok( - "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", + "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO) comments='test'", DfStatement::CreateStage(DfCreateStage { if_not_exists: false, stage_name: "test_stage".to_string(), - stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), - file_format: Some(FileFormat::Csv { compression: Compression::Auto, record_delimiter: "".to_string() }), + stage_params: StageParams::new("s3://load/files/", Credentials { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: FileFormat { format: Format::Csv, compression: Compression::Auto,..Default::default()}, comments: "test".to_string(), }), )?; @@ -992,8 +988,8 @@ fn create_stage_test() -> Result<()> { DfStatement::CreateStage(DfCreateStage { if_not_exists: false, stage_name: "test_stage".to_string(), - stage_params: StageParams::new("s3://load/files/", Credentials::S3 { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), - file_format: Some(FileFormat::Json ), + stage_params: StageParams::new("s3://load/files/", Credentials { access_key_id: "1a2b3c".to_string(), secret_access_key: "4x5y6z".to_string() }), + file_format: FileFormat { format: Format::Json,..Default::default()}, comments: "test".to_string(), }), )?; @@ -1005,7 +1001,7 @@ fn create_stage_test() -> Result<()> { expect_parse_err( "CREATE STAGE test_stage url='s3://load/files/' password=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", - String::from("sql parser error: Missing CREDENTIALS"), + String::from("sql parser error: Expected end of statement, found: password"), )?; expect_parse_err( @@ -1015,32 +1011,27 @@ fn create_stage_test() -> Result<()> { expect_parse_err( "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", - String::from("sql parser error: Missing S3 ACCESS_KEY_ID"), + String::from("sql parser error: Invalid credentials options: unknown field `access_key`, expected `access_key_id` or `secret_access_key`"), )?; expect_parse_err( "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' aecret_access_key='4x5y6z') file_format=(FORMAT=csv compression=AUTO record_delimiter=NONE) comments='test'", - String::from("sql parser error: Missing S3 SECRET_ACCESS_KEY"), + String::from("sql parser error: Invalid credentials options: unknown field `aecret_access_key`, expected `access_key_id` or `secret_access_key`"), )?; - expect_parse_err( + expect_parse_err_contains( "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(type=csv compression=AUTO record_delimiter=NONE) comments='test'", - String::from("sql parser error: Missing FORMAT"), + String::from("unknown field `type`"), )?; - expect_parse_err( + expect_parse_err_contains( "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(format=csv compression=AUTO1 record_delimiter=NONE) comments='test'", - String::from("sql parser error: no match for compression"), + String::from("unknown variant `auto1`"), )?; - expect_parse_err( - "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(format=csv compression=AUTO record_delimiter=NONE1) comments='test'", - String::from("sql parser error: Expected record delimiter NONE, found: NONE1"), - )?; - - expect_parse_err( + expect_parse_err_contains( "CREATE STAGE test_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(format=csv1 compression=AUTO record_delimiter=NONE) comments='test'", - String::from("sql parser error: Expected format type CSV|PARQUET|JSON, found: CSV1"), + String::from("unknown variant `csv1`"), )?; Ok(()) diff --git a/query/tests/it/users/user_stage.rs b/query/tests/it/users/user_stage.rs index ef041b2b96322..ca50e97ad761f 100644 --- a/query/tests/it/users/user_stage.rs +++ b/query/tests/it/users/user_stage.rs @@ -15,6 +15,7 @@ use common_base::tokio; use common_exception::Result; use common_meta_types::Credentials; +use common_meta_types::FileFormat; use common_meta_types::StageParams; use common_meta_types::UserStageInfo; use databend_query::configs::Config; @@ -36,11 +37,11 @@ async fn test_user_stage() -> Result<()> { let stage_info = UserStageInfo::new( stage_name1, comments, - StageParams::new("test", Credentials::S3 { + StageParams::new("test", Credentials { access_key_id: String::from("test"), secret_access_key: String::from("test"), }), - None, + FileFormat::default(), ); user_mgr.add_stage(stage_info).await?; } @@ -50,11 +51,11 @@ async fn test_user_stage() -> Result<()> { let stage_info = UserStageInfo::new( stage_name2, comments, - StageParams::new("test", Credentials::S3 { + StageParams::new("test", Credentials { access_key_id: String::from("test"), secret_access_key: String::from("test"), }), - None, + FileFormat::default(), ); user_mgr.add_stage(stage_info).await?; } diff --git a/tests/suites/0_stateless/10_0001_describe_stage.result b/tests/suites/0_stateless/10_0001_describe_stage.result new file mode 100644 index 0000000000000..87e42ae5e3d35 --- /dev/null +++ b/tests/suites/0_stateless/10_0001_describe_stage.result @@ -0,0 +1,8 @@ +stage_params url String s3://load/files/ 1 +credentials access_key_id String 1a2b3c 1 +credentials secret_access_key String 4x5y6z 1 +file_format format String Csv Csv 0 +file_format record_delimiter String , \n 1 +file_format field_delimiter String , , 0 +file_format csv_header Boolean false false 0 +file_format compression String Gzip None 1 diff --git a/tests/suites/0_stateless/10_0001_describe_stage.sql b/tests/suites/0_stateless/10_0001_describe_stage.sql new file mode 100644 index 0000000000000..aa1b4b94b281e --- /dev/null +++ b/tests/suites/0_stateless/10_0001_describe_stage.sql @@ -0,0 +1,2 @@ +CREATE STAGE IF NOT EXISTS test_desc_stage url='s3://load/files/' credentials=(access_key_id='1a2b3c' secret_access_key='4x5y6z') file_format=(FORMAT=CSV compression=GZIP record_delimiter=','); +desc stage test_desc_stage;