diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs index 47c98b1602..abbfcdf2c7 100644 --- a/sql/src/influxql/mod.rs +++ b/sql/src/influxql/mod.rs @@ -6,17 +6,21 @@ pub mod planner; pub(crate) mod provider; pub mod error { + use common_util::error::GenericError; use snafu::{Backtrace, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility = "pub")] pub enum Error { #[snafu(display( - "Failed to build influxdb schema, msg: {}.\nBacktrace:{}", + "Failed to build influxdb schema, msg:{}.\nBacktrace:{}", msg, backtrace ))] - BuildInfluxSchema { msg: String, backtrace: Backtrace }, + BuildSchema { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to build influxql plan, msg:{}, err:{}", msg, source))] + BuildPlan { msg: String, source: GenericError }, } define_result!(Error); } diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs index d693da75b9..896cfbfbd0 100644 --- a/sql/src/influxql/planner.rs +++ b/sql/src/influxql/planner.rs @@ -4,8 +4,10 @@ use std::sync::Arc; +use common_util::error::BoxError; use influxql_logical_planner::planner::InfluxQLToLogicalPlan; use influxql_parser::statement::Statement as InfluxqlStatement; +use snafu::ResultExt; use super::provider::InfluxSchemaProviderImpl; use crate::{ @@ -50,8 +52,20 @@ impl<'a, P: MetaProvider> Planner<'a, P> { }; let influxql_logical_planner = InfluxQLToLogicalPlan::new(&influx_schema_provider); - let df_plan = influxql_logical_planner.statement_to_plan(stmt).unwrap(); - let tables = Arc::new(self.context_provider.try_into_container().unwrap()); + let df_plan = influxql_logical_planner + .statement_to_plan(stmt) + .box_err() + .context(BuildPlan { + msg: "build df plan for influxql select statement", + })?; + let tables = Arc::new( + self.context_provider + .try_into_container() + .box_err() + .context(BuildPlan { + msg: "get tables from df plan of select", + })?, + ); Ok(Plan::Query(QueryPlan { df_plan, tables })) } diff --git a/sql/src/influxql/provider.rs b/sql/src/influxql/provider.rs index a7947c8543..77091f4905 100644 --- a/sql/src/influxql/provider.rs +++ b/sql/src/influxql/provider.rs @@ -126,7 +126,7 @@ fn map_column_type_to_influx_column_type( if column.name == "time" && !column.is_nullable { Ok(InfluxColumnType::Timestamp) } else { - BuildInfluxSchema { + BuildSchema { msg: format!("invalid time column, column:{column:?}"), } .fail() @@ -136,7 +136,7 @@ fn map_column_type_to_influx_column_type( if matches!(column.data_type, DatumKind::String) && column.is_nullable { Ok(InfluxColumnType::Tag) } else { - BuildInfluxSchema { + BuildSchema { msg: format!("invalid tag column, column:{column:?}"), } .fail() @@ -166,13 +166,13 @@ fn map_field_type_to_influx_field_type(column: &ColumnSchema) -> Result BuildInfluxSchema { + | DatumKind::Time => BuildSchema { msg: format!("invalid field column, column:{column:?}"), } .fail(), } } else { - BuildInfluxSchema { + BuildSchema { msg: format!("invalid field column, column:{column:?}"), } .fail() @@ -226,3 +226,216 @@ impl<'a, P: MetaProvider> SchemaProvider for InfluxSchemaProviderImpl<'a, P> { Ok(self.table_schema(name)?.is_some()) } } + +#[cfg(test)] +mod test { + use arrow::datatypes::{DataType, TimeUnit}; + use common_types::{ + column_schema, + datum::DatumKind, + schema::{self, Schema, TSID_COLUMN}, + }; + use influxql_logical_planner::provider::{ + InfluxColumnType, InfluxFieldType, Schema as InfluxSchema, + }; + + use super::InfluxSchemaImpl; + + #[test] + fn test_build_influx_schema() { + let cases = vec![ + Case::Compatible, + Case::TimeNameInvalid, + Case::TagNotNull, + Case::FieldNotNull, + Case::TagTypeInvalid, + Case::FieldTypeInvalid, + ]; + + for case in cases { + let schema = build_test_schema(case); + let influx_schema = InfluxSchemaImpl::new(&schema); + match case { + Case::Compatible => { + let influx_schema = influx_schema.unwrap(); + let columns = influx_schema.columns(); + for column in columns { + match column { + (InfluxColumnType::Timestamp, field) => { + assert_eq!(field.name(), "time"); + assert_eq!( + field.data_type(), + &DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert!(!field.is_nullable()); + } + (InfluxColumnType::Tag, field) => { + assert_eq!(field.name(), "tag"); + assert_eq!(field.data_type(), &DataType::Utf8); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::Integer), field) => { + assert_eq!(field.name(), "int_field"); + assert_eq!(field.data_type(), &DataType::Int64); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::UInteger), field) => { + assert_eq!(field.name(), "uint_field"); + assert_eq!(field.data_type(), &DataType::UInt64); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::Float), field) => { + assert_eq!(field.name(), "float_field"); + assert_eq!(field.data_type(), &DataType::Float64); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::String), field) => { + assert_eq!(field.name(), "str_field"); + assert_eq!(field.data_type(), &DataType::Utf8); + assert!(field.is_nullable()); + } + (InfluxColumnType::Field(InfluxFieldType::Boolean), field) => { + assert_eq!(field.name(), "bool_field"); + assert_eq!(field.data_type(), &DataType::Boolean); + assert!(field.is_nullable()); + } + } + } + } + Case::TimeNameInvalid => { + assert!(influx_schema.is_err()); + } + Case::TagNotNull => { + assert!(influx_schema.is_err()); + } + Case::FieldNotNull => { + assert!(influx_schema.is_err()); + } + Case::TagTypeInvalid => { + assert!(influx_schema.is_err()); + } + Case::FieldTypeInvalid => { + assert!(influx_schema.is_err()); + } + } + } + } + + #[derive(Clone, Copy)] + enum Case { + Compatible, + TimeNameInvalid, + TagNotNull, + FieldNotNull, + TagTypeInvalid, + FieldTypeInvalid, + } + + fn build_test_schema(case: Case) -> Schema { + let time_column_name = if matches!(case, Case::TimeNameInvalid) { + "not_time" + } else { + "time" + }; + + let base_schema_builder = schema::Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new(time_column_name.to_string(), DatumKind::Timestamp) + .is_nullable(false) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new(TSID_COLUMN.to_owned(), DatumKind::UInt64) + .is_nullable(false) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("tag".to_string(), DatumKind::String) + .is_nullable(true) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("str_field".to_string(), DatumKind::String) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("int_field".to_string(), DatumKind::Int64) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("uint_field".to_string(), DatumKind::UInt64) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("float_field".to_string(), DatumKind::Double) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("bool_field".to_string(), DatumKind::Boolean) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(); + + let schema_builder = match case { + Case::TagNotNull => base_schema_builder + .add_normal_column( + column_schema::Builder::new("tag_not_null".to_string(), DatumKind::String) + .is_nullable(false) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + Case::FieldNotNull => base_schema_builder + .add_normal_column( + column_schema::Builder::new("field_not_null".to_string(), DatumKind::Int64) + .is_nullable(false) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + Case::TagTypeInvalid => base_schema_builder + .add_normal_column( + column_schema::Builder::new("tag_invaild".to_string(), DatumKind::Varbinary) + .is_nullable(true) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + Case::FieldTypeInvalid => base_schema_builder + .add_normal_column( + column_schema::Builder::new("field_invalid".to_string(), DatumKind::Varbinary) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap(), + _ => base_schema_builder, + }; + + schema_builder.build().expect("should succeed build schema") + } +}