diff --git a/src/query/service/src/pipelines/builders/builder_mutation.rs b/src/query/service/src/pipelines/builders/builder_mutation.rs index 18b64ffe59f0..3cd64c367fc5 100644 --- a/src/query/service/src/pipelines/builders/builder_mutation.rs +++ b/src/query/service/src/pipelines/builders/builder_mutation.rs @@ -20,6 +20,7 @@ use databend_common_exception::Result; use databend_common_expression::BlockThresholds; use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; +use databend_common_pipeline_core::processors::create_resize_item; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; @@ -239,6 +240,18 @@ impl PipelineBuilder { ) -> Result<()> { // we should avoid too much little block write, because for s3 write, there are too many // little blocks, it will cause high latency. + let mut origin_len = transform_len; + let mut resize_len = 1; + let mut pipe_items = Vec::with_capacity(2); + if need_match { + origin_len += 1; + resize_len += 1; + pipe_items.push(create_dummy_item()); + } + pipe_items.push(create_resize_item(transform_len, 1)); + self.main_pipeline + .add_pipe(Pipe::create(origin_len, resize_len, pipe_items)); + let mut builder = self.main_pipeline.add_transform_with_specified_len( |transform_input_port, transform_output_port| { Ok(ProcessorPtr::create(AccumulatingTransformer::create( @@ -247,13 +260,21 @@ impl PipelineBuilder { BlockCompactBuilder::new(block_thresholds), ))) }, - transform_len, + 1, )?; if need_match { builder.add_items_prepend(vec![create_dummy_item()]); } self.main_pipeline.add_pipe(builder.finalize()); + let mut pipe_items = Vec::with_capacity(2); + if need_match { + pipe_items.push(create_dummy_item()); + } + pipe_items.push(create_resize_item(1, transform_len)); + self.main_pipeline + .add_pipe(Pipe::create(resize_len, origin_len, pipe_items)); + let mut builder = self.main_pipeline.add_transform_with_specified_len( |transform_input_port, transform_output_port| { Ok(ProcessorPtr::create(BlockMetaTransformer::create( diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test index 2263a860cd04..6025b15c33f4 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test @@ -91,25 +91,25 @@ statement ok create table t2(a int) statement ok -insert into t2 values(1),(2),(8) +insert into t2 values(0),(2),(1) statement ok set enable_experimental_merge_into = 1 query TTT -merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert * +merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 8 when matched and t2.a = 2 then delete when not matched then insert * ---- 1 1 1 query IBBII select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a ---- -0 0 0 0 1 +0 0 0 0 0 3 0 0 1 1 5 0 0 0 0 6 0 0 0 0 7 0 0 1 0 -8 0 0 0 0 +8 0 0 0 1 statement ok create table t1(a int) change_tracking = true @@ -131,12 +131,12 @@ merge into t using t1 on t.a = t1.a when matched and t1.a = 0 then update set t. query IBBII select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a ---- -1 0 0 0 2 +1 0 0 0 1 2 0 0 1 2 5 0 0 0 0 6 0 0 0 0 7 0 0 1 0 -8 0 0 0 0 +8 0 0 0 1 ############### # issue 14955 #