Skip to content

Commit

Permalink
add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 14, 2023
1 parent ea0547b commit cb476be
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 8 deletions.
8 changes: 6 additions & 2 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ pub mod planner;
pub(crate) mod provider;

pub mod error {
use common_util::error::GenericError;
use snafu::{Backtrace, Snafu};

#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display(
"Failed to build influxdb schema, msg: {}.\nBacktrace:{}",
"Failed to build influxdb schema, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
BuildInfluxSchema { msg: String, backtrace: Backtrace },
BuildSchema { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to build influxql plan, msg:{}, err:{}", msg, source))]
BuildPlan { msg: String, source: GenericError },
}
define_result!(Error);
}
18 changes: 16 additions & 2 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

use std::sync::Arc;

use common_util::error::BoxError;
use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::ResultExt;

use super::provider::InfluxSchemaProviderImpl;
use crate::{
Expand Down Expand Up @@ -50,8 +52,20 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
};
let influxql_logical_planner = InfluxQLToLogicalPlan::new(&influx_schema_provider);

let df_plan = influxql_logical_planner.statement_to_plan(stmt).unwrap();
let tables = Arc::new(self.context_provider.try_into_container().unwrap());
let df_plan = influxql_logical_planner
.statement_to_plan(stmt)
.box_err()
.context(BuildPlan {
msg: "build df plan for influxql select statement",
})?;
let tables = Arc::new(
self.context_provider
.try_into_container()
.box_err()
.context(BuildPlan {
msg: "get tables from df plan of select",
})?,
);

Ok(Plan::Query(QueryPlan { df_plan, tables }))
}
Expand Down
221 changes: 217 additions & 4 deletions sql/src/influxql/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ fn map_column_type_to_influx_column_type(
if column.name == "time" && !column.is_nullable {
Ok(InfluxColumnType::Timestamp)
} else {
BuildInfluxSchema {
BuildSchema {
msg: format!("invalid time column, column:{column:?}"),
}
.fail()
Expand All @@ -136,7 +136,7 @@ fn map_column_type_to_influx_column_type(
if matches!(column.data_type, DatumKind::String) && column.is_nullable {
Ok(InfluxColumnType::Tag)
} else {
BuildInfluxSchema {
BuildSchema {
msg: format!("invalid tag column, column:{column:?}"),
}
.fail()
Expand Down Expand Up @@ -166,13 +166,13 @@ fn map_field_type_to_influx_field_type(column: &ColumnSchema) -> Result<InfluxFi
| DatumKind::Int16
| DatumKind::Int8
| DatumKind::Date
| DatumKind::Time => BuildInfluxSchema {
| DatumKind::Time => BuildSchema {
msg: format!("invalid field column, column:{column:?}"),
}
.fail(),
}
} else {
BuildInfluxSchema {
BuildSchema {
msg: format!("invalid field column, column:{column:?}"),
}
.fail()
Expand Down Expand Up @@ -226,3 +226,216 @@ impl<'a, P: MetaProvider> SchemaProvider for InfluxSchemaProviderImpl<'a, P> {
Ok(self.table_schema(name)?.is_some())
}
}

#[cfg(test)]
mod test {
use arrow::datatypes::{DataType, TimeUnit};
use common_types::{
column_schema,
datum::DatumKind,
schema::{self, Schema, TSID_COLUMN},
};
use influxql_logical_planner::provider::{
InfluxColumnType, InfluxFieldType, Schema as InfluxSchema,
};

use super::InfluxSchemaImpl;

#[test]
fn test_build_influx_schema() {
let cases = vec![
Case::Compatible,
Case::TimeNameInvalid,
Case::TagNotNull,
Case::FieldNotNull,
Case::TagTypeInvalid,
Case::FieldTypeInvalid,
];

for case in cases {
let schema = build_test_schema(case);
let influx_schema = InfluxSchemaImpl::new(&schema);
match case {
Case::Compatible => {
let influx_schema = influx_schema.unwrap();
let columns = influx_schema.columns();
for column in columns {
match column {
(InfluxColumnType::Timestamp, field) => {
assert_eq!(field.name(), "time");
assert_eq!(
field.data_type(),
&DataType::Timestamp(TimeUnit::Millisecond, None)
);
assert!(!field.is_nullable());
}
(InfluxColumnType::Tag, field) => {
assert_eq!(field.name(), "tag");
assert_eq!(field.data_type(), &DataType::Utf8);
assert!(field.is_nullable());
}
(InfluxColumnType::Field(InfluxFieldType::Integer), field) => {
assert_eq!(field.name(), "int_field");
assert_eq!(field.data_type(), &DataType::Int64);
assert!(field.is_nullable());
}
(InfluxColumnType::Field(InfluxFieldType::UInteger), field) => {
assert_eq!(field.name(), "uint_field");
assert_eq!(field.data_type(), &DataType::UInt64);
assert!(field.is_nullable());
}
(InfluxColumnType::Field(InfluxFieldType::Float), field) => {
assert_eq!(field.name(), "float_field");
assert_eq!(field.data_type(), &DataType::Float64);
assert!(field.is_nullable());
}
(InfluxColumnType::Field(InfluxFieldType::String), field) => {
assert_eq!(field.name(), "str_field");
assert_eq!(field.data_type(), &DataType::Utf8);
assert!(field.is_nullable());
}
(InfluxColumnType::Field(InfluxFieldType::Boolean), field) => {
assert_eq!(field.name(), "bool_field");
assert_eq!(field.data_type(), &DataType::Boolean);
assert!(field.is_nullable());
}
}
}
}
Case::TimeNameInvalid => {
assert!(influx_schema.is_err());
}
Case::TagNotNull => {
assert!(influx_schema.is_err());
}
Case::FieldNotNull => {
assert!(influx_schema.is_err());
}
Case::TagTypeInvalid => {
assert!(influx_schema.is_err());
}
Case::FieldTypeInvalid => {
assert!(influx_schema.is_err());
}
}
}
}

#[derive(Clone, Copy)]
enum Case {
Compatible,
TimeNameInvalid,
TagNotNull,
FieldNotNull,
TagTypeInvalid,
FieldTypeInvalid,
}

fn build_test_schema(case: Case) -> Schema {
let time_column_name = if matches!(case, Case::TimeNameInvalid) {
"not_time"
} else {
"time"
};

let base_schema_builder = schema::Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new(time_column_name.to_string(), DatumKind::Timestamp)
.is_nullable(false)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_key_column(
column_schema::Builder::new(TSID_COLUMN.to_owned(), DatumKind::UInt64)
.is_nullable(false)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag".to_string(), DatumKind::String)
.is_nullable(true)
.is_tag(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("str_field".to_string(), DatumKind::String)
.is_nullable(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("int_field".to_string(), DatumKind::Int64)
.is_nullable(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("uint_field".to_string(), DatumKind::UInt64)
.is_nullable(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("float_field".to_string(), DatumKind::Double)
.is_nullable(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("bool_field".to_string(), DatumKind::Boolean)
.is_nullable(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap();

let schema_builder = match case {
Case::TagNotNull => base_schema_builder
.add_normal_column(
column_schema::Builder::new("tag_not_null".to_string(), DatumKind::String)
.is_nullable(false)
.is_tag(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap(),
Case::FieldNotNull => base_schema_builder
.add_normal_column(
column_schema::Builder::new("field_not_null".to_string(), DatumKind::Int64)
.is_nullable(false)
.build()
.expect("should succeed build column schema"),
)
.unwrap(),
Case::TagTypeInvalid => base_schema_builder
.add_normal_column(
column_schema::Builder::new("tag_invaild".to_string(), DatumKind::Varbinary)
.is_nullable(true)
.is_tag(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap(),
Case::FieldTypeInvalid => base_schema_builder
.add_normal_column(
column_schema::Builder::new("field_invalid".to_string(), DatumKind::Varbinary)
.is_nullable(true)
.build()
.expect("should succeed build column schema"),
)
.unwrap(),
_ => base_schema_builder,
};

schema_builder.build().expect("should succeed build schema")
}
}

0 comments on commit cb476be

Please sign in to comment.