Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): sort spilling may hang #16672

Merged
merged 6 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
#![feature(iter_map_windows)]

pub mod processors;
pub use processors::*;
2 changes: 1 addition & 1 deletion src/query/service/src/pipelines/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

pub use databend_common_pipeline_core::processors::*;
pub(crate) mod transforms;
pub mod transforms;

pub use transforms::HashJoinBuildState;
pub use transforms::HashJoinDesc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl<T: HashMethodBounds, V: Send + Sync + 'static> HashTableCell<T, V> {
self.hashtable.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn allocated_bytes(&self) -> usize {
self.hashtable.bytes_len(false)
+ self.arena.allocated_bytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
&self.location_prefix,
payload,
)?,
false => agg_spilling_aggregate_payload::<Method>(
false => agg_spilling_aggregate_payload(
self.ctx.clone(),
self.operator.clone(),
&self.location_prefix,
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}
}

fn agg_spilling_aggregate_payload<Method: HashMethodBounds>(
fn agg_spilling_aggregate_payload(
ctx: Arc<QueryContext>,
operator: Operator,
location_prefix: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
&self.location_prefix,
payload,
)?,
false => agg_spilling_group_by_payload::<Method>(
false => agg_spilling_group_by_payload(
self.ctx.clone(),
self.operator.clone(),
&self.location_prefix,
Expand Down Expand Up @@ -292,7 +292,7 @@ fn get_columns(data_block: DataBlock) -> Vec<BlockEntry> {
data_block.columns().to_vec()
}

fn agg_spilling_group_by_payload<Method: HashMethodBounds>(
fn agg_spilling_group_by_payload(
ctx: Arc<QueryContext>,
operator: Operator,
location_prefix: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct TransformAsyncFunction {
}

impl TransformAsyncFunction {
pub fn new(
pub(crate) fn new(
ctx: Arc<QueryContext>,
async_func_descs: Vec<AsyncFunctionDesc>,
operators: BTreeMap<usize, Arc<DictionaryOperator>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ where
}

if let Some(block) = self.output_data.take() {
debug_assert!(matches!(self.state, State::MergeFinal | State::Finish));
assert!(matches!(self.state, State::MergeFinal | State::Finish));
self.output_block(block);
return Ok(Event::NeedConsume);
}

if matches!(self.state, State::Finish) {
debug_assert!(self.input.is_finished());
assert!(self.input.is_finished());
self.output.finish();
return Ok(Event::Finished);
}
Expand Down Expand Up @@ -179,6 +179,7 @@ where
if meta.is_none() {
// It means we get the last block.
// We can launch external merge sort now.
self.input.finish();
self.state = State::Merging;
}
self.input_data = Some(block);
Expand Down
186 changes: 20 additions & 166 deletions src/query/service/tests/it/pipelines/transforms/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use databend_common_base::base::tokio;
use databend_common_base::base::tokio::sync::mpsc::channel;
use databend_common_base::base::tokio::sync::mpsc::Receiver;
use databend_common_exception::Result;
use databend_common_expression::block_debug::pretty_format_blocks;
use databend_common_expression::types::Int32Type;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
Expand All @@ -34,130 +33,16 @@ use databend_common_pipeline_core::PipeItem;
use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_sinks::SyncSenderSink;
use databend_common_pipeline_sources::BlocksSource;
use databend_common_pipeline_transforms::processors::add_k_way_merge_sort;
use databend_query::pipelines::executor::ExecutorSettings;
use databend_query::pipelines::executor::QueryPipelineExecutor;
use databend_query::sessions::QueryContext;
use databend_query::test_kits::TestFixture;
use itertools::Itertools;
use parking_lot::Mutex;
use rand::rngs::ThreadRng;
use rand::Rng;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_k_way_merge_sort() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;

let worker = 3;
let block_size = 4;
let limit = None;
let (data, expected) = basic_test_data(None);
let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?;

executor.execute()?;

let mut got: Vec<DataBlock> = Vec::new();
while !rx.is_empty() {
got.push(rx.recv().await.unwrap()?);
}

check_result(got, expected);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_k_way_merge_sort_fuzz() -> Result<()> {
let mut rng = rand::thread_rng();
let fixture = TestFixture::setup().await?;

for _ in 0..10 {
let ctx = fixture.new_query_ctx().await?;
run_fuzz(ctx, &mut rng, false).await?;
}

for _ in 0..10 {
let ctx = fixture.new_query_ctx().await?;
run_fuzz(ctx, &mut rng, true).await?;
}
Ok(())
}

async fn run_fuzz(ctx: Arc<QueryContext>, rng: &mut ThreadRng, with_limit: bool) -> Result<()> {
let worker = rng.gen_range(1..=5);
let block_size = rng.gen_range(1..=20);
let (data, expected, limit) = random_test_data(rng, with_limit);

// println!("\nwith_limit {with_limit}");
// for (input, blocks) in data.iter().enumerate() {
// println!("intput {input}");
// for b in blocks {
// println!("{:?}", b.columns()[0].value);
// }
// }

let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?;
executor.execute()?;

let mut got: Vec<DataBlock> = Vec::new();
while !rx.is_empty() {
got.push(rx.recv().await.unwrap()?);
}

check_result(got, expected);

Ok(())
}

fn create_pipeline(
ctx: Arc<QueryContext>,
data: Vec<Vec<DataBlock>>,
worker: usize,
block_size: usize,
limit: Option<usize>,
) -> Result<(Arc<QueryPipelineExecutor>, Receiver<Result<DataBlock>>)> {
let mut pipeline = Pipeline::create();

let data_type = data[0][0].get_by_offset(0).data_type.clone();
let source_pipe = create_source_pipe(ctx, data)?;
pipeline.add_pipe(source_pipe);

let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]);
let sort_desc = Arc::new(vec![SortColumnDescription {
offset: 0,
asc: true,
nulls_first: true,
is_nullable: false,
}]);
add_k_way_merge_sort(
&mut pipeline,
schema,
worker,
block_size,
limit,
sort_desc,
false,
true,
)?;

let (mut rx, sink_pipe) = create_sink_pipe(1)?;
let rx = rx.pop().unwrap();
pipeline.add_pipe(sink_pipe);
pipeline.set_max_threads(3);

let settings = ExecutorSettings {
query_id: Arc::new("".to_string()),
max_execute_time_in_seconds: Default::default(),
enable_queries_executor: false,
max_threads: 8,
executor_node_id: "".to_string(),
};
let executor = QueryPipelineExecutor::create(pipeline, settings)?;
Ok((executor, rx))
}

fn create_source_pipe(ctx: Arc<QueryContext>, data: Vec<Vec<DataBlock>>) -> Result<Pipe> {
use parking_lot::Mutex;

let size = data.len();
let mut items = Vec::with_capacity(size);

Expand All @@ -179,7 +64,7 @@ fn create_source_pipe(ctx: Arc<QueryContext>, data: Vec<Vec<DataBlock>>) -> Resu
fn create_sink_pipe(size: usize) -> Result<(Vec<Receiver<Result<DataBlock>>>, Pipe)> {
let mut rxs = Vec::with_capacity(size);
let mut items = Vec::with_capacity(size);
for _index in 0..size {
for _ in 0..size {
let input = InputPort::create();
let (tx, rx) = channel(1000);
rxs.push(rx);
Expand All @@ -193,21 +78,11 @@ fn create_sink_pipe(size: usize) -> Result<(Vec<Receiver<Result<DataBlock>>>, Pi
Ok((rxs, Pipe::create(size, 0, items)))
}

/// Returns (input, expected)
pub fn basic_test_data(limit: Option<usize>) -> (Vec<Vec<DataBlock>>, DataBlock) {
let data = vec![
vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]],
vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]],
vec![vec![0, 2, 4, 5]],
];

prepare_input_and_result(data, limit)
}

fn prepare_input_and_result(
fn prepare_multi_input_and_result(
data: Vec<Vec<Vec<i32>>>,
limit: Option<usize>,
) -> (Vec<Vec<DataBlock>>, DataBlock) {
use itertools::Itertools;
let input = data
.clone()
.into_iter()
Expand All @@ -229,7 +104,17 @@ fn prepare_input_and_result(
(input, result)
}

fn prepare_single_input_and_result(
data: Vec<Vec<i32>>,
limit: Option<usize>,
) -> (Vec<DataBlock>, DataBlock) {
let (mut input, expected) = prepare_multi_input_and_result(vec![data], limit);
(input.remove(0), expected)
}

fn check_result(result: Vec<DataBlock>, expected: DataBlock) {
use databend_common_expression::block_debug::pretty_format_blocks;

if expected.is_empty() {
if !result.is_empty() && !DataBlock::concat(&result).unwrap().is_empty() {
panic!(
Expand All @@ -240,46 +125,15 @@ fn check_result(result: Vec<DataBlock>, expected: DataBlock) {
return;
}

let result_rows: usize = result.iter().map(|v| v.num_rows()).sum();
let result = pretty_format_blocks(&result).unwrap();
let expected_rows = expected.num_rows();
let expected = pretty_format_blocks(&[expected]).unwrap();
let result_rows: usize = result.iter().map(|v| v.num_rows()).sum();
let result = pretty_format_blocks(&result).unwrap();
assert_eq!(
expected, result,
"\nexpected (num_rows = {}):\n{}\nactual (num_rows = {}):\n{}",
expected_rows, expected, result_rows, result
"\nexpected (num_rows = {expected_rows}):\n{expected}\nactual (num_rows = {result_rows}):\n{result}",
);
}

fn random_test_data(
rng: &mut ThreadRng,
with_limit: bool,
) -> (Vec<Vec<DataBlock>>, DataBlock, Option<usize>) {
let random_batch_size = rng.gen_range(1..=10);
let random_num_streams = rng.gen_range(5..=10);

let random_data = (0..random_num_streams)
.map(|_| {
let random_num_blocks = rng.gen_range(1..=10);
let mut data = (0..random_batch_size * random_num_blocks)
.map(|_| rng.gen_range(0..=1000))
.collect::<Vec<_>>();
data.sort();
data.chunks(random_batch_size)
.map(|v| v.to_vec())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

let num_rows = random_data
.iter()
.map(|v| v.iter().map(|v| v.len()).sum::<usize>())
.sum::<usize>();
let limit = if with_limit {
Some(rng.gen_range(0..=num_rows))
} else {
None
};
let (input, expected) = prepare_input_and_result(random_data, limit);
(input, expected, limit)
}
mod k_way;
mod spill;
Loading
Loading