diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 2077e667b8b3..c384849ac011 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -224,23 +224,26 @@ impl FuseTable { let table = self.clone(); let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - self.runtime.spawn(async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - for part in partitions.partitions { - // ignore the error, the sql may be killed or early stop - let _ = sender.send(Ok(part)).await; + pipeline.set_on_init(move || { + table.runtime.clone().spawn(async move { + match table + .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) + .await + { + Ok((_, partitions)) => { + for part in partitions.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; + } + } + Err(err) => { + let _ = sender.send(Err(err)).await; } } - Err(err) => { - let _ = sender.send(Err(err)).await; - } - } - Ok::<_, ErrorCode>(()) - }); + Ok::<_, ErrorCode>(()) + }); + Ok(()) + }) } Ok(())