Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve AggregationFuzzer error reporting #12832

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn test_basic_prim_aggr_no_group() {
.table_name("fuzz_table")
.build();

fuzzer.run().await;
fuzzer.run().await
}

/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `group by single int64`
Expand Down
90 changes: 58 additions & 32 deletions datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use arrow::util::pretty::pretty_format_batches;
use arrow_array::RecordBatch;
use datafusion_common::{DataFusionError, Result};
use rand::{thread_rng, Rng};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -132,7 +133,20 @@ struct QueryGroup {
}

impl AggregationFuzzer {
/// Run the fuzzer, printing an error and panicking if any of the tasks fail
pub async fn run(&self) {
let res = self.run_inner().await;

if let Err(e) = res {
// Print the error via `Display` so that it displays nicely (the default `unwrap()`
// prints using `Debug` which escapes newlines, and makes multi-line messages
// hard to read
println!("{e}");
alamb marked this conversation as resolved.
Show resolved Hide resolved
panic!("Error!");
}
}

async fn run_inner(&self) -> Result<()> {
let mut join_set = JoinSet::new();
let mut rng = thread_rng();

Expand All @@ -157,16 +171,20 @@ impl AggregationFuzzer {

let tasks = self.generate_fuzz_tasks(query_groups).await;
for task in tasks {
join_set.spawn(async move {
task.run().await;
});
join_set.spawn(async move { task.run().await });
}
}

while let Some(join_handle) = join_set.join_next().await {
// propagate errors
join_handle.unwrap();
join_handle.map_err(|e| {
DataFusionError::Internal(format!(
"AggregationFuzzer task error: {:?}",
e
))
})??;
}
Ok(())
}

async fn generate_fuzz_tasks(
Expand Down Expand Up @@ -237,45 +255,53 @@ struct AggregationFuzzTestTask {
}

impl AggregationFuzzTestTask {
async fn run(&self) {
async fn run(&self) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key change is to return an actual DataFusionError here and then print it when looking at the task output

let task_result = run_sql(&self.sql, &self.ctx_with_params.ctx)
.await
.expect("should success to run sql");
self.check_result(&task_result, &self.expected_result);
.map_err(|e| e.context(self.context_error_report()))?;
self.check_result(&task_result, &self.expected_result)
}

// TODO: maybe we should persist the `expected_result` and `task_result`,
// because the readability is not so good if we just print it.
fn check_result(&self, task_result: &[RecordBatch], expected_result: &[RecordBatch]) {
let result = check_equality_of_batches(task_result, expected_result);
if let Err(e) = result {
fn check_result(
&self,
task_result: &[RecordBatch],
expected_result: &[RecordBatch],
) -> Result<()> {
check_equality_of_batches(task_result, expected_result).map_err(|e| {
// If we found inconsistent result, we print the test details for reproducing at first
println!(
"##### AggregationFuzzer error report #####
### Sql:\n{}\n\
### Schema:\n{}\n\
### Session context params:\n{:?}\n\
### Inconsistent row:\n\
- row_idx:{}\n\
- task_row:{}\n\
- expected_row:{}\n\
### Task total result:\n{}\n\
### Expected total result:\n{}\n\
### Input:\n{}\n\
",
self.sql,
self.dataset_ref.batches[0].schema_ref(),
self.ctx_with_params.params,
let message = format!(
"{}\n\
### Inconsistent row:\n\
- row_idx:{}\n\
- task_row:{}\n\
- expected_row:{}\n\
### Task total result:\n{}\n\
### Expected total result:\n{}\n\
",
self.context_error_report(),
e.row_idx,
e.lhs_row,
e.rhs_row,
pretty_format_batches(task_result).unwrap(),
pretty_format_batches(expected_result).unwrap(),
pretty_format_batches(&self.dataset_ref.batches).unwrap(),
);
DataFusionError::Internal(message)
})
}

// Then we just panic
panic!();
}
/// Returns a formatted error message
fn context_error_report(&self) -> String {
format!(
"##### AggregationFuzzer error report #####\n\
### Sql:\n{}\n\
### Schema:\n{}\n\
### Session context params:\n{:?}\n\
Comment on lines +296 to +298
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we will also care about some information about reproducing like session context params, sql, when we encounter inconsistent results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that information is already included when inconsistent results are detected (in check_result) but perhaps I don't understand what you are proposing here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it is my mistake, I see it now.

### Input:\n{}\n\
",
self.sql,
self.dataset_ref.batches[0].schema_ref(),
self.ctx_with_params.params,
pretty_format_batches(&self.dataset_ref.batches).unwrap(),
)
}
}