From 61f27b4b8a67ac5294c3f257cbddc23047671d90 Mon Sep 17 00:00:00 2001 From: baishen Date: Sat, 7 Oct 2023 16:30:57 +0800 Subject: [PATCH] feat: FlightSQL support extension types --- bindings/nodejs/src/lib.rs | 6 +- cli/tests/00-base.result | 7 ++- cli/tests/00-base.sql | 12 ++-- cli/tests/01-put.sh | 0 driver/src/flight_sql.rs | 6 +- sql/Cargo.toml | 3 + sql/src/schema.rs | 120 +++++++++++++++++++++++-------------- sql/src/value.rs | 74 ++++++++++++++++++++++- 8 files changed, 168 insertions(+), 60 deletions(-) mode change 100644 => 100755 cli/tests/01-put.sh diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 6909d90d9..cebd14023 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -67,6 +67,8 @@ impl ToNapiValue for Value { unsafe fn to_napi_value(env: sys::napi_env, val: Self) -> Result { match val.0 { databend_driver::Value::Null => Null::to_napi_value(env, Null), + databend_driver::Value::EmptyArray => String::to_napi_value(env, "[]".to_string()), + databend_driver::Value::EmptyMap => String::to_napi_value(env, "{}".to_string()), databend_driver::Value::Boolean(b) => bool::to_napi_value(env, b), databend_driver::Value::String(s) => String::to_napi_value(env, s), databend_driver::Value::Number(n) => NumberValue::to_napi_value(env, NumberValue(n)), @@ -81,6 +83,8 @@ impl ToNapiValue for Value { NaiveDateTime::new(v, NaiveTime::from_hms_opt(0, 0, 0).unwrap()), ) } + databend_driver::Value::Bitmap(s) => String::to_napi_value(env, s), + databend_driver::Value::Variant(s) => String::to_napi_value(env, s), } } } @@ -317,7 +321,7 @@ impl Connection { self.0 .query_iter(&sql) .await - .map(|iter| RowIterator(iter)) + .map(RowIterator) .map_err(format_napi_error) } diff --git a/cli/tests/00-base.result b/cli/tests/00-base.result index b1f857f39..4da57d370 100644 --- a/cli/tests/00-base.result +++ b/cli/tests/00-base.result @@ -1,9 +1,10 @@ -b 2 false -a 1 true -0 99999 100000 +b 2 false "{""k"":""v""}" +a 1 true [1,2] +0 99999 99999 100000 1 2 3 +[] {} with comment 3.00 3.00 Asia/Shanghai diff --git a/cli/tests/00-base.sql b/cli/tests/00-base.sql index d4900b2ca..1f5acdbeb 100644 --- a/cli/tests/00-base.sql +++ b/cli/tests/00-base.sql @@ -1,15 +1,17 @@ drop table if exists test; -create table test(a string, b int, d boolean); -insert into test values('a', 1, true); -insert into test values('b', 2, false); +create table test(a string, b int, c boolean, d variant); +insert into test values('a', 1, true, '[1,2]'); +insert into test values('b', 2, false, '{"k":"v"}'); select * from test order by a desc; truncate table test; -insert into test select to_string(number), number, false from numbers(100000); -select min(a), max(b), count() from test; +insert into test select to_string(number), number, false, number from numbers(100000); +select min(a), max(b), max(d), count() from test; select '1';select 2; select 1+2; +select [], {}; + -- ignore this line select /* ignore this block */ 'with comment'; diff --git a/cli/tests/01-put.sh b/cli/tests/01-put.sh old mode 100644 new mode 100755 diff --git a/driver/src/flight_sql.rs b/driver/src/flight_sql.rs index 9db430d8d..69c65fdf1 100644 --- a/driver/src/flight_sql.rs +++ b/driver/src/flight_sql.rs @@ -101,9 +101,9 @@ impl Connection for FlightSQLConnection { let row = self.query_row(&sql).await?.ok_or(Error::InvalidResponse( "Empty response from server for presigned request".to_string(), ))?; - let (method, _, url): (String, String, String) = row.try_into().map_err(Error::Parsing)?; - // FIXME: headers is variant, not handled by driver yet - let headers: BTreeMap = BTreeMap::new(); + let (method, headers, url): (String, String, String) = + row.try_into().map_err(Error::Parsing)?; + let headers: BTreeMap = serde_json::from_str(&headers)?; Ok(PresignedResponse { method, headers, diff --git a/sql/Cargo.toml b/sql/Cargo.toml index e8b0147fe..66336ffeb 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -18,6 +18,9 @@ databend-client = { workspace = true } chrono = { version = "0.4", default-features = false } glob = "0.3" +itertools = "0.10" +jsonb = { git = "https://github.com/datafuselabs/jsonb", rev = "3a3c6ef" } +roaring = { version = "0.10.1", features = ["serde"] } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio-stream = "0.1" diff --git a/sql/src/schema.rs b/sql/src/schema.rs index 147dd939e..42b77e732 100644 --- a/sql/src/schema.rs +++ b/sql/src/schema.rs @@ -21,6 +21,13 @@ use databend_client::response::SchemaField as APISchemaField; use crate::error::{Error, Result}; +// Extension types defined by Databend +pub(crate) const EXTENSION_KEY: &str = "Extension"; +pub(crate) const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray"; +pub(crate) const ARROW_EXT_TYPE_EMPTY_MAP: &str = "EmptyMap"; +pub(crate) const ARROW_EXT_TYPE_VARIANT: &str = "Variant"; +pub(crate) const ARROW_EXT_TYPE_BITMAP: &str = "Bitmap"; + #[derive(Debug, Clone, PartialEq, Eq)] pub enum NumberDataType { UInt8, @@ -204,18 +211,26 @@ impl TryFrom<&TypeDesc<'_>> for DataType { "Array type must have one argument".to_string(), )); } - let inner = Self::try_from(&desc.args[0])?; - DataType::Array(Box::new(inner)) + if desc.args[0].name == "Nothing" { + DataType::EmptyArray + } else { + let inner = Self::try_from(&desc.args[0])?; + DataType::Array(Box::new(inner)) + } } "Map" => { - if desc.args.len() != 2 { - return Err(Error::Parsing( - "Map type must have two argument".to_string(), - )); + if desc.args.len() == 1 && desc.args[0].name == "Nothing" { + DataType::EmptyMap + } else { + if desc.args.len() != 2 { + return Err(Error::Parsing( + "Map type must have two arguments".to_string(), + )); + } + let key_ty = Self::try_from(&desc.args[0])?; + let val_ty = Self::try_from(&desc.args[1])?; + DataType::Map(Box::new(DataType::Tuple(vec![key_ty, val_ty]))) } - let key_ty = Self::try_from(&desc.args[0])?; - let val_ty = Self::try_from(&desc.args[1])?; - DataType::Map(Box::new(DataType::Tuple(vec![key_ty, val_ty]))) } "Tuple" => { let mut inner = vec![]; @@ -263,43 +278,58 @@ impl TryFrom<&Arc> for Field { type Error = Error; fn try_from(f: &Arc) -> Result { - let mut dt = match f.data_type() { - ArrowDataType::Null => DataType::Null, - ArrowDataType::Boolean => DataType::Boolean, - ArrowDataType::Int8 => DataType::Number(NumberDataType::Int8), - ArrowDataType::Int16 => DataType::Number(NumberDataType::Int16), - ArrowDataType::Int32 => DataType::Number(NumberDataType::Int32), - ArrowDataType::Int64 => DataType::Number(NumberDataType::Int64), - ArrowDataType::UInt8 => DataType::Number(NumberDataType::UInt8), - ArrowDataType::UInt16 => DataType::Number(NumberDataType::UInt16), - ArrowDataType::UInt32 => DataType::Number(NumberDataType::UInt32), - ArrowDataType::UInt64 => DataType::Number(NumberDataType::UInt64), - ArrowDataType::Float32 => DataType::Number(NumberDataType::Float32), - ArrowDataType::Float64 => DataType::Number(NumberDataType::Float64), - ArrowDataType::Utf8 - | ArrowDataType::Binary - | ArrowDataType::LargeUtf8 - | ArrowDataType::LargeBinary - | ArrowDataType::FixedSizeBinary(_) => DataType::String, - ArrowDataType::Timestamp(_, _) => DataType::Timestamp, - ArrowDataType::Date32 => DataType::Date, - ArrowDataType::Decimal128(p, s) => { - DataType::Decimal(DecimalDataType::Decimal128(DecimalSize { - precision: *p, - scale: *s as u8, - })) - } - ArrowDataType::Decimal256(p, s) => { - DataType::Decimal(DecimalDataType::Decimal256(DecimalSize { - precision: *p, - scale: *s as u8, - })) + let mut dt = if let Some(extend_type) = f.metadata().get(EXTENSION_KEY) { + match extend_type.as_str() { + ARROW_EXT_TYPE_EMPTY_ARRAY => DataType::EmptyArray, + ARROW_EXT_TYPE_EMPTY_MAP => DataType::EmptyMap, + ARROW_EXT_TYPE_VARIANT => DataType::Variant, + ARROW_EXT_TYPE_BITMAP => DataType::Bitmap, + _ => { + return Err(Error::Parsing(format!( + "Unsupported extension datatype for arrow field: {:?}", + f + ))) + } } - _ => { - return Err(Error::Parsing(format!( - "Unsupported datatype for arrow field: {:?}", - f - ))) + } else { + match f.data_type() { + ArrowDataType::Null => DataType::Null, + ArrowDataType::Boolean => DataType::Boolean, + ArrowDataType::Int8 => DataType::Number(NumberDataType::Int8), + ArrowDataType::Int16 => DataType::Number(NumberDataType::Int16), + ArrowDataType::Int32 => DataType::Number(NumberDataType::Int32), + ArrowDataType::Int64 => DataType::Number(NumberDataType::Int64), + ArrowDataType::UInt8 => DataType::Number(NumberDataType::UInt8), + ArrowDataType::UInt16 => DataType::Number(NumberDataType::UInt16), + ArrowDataType::UInt32 => DataType::Number(NumberDataType::UInt32), + ArrowDataType::UInt64 => DataType::Number(NumberDataType::UInt64), + ArrowDataType::Float32 => DataType::Number(NumberDataType::Float32), + ArrowDataType::Float64 => DataType::Number(NumberDataType::Float64), + ArrowDataType::Utf8 + | ArrowDataType::Binary + | ArrowDataType::LargeUtf8 + | ArrowDataType::LargeBinary + | ArrowDataType::FixedSizeBinary(_) => DataType::String, + ArrowDataType::Timestamp(_, _) => DataType::Timestamp, + ArrowDataType::Date32 => DataType::Date, + ArrowDataType::Decimal128(p, s) => { + DataType::Decimal(DecimalDataType::Decimal128(DecimalSize { + precision: *p, + scale: *s as u8, + })) + } + ArrowDataType::Decimal256(p, s) => { + DataType::Decimal(DecimalDataType::Decimal256(DecimalSize { + precision: *p, + scale: *s as u8, + })) + } + _ => { + return Err(Error::Parsing(format!( + "Unsupported datatype for arrow field: {:?}", + f + ))) + } } }; if f.is_nullable() && !matches!(dt, DataType::Null) { diff --git a/sql/src/value.rs b/sql/src/value.rs index e639ea580..5222a9c37 100644 --- a/sql/src/value.rs +++ b/sql/src/value.rs @@ -19,8 +19,16 @@ use crate::{ error::{ConvertError, Error, Result}, schema::{DecimalDataType, DecimalSize}, }; +use itertools::join; +use roaring::RoaringTreemap; use std::fmt::Write; +use crate::schema::ARROW_EXT_TYPE_BITMAP; +use crate::schema::ARROW_EXT_TYPE_EMPTY_ARRAY; +use crate::schema::ARROW_EXT_TYPE_EMPTY_MAP; +use crate::schema::ARROW_EXT_TYPE_VARIANT; +use crate::schema::EXTENSION_KEY; + // Thu 1970-01-01 is R.D. 719163 const DAYS_FROM_CE: i32 = 719_163; const NULL_VALUE: &str = "NULL"; @@ -58,6 +66,8 @@ pub enum NumberValue { #[derive(Clone, Debug, PartialEq)] pub enum Value { Null, + EmptyArray, + EmptyMap, Boolean(bool), String(String), Number(NumberValue), @@ -67,14 +77,16 @@ pub enum Value { // Array(Vec), // Map(Vec<(Value, Value)>), // Tuple(Vec), - // Variant, - // Generic(usize, Vec), + Bitmap(String), + Variant(String), } impl Value { pub fn get_type(&self) -> DataType { match self { Self::Null => DataType::Null, + Self::EmptyArray => DataType::EmptyArray, + Self::EmptyMap => DataType::EmptyMap, Self::Boolean(_) => DataType::Boolean, Self::String(_) => DataType::String, Self::Number(n) => match n { @@ -98,7 +110,8 @@ impl Value { // Self::Array(v) => DataType::Array(Box::new(v[0].get_type())), // Self::Map(_) => DataType::Map(Box::new(DataType::Null)), // Self::Tuple(_) => DataType::Tuple(vec![]), - // Self::Variant => DataType::Variant, + Self::Bitmap(_) => DataType::Bitmap, + Self::Variant(_) => DataType::Variant, } } } @@ -109,6 +122,8 @@ impl TryFrom<(&DataType, &str)> for Value { fn try_from((t, v): (&DataType, &str)) -> Result { match t { DataType::Null => Ok(Self::Null), + DataType::EmptyArray => Ok(Self::EmptyArray), + DataType::EmptyMap => Ok(Self::EmptyMap), DataType::Boolean => Ok(Self::Boolean(v == "1")), DataType::String => Ok(Self::String(v.to_string())), @@ -159,6 +174,8 @@ impl TryFrom<(&DataType, &str)> for Value { DataType::Date => Ok(Self::Date( chrono::NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE, )), + DataType::Bitmap => Ok(Self::Bitmap(v.to_string())), + DataType::Variant => Ok(Self::Variant(v.to_string())), DataType::Nullable(inner) => { if v == NULL_VALUE { @@ -180,6 +197,51 @@ impl TryFrom<(&ArrowField, &Arc, usize)> for Value { fn try_from( (field, array, seq): (&ArrowField, &Arc, usize), ) -> std::result::Result { + if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) { + match extend_type.as_str() { + ARROW_EXT_TYPE_EMPTY_ARRAY => { + return Ok(Value::EmptyArray); + } + ARROW_EXT_TYPE_EMPTY_MAP => { + return Ok(Value::EmptyMap); + } + ARROW_EXT_TYPE_VARIANT => { + if field.is_nullable() && array.is_null(seq) { + return Ok(Value::Null); + } + return match array.as_any().downcast_ref::() { + Some(array) => Ok(Value::Variant(jsonb::to_string(array.value(seq)))), + None => Err(ConvertError::new("variant", format!("{:?}", array)).into()), + }; + } + ARROW_EXT_TYPE_BITMAP => { + if field.is_nullable() && array.is_null(seq) { + return Ok(Value::Null); + } + return match array.as_any().downcast_ref::() { + Some(array) => { + let rb = RoaringTreemap::deserialize_from(array.value(seq)) + .expect("failed to deserialize bitmap"); + let raw = rb.into_iter().collect::>(); + let s = join(raw.iter(), ","); + Ok(Value::Bitmap(s)) + } + None => Err(ConvertError::new("bitmap", format!("{:?}", array)).into()), + }; + } + _ => { + return Err(ConvertError::new( + "extension", + format!( + "Unsupported extension datatype for arrow field: {:?}", + field + ), + ) + .into()); + } + } + } + if field.is_nullable() && array.is_null(seq) { return Ok(Value::Null); } @@ -325,6 +387,8 @@ impl TryFrom for String { fn try_from(val: Value) -> Result { match val { Value::String(s) => Ok(s), + Value::Bitmap(s) => Ok(s), + Value::Variant(s) => Ok(s), _ => Err(ConvertError::new("string", format!("{:?}", val)).into()), } } @@ -474,6 +538,8 @@ impl std::fmt::Display for Value { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Value::Null => write!(f, "NULL"), + Value::EmptyArray => write!(f, "[]"), + Value::EmptyMap => write!(f, "{{}}"), Value::Boolean(b) => write!(f, "{}", b), Value::Number(n) => write!(f, "{}", n), Value::String(s) => write!(f, "{}", s), @@ -488,6 +554,8 @@ impl std::fmt::Display for Value { let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default(); write!(f, "{}", d) } + Value::Bitmap(s) => write!(f, "{}", s), + Value::Variant(s) => write!(f, "{}", s), } } }