Skip to content

Commit

Permalink
fix(ci): flaky test (databendlabs#16933)
Browse files Browse the repository at this point in the history
* flaky test

* fix

* fix test
  • Loading branch information
zhyass authored and Dousir9 committed Nov 27, 2024
1 parent 3689eea commit 556b287
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
23 changes: 22 additions & 1 deletion src/query/service/src/pipelines/builders/builder_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_pipeline_core::processors::create_resize_item;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
Expand Down Expand Up @@ -239,6 +240,18 @@ impl PipelineBuilder {
) -> Result<()> {
// we should avoid too much little block write, because for s3 write, there are too many
// little blocks, it will cause high latency.
let mut origin_len = transform_len;
let mut resize_len = 1;
let mut pipe_items = Vec::with_capacity(2);
if need_match {
origin_len += 1;
resize_len += 1;
pipe_items.push(create_dummy_item());
}
pipe_items.push(create_resize_item(transform_len, 1));
self.main_pipeline
.add_pipe(Pipe::create(origin_len, resize_len, pipe_items));

let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(AccumulatingTransformer::create(
Expand All @@ -247,13 +260,21 @@ impl PipelineBuilder {
BlockCompactBuilder::new(block_thresholds),
)))
},
transform_len,
1,
)?;
if need_match {
builder.add_items_prepend(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());

let mut pipe_items = Vec::with_capacity(2);
if need_match {
pipe_items.push(create_dummy_item());
}
pipe_items.push(create_resize_item(1, transform_len));
self.main_pipeline
.add_pipe(Pipe::create(resize_len, origin_len, pipe_items));

let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(BlockMetaTransformer::create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,25 @@ statement ok
create table t2(a int)

statement ok
insert into t2 values(1),(2),(8)
insert into t2 values(0),(2),(1)

statement ok
set enable_experimental_merge_into = 1

query TTT
merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert *
merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 8 when matched and t2.a = 2 then delete when not matched then insert *
----
1 1 1

query IBBII
select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a
----
0 0 0 0 1
0 0 0 0 0
3 0 0 1 1
5 0 0 0 0
6 0 0 0 0
7 0 0 1 0
8 0 0 0 0
8 0 0 0 1

statement ok
create table t1(a int) change_tracking = true
Expand All @@ -131,12 +131,12 @@ merge into t using t1 on t.a = t1.a when matched and t1.a = 0 then update set t.
query IBBII
select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a
----
1 0 0 0 2
1 0 0 0 1
2 0 0 1 2
5 0 0 0 0
6 0 0 0 0
7 0 0 1 0
8 0 0 0 0
8 0 0 0 1

###############
# issue 14955 #
Expand Down

0 comments on commit 556b287

Please sign in to comment.