Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reserve column case when build plan #901

Merged
merged 3 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ build-ceresdb:
build-test:
cargo build

build: build-ceresdb build-test build-meta
build: build-ceresdb build-test
Rachelint marked this conversation as resolved.
Show resolved Hide resolved

kill-old-process:
killall ceresdb-server | true
killall ceresmeta | true

prepare: clean build kill-old-process

run: prepare
run: prepare build-meta
$(CERESDB_TEST_BINARY)

run-local: prepare
CERESDB_ENV_FILTER=local $(CERESDB_TEST_BINARY)

run-cluster: prepare
run-cluster: prepare build-meta
CERESDB_ENV_FILTER=cluster $(CERESDB_TEST_BINARY)

run-java:
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/build_meta.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

set -exo
set -e

SRC=/tmp/ceresmeta-src
TARGET=$(pwd)/ceresmeta
Expand Down
28 changes: 24 additions & 4 deletions integration_tests/prom/remote-query.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def now():
return int(time.time()) * 1000

table = 'prom_remote_query_test' + str(now())
table2 = 'PROM_REMOTE_QUERY_TEST' + str(now())

def execute_sql(sql):
r = requests.post('{}/sql'.format(api_root), json={'query': sql}, headers=headers)
Expand All @@ -25,25 +26,34 @@ def execute_pql(pql):
return r.json()

def prepare_data(ts):
execute_sql("""
for t in [table, table2]:
execute_sql("""
CREATE TABLE if not exists `{}` (
`t` timestamp NOT NULL,
`tag1` string TAG,
`tag2` string TAG,
`value` double NOT NULL,
`value2` double NOT NULL,
`VALUE2` double NOT NULL,
timestamp KEY (t)
);
""".format(table))
""".format(t))

execute_sql("""
insert into {}(t, tag1, tag2, value, value2)
insert into {}(t, tag1, tag2, value, VALUE2)
values
({}, "v1", "v2", 1, 2),
({}, "v1", "v2", 11, 22)
;
""".format(table, ts-5000, ts))

execute_sql("""
insert into {}(t, tag1, tag2, value, VALUE2)
values
({}, "v1", "v2", 10, 20),
({}, "v1", "v2", 110, 220)
;
""".format(table2, ts-5000, ts))


def remote_query(ts):
ts = ts/1000 # prom return seconds
Expand All @@ -64,6 +74,16 @@ def remote_query(ts):
result = r['data']['result']
assert result == []

# uppercase field
r = execute_pql(table + '{tag1="v1",__ceresdb_field__="VALUE2"}[5m]')
result = r['data']['result']
assert result == [{'metric': {'__name__': table, 'tag1': 'v1', 'tag2': 'v2'}, 'values': [[ts-5, '2'], [ts, '22']]}]

# uppercase table
r = execute_pql(table2 + '{tag1="v1"}[5m]')
result = r['data']['result']
assert result == [{'metric': {'__name__': table2, 'tag1': 'v1', 'tag2': 'v2'}, 'values': [[ts-5, '10'], [ts, '110']]}]

def main():
ts = now()
prepare_data(ts)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/prom/run-tests.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash

VERSION=prometheus-2.43.0.linux-amd64
wget "https://github.com/prometheus/prometheus/releases/download/v2.43.0/${VERSION}.tar.gz"
wget -q "https://github.com/prometheus/prometheus/releases/download/v2.43.0/${VERSION}.tar.gz"

tar xvf prometheus*.tar.gz
nohup ./${VERSION}/prometheus --config.file ./prometheus.yml &
Expand Down
38 changes: 21 additions & 17 deletions query_frontend/src/promql/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use common_types::{
};
use datafusion::{
logical_expr::{
avg, col, count, lit,
avg, count, lit,
logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder},
max, min, sum, Expr as DataFusionExpr,
},
optimizer::utils::conjunction,
prelude::ident,
sql::planner::ContextProvider,
};
use snafu::{ensure, OptionExt, ResultExt};
Expand Down Expand Up @@ -286,9 +287,12 @@ impl Expr {
};
let aggr_expr =
Self::aggr_op_expr(&op, &column_name.field, column_name.field.clone())?;
let tag_exprs = groupby_columns.iter().map(|v| col(*v)).collect::<Vec<_>>();
let tag_exprs = groupby_columns
.iter()
.map(|v| ident(*v))
.collect::<Vec<_>>();
let udf_args = tag_exprs.clone();
let mut groupby_expr = vec![col(&column_name.timestamp)];
let mut groupby_expr = vec![ident(&column_name.timestamp)];
groupby_expr.extend(udf_args);
let unique_id_expr =
// TSID is lost after aggregate, but PromAlignNode need a unique id, so
Expand All @@ -302,16 +306,16 @@ impl Expr {
);
let mut projection = tag_exprs.clone();
projection.extend(vec![
col(&column_name.timestamp),
col(&column_name.field),
ident(&column_name.timestamp),
ident(&column_name.field),
unique_id_expr.clone(),
]);
let sort_exprs = if tag_exprs.is_empty() {
vec![col(&column_name.timestamp).sort(true, true)]
vec![ident(&column_name.timestamp).sort(true, true)]
} else {
vec![
unique_id_expr.sort(true, true),
col(&column_name.timestamp).sort(true, true),
ident(&column_name.timestamp).sort(true, true),
]
};
let builder = LogicalPlanBuilder::from(sub_plan);
Expand All @@ -333,11 +337,11 @@ impl Expr {

fn aggr_op_expr(aggr_op: &str, field: &str, alias: String) -> Result<DataFusionExpr> {
let expr = match aggr_op {
"sum" => sum(col(field)),
"max" => max(col(field)),
"min" => min(col(field)),
"count" => count(col(field)),
"avg" => avg(col(field)),
"sum" => sum(ident(field)),
"max" => max(ident(field)),
"min" => min(ident(field)),
"count" => count(ident(field)),
"avg" => avg(ident(field)),
_ => {
return InvalidExpr {
msg: format!("aggr {aggr_op} not supported now"),
Expand Down Expand Up @@ -473,7 +477,7 @@ pub struct Filter {

impl From<Filter> for DataFusionExpr {
fn from(mut f: Filter) -> DataFusionExpr {
let tag_key = col(&f.tag_key);
let tag_key = ident(&f.tag_key);
// TODO(chenxiang): only compare first op now
let mut first_op = f.operators.remove(0);
match first_op.typ {
Expand Down Expand Up @@ -608,21 +612,21 @@ impl Selector {
.filter_map(|column| {
if column.is_tag {
tag_keys.push(column.name.clone());
Some(col(&column.name))
Some(ident(&column.name))
} else {
None
}
})
.collect::<Vec<_>>();

let timestamp_expr = col(&schema.column(schema.timestamp_index()).name);
let timestamp_expr = ident(&schema.column(schema.timestamp_index()).name);
let tsid_expr = schema
.tsid_column()
.map(|c| col(&c.name))
.map(|c| ident(&c.name))
.context(InvalidExpr {
msg: format!("{TSID_COLUMN} not found"),
})?;
let field_expr = col(field);
let field_expr = ident(field);
projection.extend(vec![timestamp_expr, tsid_expr, field_expr]);

Ok((projection, tag_keys))
Expand Down
9 changes: 1 addition & 8 deletions query_frontend/src/promql/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,8 @@ pub fn remote_query_to_plan<P: MetaProvider>(
) -> Result<RemoteQueryPlan> {
let (metric, field, mut filters) = normalize_matchers(query.matchers)?;

// get table schema
let table_ref = meta_provider
.table(TableReference::parse_str(&metric))
.context(MetaProviderError {
msg: format!("Failed to find table, name:{metric}"),
})?
.context(TableNotFound { name: &metric })?;
let table_provider = meta_provider
.get_table_provider(table_ref.name().into())
.get_table_provider(TableReference::bare(&metric))
.context(TableProviderNotFound { name: &metric })?;
let schema = Schema::try_from(table_provider.schema()).context(BuildTableSchema)?;
let timestamp_col_name = schema.timestamp_name();
Expand Down