diff --git a/Cargo.lock b/Cargo.lock index 2ea7665272..ce7382a7d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3082,6 +3082,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json_pretty" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9fe3f290d2cb8660e3e051352ea55a404788d213a106a33ec8802447c4a762" + [[package]] name = "lazy_static" version = "1.4.0" @@ -5583,6 +5589,7 @@ dependencies = [ "http", "influxdb_line_protocol", "interpreters", + "json_pretty", "lazy_static", "log", "logger", diff --git a/integration_tests/cases/env/local/influxql/basic.result b/integration_tests/cases/env/local/influxql/basic.result index fb1abe4ae7..10871c1cc3 100644 --- a/integration_tests/cases/env/local/influxql/basic.result +++ b/integration_tests/cases/env/local/influxql/basic.result @@ -27,12 +27,12 @@ affected_rows: 6 -- SQLNESS ARG protocol=influxql SELECT * FROM "h2o_feet"; -{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887}]} +{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",2.064],[1439827200000,"between 6 and 9 feet","coyote_creek",8.12],[1439827560000,"below 3 feet","santa_monica",2.116],[1439827560000,"between 6 and 9 feet","coyote_creek",8.005],[1439827620000,"below 3 feet","santa_monica",2.028],[1439827620000,"between 6 and 9 feet","coyote_creek",7.887]]}]}]} -- SQLNESS ARG protocol=influxql SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica'; -{"rows":[{"ceresdb::measurement":"h2o_feet","time":1439827200000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"ceresdb::measurement":"h2o_feet","time":1439827560000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"ceresdb::measurement":"h2o_feet","time":1439827620000,"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]} +{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",2.064],[1439827560000,"below 3 feet","santa_monica",2.116],[1439827620000,"below 3 feet","santa_monica",2.028]]}]}]} DROP TABLE IF EXISTS `h2o_feet`; diff --git a/server/Cargo.toml b/server/Cargo.toml index 97e2c82baa..024488107d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -55,4 +55,5 @@ warp = "0.3" zstd = { workspace = true } [dev-dependencies] +json_pretty = "0.1.2" sql = { workspace = true, features = ["test"] } diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 16c2355a53..a75def41c0 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -74,7 +74,10 @@ pub enum Error { }, #[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))] - InfluxDbHandler { msg: String, source: GenericError }, + InfluxDbHandlerWithCause { msg: String, source: GenericError }, + + #[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace }, } define_result!(Error); diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index cb10fd7b4c..49e2f097e4 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -4,22 +4,32 @@ //! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint //! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + time::Instant, +}; use bytes::Bytes; use ceresdbproto::storage::{ value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, }; -use common_types::{request_id::RequestId, time::Timestamp}; +use common_types::{ + column_schema::ColumnSchema, datum::Datum, record_batch::RecordBatch, request_id::RequestId, + schema::RecordSchema, time::Timestamp, +}; use common_util::error::BoxError; use handlers::{ - error::{InfluxDbHandler, Result}, + error::{InfluxDbHandlerNoCause, InfluxDbHandlerWithCause, Result}, query::QueryRequest, }; use influxdb_line_protocol::FieldValue; +use interpreters::interpreter::Output; use log::debug; use query_engine::executor::Executor as QueryExecutor; -use snafu::ResultExt; +use serde::Serialize; +use snafu::{ensure, ResultExt}; +use sql::influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME; use warp::{reject, reply, Rejection, Reply}; use crate::{ @@ -71,6 +81,266 @@ impl From for WriteRequest { pub type WriteResponse = (); +/// Influxql response organized in the same way with influxdb. +/// +/// The basic example: +/// ```json +/// {"results":[{"statement_id":0,"series":[{"name":"mymeas", +/// "columns":["time","myfield","mytag1","mytag2"], +/// "values":[["2017-03-01T00:16:18Z",33.1,null,null], +/// ["2017-03-01T00:17:18Z",12.4,"12","14"]]}]}]} +/// ``` +/// More details refer to: +/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-data-with-a-select-statement +#[derive(Debug, Serialize)] +pub struct InfluxqlResponse { + pub results: Vec, +} + +#[derive(Debug, Serialize)] +pub struct OneInfluxqlResult { + statement_id: u32, + #[serde(skip_serializing_if = "Option::is_none")] + series: Option>, +} + +#[derive(Debug, Serialize)] +struct OneSeries { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + tags: Option>, + columns: Vec, + values: Vec>, +} + +/// [InfluxqlResult] builder +#[derive(Default)] +pub struct InfluxqlResultBuilder { + /// Query id for [multiple queries](https://docs.influxdata.com/influxdb/v1.8/tools/api/#request-multiple-queries) + statement_id: u32, + + /// Schema of influxql query result + /// + /// Its format is like: + /// measurement | + /// tag_1..tag_n(columns in `group by`) | + /// time | + /// column_1..column_n(column in `projection` but not in `group by`) + column_schemas: Vec, + + /// Tags part in schema + group_by_tag_col_idxs: Vec, + + /// Columns part in schema(include `time`) + value_col_idxs: Vec, + + /// Mapping group key(`measurement` + `tag values`) to column values, + /// + /// NOTE: because tag keys in `group by` clause are same in each sub result, + /// we just use the `measurement` + `tag values` to distinguish them. + group_key_to_idx: HashMap, + + /// Column values grouped by [GroupKey] + value_groups: Vec, +} + +type Row = Vec; +type RowGroup = Vec; + +impl InfluxqlResultBuilder { + pub fn new(record_schema: &RecordSchema, statement_id: u32) -> Result { + let column_schemas = record_schema.columns().to_owned(); + ensure!( + !column_schemas.is_empty(), + InfluxDbHandlerNoCause { + msg: "empty schema", + } + ); + + // Find the tags part and columns part from schema. + let mut group_by_col_idxs = Vec::new(); + let mut value_col_idxs = Vec::new(); + + // The following index searching logic is derived from the fixed format + // described when introducing `column_schemas`. + let mut col_iter = column_schemas.iter().enumerate(); + // The first column may be measurement column in normal. + ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InfluxDbHandlerNoCause { + msg: format!("invalid schema whose first column is not measurement column, schema:{column_schemas:?}"), + }); + + // The group by tags will be placed after measurement and before time column. + let mut searching_group_by_tags = true; + for (idx, col) in col_iter { + if col.data_type.is_timestamp() { + searching_group_by_tags = false; + } + + if searching_group_by_tags { + group_by_col_idxs.push(idx); + } else { + value_col_idxs.push(idx); + } + } + + Ok(Self { + statement_id, + column_schemas, + group_by_tag_col_idxs: group_by_col_idxs, + value_col_idxs, + group_key_to_idx: HashMap::new(), + value_groups: Vec::new(), + }) + } + + pub fn add_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> { + // Check schema's compatibility. + ensure!( + record_batch.schema().columns() == self.column_schemas, + InfluxDbHandlerNoCause { + msg: format!( + "conflict schema, origin:{:?}, new:{:?}", + self.column_schemas, + record_batch.schema().columns() + ), + } + ); + + let row_num = record_batch.num_rows(); + for row_idx in 0..row_num { + // Get measurement + group by tags. + let group_key = self.extract_group_key(&record_batch, row_idx)?; + let value_group = self.extract_value_group(&record_batch, row_idx)?; + + let value_groups = if let Some(idx) = self.group_key_to_idx.get(&group_key) { + self.value_groups.get_mut(*idx).unwrap() + } else { + self.value_groups.push(Vec::new()); + self.group_key_to_idx + .insert(group_key, self.value_groups.len() - 1); + self.value_groups.last_mut().unwrap() + }; + + value_groups.push(value_group); + } + + Ok(()) + } + + pub fn build(self) -> OneInfluxqlResult { + let ordered_group_keys = { + let mut ordered_pairs = self.group_key_to_idx.into_iter().collect::>(); + ordered_pairs.sort_by(|a, b| a.1.cmp(&b.1)); + ordered_pairs + .into_iter() + .map(|(key, _)| key) + .collect::>() + }; + + let series = ordered_group_keys + .into_iter() + .zip(self.value_groups.into_iter()) + .map(|(group_key, value_group)| { + let name = group_key.measurement; + let tags = if group_key.group_by_tag_values.is_empty() { + None + } else { + let tags = group_key + .group_by_tag_values + .into_iter() + .enumerate() + .map(|(tagk_idx, tagv)| { + let tagk_col_idx = self.group_by_tag_col_idxs[tagk_idx]; + let tagk = self.column_schemas[tagk_col_idx].name.clone(); + + (tagk, tagv) + }) + .collect::>(); + + Some(tags) + }; + + let columns = self + .value_col_idxs + .iter() + .map(|idx| self.column_schemas[*idx].name.clone()) + .collect::>(); + + OneSeries { + name, + tags, + columns, + values: value_group, + } + }) + .collect(); + + OneInfluxqlResult { + series: Some(series), + statement_id: self.statement_id, + } + } + + fn extract_group_key(&self, record_batch: &RecordBatch, row_idx: usize) -> Result { + let mut group_by_tag_values = Vec::with_capacity(self.group_by_tag_col_idxs.len()); + let measurement = { + let measurement = record_batch.column(0).datum(row_idx); + match measurement { + Datum::String(m) => m.to_string(), + other => { + return InfluxDbHandlerNoCause { + msg: format!("invalid measurement column, column:{other:?}"), + } + .fail() + } + } + }; + + for col_idx in &self.group_by_tag_col_idxs { + let tag = { + let tag_datum = record_batch.column(*col_idx).datum(row_idx); + match tag_datum { + Datum::Null => "".to_string(), + Datum::String(tag) => tag.to_string(), + other => { + return InfluxDbHandlerNoCause { + msg: format!("invalid tag column, column:{other:?}"), + } + .fail() + } + } + }; + group_by_tag_values.push(tag); + } + + Ok(GroupKey { + measurement, + group_by_tag_values, + }) + } + + fn extract_value_group( + &self, + record_batch: &RecordBatch, + row_idx: usize, + ) -> Result> { + let mut value_group = Vec::with_capacity(self.value_col_idxs.len()); + for col_idx in &self.value_col_idxs { + let value = record_batch.column(*col_idx).datum(row_idx); + + value_group.push(value); + } + + Ok(value_group) + } +} + +#[derive(Hash, PartialEq, Eq, Clone)] +struct GroupKey { + measurement: String, + group_by_tag_values: Vec, +} + impl InfluxDb { pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { Self { @@ -79,14 +349,15 @@ impl InfluxDb { } } - async fn query( - &self, - ctx: RequestContext, - req: QueryRequest, - ) -> Result { - handlers::query::handle_query(&ctx, self.instance.clone(), req) + async fn query(&self, ctx: RequestContext, req: QueryRequest) -> Result { + let output = handlers::query::handle_query(&ctx, self.instance.clone(), req) .await - .map(handlers::query::convert_output) + .box_err() + .context(InfluxDbHandlerWithCause { + msg: "failed to query by influxql", + })?; + + convert_influxql_output(output) } async fn write(&self, ctx: RequestContext, req: WriteRequest) -> Result { @@ -99,7 +370,7 @@ impl InfluxDb { .schema_config_provider .schema_config(schema) .box_err() - .with_context(|| InfluxDbHandler { + .with_context(|| InfluxDbHandlerWithCause { msg: format!("get schema config failed, schema:{schema}"), })?; @@ -114,7 +385,7 @@ impl InfluxDb { ) .await .box_err() - .with_context(|| InfluxDbHandler { + .with_context(|| InfluxDbHandlerWithCause { msg: "write request to insert plan", })?; @@ -130,7 +401,7 @@ impl InfluxDb { ) .await .box_err() - .with_context(|| InfluxDbHandler { + .with_context(|| InfluxDbHandlerWithCause { msg: "execute plan", })?; } @@ -147,7 +418,7 @@ fn convert_write_request(req: WriteRequest) -> Result> { let mut req_by_measurement = HashMap::new(); let default_ts = Timestamp::now().as_i64(); for line in influxdb_line_protocol::parse_lines(&req.lines) { - let mut line = line.box_err().with_context(|| InfluxDbHandler { + let mut line = line.box_err().with_context(|| InfluxDbHandlerWithCause { msg: "invalid line", })?; @@ -229,6 +500,39 @@ fn convert_influx_value(field_value: FieldValue) -> Value { Value { value: Some(v) } } +fn convert_influxql_output(output: Output) -> Result { + // TODO: now, we just support one influxql in each query. + let records = match output { + Output::Records(records) => records, + Output::AffectedRows(_) => { + return InfluxDbHandlerNoCause { + msg: "output in influxql should not be affected rows", + } + .fail() + } + }; + + let influxql_result = if records.is_empty() { + OneInfluxqlResult { + statement_id: 0, + series: None, + } + } else { + // All record schemas in one query result should be same. + let record_schema = records.first().unwrap().schema(); + let mut builder = InfluxqlResultBuilder::new(record_schema, 0)?; + for record in records { + builder.add_record_batch(record)?; + } + + builder.build() + }; + + Ok(InfluxqlResponse { + results: vec![influxql_result], + }) +} + // TODO: Request and response type don't match influxdb's API now. pub async fn query( ctx: RequestContext, @@ -255,16 +559,26 @@ pub async fn write( #[cfg(test)] mod tests { + use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; + use common_types::{ + column::{ColumnBlock, ColumnBlockBuilder}, + column_schema, + datum::DatumKind, + schema, + string::StringBytes, + }; + use json_pretty::PrettyFormatter; + use super::*; #[test] fn test_convert_influxdb_write_req() { let lines = r#" -demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 -demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 -demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 -demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 -"# + demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 + demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 + demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 + demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 + "# .to_string(); let req = WriteRequest { lines, @@ -366,4 +680,163 @@ demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 } ); } + + #[test] + fn test_influxql_result() { + let record_schema = build_test_record_schema(); + let column_blocks = build_test_column_blocks(); + let record_batch = RecordBatch::new(record_schema, column_blocks).unwrap(); + + let mut builder = InfluxqlResultBuilder::new(record_batch.schema(), 0).unwrap(); + builder.add_record_batch(record_batch).unwrap(); + let iql_results = vec![builder.build()]; + let iql_response = InfluxqlResponse { + results: iql_results, + }; + let iql_result_json = + PrettyFormatter::from_str(&serde_json::to_string(&iql_response).unwrap()).pretty(); + let expected = PrettyFormatter::from_str(r#"{"results":[{"statement_id":0,"series":[{"name":"m1","tags":{"tag":"tv1"}, + "columns":["time","field1","field2"],"values":[[10001,"fv1",1]]}, + {"name":"m1","tags":{"tag":"tv2"},"columns":["time","field1","field2"],"values":[[100002,"fv2",2]]}, + {"name":"m1","tags":{"tag":"tv3"},"columns":["time","field1","field2"],"values":[[10003,"fv3",3]]}, + {"name":"m1","tags":{"tag":""},"columns":["time","field1","field2"],"values":[[10007,null,null]]}, + {"name":"m2","tags":{"tag":"tv4"},"columns":["time","field1","field2"],"values":[[10004,"fv4",4]]}, + {"name":"m2","tags":{"tag":"tv5"},"columns":["time","field1","field2"],"values":[[100005,"fv5",5]]}, + {"name":"m2","tags":{"tag":"tv6"},"columns":["time","field1","field2"],"values":[[10006,"fv6",6]]}]}]}"#).pretty(); + assert_eq!(expected, iql_result_json); + } + + fn build_test_record_schema() -> RecordSchema { + let schema = schema::Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new("time".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("tag".to_string(), DatumKind::String) + .is_tag(true) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("field1".to_string(), DatumKind::String) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + // The data type of column is `UInt32`, and the type of default value expr is + // `Int64`. So we use this column to cover the test, which has + // different type. + column_schema::Builder::new("field2".to_string(), DatumKind::UInt64) + .is_nullable(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .unwrap(); + + // Record schema + let arrow_schema = schema.to_arrow_schema_ref(); + let fields = arrow_schema.fields.to_owned(); + let measurement_field = ArrowField::new( + "ceresdb::measurement".to_string(), + schema::DataType::Utf8, + false, + ); + let project_fields = vec![ + measurement_field, + fields[1].clone(), + fields[0].clone(), + fields[2].clone(), + fields[3].clone(), + ]; + let project_arrow_schema = Arc::new(ArrowSchema::new_with_metadata( + project_fields, + arrow_schema.metadata().clone(), + )); + + RecordSchema::try_from(project_arrow_schema).unwrap() + } + + fn build_test_column_blocks() -> Vec { + let mut measurement_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); + let mut tag_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); + let mut time_builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 3); + let mut field_builder1 = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); + let mut field_builder2 = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 3); + + // Data in measurement1 + let measurement1 = Datum::String(StringBytes::copy_from_str("m1")); + let tags1 = vec!["tv1".to_string(), "tv2".to_string(), "tv3".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let times1 = vec![10001_i64, 100002, 10003] + .into_iter() + .map(|v| Datum::Timestamp(v.into())) + .collect::>(); + let fields1 = vec!["fv1".to_string(), "fv2".to_string(), "fv3".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let fields2 = vec![1_u64, 2, 3] + .into_iter() + .map(Datum::UInt64) + .collect::>(); + + let measurement2 = Datum::String(StringBytes::copy_from_str("m2")); + let tags2 = vec!["tv4".to_string(), "tv5".to_string(), "tv6".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let times2 = vec![10004_i64, 100005, 10006] + .into_iter() + .map(|v| Datum::Timestamp(v.into())) + .collect::>(); + let fields3 = vec!["fv4".to_string(), "fv5".to_string(), "fv6".to_string()] + .into_iter() + .map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))) + .collect::>(); + let fields4 = vec![4_u64, 5, 6] + .into_iter() + .map(Datum::UInt64) + .collect::>(); + + for idx in 0..3 { + measurement_builder.append(measurement1.clone()).unwrap(); + tag_builder.append(tags1[idx].clone()).unwrap(); + time_builder.append(times1[idx].clone()).unwrap(); + field_builder1.append(fields1[idx].clone()).unwrap(); + field_builder2.append(fields2[idx].clone()).unwrap(); + } + measurement_builder.append(measurement1).unwrap(); + tag_builder.append(Datum::Null).unwrap(); + time_builder.append(Datum::Timestamp(10007.into())).unwrap(); + field_builder1.append(Datum::Null).unwrap(); + field_builder2.append(Datum::Null).unwrap(); + + for idx in 0..3 { + measurement_builder.append(measurement2.clone()).unwrap(); + tag_builder.append(tags2[idx].clone()).unwrap(); + time_builder.append(times2[idx].clone()).unwrap(); + field_builder1.append(fields3[idx].clone()).unwrap(); + field_builder2.append(fields4[idx].clone()).unwrap(); + } + + vec![ + measurement_builder.build(), + tag_builder.build(), + time_builder.build(), + field_builder1.build(), + field_builder2.build(), + ] + } } diff --git a/server/src/handlers/query.rs b/server/src/handlers/query.rs index cfdb7a644f..049cf3eec3 100644 --- a/server/src/handlers/query.rs +++ b/server/src/handlers/query.rs @@ -113,7 +113,7 @@ pub enum QueryRequest { Influxql(Request), } impl QueryRequest { - fn query(&self) -> &str { + pub fn query(&self) -> &str { match self { QueryRequest::Sql(request) => request.query.as_str(), QueryRequest::Influxql(request) => request.query.as_str(), diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs index f4dcbc73ce..7ce67347dd 100644 --- a/sql/src/influxql/planner.rs +++ b/sql/src/influxql/planner.rs @@ -19,7 +19,7 @@ use crate::{ provider::{ContextProviderAdapter, MetaProvider}, }; -const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement"; +pub const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "ceresdb::measurement"; pub(crate) struct Planner<'a, P: MetaProvider> { context_provider: ContextProviderAdapter<'a, P>,