From 3dfd24de6df16c4c433edc97a32beed2fb5dc3c6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 21 Oct 2023 14:51:17 -0400 Subject: [PATCH 1/4] Add multi-column topk tests --- .../core/tests/fuzz_cases/limit_fuzz.rs | 334 ++++++++++++++++++ datafusion/core/tests/fuzz_cases/mod.rs | 2 + datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 221 +----------- test-utils/src/lib.rs | 2 +- 4 files changed, 340 insertions(+), 219 deletions(-) create mode 100644 datafusion/core/tests/fuzz_cases/limit_fuzz.rs diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs new file mode 100644 index 000000000000..09c529147aac --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Fuzz Test for Sort + Fetch/Limit (TopK!) + +use arrow::util::pretty::pretty_format_batches; +use arrow::{array::Int32Array, record_batch::RecordBatch}; +use arrow_array::{Float64Array, Int64Array, StringArray}; +use arrow_schema::SchemaRef; +use datafusion::datasource::MemTable; +use datafusion::prelude::SessionContext; +use datafusion_common::assert_contains; +use rand::{thread_rng, Rng}; +use std::sync::Arc; +use test_utils::stagger_batch; + +#[tokio::test] +async fn test_sort_topk_i32() { + run_limit_fuzz_test(|size| SortedData::new_i32(size)).await +} + +#[tokio::test] +async fn test_sort_topk_f64() { + run_limit_fuzz_test(|size| SortedData::new_f64(size)).await +} + +#[tokio::test] +async fn test_sort_topk_str() { + run_limit_fuzz_test(|size| SortedData::new_str(size)).await +} + +#[tokio::test] +async fn test_sort_topk_i64str() { + run_limit_fuzz_test(|size| SortedData::new_i64str(size)).await +} + +/// Run TopK fuzz tests the specified input data with different +/// different test functions so they can run in parallel) +async fn run_limit_fuzz_test(make_data: F) +where + F: Fn(usize) -> SortedData, +{ + //let mut rng = thread_rng(); + for size in [10, 1_0000, 10_000, 100_000] { + let data = make_data(size); + // test various limits + // TODO add back in large limits + for limit in [1, 3, 7, 17 /*10000, rng.gen_range(1..size)*/] { + // limit can be larger than the number of rows in the input + run_limit_test(limit, &data).await; + } + } +} + +/// The type of column(s) to use for the sort test +#[derive(Debug)] +enum SortedData { + // single Int32 column + I32 { + batches: Vec, + sorted: Vec>, + }, + /// Single Float64 column + F64 { + batches: Vec, + sorted: Vec>, + }, + /// Single sorted String column + Str { + batches: Vec, + sorted: Vec>, + }, + /// (i64, string) columns + I64Str { + batches: Vec, + sorted: Vec<(Option, Option)>, + }, +} + +impl SortedData { + /// Create an i32 column of random values, with the specified number of + /// rows, sorted the default + fn new_i32(size: usize) -> Self { + let mut rng = thread_rng(); + // have some repeats (approximately 1/3 of the values are the same) + let max = size as i32 / 3; + let data: Vec> = (0..size) + .map(|_| { + // no nulls for now + Some(rng.gen_range(0..max)) + }) + .collect(); + + let batches = stagger_batch(int32_batch(data.iter().cloned())); + + let mut sorted = data; + sorted.sort_unstable(); + + Self::I32 { batches, sorted } + } + + /// Create an f64 column of random values, with the specified number of + /// rows, sorted the default + fn new_f64(size: usize) -> Self { + let mut rng = thread_rng(); + let mut data: Vec> = (0..size / 3) + .map(|_| { + // no nulls for now + Some(rng.gen_range(0.0..1.0f64)) + }) + .collect(); + + // have some repeats (approximately 1/3 of the values are the same) + while data.len() < size { + data.push(data[rng.gen_range(0..data.len())]); + } + + let batches = stagger_batch(f64_batch(data.iter().cloned())); + + let mut sorted = data; + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap()); + + Self::F64 { batches, sorted } + } + + /// Create an string column of random values, with the specified number of + /// rows, sorted the default + fn new_str(size: usize) -> Self { + let mut rng = thread_rng(); + let mut data: Vec> = (0..size / 3) + .map(|_| { + // no nulls for now + Some(get_random_string(16)) + }) + .collect(); + + // have some repeats (approximately 1/3 of the values are the same) + while data.len() < size { + data.push(data[rng.gen_range(0..data.len())].clone()); + } + + let batches = stagger_batch(string_batch(data.iter())); + + let mut sorted = data; + sorted.sort_unstable(); + + Self::Str { batches, sorted } + } + + /// Create two columns of random values (int64, string), with the specified number of + /// rows, sorted the default + fn new_i64str(size: usize) -> Self { + let mut rng = thread_rng(); + + // 100 distinct values + let strings: Vec> = (0..100) + .map(|_| { + // no nulls for now + Some(get_random_string(16)) + }) + .collect(); + + // form inputs, with only 10 distinct integer values , to force collision checks + let data = (0..size) + .map(|_| { + ( + Some(rng.gen_range(0..10)), + strings[rng.gen_range(0..strings.len())].clone(), + ) + }) + .collect::>(); + + let batches = stagger_batch(i64string_batch(data.iter())); + + let mut sorted = data; + sorted.sort_unstable(); + + Self::I64Str { batches, sorted } + } + + /// Return top top `limit` values as a RecordBatch + fn topk_values(&self, limit: usize) -> RecordBatch { + match self { + Self::I32 { sorted, .. } => int32_batch(sorted.iter().take(limit).cloned()), + Self::F64 { sorted, .. } => f64_batch(sorted.iter().take(limit).cloned()), + Self::Str { sorted, .. } => string_batch(sorted.iter().take(limit)), + Self::I64Str { sorted, .. } => i64string_batch(sorted.iter().take(limit)), + } + } + + /// Return the input data to sort + fn batches(&self) -> Vec { + match self { + Self::I32 { batches, .. } => batches.clone(), + Self::F64 { batches, .. } => batches.clone(), + Self::Str { batches, .. } => batches.clone(), + Self::I64Str { batches, .. } => batches.clone(), + } + } + + /// Return the schema of the input data + fn schema(&self) -> SchemaRef { + match self { + Self::I32 { batches, .. } => batches[0].schema(), + Self::F64 { batches, .. } => batches[0].schema(), + Self::Str { batches, .. } => batches[0].schema(), + Self::I64Str { batches, .. } => batches[0].schema(), + } + } + + /// Return the sort expression to use for this data, depending on the type + fn sort_expr(&self) -> Vec { + match self { + Self::I32 { .. } | Self::F64 { .. } | Self::Str { .. } => { + vec![datafusion_expr::col("x").sort(true, true)] + } + Self::I64Str { .. } => { + vec![ + datafusion_expr::col("x").sort(true, true), + datafusion_expr::col("y").sort(true, true), + ] + } + } + } +} + +/// Create a record batch with a single column of type `Int32` named "x" +fn int32_batch(values: impl IntoIterator>) -> RecordBatch { + RecordBatch::try_from_iter(vec![( + "x", + Arc::new(Int32Array::from_iter(values.into_iter())) as _, + )]) + .unwrap() +} + +/// Create a record batch with a single column of type `Float64` named "x" +fn f64_batch(values: impl IntoIterator>) -> RecordBatch { + RecordBatch::try_from_iter(vec![( + "x", + Arc::new(Float64Array::from_iter(values.into_iter())) as _, + )]) + .unwrap() +} + +/// Create a record batch with a single column of type `StringArray` named "x" +fn string_batch<'a>(values: impl IntoIterator>) -> RecordBatch { + RecordBatch::try_from_iter(vec![( + "x", + Arc::new(StringArray::from_iter(values.into_iter())) as _, + )]) + .unwrap() +} + +/// Create a record batch with i64 column "x" and utf8 column "y" +fn i64string_batch<'a>( + values: impl IntoIterator, Option)> + Clone, +) -> RecordBatch { + let ints = values.clone().into_iter().map(|(i, _)| *i); + let strings = values.into_iter().map(|(_, s)| s); + RecordBatch::try_from_iter(vec![ + ("x", Arc::new(Int64Array::from_iter(ints)) as _), + ("y", Arc::new(StringArray::from_iter(strings)) as _), + ]) + .unwrap() +} + +/// Run the TopK test, sorting the input batches with the specified ftch +/// (limit) and compares the results to the expected values. +async fn run_limit_test(fetch: usize, data: &SortedData) { + let input = data.batches(); + let schema = data.schema(); + + let table = MemTable::try_new(schema, vec![input]).unwrap(); + + let ctx = SessionContext::new(); + let df = ctx + .read_table(Arc::new(table)) + .unwrap() + .sort(data.sort_expr()) + .unwrap() + .limit(0, Some(fetch)) + .unwrap(); + + // Verify the plan contains a TopK node + { + let explain = df + .clone() + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + let plan_text = pretty_format_batches(&explain).unwrap().to_string(); + let expected = format!("TopK(fetch={fetch})"); + assert_contains!(plan_text, expected); + } + + let results = df.collect().await.unwrap(); + let expected = vec![data.topk_values(fetch)]; + + assert_eq!(expected, results, "TopK mismatch fetch {fetch} {} expected rows, {} actual rows.\n\nExpected:\n{}\n\nActual:\n{}", + row_count(&expected), + row_count(&results), + pretty_format_batches(&expected).unwrap(), + pretty_format_batches(&results).unwrap(), + ); +} + +fn row_count(batches: &[RecordBatch]) -> usize { + batches.iter().map(|b| b.num_rows()).sum() +} + +/// Return random ASCII String with len +fn get_random_string(len: usize) -> String { + rand::thread_rng() + .sample_iter(rand::distributions::Alphanumeric) + .take(len) + .map(char::from) + .collect() +} diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index 140cf7e5c75b..83ec928ae229 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -19,5 +19,7 @@ mod aggregate_fuzz; mod join_fuzz; mod merge_fuzz; mod sort_fuzz; + +mod limit_fuzz; mod sort_preserving_repartition_fuzz; mod window_fuzz; diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 2615abfd3c0d..f4b4f16aa160 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -22,25 +22,17 @@ use arrow::{ compute::SortOptions, record_batch::RecordBatch, }; -use arrow_array::{Float64Array, StringArray}; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion::{ - datasource::MemTable, - execution::runtime_env::{RuntimeConfig, RuntimeEnv}, -}; -use datafusion_common::{ - cast::{as_float64_array, as_string_array}, - TableReference, -}; use datafusion_execution::memory_pool::GreedyMemoryPool; use datafusion_physical_expr::expressions::col; -use rand::{rngs::StdRng, Rng, SeedableRng}; +use rand::Rng; use std::sync::Arc; -use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch}; +use test_utils::{batches_to_vec, partitions_to_sorted_vec}; const KB: usize = 1 << 10; #[tokio::test] @@ -80,44 +72,6 @@ async fn test_sort_unlimited_mem() { .await; } } - -#[tokio::test] -async fn test_sort_topk() { - for size in [10, 100, 1000, 10000, 1000000] { - let mut topk_scenario = TopKScenario::new() - .with_limit(10) - .with_table_name("t") - .with_col_name("x"); - - // test topk with i32 - let collected_i32 = SortTest::new() - .with_input(topk_scenario.batches(size, ColType::I32)) - .run_with_limit(&topk_scenario) - .await; - let actual = batches_to_vec(&collected_i32); - let excepted_i32 = topk_scenario.excepted_i32(); - assert_eq!(actual, excepted_i32); - - // test topk with f64 - let collected_f64 = SortTest::new() - .with_input(topk_scenario.batches(size, ColType::F64)) - .run_with_limit(&topk_scenario) - .await; - let actual: Vec> = batches_to_f64_vec(&collected_f64); - let excepted_f64 = topk_scenario.excepted_f64(); - assert_eq!(actual, excepted_f64); - - // test topk with str - let collected_str = SortTest::new() - .with_input(topk_scenario.batches(size, ColType::Str)) - .run_with_limit(&topk_scenario) - .await; - let actual: Vec> = batches_to_str_vec(&collected_str); - let excepted_str = topk_scenario.excepted_str(); - assert_eq!(actual, excepted_str); - } -} - #[derive(Debug, Default)] struct SortTest { input: Vec>, @@ -132,11 +86,6 @@ impl SortTest { Default::default() } - fn with_input(mut self, batches: Vec>) -> Self { - self.input = batches.clone(); - self - } - /// Create batches of int32 values of rows fn with_int32_batches(mut self, rows: usize) -> Self { self.input = vec![make_staggered_i32_batches(rows)]; @@ -154,44 +103,6 @@ impl SortTest { self } - async fn run_with_limit<'a>( - &self, - topk_scenario: &TopKScenario<'a>, - ) -> Vec { - let input = self.input.clone(); - let schema = input - .iter() - .flat_map(|p| p.iter()) - .next() - .expect("at least one batch") - .schema(); - - let table = MemTable::try_new(schema, input.clone()).unwrap(); - - let ctx = SessionContext::new(); - - ctx.register_table( - TableReference::Bare { - table: topk_scenario.table_name.into(), - }, - Arc::new(table), - ) - .unwrap(); - - let df = ctx - .table(topk_scenario.table_name) - .await - .unwrap() - .sort(vec![ - datafusion_expr::col(topk_scenario.col_name).sort(true, true) - ]) - .unwrap() - .limit(0, Some(topk_scenario.limit)) - .unwrap(); - - df.collect().await.unwrap() - } - /// Sort the input using SortExec and ensure the results are /// correct according to `Vec::sort` both with and without spilling async fn run(&self) { @@ -262,109 +173,6 @@ impl SortTest { } } -enum ColType { - I32, - F64, - Str, -} - -struct TopKScenario<'a> { - limit: usize, - batches: Vec>, - table_name: &'a str, - col_name: &'a str, -} - -impl<'a> TopKScenario<'a> { - fn new() -> Self { - TopKScenario { - limit: 0, - batches: vec![], - table_name: "", - col_name: "", - } - } - - fn with_limit(mut self, limit: usize) -> Self { - self.limit = limit; - self - } - - fn with_table_name(mut self, table_name: &'a str) -> Self { - self.table_name = table_name; - self - } - - fn with_col_name(mut self, col_name: &'a str) -> Self { - self.col_name = col_name; - self - } - - fn batches(&mut self, len: usize, t: ColType) -> Vec> { - let batches = match t { - ColType::I32 => make_staggered_i32_batches(len), - ColType::F64 => make_staggered_f64_batches(len), - ColType::Str => make_staggered_str_batches(len), - }; - self.batches = vec![batches]; - self.batches.clone() - } - - fn excepted_i32(&self) -> Vec> { - let excepted = partitions_to_sorted_vec(&self.batches); - excepted[0..self.limit].into() - } - - fn excepted_f64(&self) -> Vec> { - let mut excepted: Vec> = self - .batches - .iter() - .flat_map(|batches| batches_to_f64_vec(batches).into_iter()) - .collect(); - excepted.sort_by(|a, b| a.partial_cmp(b).unwrap()); - excepted[0..self.limit].into() - } - - fn excepted_str(&self) -> Vec> { - let mut excepted: Vec> = self - .batches - .iter() - .flat_map(|batches| batches_to_str_vec(batches).into_iter()) - .collect(); - excepted.sort_unstable(); - excepted[0..self.limit].into() - } -} - -impl Default for TopKScenario<'_> { - fn default() -> Self { - Self::new() - } -} - -fn make_staggered_f64_batches(len: usize) -> Vec { - let mut rng = StdRng::seed_from_u64(100); - let remainder = RecordBatch::try_from_iter(vec![( - "x", - Arc::new(Float64Array::from_iter_values( - (0..len).map(|_| rng.gen_range(0.0..1000.7)), - )) as ArrayRef, - )]) - .unwrap(); - stagger_batch(remainder) -} - -fn make_staggered_str_batches(len: usize) -> Vec { - let remainder = RecordBatch::try_from_iter(vec![( - "x", - Arc::new(StringArray::from_iter_values( - (0..len).map(|_| get_random_string(6)), - )) as ArrayRef, - )]) - .unwrap(); - stagger_batch(remainder) -} - /// Return randomly sized record batches in a field named 'x' of type `Int32` /// with randomized i32 content fn make_staggered_i32_batches(len: usize) -> Vec { @@ -389,26 +197,3 @@ fn make_staggered_i32_batches(len: usize) -> Vec { } batches } - -/// Return random ASCII String with len -fn get_random_string(len: usize) -> String { - rand::thread_rng() - .sample_iter(rand::distributions::Alphanumeric) - .take(len) - .map(char::from) - .collect() -} - -fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec> { - batches - .iter() - .flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter()) - .collect() -} - -fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec> { - batches - .iter() - .flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter()) - .collect() -} diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index e3c96d16eeb9..0c3668d2f8c0 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -38,7 +38,7 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec> { .collect() } -/// extract values from batches and sort them +/// extract i32 values from batches and sort them pub fn partitions_to_sorted_vec(partitions: &[Vec]) -> Vec> { let mut values: Vec<_> = partitions .iter() From 71e4e62d46041475a9555693c0b2140dfe5b4eb3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 21 Oct 2023 15:52:50 -0400 Subject: [PATCH 2/4] clippy --- datafusion/core/tests/fuzz_cases/limit_fuzz.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs index 09c529147aac..e422328c89a2 100644 --- a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs @@ -30,22 +30,22 @@ use test_utils::stagger_batch; #[tokio::test] async fn test_sort_topk_i32() { - run_limit_fuzz_test(|size| SortedData::new_i32(size)).await + run_limit_fuzz_test(SortedData::new_i32).await } #[tokio::test] async fn test_sort_topk_f64() { - run_limit_fuzz_test(|size| SortedData::new_f64(size)).await + run_limit_fuzz_test(SortedData::new_f64).await } #[tokio::test] async fn test_sort_topk_str() { - run_limit_fuzz_test(|size| SortedData::new_str(size)).await + run_limit_fuzz_test(SortedData::new_str).await } #[tokio::test] async fn test_sort_topk_i64str() { - run_limit_fuzz_test(|size| SortedData::new_i64str(size)).await + run_limit_fuzz_test(SortedData::new_i64str).await } /// Run TopK fuzz tests the specified input data with different From d84486c9fd8d04b9c02148b21aec6991f6baa27a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 21 Oct 2023 16:05:10 -0400 Subject: [PATCH 3/4] fix validation --- .../core/tests/fuzz_cases/limit_fuzz.rs | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs index e422328c89a2..cc39a7fec927 100644 --- a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs @@ -17,6 +17,7 @@ //! Fuzz Test for Sort + Fetch/Limit (TopK!) +use arrow::compute::concat_batches; use arrow::util::pretty::pretty_format_batches; use arrow::{array::Int32Array, record_batch::RecordBatch}; use arrow_array::{Float64Array, Int64Array, StringArray}; @@ -54,12 +55,11 @@ async fn run_limit_fuzz_test(make_data: F) where F: Fn(usize) -> SortedData, { - //let mut rng = thread_rng(); + let mut rng = thread_rng(); for size in [10, 1_0000, 10_000, 100_000] { let data = make_data(size); - // test various limits - // TODO add back in large limits - for limit in [1, 3, 7, 17 /*10000, rng.gen_range(1..size)*/] { + // test various limits including some random ones + for limit in [1, 3, 7, 17, 10000, rng.gen_range(1..size * 2)] { // limit can be larger than the number of rows in the input run_limit_test(limit, &data).await; } @@ -310,18 +310,30 @@ async fn run_limit_test(fetch: usize, data: &SortedData) { } let results = df.collect().await.unwrap(); - let expected = vec![data.topk_values(fetch)]; + let expected = data.topk_values(fetch); - assert_eq!(expected, results, "TopK mismatch fetch {fetch} {} expected rows, {} actual rows.\n\nExpected:\n{}\n\nActual:\n{}", - row_count(&expected), - row_count(&results), - pretty_format_batches(&expected).unwrap(), - pretty_format_batches(&results).unwrap(), - ); -} + // Verify that all output batches conform to the specified batch size + let max_batch_size = ctx.copied_config().batch_size(); + for batch in &results { + assert!(batch.num_rows() <= max_batch_size); + } -fn row_count(batches: &[RecordBatch]) -> usize { - batches.iter().map(|b| b.num_rows()).sum() + let results = concat_batches(&results[0].schema(), &results).unwrap(); + + let results = [results]; + let expected = [expected]; + + assert_eq!( + &expected, + &results, + "TopK mismatch fetch {fetch} \n\ + expected rows {}, actual rows {}.\ + \n\nExpected:\n{}\n\nActual:\n{}", + expected[0].num_rows(), + results[0].num_rows(), + pretty_format_batches(&expected).unwrap(), + pretty_format_batches(&results).unwrap(), + ); } /// Return random ASCII String with len From be4373a1b172c07dcfa59cdb104d8a3061f4a72f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 21 Oct 2023 16:08:23 -0400 Subject: [PATCH 4/4] Update docs --- datafusion/core/tests/fuzz_cases/limit_fuzz.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs index cc39a7fec927..9889ce2ae562 100644 --- a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs @@ -66,7 +66,10 @@ where } } -/// The type of column(s) to use for the sort test +/// The data column(s) to use for the TopK test +/// +/// Each variants stores the input batches and the expected sorted values +/// compute the expected output for a given fetch (limit) value. #[derive(Debug)] enum SortedData { // single Int32 column