diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index e2af9b654d9e..c32164790b90 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; +use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::Display; @@ -21,8 +23,8 @@ use std::task::{Context, Poll}; use std::time::Duration; use ahash::RandomState; -use arrow::compute::{self, cast_with_options, CastOptions}; -use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow::compute::{self, cast_with_options, CastOptions, SortColumn}; +use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit}; use common_query::DfPhysicalPlan; use common_recordbatch::DfSendableRecordBatchStream; use datafusion::common::{Result as DataFusionResult, Statistics}; @@ -35,10 +37,14 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, }; use datafusion::physical_planner::create_physical_sort_expr; -use datafusion_common::utils::get_arrayref_at_indices; +use datafusion_common::utils::{get_arrayref_at_indices, get_row_at_idx}; use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, ScalarValue}; -use datafusion_expr::utils::exprlist_to_fields; -use datafusion_expr::{Accumulator, Expr, ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion_expr::utils::{exprlist_to_fields, COUNT_STAR_EXPANSION}; +use datafusion_expr::{ + lit, Accumulator, AggregateFunction, Expr, ExprSchemable, LogicalPlan, + UserDefinedLogicalNodeCore, +}; +use datafusion_physical_expr::aggregate::utils::down_cast_any_ref; use datafusion_physical_expr::expressions::create_aggregate_expr as create_aggr_expr; use datafusion_physical_expr::hash_utils::create_hashes; use datafusion_physical_expr::{ @@ -58,6 +64,140 @@ use crate::error::{DataFusionSnafu, RangeQuerySnafu, Result}; type Millisecond = ::Native; +/// Implementation of `first_value`/`last_value` +/// aggregate function adapted to range query +#[derive(Debug)] +struct RangeFirstListValue { + /// calculate expr + expr: Arc, + order_bys: Vec, +} + +impl RangeFirstListValue { + pub fn new_aggregate_expr( + expr: Arc, + order_bys: Vec, + ) -> Arc { + Arc::new(Self { expr, order_bys }) + } +} + +impl PartialEq for RangeFirstListValue { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self.expr.eq(&x.expr) && self.order_bys.iter().eq(x.order_bys.iter())) + .unwrap_or(false) + } +} + +impl AggregateExpr for RangeFirstListValue { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn create_accumulator(&self) -> DataFusionResult> { + Ok(Box::new(RangeFirstListValueAcc::new( + self.order_bys.iter().map(|order| order.options).collect(), + ))) + } + + fn expressions(&self) -> Vec> { + let mut exprs: Vec<_> = self + .order_bys + .iter() + .map(|order| order.expr.clone()) + .collect(); + exprs.push(self.expr.clone()); + exprs + } + + fn field(&self) -> DataFusionResult { + unreachable!("AggregateExpr::field will not be used in range query") + } + + fn state_fields(&self) -> DataFusionResult> { + unreachable!("AggregateExpr::state_fields will not be used in range query") + } +} + +#[derive(Debug)] +pub struct RangeFirstListValueAcc { + pub sort_options: Vec, + pub sort_columns: Vec, + pub data: Option, +} + +impl RangeFirstListValueAcc { + pub fn new(sort_options: Vec) -> Self { + Self { + sort_options, + sort_columns: vec![], + data: None, + } + } +} + +impl Accumulator for RangeFirstListValueAcc { + fn update_batch(&mut self, values: &[ArrayRef]) -> DataFusionResult<()> { + let columns: Vec<_> = values + .iter() + .zip(self.sort_options.iter()) + .map(|(v, s)| SortColumn { + values: v.clone(), + options: Some(*s), + }) + .collect(); + // finding the Top1 problem with complexity O(n) + let idx = compute::lexsort_to_indices(&columns, Some(1))?.value(0); + let vs = get_row_at_idx(values, idx as usize)?; + let need_update = self.data.is_none() + || vs + .iter() + .zip(self.sort_columns.iter()) + .zip(self.sort_options.iter()) + .find_map(|((new_value, old_value), sort_option)| { + if new_value.is_null() && old_value.is_null() { + None + } else if sort_option.nulls_first + && (new_value.is_null() || old_value.is_null()) + { + Some(new_value.is_null()) + } else { + new_value.partial_cmp(old_value).map(|x| { + (x == Ordering::Greater && sort_option.descending) + || (x == Ordering::Less && !sort_option.descending) + }) + } + }) + .unwrap_or(false); + if need_update { + self.sort_columns = vs; + self.data = Some(ScalarValue::try_from_array( + &values[self.sort_options.len()], + idx as usize, + )?); + } + Ok(()) + } + + fn evaluate(&self) -> DataFusionResult { + Ok(self.data.clone().unwrap_or(ScalarValue::Null)) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } + + fn state(&self) -> DataFusionResult> { + unreachable!("Accumulator::state will not be used in range query") + } + + fn merge_batch(&mut self, _states: &[ArrayRef]) -> DataFusionResult<()> { + unreachable!("Accumulator::merge_batch will not be used in range query") + } +} + #[derive(PartialEq, Eq, Debug, Hash, Clone)] pub enum Fill { Null, @@ -271,6 +411,7 @@ pub struct RangeSelect { pub align: Duration, pub align_to: i64, pub time_index: String, + pub time_expr: Expr, pub by: Vec, pub schema: DFSchemaRef, pub by_schema: DFSchemaRef, @@ -382,6 +523,7 @@ impl RangeSelect { align, align_to, time_index: time_index_name, + time_expr: time_index, schema, by_schema, by, @@ -440,6 +582,7 @@ impl UserDefinedLogicalNodeCore for RangeSelect { range_expr: self.range_expr.clone(), input: Arc::new(inputs[0].clone()), time_index: self.time_index.clone(), + time_expr: self.time_expr.clone(), schema: self.schema.clone(), by: self.by.clone(), by_schema: self.by_schema.clone(), @@ -452,6 +595,7 @@ impl UserDefinedLogicalNodeCore for RangeSelect { impl RangeSelect { fn create_physical_expr_list( &self, + is_count_aggr: bool, exprs: &[Expr], df_schema: &Arc, schema: &Schema, @@ -459,7 +603,20 @@ impl RangeSelect { ) -> DfResult>> { exprs .iter() - .map(|by| create_physical_expr(by, df_schema, schema, session_state.execution_props())) + .map(|e| match e { + // `count(*)` will be rewritten by `CountWildcardRule` into `count(1)` when optimizing logical plan. + // The modification occurs after range plan rewrite. + // At this time, aggregate plan has been replaced by a custom range plan, + // so `CountWildcardRule` has not been applied. + // We manually modify it when creating the physical plan. + Expr::Wildcard if is_count_aggr => create_physical_expr( + &lit(COUNT_STAR_EXPANSION), + df_schema, + schema, + session_state.execution_props(), + ), + _ => create_physical_expr(e, df_schema, schema, session_state.execution_props()), + }) .collect::>>() } @@ -488,6 +645,72 @@ impl RangeSelect { .iter() .map(|range_fn| { let expr = match &range_fn.expr { + Expr::AggregateFunction(aggr) + if aggr.fun == AggregateFunction::FirstValue + || aggr.fun == AggregateFunction::LastValue => + { + // Because we only need to find the first_value/last_value, + // the complexity of sorting the entire batch is O(nlogn). + // We can sort the batch with limit 1. + // In this case, the algorithm degenerates into finding the Top1 problem with complexity O(n). + // We need reverse the sort order of last_value to correctly apply limit 1 when sorting. + let order_by = if let Some(exprs) = &aggr.order_by { + exprs + .iter() + .map(|x| { + create_physical_sort_expr( + x, + input_dfschema, + &input_schema, + session_state.execution_props(), + ) + .map(|expr| { + // reverse the last_value sort + if aggr.fun == AggregateFunction::LastValue { + PhysicalSortExpr { + expr: expr.expr, + options: SortOptions { + descending: !expr.options.descending, + nulls_first: !expr.options.nulls_first, + }, + } + } else { + expr + } + }) + }) + .collect::>>()? + } else { + // if user not assign order by, time index is needed as default ordering + let time_index = create_physical_expr( + &self.time_expr, + input_dfschema, + &input_schema, + session_state.execution_props(), + )?; + vec![PhysicalSortExpr { + expr: time_index, + options: SortOptions { + descending: aggr.fun == AggregateFunction::LastValue, + nulls_first: false, + }, + }] + }; + let arg = self.create_physical_expr_list( + false, + &aggr.args, + input_dfschema, + &input_schema, + session_state, + )?; + // first_value/last_value has only one param. + // The param have been checked by datafusion in logical plan stage. + // We can safely assume that there is only one element here. + Ok(RangeFirstListValue::new_aggregate_expr( + arg[0].clone(), + order_by, + )) + } Expr::AggregateFunction(aggr) => { let order_by = if let Some(exprs) = &aggr.order_by { exprs @@ -508,6 +731,7 @@ impl RangeSelect { &aggr.fun, false, &self.create_physical_expr_list( + aggr.fun == AggregateFunction::Count, &aggr.args, input_dfschema, &input_schema, @@ -523,6 +747,7 @@ impl RangeSelect { let expr = create_aggr_udf_expr( &aggr_udf.fun, &self.create_physical_expr_list( + false, &aggr_udf.args, input_dfschema, &input_schema, @@ -564,6 +789,7 @@ impl RangeSelect { align: self.align.as_millis() as Millisecond, align_to: self.align_to, by: self.create_physical_expr_list( + false, &self.by, input_dfschema, &input_schema, @@ -1447,4 +1673,44 @@ mod test { Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap(); assert_eq!(test, test1); } + + #[test] + fn test_fist_last_accumulator() { + let mut acc = RangeFirstListValueAcc::new(vec![ + SortOptions { + descending: true, + nulls_first: false, + }, + SortOptions { + descending: false, + nulls_first: true, + }, + ]); + let batch1: Vec> = vec![ + Arc::new(nullable_array!(Float64; + 0.0, null, 0.0, null, 1.0 + )), + Arc::new(nullable_array!(Float64; + 5.0, null, 4.0, null, 3.0 + )), + Arc::new(nullable_array!(Int64; + 1, 2, 3, 4, 5 + )), + ]; + let batch2: Vec> = vec![ + Arc::new(nullable_array!(Float64; + 3.0, 3.0, 3.0, 3.0, 3.0 + )), + Arc::new(nullable_array!(Float64; + null,3.0, 3.0, 3.0, 3.0 + )), + Arc::new(nullable_array!(Int64; + 6, 7, 8, 9, 10 + )), + ]; + acc.update_batch(&batch1).unwrap(); + assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(5))); + acc.update_batch(&batch2).unwrap(); + assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(6))); + } } diff --git a/tests/cases/standalone/common/range/special_aggr.result b/tests/cases/standalone/common/range/special_aggr.result new file mode 100644 index 000000000000..b3eff8f5a9f8 --- /dev/null +++ b/tests/cases/standalone/common/range/special_aggr.result @@ -0,0 +1,253 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + addon BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 0, 1), + (1000, 'host1', 1, 2), + (2000, 'host1', 2, 3), + (5000, 'host1', null, 4), + (6000, 'host1', null, 5), + (7000, 'host1', null, 6), + (10000, 'host1', null, 7), + (11000, 'host1', 4, 8), + (12000, 'host1', 5, 9), + (15000, 'host1', 6, 10), + (16000, 'host1', null, 11), + (17000, 'host1', 7, 12), + (20000, 'host1', 8, 13), + (21000, 'host1', 9, 14), + (22000, 'host1', null, 15), + (0, 'host2', 0, 16), + (1000, 'host2', 1, 17), + (2000, 'host2', 2, 18), + (5000, 'host2', null, 19), + (6000, 'host2', null, 20), + (7000, 'host2', null, 21), + (10000, 'host2', null, 22), + (11000, 'host2', 4, 23), + (12000, 'host2', 5, 24), + (15000, 'host2', 6, 25), + (16000, 'host2', null, 26), + (17000, 'host2', 7, 27), + (20000, 'host2', 8, 28), + (21000, 'host2', 9, 29), + (22000, 'host2', null, 30); + +Affected Rows: 30 + +SELECT ts, host, first_value(val) RANGE '5s', last_value(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+------------------------------------------+-----------------------------------------+ +| ts | host | FIRST_VALUE(host.val) RANGE 5s FILL NULL | LAST_VALUE(host.val) RANGE 5s FILL NULL | ++---------------------+-------+------------------------------------------+-----------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | 2 | +| 1970-01-01T00:00:05 | host1 | | | +| 1970-01-01T00:00:10 | host1 | | 5 | +| 1970-01-01T00:00:15 | host1 | 6 | 7 | +| 1970-01-01T00:00:20 | host1 | 8 | | +| 1970-01-01T00:00:00 | host2 | 0 | 2 | +| 1970-01-01T00:00:05 | host2 | | | +| 1970-01-01T00:00:10 | host2 | | 5 | +| 1970-01-01T00:00:15 | host2 | 6 | 7 | +| 1970-01-01T00:00:20 | host2 | 8 | | ++---------------------+-------+------------------------------------------+-----------------------------------------+ + +SELECT ts, host, first_value(addon ORDER BY val DESC) RANGE '5s', last_value(addon ORDER BY val DESC) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+---------------------------------------------------------------------------------+--------------------------------------------------------------------------------+ +| ts | host | FIRST_VALUE(host.addon) ORDER BY [host.val DESC NULLS FIRST] RANGE 5s FILL NULL | LAST_VALUE(host.addon) ORDER BY [host.val DESC NULLS FIRST] RANGE 5s FILL NULL | ++---------------------+-------+---------------------------------------------------------------------------------+--------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | 1 | +| 1970-01-01T00:00:05 | host1 | 4 | 4 | +| 1970-01-01T00:00:10 | host1 | 7 | 8 | +| 1970-01-01T00:00:15 | host1 | 11 | 10 | +| 1970-01-01T00:00:20 | host1 | 15 | 13 | +| 1970-01-01T00:00:00 | host2 | 18 | 16 | +| 1970-01-01T00:00:05 | host2 | 19 | 19 | +| 1970-01-01T00:00:10 | host2 | 22 | 23 | +| 1970-01-01T00:00:15 | host2 | 26 | 25 | +| 1970-01-01T00:00:20 | host2 | 30 | 28 | ++---------------------+-------+---------------------------------------------------------------------------------+--------------------------------------------------------------------------------+ + +SELECT ts, host, first_value(addon ORDER BY val DESC NULLS LAST) RANGE '5s', last_value(addon ORDER BY val DESC NULLS LAST) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+--------------------------------------------------------------------------------+-------------------------------------------------------------------------------+ +| ts | host | FIRST_VALUE(host.addon) ORDER BY [host.val DESC NULLS LAST] RANGE 5s FILL NULL | LAST_VALUE(host.addon) ORDER BY [host.val DESC NULLS LAST] RANGE 5s FILL NULL | ++---------------------+-------+--------------------------------------------------------------------------------+-------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | 1 | +| 1970-01-01T00:00:05 | host1 | 4 | 4 | +| 1970-01-01T00:00:10 | host1 | 9 | 7 | +| 1970-01-01T00:00:15 | host1 | 12 | 11 | +| 1970-01-01T00:00:20 | host1 | 14 | 15 | +| 1970-01-01T00:00:00 | host2 | 18 | 16 | +| 1970-01-01T00:00:05 | host2 | 19 | 19 | +| 1970-01-01T00:00:10 | host2 | 24 | 22 | +| 1970-01-01T00:00:15 | host2 | 27 | 26 | +| 1970-01-01T00:00:20 | host2 | 29 | 30 | ++---------------------+-------+--------------------------------------------------------------------------------+-------------------------------------------------------------------------------+ + +SELECT ts, host, first_value(addon ORDER BY val ASC) RANGE '5s', last_value(addon ORDER BY val ASC) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+-------------------------------------------------------------------------------+------------------------------------------------------------------------------+ +| ts | host | FIRST_VALUE(host.addon) ORDER BY [host.val ASC NULLS LAST] RANGE 5s FILL NULL | LAST_VALUE(host.addon) ORDER BY [host.val ASC NULLS LAST] RANGE 5s FILL NULL | ++---------------------+-------+-------------------------------------------------------------------------------+------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 1 | 3 | +| 1970-01-01T00:00:05 | host1 | 4 | 4 | +| 1970-01-01T00:00:10 | host1 | 8 | 7 | +| 1970-01-01T00:00:15 | host1 | 10 | 11 | +| 1970-01-01T00:00:20 | host1 | 13 | 15 | +| 1970-01-01T00:00:00 | host2 | 16 | 18 | +| 1970-01-01T00:00:05 | host2 | 19 | 19 | +| 1970-01-01T00:00:10 | host2 | 23 | 22 | +| 1970-01-01T00:00:15 | host2 | 25 | 26 | +| 1970-01-01T00:00:20 | host2 | 28 | 30 | ++---------------------+-------+-------------------------------------------------------------------------------+------------------------------------------------------------------------------+ + +SELECT ts, host, first_value(addon ORDER BY val ASC NULLS FIRST) RANGE '5s', last_value(addon ORDER BY val ASC NULLS FIRST) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+--------------------------------------------------------------------------------+-------------------------------------------------------------------------------+ +| ts | host | FIRST_VALUE(host.addon) ORDER BY [host.val ASC NULLS FIRST] RANGE 5s FILL NULL | LAST_VALUE(host.addon) ORDER BY [host.val ASC NULLS FIRST] RANGE 5s FILL NULL | ++---------------------+-------+--------------------------------------------------------------------------------+-------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 1 | 3 | +| 1970-01-01T00:00:05 | host1 | 4 | 4 | +| 1970-01-01T00:00:10 | host1 | 7 | 9 | +| 1970-01-01T00:00:15 | host1 | 11 | 12 | +| 1970-01-01T00:00:20 | host1 | 15 | 14 | +| 1970-01-01T00:00:00 | host2 | 16 | 18 | +| 1970-01-01T00:00:05 | host2 | 19 | 19 | +| 1970-01-01T00:00:10 | host2 | 22 | 24 | +| 1970-01-01T00:00:15 | host2 | 26 | 27 | +| 1970-01-01T00:00:20 | host2 | 30 | 29 | ++---------------------+-------+--------------------------------------------------------------------------------+-------------------------------------------------------------------------------+ + +SELECT ts, host, first_value(addon ORDER BY val ASC, ts ASC) RANGE '5s', last_value(addon ORDER BY val ASC, ts ASC) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+-------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+ +| ts | host | FIRST_VALUE(host.addon) ORDER BY [host.val ASC NULLS LAST, host.ts ASC NULLS LAST] RANGE 5s FILL NULL | LAST_VALUE(host.addon) ORDER BY [host.val ASC NULLS LAST, host.ts ASC NULLS LAST] RANGE 5s FILL NULL | ++---------------------+-------+-------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 1 | 3 | +| 1970-01-01T00:00:05 | host1 | 4 | 6 | +| 1970-01-01T00:00:10 | host1 | 8 | 7 | +| 1970-01-01T00:00:15 | host1 | 10 | 11 | +| 1970-01-01T00:00:20 | host1 | 13 | 15 | +| 1970-01-01T00:00:00 | host2 | 16 | 18 | +| 1970-01-01T00:00:05 | host2 | 19 | 21 | +| 1970-01-01T00:00:10 | host2 | 23 | 22 | +| 1970-01-01T00:00:15 | host2 | 25 | 26 | +| 1970-01-01T00:00:20 | host2 | 28 | 30 | ++---------------------+-------+-------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+ + +SELECT ts, host, count(val) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+------------------------------------+ +| ts | host | COUNT(host.val) RANGE 5s FILL NULL | ++---------------------+-------+------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | +| 1970-01-01T00:00:05 | host1 | 0 | +| 1970-01-01T00:00:10 | host1 | 2 | +| 1970-01-01T00:00:15 | host1 | 2 | +| 1970-01-01T00:00:20 | host1 | 2 | +| 1970-01-01T00:00:00 | host2 | 3 | +| 1970-01-01T00:00:05 | host2 | 0 | +| 1970-01-01T00:00:10 | host2 | 2 | +| 1970-01-01T00:00:15 | host2 | 2 | +| 1970-01-01T00:00:20 | host2 | 2 | ++---------------------+-------+------------------------------------+ + +SELECT ts, host, count(distinct val) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+---------------------------------------------+ +| ts | host | COUNT(DISTINCT host.val) RANGE 5s FILL NULL | ++---------------------+-------+---------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | +| 1970-01-01T00:00:05 | host1 | 0 | +| 1970-01-01T00:00:10 | host1 | 2 | +| 1970-01-01T00:00:15 | host1 | 2 | +| 1970-01-01T00:00:20 | host1 | 2 | +| 1970-01-01T00:00:00 | host2 | 3 | +| 1970-01-01T00:00:05 | host2 | 0 | +| 1970-01-01T00:00:10 | host2 | 2 | +| 1970-01-01T00:00:15 | host2 | 2 | +| 1970-01-01T00:00:20 | host2 | 2 | ++---------------------+-------+---------------------------------------------+ + +SELECT ts, host, count(*) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+-----------------------------+ +| ts | host | COUNT(*) RANGE 5s FILL NULL | ++---------------------+-------+-----------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | +| 1970-01-01T00:00:05 | host1 | 3 | +| 1970-01-01T00:00:10 | host1 | 3 | +| 1970-01-01T00:00:15 | host1 | 3 | +| 1970-01-01T00:00:20 | host1 | 3 | +| 1970-01-01T00:00:00 | host2 | 3 | +| 1970-01-01T00:00:05 | host2 | 3 | +| 1970-01-01T00:00:10 | host2 | 3 | +| 1970-01-01T00:00:15 | host2 | 3 | +| 1970-01-01T00:00:20 | host2 | 3 | ++---------------------+-------+-----------------------------+ + +SELECT ts, host, count(distinct *) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+--------------------------------------+ +| ts | host | COUNT(DISTINCT *) RANGE 5s FILL NULL | ++---------------------+-------+--------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | +| 1970-01-01T00:00:05 | host1 | 3 | +| 1970-01-01T00:00:10 | host1 | 3 | +| 1970-01-01T00:00:15 | host1 | 3 | +| 1970-01-01T00:00:20 | host1 | 3 | +| 1970-01-01T00:00:00 | host2 | 3 | +| 1970-01-01T00:00:05 | host2 | 3 | +| 1970-01-01T00:00:10 | host2 | 3 | +| 1970-01-01T00:00:15 | host2 | 3 | +| 1970-01-01T00:00:20 | host2 | 3 | ++---------------------+-------+--------------------------------------+ + +-- Test error first_value/last_value +SELECT ts, host, first_value(val, val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: No function matches the given name and argument types 'FIRST_VALUE(Int64, Int64)'. You might need to add explicit type casts. + Candidate functions: + FIRST_VALUE(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64) + +DROP TABLE host; + +Affected Rows: 0 + +-- Test first_value/last_value will execute sort +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + addon BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 0, 3), + (1000, 'host1', 1, 2), + (2000, 'host1', 2, 1); + +Affected Rows: 3 + +SELECT ts, first_value(val ORDER BY addon ASC) RANGE '5s', last_value(val ORDER BY addon ASC) RANGE '5s' FROM host ALIGN '5s'; + ++---------------------+-------------------------------------------------------------------------------+------------------------------------------------------------------------------+ +| ts | FIRST_VALUE(host.val) ORDER BY [host.addon ASC NULLS LAST] RANGE 5s FILL NULL | LAST_VALUE(host.val) ORDER BY [host.addon ASC NULLS LAST] RANGE 5s FILL NULL | ++---------------------+-------------------------------------------------------------------------------+------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | 2 | 0 | ++---------------------+-------------------------------------------------------------------------------+------------------------------------------------------------------------------+ + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/special_aggr.sql b/tests/cases/standalone/common/range/special_aggr.sql new file mode 100644 index 000000000000..bf3cd9e29c6d --- /dev/null +++ b/tests/cases/standalone/common/range/special_aggr.sql @@ -0,0 +1,91 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + addon BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 0, 1), + (1000, 'host1', 1, 2), + (2000, 'host1', 2, 3), + + (5000, 'host1', null, 4), + (6000, 'host1', null, 5), + (7000, 'host1', null, 6), + + (10000, 'host1', null, 7), + (11000, 'host1', 4, 8), + (12000, 'host1', 5, 9), + + (15000, 'host1', 6, 10), + (16000, 'host1', null, 11), + (17000, 'host1', 7, 12), + + (20000, 'host1', 8, 13), + (21000, 'host1', 9, 14), + (22000, 'host1', null, 15), + + (0, 'host2', 0, 16), + (1000, 'host2', 1, 17), + (2000, 'host2', 2, 18), + + (5000, 'host2', null, 19), + (6000, 'host2', null, 20), + (7000, 'host2', null, 21), + + (10000, 'host2', null, 22), + (11000, 'host2', 4, 23), + (12000, 'host2', 5, 24), + + (15000, 'host2', 6, 25), + (16000, 'host2', null, 26), + (17000, 'host2', 7, 27), + + (20000, 'host2', 8, 28), + (21000, 'host2', 9, 29), + (22000, 'host2', null, 30); + +SELECT ts, host, first_value(val) RANGE '5s', last_value(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, first_value(addon ORDER BY val DESC) RANGE '5s', last_value(addon ORDER BY val DESC) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, first_value(addon ORDER BY val DESC NULLS LAST) RANGE '5s', last_value(addon ORDER BY val DESC NULLS LAST) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, first_value(addon ORDER BY val ASC) RANGE '5s', last_value(addon ORDER BY val ASC) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, first_value(addon ORDER BY val ASC NULLS FIRST) RANGE '5s', last_value(addon ORDER BY val ASC NULLS FIRST) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, first_value(addon ORDER BY val ASC, ts ASC) RANGE '5s', last_value(addon ORDER BY val ASC, ts ASC) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, count(val) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, count(distinct val) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, count(*) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, count(distinct *) RANGE '5s'FROM host ALIGN '5s' ORDER BY host, ts; + +-- Test error first_value/last_value + +SELECT ts, host, first_value(val, val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +DROP TABLE host; + +-- Test first_value/last_value will execute sort + +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + addon BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 0, 3), + (1000, 'host1', 1, 2), + (2000, 'host1', 2, 1); + +SELECT ts, first_value(val ORDER BY addon ASC) RANGE '5s', last_value(val ORDER BY addon ASC) RANGE '5s' FROM host ALIGN '5s'; + +DROP TABLE host;