diff --git a/integration_tests/Makefile b/integration_tests/Makefile index 0ce460ecfa..49ff4ba9c6 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -37,7 +37,7 @@ build-ceresdb: build-test: cargo build -build: build-ceresdb build-test build-meta +build: build-ceresdb build-test kill-old-process: killall ceresdb-server | true @@ -45,13 +45,13 @@ kill-old-process: prepare: clean build kill-old-process -run: prepare +run: prepare build-meta $(CERESDB_TEST_BINARY) run-local: prepare CERESDB_ENV_FILTER=local $(CERESDB_TEST_BINARY) -run-cluster: prepare +run-cluster: prepare build-meta CERESDB_ENV_FILTER=cluster $(CERESDB_TEST_BINARY) run-java: diff --git a/integration_tests/build_meta.sh b/integration_tests/build_meta.sh index 48ac0c594f..ea0cc18783 100755 --- a/integration_tests/build_meta.sh +++ b/integration_tests/build_meta.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -set -exo +set -e SRC=/tmp/ceresmeta-src TARGET=$(pwd)/ceresmeta diff --git a/integration_tests/prom/remote-query.py b/integration_tests/prom/remote-query.py index 046d370600..8b2872dfd5 100755 --- a/integration_tests/prom/remote-query.py +++ b/integration_tests/prom/remote-query.py @@ -14,6 +14,7 @@ def now(): return int(time.time()) * 1000 table = 'prom_remote_query_test' + str(now()) +table2 = 'PROM_REMOTE_QUERY_TEST' + str(now()) def execute_sql(sql): r = requests.post('{}/sql'.format(api_root), json={'query': sql}, headers=headers) @@ -25,25 +26,34 @@ def execute_pql(pql): return r.json() def prepare_data(ts): - execute_sql(""" + for t in [table, table2]: + execute_sql(""" CREATE TABLE if not exists `{}` ( `t` timestamp NOT NULL, `tag1` string TAG, `tag2` string TAG, `value` double NOT NULL, - `value2` double NOT NULL, + `VALUE2` double NOT NULL, timestamp KEY (t) ); - """.format(table)) + """.format(t)) execute_sql(""" -insert into {}(t, tag1, tag2, value, value2) +insert into {}(t, tag1, tag2, value, VALUE2) values ({}, "v1", "v2", 1, 2), ({}, "v1", "v2", 11, 22) ; """.format(table, ts-5000, ts)) + execute_sql(""" +insert into {}(t, tag1, tag2, value, VALUE2) +values +({}, "v1", "v2", 10, 20), +({}, "v1", "v2", 110, 220) + ; + """.format(table2, ts-5000, ts)) + def remote_query(ts): ts = ts/1000 # prom return seconds @@ -64,6 +74,16 @@ def remote_query(ts): result = r['data']['result'] assert result == [] + # uppercase field + r = execute_pql(table + '{tag1="v1",__ceresdb_field__="VALUE2"}[5m]') + result = r['data']['result'] + assert result == [{'metric': {'__name__': table, 'tag1': 'v1', 'tag2': 'v2'}, 'values': [[ts-5, '2'], [ts, '22']]}] + + # uppercase table + r = execute_pql(table2 + '{tag1="v1"}[5m]') + result = r['data']['result'] + assert result == [{'metric': {'__name__': table2, 'tag1': 'v1', 'tag2': 'v2'}, 'values': [[ts-5, '10'], [ts, '110']]}] + def main(): ts = now() prepare_data(ts) diff --git a/integration_tests/prom/run-tests.sh b/integration_tests/prom/run-tests.sh index c3cf6a2830..289185424d 100755 --- a/integration_tests/prom/run-tests.sh +++ b/integration_tests/prom/run-tests.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash VERSION=prometheus-2.43.0.linux-amd64 -wget "https://github.com/prometheus/prometheus/releases/download/v2.43.0/${VERSION}.tar.gz" +wget -q "https://github.com/prometheus/prometheus/releases/download/v2.43.0/${VERSION}.tar.gz" tar xvf prometheus*.tar.gz nohup ./${VERSION}/prometheus --config.file ./prometheus.yml & diff --git a/query_frontend/src/promql/convert.rs b/query_frontend/src/promql/convert.rs index cc1601f6d1..6d8231a045 100644 --- a/query_frontend/src/promql/convert.rs +++ b/query_frontend/src/promql/convert.rs @@ -12,11 +12,12 @@ use common_types::{ }; use datafusion::{ logical_expr::{ - avg, col, count, lit, + avg, count, lit, logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder}, max, min, sum, Expr as DataFusionExpr, }, optimizer::utils::conjunction, + prelude::ident, sql::planner::ContextProvider, }; use snafu::{ensure, OptionExt, ResultExt}; @@ -286,9 +287,12 @@ impl Expr { }; let aggr_expr = Self::aggr_op_expr(&op, &column_name.field, column_name.field.clone())?; - let tag_exprs = groupby_columns.iter().map(|v| col(*v)).collect::>(); + let tag_exprs = groupby_columns + .iter() + .map(|v| ident(*v)) + .collect::>(); let udf_args = tag_exprs.clone(); - let mut groupby_expr = vec![col(&column_name.timestamp)]; + let mut groupby_expr = vec![ident(&column_name.timestamp)]; groupby_expr.extend(udf_args); let unique_id_expr = // TSID is lost after aggregate, but PromAlignNode need a unique id, so @@ -302,16 +306,16 @@ impl Expr { ); let mut projection = tag_exprs.clone(); projection.extend(vec![ - col(&column_name.timestamp), - col(&column_name.field), + ident(&column_name.timestamp), + ident(&column_name.field), unique_id_expr.clone(), ]); let sort_exprs = if tag_exprs.is_empty() { - vec![col(&column_name.timestamp).sort(true, true)] + vec![ident(&column_name.timestamp).sort(true, true)] } else { vec![ unique_id_expr.sort(true, true), - col(&column_name.timestamp).sort(true, true), + ident(&column_name.timestamp).sort(true, true), ] }; let builder = LogicalPlanBuilder::from(sub_plan); @@ -333,11 +337,11 @@ impl Expr { fn aggr_op_expr(aggr_op: &str, field: &str, alias: String) -> Result { let expr = match aggr_op { - "sum" => sum(col(field)), - "max" => max(col(field)), - "min" => min(col(field)), - "count" => count(col(field)), - "avg" => avg(col(field)), + "sum" => sum(ident(field)), + "max" => max(ident(field)), + "min" => min(ident(field)), + "count" => count(ident(field)), + "avg" => avg(ident(field)), _ => { return InvalidExpr { msg: format!("aggr {aggr_op} not supported now"), @@ -473,7 +477,7 @@ pub struct Filter { impl From for DataFusionExpr { fn from(mut f: Filter) -> DataFusionExpr { - let tag_key = col(&f.tag_key); + let tag_key = ident(&f.tag_key); // TODO(chenxiang): only compare first op now let mut first_op = f.operators.remove(0); match first_op.typ { @@ -608,21 +612,21 @@ impl Selector { .filter_map(|column| { if column.is_tag { tag_keys.push(column.name.clone()); - Some(col(&column.name)) + Some(ident(&column.name)) } else { None } }) .collect::>(); - let timestamp_expr = col(&schema.column(schema.timestamp_index()).name); + let timestamp_expr = ident(&schema.column(schema.timestamp_index()).name); let tsid_expr = schema .tsid_column() - .map(|c| col(&c.name)) + .map(|c| ident(&c.name)) .context(InvalidExpr { msg: format!("{TSID_COLUMN} not found"), })?; - let field_expr = col(field); + let field_expr = ident(field); projection.extend(vec![timestamp_expr, tsid_expr, field_expr]); Ok((projection, tag_keys)) diff --git a/query_frontend/src/promql/remote.rs b/query_frontend/src/promql/remote.rs index fa0553e92e..fad2816b4e 100644 --- a/query_frontend/src/promql/remote.rs +++ b/query_frontend/src/promql/remote.rs @@ -46,15 +46,8 @@ pub fn remote_query_to_plan( ) -> Result { let (metric, field, mut filters) = normalize_matchers(query.matchers)?; - // get table schema - let table_ref = meta_provider - .table(TableReference::parse_str(&metric)) - .context(MetaProviderError { - msg: format!("Failed to find table, name:{metric}"), - })? - .context(TableNotFound { name: &metric })?; let table_provider = meta_provider - .get_table_provider(table_ref.name().into()) + .get_table_provider(TableReference::bare(&metric)) .context(TableProviderNotFound { name: &metric })?; let schema = Schema::try_from(table_provider.schema()).context(BuildTableSchema)?; let timestamp_col_name = schema.timestamp_name();