Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ci): flaky test #16933

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/query/service/src/pipelines/builders/builder_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_pipeline_core::processors::create_resize_item;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
Expand Down Expand Up @@ -239,6 +240,18 @@ impl PipelineBuilder {
) -> Result<()> {
// we should avoid too much little block write, because for s3 write, there are too many
// little blocks, it will cause high latency.
let mut origin_len = transform_len;
let mut resize_len = 1;
let mut pipe_items = Vec::with_capacity(2);
if need_match {
origin_len += 1;
resize_len += 1;
pipe_items.push(create_dummy_item());
}
pipe_items.push(create_resize_item(transform_len, 1));
self.main_pipeline
.add_pipe(Pipe::create(origin_len, resize_len, pipe_items));

let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(AccumulatingTransformer::create(
Expand All @@ -247,13 +260,21 @@ impl PipelineBuilder {
BlockCompactBuilder::new(block_thresholds),
)))
},
transform_len,
1,
)?;
if need_match {
builder.add_items_prepend(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());

let mut pipe_items = Vec::with_capacity(2);
if need_match {
pipe_items.push(create_dummy_item());
}
pipe_items.push(create_resize_item(1, transform_len));
self.main_pipeline
.add_pipe(Pipe::create(resize_len, origin_len, pipe_items));

let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(BlockMetaTransformer::create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ statement ok
set enable_experimental_merge_into = 1

query TTT
settings (max_threads = 8) merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert *
merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert *
----
1 1 1

Expand All @@ -109,7 +109,7 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n
5 0 0 0 0
6 0 0 0 0
7 0 0 1 0
8 0 0 0 0
8 0 0 1 0

statement ok
create table t1(a int) change_tracking = true
Expand All @@ -136,7 +136,7 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n
5 0 0 0 0
6 0 0 0 0
7 0 0 1 0
8 0 0 0 0
8 0 0 1 0

###############
# issue 14955 #
Expand Down
Loading