Skip to content

Commit

Permalink
feat: FlightSQL support extension types
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Oct 9, 2023
1 parent 7830e99 commit 61f27b4
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 60 deletions.
6 changes: 5 additions & 1 deletion bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl ToNapiValue for Value {
unsafe fn to_napi_value(env: sys::napi_env, val: Self) -> Result<sys::napi_value> {
match val.0 {
databend_driver::Value::Null => Null::to_napi_value(env, Null),
databend_driver::Value::EmptyArray => String::to_napi_value(env, "[]".to_string()),
databend_driver::Value::EmptyMap => String::to_napi_value(env, "{}".to_string()),
databend_driver::Value::Boolean(b) => bool::to_napi_value(env, b),
databend_driver::Value::String(s) => String::to_napi_value(env, s),
databend_driver::Value::Number(n) => NumberValue::to_napi_value(env, NumberValue(n)),
Expand All @@ -81,6 +83,8 @@ impl ToNapiValue for Value {
NaiveDateTime::new(v, NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
)
}
databend_driver::Value::Bitmap(s) => String::to_napi_value(env, s),
databend_driver::Value::Variant(s) => String::to_napi_value(env, s),
}
}
}
Expand Down Expand Up @@ -317,7 +321,7 @@ impl Connection {
self.0
.query_iter(&sql)
.await
.map(|iter| RowIterator(iter))
.map(RowIterator)
.map_err(format_napi_error)
}

Expand Down
7 changes: 4 additions & 3 deletions cli/tests/00-base.result
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
b 2 false
a 1 true
0 99999 100000
b 2 false "{""k"":""v""}"
a 1 true [1,2]
0 99999 99999 100000
1
2
3
[] {}
with comment
3.00 3.00
Asia/Shanghai
Expand Down
12 changes: 7 additions & 5 deletions cli/tests/00-base.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
drop table if exists test;
create table test(a string, b int, d boolean);
insert into test values('a', 1, true);
insert into test values('b', 2, false);
create table test(a string, b int, c boolean, d variant);
insert into test values('a', 1, true, '[1,2]');
insert into test values('b', 2, false, '{"k":"v"}');
select * from test order by a desc;

truncate table test;
insert into test select to_string(number), number, false from numbers(100000);
select min(a), max(b), count() from test;
insert into test select to_string(number), number, false, number from numbers(100000);
select min(a), max(b), max(d), count() from test;

select '1';select 2; select 1+2;

select [], {};

-- ignore this line

select /* ignore this block */ 'with comment';
Expand Down
Empty file modified cli/tests/01-put.sh
100644 → 100755
Empty file.
6 changes: 3 additions & 3 deletions driver/src/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ impl Connection for FlightSQLConnection {
let row = self.query_row(&sql).await?.ok_or(Error::InvalidResponse(
"Empty response from server for presigned request".to_string(),
))?;
let (method, _, url): (String, String, String) = row.try_into().map_err(Error::Parsing)?;
// FIXME: headers is variant, not handled by driver yet
let headers: BTreeMap<String, String> = BTreeMap::new();
let (method, headers, url): (String, String, String) =
row.try_into().map_err(Error::Parsing)?;
let headers: BTreeMap<String, String> = serde_json::from_str(&headers)?;
Ok(PresignedResponse {
method,
headers,
Expand Down
3 changes: 3 additions & 0 deletions sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ databend-client = { workspace = true }

chrono = { version = "0.4", default-features = false }
glob = "0.3"
itertools = "0.10"
jsonb = { git = "https://github.com/datafuselabs/jsonb", rev = "3a3c6ef" }
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio-stream = "0.1"
Expand Down
120 changes: 75 additions & 45 deletions sql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ use databend_client::response::SchemaField as APISchemaField;

use crate::error::{Error, Result};

// Extension types defined by Databend
pub(crate) const EXTENSION_KEY: &str = "Extension";
pub(crate) const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray";
pub(crate) const ARROW_EXT_TYPE_EMPTY_MAP: &str = "EmptyMap";
pub(crate) const ARROW_EXT_TYPE_VARIANT: &str = "Variant";
pub(crate) const ARROW_EXT_TYPE_BITMAP: &str = "Bitmap";

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NumberDataType {
UInt8,
Expand Down Expand Up @@ -204,18 +211,26 @@ impl TryFrom<&TypeDesc<'_>> for DataType {
"Array type must have one argument".to_string(),
));
}
let inner = Self::try_from(&desc.args[0])?;
DataType::Array(Box::new(inner))
if desc.args[0].name == "Nothing" {
DataType::EmptyArray
} else {
let inner = Self::try_from(&desc.args[0])?;
DataType::Array(Box::new(inner))
}
}
"Map" => {
if desc.args.len() != 2 {
return Err(Error::Parsing(
"Map type must have two argument".to_string(),
));
if desc.args.len() == 1 && desc.args[0].name == "Nothing" {
DataType::EmptyMap
} else {
if desc.args.len() != 2 {
return Err(Error::Parsing(
"Map type must have two arguments".to_string(),
));
}
let key_ty = Self::try_from(&desc.args[0])?;
let val_ty = Self::try_from(&desc.args[1])?;
DataType::Map(Box::new(DataType::Tuple(vec![key_ty, val_ty])))
}
let key_ty = Self::try_from(&desc.args[0])?;
let val_ty = Self::try_from(&desc.args[1])?;
DataType::Map(Box::new(DataType::Tuple(vec![key_ty, val_ty])))
}
"Tuple" => {
let mut inner = vec![];
Expand Down Expand Up @@ -263,43 +278,58 @@ impl TryFrom<&Arc<ArrowField>> for Field {
type Error = Error;

fn try_from(f: &Arc<ArrowField>) -> Result<Self> {
let mut dt = match f.data_type() {
ArrowDataType::Null => DataType::Null,
ArrowDataType::Boolean => DataType::Boolean,
ArrowDataType::Int8 => DataType::Number(NumberDataType::Int8),
ArrowDataType::Int16 => DataType::Number(NumberDataType::Int16),
ArrowDataType::Int32 => DataType::Number(NumberDataType::Int32),
ArrowDataType::Int64 => DataType::Number(NumberDataType::Int64),
ArrowDataType::UInt8 => DataType::Number(NumberDataType::UInt8),
ArrowDataType::UInt16 => DataType::Number(NumberDataType::UInt16),
ArrowDataType::UInt32 => DataType::Number(NumberDataType::UInt32),
ArrowDataType::UInt64 => DataType::Number(NumberDataType::UInt64),
ArrowDataType::Float32 => DataType::Number(NumberDataType::Float32),
ArrowDataType::Float64 => DataType::Number(NumberDataType::Float64),
ArrowDataType::Utf8
| ArrowDataType::Binary
| ArrowDataType::LargeUtf8
| ArrowDataType::LargeBinary
| ArrowDataType::FixedSizeBinary(_) => DataType::String,
ArrowDataType::Timestamp(_, _) => DataType::Timestamp,
ArrowDataType::Date32 => DataType::Date,
ArrowDataType::Decimal128(p, s) => {
DataType::Decimal(DecimalDataType::Decimal128(DecimalSize {
precision: *p,
scale: *s as u8,
}))
}
ArrowDataType::Decimal256(p, s) => {
DataType::Decimal(DecimalDataType::Decimal256(DecimalSize {
precision: *p,
scale: *s as u8,
}))
let mut dt = if let Some(extend_type) = f.metadata().get(EXTENSION_KEY) {
match extend_type.as_str() {
ARROW_EXT_TYPE_EMPTY_ARRAY => DataType::EmptyArray,
ARROW_EXT_TYPE_EMPTY_MAP => DataType::EmptyMap,
ARROW_EXT_TYPE_VARIANT => DataType::Variant,
ARROW_EXT_TYPE_BITMAP => DataType::Bitmap,
_ => {
return Err(Error::Parsing(format!(
"Unsupported extension datatype for arrow field: {:?}",
f
)))
}
}
_ => {
return Err(Error::Parsing(format!(
"Unsupported datatype for arrow field: {:?}",
f
)))
} else {
match f.data_type() {
ArrowDataType::Null => DataType::Null,
ArrowDataType::Boolean => DataType::Boolean,
ArrowDataType::Int8 => DataType::Number(NumberDataType::Int8),
ArrowDataType::Int16 => DataType::Number(NumberDataType::Int16),
ArrowDataType::Int32 => DataType::Number(NumberDataType::Int32),
ArrowDataType::Int64 => DataType::Number(NumberDataType::Int64),
ArrowDataType::UInt8 => DataType::Number(NumberDataType::UInt8),
ArrowDataType::UInt16 => DataType::Number(NumberDataType::UInt16),
ArrowDataType::UInt32 => DataType::Number(NumberDataType::UInt32),
ArrowDataType::UInt64 => DataType::Number(NumberDataType::UInt64),
ArrowDataType::Float32 => DataType::Number(NumberDataType::Float32),
ArrowDataType::Float64 => DataType::Number(NumberDataType::Float64),
ArrowDataType::Utf8
| ArrowDataType::Binary
| ArrowDataType::LargeUtf8
| ArrowDataType::LargeBinary
| ArrowDataType::FixedSizeBinary(_) => DataType::String,
ArrowDataType::Timestamp(_, _) => DataType::Timestamp,
ArrowDataType::Date32 => DataType::Date,
ArrowDataType::Decimal128(p, s) => {
DataType::Decimal(DecimalDataType::Decimal128(DecimalSize {
precision: *p,
scale: *s as u8,
}))
}
ArrowDataType::Decimal256(p, s) => {
DataType::Decimal(DecimalDataType::Decimal256(DecimalSize {
precision: *p,
scale: *s as u8,
}))
}
_ => {
return Err(Error::Parsing(format!(
"Unsupported datatype for arrow field: {:?}",
f
)))
}
}
};
if f.is_nullable() && !matches!(dt, DataType::Null) {
Expand Down
74 changes: 71 additions & 3 deletions sql/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ use crate::{
error::{ConvertError, Error, Result},
schema::{DecimalDataType, DecimalSize},
};
use itertools::join;
use roaring::RoaringTreemap;
use std::fmt::Write;

use crate::schema::ARROW_EXT_TYPE_BITMAP;
use crate::schema::ARROW_EXT_TYPE_EMPTY_ARRAY;
use crate::schema::ARROW_EXT_TYPE_EMPTY_MAP;
use crate::schema::ARROW_EXT_TYPE_VARIANT;
use crate::schema::EXTENSION_KEY;

// Thu 1970-01-01 is R.D. 719163
const DAYS_FROM_CE: i32 = 719_163;
const NULL_VALUE: &str = "NULL";
Expand Down Expand Up @@ -58,6 +66,8 @@ pub enum NumberValue {
#[derive(Clone, Debug, PartialEq)]
pub enum Value {
Null,
EmptyArray,
EmptyMap,
Boolean(bool),
String(String),
Number(NumberValue),
Expand All @@ -67,14 +77,16 @@ pub enum Value {
// Array(Vec<Value>),
// Map(Vec<(Value, Value)>),
// Tuple(Vec<Value>),
// Variant,
// Generic(usize, Vec<u8>),
Bitmap(String),
Variant(String),
}

impl Value {
pub fn get_type(&self) -> DataType {
match self {
Self::Null => DataType::Null,
Self::EmptyArray => DataType::EmptyArray,
Self::EmptyMap => DataType::EmptyMap,
Self::Boolean(_) => DataType::Boolean,
Self::String(_) => DataType::String,
Self::Number(n) => match n {
Expand All @@ -98,7 +110,8 @@ impl Value {
// Self::Array(v) => DataType::Array(Box::new(v[0].get_type())),
// Self::Map(_) => DataType::Map(Box::new(DataType::Null)),
// Self::Tuple(_) => DataType::Tuple(vec![]),
// Self::Variant => DataType::Variant,
Self::Bitmap(_) => DataType::Bitmap,
Self::Variant(_) => DataType::Variant,
}
}
}
Expand All @@ -109,6 +122,8 @@ impl TryFrom<(&DataType, &str)> for Value {
fn try_from((t, v): (&DataType, &str)) -> Result<Self> {
match t {
DataType::Null => Ok(Self::Null),
DataType::EmptyArray => Ok(Self::EmptyArray),
DataType::EmptyMap => Ok(Self::EmptyMap),
DataType::Boolean => Ok(Self::Boolean(v == "1")),
DataType::String => Ok(Self::String(v.to_string())),

Expand Down Expand Up @@ -159,6 +174,8 @@ impl TryFrom<(&DataType, &str)> for Value {
DataType::Date => Ok(Self::Date(
chrono::NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE,
)),
DataType::Bitmap => Ok(Self::Bitmap(v.to_string())),
DataType::Variant => Ok(Self::Variant(v.to_string())),

DataType::Nullable(inner) => {
if v == NULL_VALUE {
Expand All @@ -180,6 +197,51 @@ impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize)> for Value {
fn try_from(
(field, array, seq): (&ArrowField, &Arc<dyn ArrowArray>, usize),
) -> std::result::Result<Self, Self::Error> {
if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
match extend_type.as_str() {
ARROW_EXT_TYPE_EMPTY_ARRAY => {
return Ok(Value::EmptyArray);
}
ARROW_EXT_TYPE_EMPTY_MAP => {
return Ok(Value::EmptyMap);
}
ARROW_EXT_TYPE_VARIANT => {
if field.is_nullable() && array.is_null(seq) {
return Ok(Value::Null);
}
return match array.as_any().downcast_ref::<LargeBinaryArray>() {
Some(array) => Ok(Value::Variant(jsonb::to_string(array.value(seq)))),
None => Err(ConvertError::new("variant", format!("{:?}", array)).into()),
};
}
ARROW_EXT_TYPE_BITMAP => {
if field.is_nullable() && array.is_null(seq) {
return Ok(Value::Null);
}
return match array.as_any().downcast_ref::<LargeBinaryArray>() {
Some(array) => {
let rb = RoaringTreemap::deserialize_from(array.value(seq))
.expect("failed to deserialize bitmap");
let raw = rb.into_iter().collect::<Vec<_>>();
let s = join(raw.iter(), ",");
Ok(Value::Bitmap(s))
}
None => Err(ConvertError::new("bitmap", format!("{:?}", array)).into()),
};
}
_ => {
return Err(ConvertError::new(
"extension",
format!(
"Unsupported extension datatype for arrow field: {:?}",
field
),
)
.into());
}
}
}

if field.is_nullable() && array.is_null(seq) {
return Ok(Value::Null);
}
Expand Down Expand Up @@ -325,6 +387,8 @@ impl TryFrom<Value> for String {
fn try_from(val: Value) -> Result<Self> {
match val {
Value::String(s) => Ok(s),
Value::Bitmap(s) => Ok(s),
Value::Variant(s) => Ok(s),
_ => Err(ConvertError::new("string", format!("{:?}", val)).into()),
}
}
Expand Down Expand Up @@ -474,6 +538,8 @@ impl std::fmt::Display for Value {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Value::Null => write!(f, "NULL"),
Value::EmptyArray => write!(f, "[]"),
Value::EmptyMap => write!(f, "{{}}"),
Value::Boolean(b) => write!(f, "{}", b),
Value::Number(n) => write!(f, "{}", n),
Value::String(s) => write!(f, "{}", s),
Expand All @@ -488,6 +554,8 @@ impl std::fmt::Display for Value {
let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default();
write!(f, "{}", d)
}
Value::Bitmap(s) => write!(f, "{}", s),
Value::Variant(s) => write!(f, "{}", s),
}
}
}
Expand Down

0 comments on commit 61f27b4

Please sign in to comment.