Skip to content

Commit

Permalink
fix: drop channel if partition receiver finished (#17037)
Browse files Browse the repository at this point in the history
* fix: partition receiver should implement on_finish

* fix: partition receiver should implement on_finish

---------

Co-authored-by: Winter Zhang <coswde@gmail.com>
  • Loading branch information
dqhl76 and zhang2014 authored Dec 12, 2024
1 parent b5365e3 commit d46605e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 15 deletions.
20 changes: 19 additions & 1 deletion src/query/pipeline/sources/src/async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ pub trait AsyncSource: Send {
fn un_reacted(&self) -> Result<()> {
Ok(())
}

#[async_backtrace::framed]
async fn on_finish(&mut self) -> Result<()> {
Ok(())
}
}

// TODO: This can be refactored using proc macros
Expand All @@ -50,6 +55,7 @@ pub struct AsyncSourcer<T: 'static + AsyncSource> {
output: Arc<OutputPort>,
scan_progress: Arc<Progress>,
generated_data: Option<DataBlock>,
called_on_finish: bool,
}

impl<T: 'static + AsyncSource> AsyncSourcer<T> {
Expand All @@ -65,6 +71,7 @@ impl<T: 'static + AsyncSource> AsyncSourcer<T> {
scan_progress,
is_finish: false,
generated_data: None,
called_on_finish: false,
})))
}
}
Expand All @@ -81,12 +88,16 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {

fn event(&mut self) -> Result<Event> {
if self.is_finish {
if !self.called_on_finish {
return Ok(Event::Async);
}
self.output.finish();
return Ok(Event::Finished);
}

if self.output.is_finished() {
return Ok(Event::Finished);
self.is_finish = true;
return Ok(Event::Async);
}

if !self.output.can_push() {
Expand All @@ -112,6 +123,13 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {

#[async_backtrace::framed]
async fn async_process(&mut self) -> Result<()> {
if self.is_finish {
if !self.called_on_finish {
self.called_on_finish = true;
self.inner.on_finish().await?;
}
return Ok(());
}
match self.inner.generate().await? {
None => self.is_finish = true,
Some(data_block) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use databend_common_pipeline_sources::AsyncSourcer;
use crate::operations::read::block_partition_meta::BlockPartitionMeta;

pub struct BlockPartitionReceiverSource {
pub meta_receiver: Receiver<Result<PartInfoPtr>>,
pub meta_receiver: Option<Receiver<Result<PartInfoPtr>>>,
}

impl BlockPartitionReceiverSource {
Expand All @@ -37,7 +37,7 @@ impl BlockPartitionReceiverSource {
output_port: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx, output_port, Self {
meta_receiver: receiver,
meta_receiver: Some(receiver),
})
}
}
Expand All @@ -49,18 +49,28 @@ impl AsyncSource for BlockPartitionReceiverSource {

#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
match self.meta_receiver.recv().await {
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
BlockPartitionMeta::create(vec![part]),
))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
if let Some(rx) = &self.meta_receiver {
match rx.recv().await {
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
BlockPartitionMeta::create(vec![part]),
))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
}
}
} else {
Ok(None)
}
}

#[async_backtrace::framed]
async fn on_finish(&mut self) -> Result<()> {
drop(self.meta_receiver.take());
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ impl AsyncSink for SendPartInfoSink {

for info in info_ptr {
if let Some(sender) = &self.sender {
let _ = sender.send(Ok(info)).await;
if let Err(_e) = sender.send(Ok(info)).await {
break;
}
}
}

Expand Down

0 comments on commit d46605e

Please sign in to comment.