Skip to content

Commit

Permalink
address CR.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 16, 2023
1 parent 6506ffa commit eb189a0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
3 changes: 1 addition & 2 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::ResultExt;

use super::provider::InfluxSchemaProviderImpl;
use crate::{
influxql::error::*,
influxql::{error::*, provider::InfluxSchemaProviderImpl},
plan::{Plan, QueryPlan},
provider::{ContextProviderAdapter, MetaProvider},
};
Expand Down
32 changes: 21 additions & 11 deletions sql/src/influxql/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
/// Influx schema used for build logical plan
pub struct InfluxSchemaImpl {
columns: Vec<InfluxColumnSchema>,
time_column_idx: usize,
}

impl InfluxSchemaImpl {
Expand Down Expand Up @@ -54,8 +55,17 @@ impl InfluxSchemaImpl {
})
.collect::<Result<Vec<_>>>()?;

// Schema is ensured to have timestamp key.
let time_column_idx = influx_columns
.iter()
.enumerate()
.find(|(_, column)| matches!(column.influx_type, InfluxColumnType::Timestamp))
.map(|(idx, _)| idx)
.unwrap();

Ok(Self {
columns: influx_columns,
time_column_idx,
})
}
}
Expand Down Expand Up @@ -96,11 +106,7 @@ impl InfluxSchema for InfluxSchemaImpl {

fn time(&self) -> &ArrowField {
// Time column must exist, has checked it when building.
let time_column = self
.columns
.iter()
.find(|column| matches!(column.influx_type, InfluxColumnType::Timestamp))
.unwrap();
let time_column = &self.columns[self.time_column_idx];

&time_column.arrow_field
}
Expand All @@ -125,14 +131,17 @@ fn map_column_to_influx_column(
is_timestamp_key: bool,
) -> Result<InfluxColumnType> {
if is_timestamp_key {
map_column_to_influx_time_column(column)
} else if column.is_tag {
map_column_to_influx_tag_column(column)
} else {
map_column_to_influx_field_column(column)
return map_column_to_influx_time_column(column);
}

if column.is_tag {
return map_column_to_influx_tag_column(column);
}

map_column_to_influx_field_column(column)
}

// TODO: don't restrict the time column name.
fn map_column_to_influx_time_column(column: &ColumnSchema) -> Result<InfluxColumnType> {
if column.name == "time" && !column.is_nullable {
Ok(InfluxColumnType::Timestamp)
Expand All @@ -144,8 +153,8 @@ fn map_column_to_influx_time_column(column: &ColumnSchema) -> Result<InfluxColum
}
}

// TODO: support more tag types.
fn map_column_to_influx_tag_column(column: &ColumnSchema) -> Result<InfluxColumnType> {
// Tag column
if matches!(column.data_type, DatumKind::String) && column.is_nullable {
Ok(InfluxColumnType::Tag)
} else {
Expand All @@ -156,6 +165,7 @@ fn map_column_to_influx_tag_column(column: &ColumnSchema) -> Result<InfluxColumn
}
}

// TODO: support more field types.
fn map_column_to_influx_field_column(column: &ColumnSchema) -> Result<InfluxColumnType> {
if column.is_nullable {
match column.data_type {
Expand Down
1 change: 0 additions & 1 deletion sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ impl<'a, P: MetaProvider> Planner<'a, P> {

pub fn influxql_stmt_to_plan(&self, statement: InfluxqlStatement) -> Result<Plan> {
let adapter = ContextProviderAdapter::new(self.provider, self.read_parallelism);
// let planner = PlannerDelegate::new(adapter);

let influxql_planner = crate::influxql::planner::Planner::new(adapter);
influxql_planner
Expand Down

0 comments on commit eb189a0

Please sign in to comment.