Skip to content

Commit

Permalink
fix: add sqlness tests for some promql function (#1838)
Browse files Browse the repository at this point in the history
* correct range manipulate exec fmt text

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix partition requirement

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix udf signature

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finilise

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ignore unstable ordered result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add nan value test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Jun 27, 2023
1 parent 99f0479 commit b737a24
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl ExecutionPlan for RangeManipulateExec {
DisplayFormatType::Default => {
write!(
f,
"PromInstantManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
"PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
self.start, self.end, self.interval, self.range, self.time_index_column
)
}
Expand Down
19 changes: 15 additions & 4 deletions src/promql/src/extension_plan/series_divide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Statistics,
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datatypes::arrow::compute;
use futures::{ready, Stream, StreamExt};
Expand Down Expand Up @@ -129,9 +129,15 @@ impl ExecutionPlan for SeriesDivideExec {
}

fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
Partitioning::UnknownPartitioning(1)
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}

// TODO(ruihang): specify required input ordering

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
Expand Down Expand Up @@ -229,7 +235,7 @@ impl Stream for SeriesDivideStream {
loop {
if let Some(batch) = self.buffer.clone() {
let same_length = self.find_first_diff_row(&batch) + 1;
if same_length == batch.num_rows() {
if same_length >= batch.num_rows() {
let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(batch)) => batch,
None => {
Expand Down Expand Up @@ -277,6 +283,11 @@ impl SeriesDivideStream {
}

fn find_first_diff_row(&self, batch: &RecordBatch) -> usize {
// fast path: no tag columns means all data belongs to the same series.
if self.tag_indices.is_empty() {
return batch.num_rows();
}

let num_rows = batch.num_rows();
let mut result = num_rows;

Expand Down
3 changes: 1 addition & 2 deletions src/promql/src/functions/quantile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl QuantileOverTime {

fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input.
// The third one is quantile param, which is included in fields.
assert_eq!(input.len(), 3);
assert_eq!(input.len(), 2);
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;

Expand Down
27 changes: 14 additions & 13 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashSet};
use std::collections::{BTreeSet, HashSet, VecDeque};
use std::str::FromStr;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -833,9 +833,10 @@ impl PromPlanner {
fn create_function_expr(
&mut self,
func: &Function,
mut other_input_exprs: Vec<DfExpr>,
other_input_exprs: Vec<DfExpr>,
) -> Result<Vec<DfExpr>> {
// TODO(ruihang): check function args list
let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();

// TODO(ruihang): set this according to in-param list
let field_column_pos = 0;
Expand Down Expand Up @@ -865,8 +866,8 @@ impl PromPlanner {
"stddev_over_time" => ScalarFunc::Udf(StddevOverTime::scalar_udf()),
"stdvar_over_time" => ScalarFunc::Udf(StdvarOverTime::scalar_udf()),
"quantile_over_time" => {
let quantile_expr = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => *quantile,
let quantile_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as quantile, but found {:?}", other),
}
Expand All @@ -875,8 +876,8 @@ impl PromPlanner {
ScalarFunc::Udf(QuantileOverTime::scalar_udf(quantile_expr))
}
"predict_linear" => {
let t_expr = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => *t,
let t_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => t,
other => UnexpectedPlanExprSnafu {
desc: format!("expect i64 literal as t, but found {:?}", other),
}
Expand All @@ -885,8 +886,8 @@ impl PromPlanner {
ScalarFunc::Udf(PredictLinear::scalar_udf(t_expr))
}
"holt_winters" => {
let sf_exp = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => *sf,
let sf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf,
other => UnexpectedPlanExprSnafu {
desc: format!(
"expect f64 literal as smoothing factor, but found {:?}",
Expand All @@ -895,8 +896,8 @@ impl PromPlanner {
}
.fail()?,
};
let tf_exp = match other_input_exprs.get(1) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => *tf,
let tf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as trend factor, but found {:?}", other),
}
Expand Down Expand Up @@ -924,7 +925,7 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos, col_expr);
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
fun,
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos);
Expand All @@ -939,7 +940,7 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos + 1, col_expr);
let fn_expr = DfExpr::ScalarUDF(ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos + 1);
Expand All @@ -957,7 +958,7 @@ impl PromPlanner {
.insert(field_column_pos + 2, self.create_time_index_column_expr()?);
let fn_expr = DfExpr::ScalarUDF(ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.remove(field_column_pos + 2);
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/distributed/tql-explain-analyze/analyze.result
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ TQL ANALYZE (0, 10, '5s') test;
| Plan with Metrics | CoalescePartitionsExec, REDACTED
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+
Expand Down
8 changes: 4 additions & 4 deletions tests/cases/distributed/tql-explain-analyze/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ TQL EXPLAIN (0, 10, '5s') test;
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | RepartitionExec: partitioning=REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
Expand Down Expand Up @@ -58,8 +58,8 @@ TQL EXPLAIN host_load1{__field__="val"};
| | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | RepartitionExec: partitioning=REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] |
| | MergeScanExec: peers=[REDACTED
| | |
Expand Down
31 changes: 31 additions & 0 deletions tests/cases/standalone/common/insert/special_value.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
create table data (ts timestamp(3) time index, val double);

Affected Rows: 0

insert into data values
(0, 'infinity'::double),
(1, '-infinity'::double),
(2, 'nan'::double),
(3, 'NaN'::double);

Affected Rows: 4

select * from data;

+-------------------------+------+
| ts | val |
+-------------------------+------+
| 1970-01-01T00:00:00 | inf |
| 1970-01-01T00:00:00.001 | -inf |
| 1970-01-01T00:00:00.002 | NaN |
| 1970-01-01T00:00:00.003 | NaN |
+-------------------------+------+

insert into data values (4, 'infinityyyy'::double);

Error: 3001(EngineExecuteQuery), Cast error: Cannot cast string 'infinityyyy' to value of Float64 type

drop table data;

Affected Rows: 1

13 changes: 13 additions & 0 deletions tests/cases/standalone/common/insert/special_value.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
create table data (ts timestamp(3) time index, val double);

insert into data values
(0, 'infinity'::double),
(1, '-infinity'::double),
(2, 'nan'::double),
(3, 'NaN'::double);

select * from data;

insert into data values (4, 'infinityyyy'::double);

drop table data;
132 changes: 132 additions & 0 deletions tests/cases/standalone/common/tql/aggr_over_time.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
create table metric (ts timestamp(3) time index, val double);

Affected Rows: 0

insert into metric values
(0,0),
(10000,8),
(20000,8),
(30000,2),
(40000,3);

Affected Rows: 5

select * from metric;

+---------------------+-----+
| ts | val |
+---------------------+-----+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:10 | 8.0 |
| 1970-01-01T00:00:20 | 8.0 |
| 1970-01-01T00:00:30 | 2.0 |
| 1970-01-01T00:00:40 | 3.0 |
+---------------------+-----+

tql eval (60, 61, '10s') stdvar_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stdvar_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 10.559999999999999 |
+---------------------+-------------------------------------+

tql eval (60, 60, '1s') stddev_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 3.249615361854384 |
+---------------------+-------------------------------------+

tql eval (60, 60, '1s') stddev_over_time((metric[1m]));

+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 3.249615361854384 |
+---------------------+-------------------------------------+

drop table metric;

Affected Rows: 1

create table metric (ts timestamp(3) time index, val double);

Affected Rows: 0

insert into metric values
(0,0),
(10000,1.5990505637277868),
(20000,1.5990505637277868),
(30000,1.5990505637277868);

Affected Rows: 4

tql eval (60, 60, '1s') stdvar_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stdvar_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 0.47943050725465364 |
+---------------------+-------------------------------------+

tql eval (60, 60, '1s') stddev_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 0.6924092050620454 |
+---------------------+-------------------------------------+

drop table metric;

Affected Rows: 1

create table data (ts timestamp(3) time index, val double, test string primary key);

Affected Rows: 0

insert into data values
(0, 0, "two samples"),
(10000, 1, "two samples"),
(0, 0, "three samples"),
(10000, 1, "three samples"),
(20000, 2, "three samples"),
(0, 0, "uneven samples"),
(10000, 1, "uneven samples"),
(20000, 4, "uneven samples");

Affected Rows: 8

drop table data;

Affected Rows: 1

create table data (ts timestamp(3) time index, val double, ty string primary key);

Affected Rows: 0

insert into data values
(0, 2::double, 'numbers'),
(10000, 0::double, 'numbers'),
(20000, 3::double, 'numbers'),
(0, 2::double, 'some_nan'),
(10000, 0::double, 'some_nan'),
(20000, 'NaN'::double, 'some_nan'),
(0, 2::double, 'some_nan2'),
(10000, 'NaN'::double, 'some_nan2'),
(20000, 1::double, 'some_nan2'),
(0, 'NaN'::double, 'some_nan3'),
(10000, 0::double, 'some_nan3'),
(20000, 1::double, 'some_nan3'),
(0, 'NaN'::double, 'only_nan'),
(10000, 'NaN'::double, 'only_nan'),
(20000, 'NaN'::double, 'only_nan');

Affected Rows: 15

drop table data;

Affected Rows: 1

Loading

0 comments on commit b737a24

Please sign in to comment.