From 9816d2a08b262da7e167df9344947e74a9ad5b8e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 19 Mar 2024 21:46:55 +0800 Subject: [PATCH] fix: clone data instead of moving it - homemade future is dangerous (#3542) * fix: clone data instead of moving it - homemade future is dangerous Signed-off-by: Ruihang Xia * add comment --------- Signed-off-by: Ruihang Xia --- src/promql/src/extension_plan/series_divide.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 73fffbdeeb10..be41a2eed872 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -255,18 +255,21 @@ impl Stream for SeriesDivideStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - if let Some(batch) = self.buffer.take() { + // It has to be cloned here, otherwise the later ready! will mess things up + if let Some(batch) = self.buffer.clone() { let same_length = self.find_first_diff_row(&batch) + 1; if same_length >= batch.num_rows() { let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) { - Some(Ok(next_batch)) => next_batch, + Some(Ok(batch)) => batch, None => { + self.buffer = None; self.num_series += 1; return Poll::Ready(Some(Ok(batch))); } error => return Poll::Ready(error), }; - let new_batch = compute::concat_batches(&batch.schema(), &[batch, next_batch])?; + let new_batch = + compute::concat_batches(&batch.schema(), &[batch.clone(), next_batch])?; self.buffer = Some(new_batch); continue; } else {