Skip to content

Commit

Permalink
fix: clone data instead of moving it - homemade future is dangerous (#…
Browse files Browse the repository at this point in the history
…3542)

* fix: clone data instead of moving it - homemade future is dangerous

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add comment

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Mar 19, 2024
1 parent a99d6eb commit 9816d2a
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/promql/src/extension_plan/series_divide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,21 @@ impl Stream for SeriesDivideStream {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(batch) = self.buffer.take() {
// It has to be cloned here, otherwise the later ready! will mess things up
if let Some(batch) = self.buffer.clone() {
let same_length = self.find_first_diff_row(&batch) + 1;
if same_length >= batch.num_rows() {
let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(next_batch)) => next_batch,
Some(Ok(batch)) => batch,
None => {
self.buffer = None;
self.num_series += 1;
return Poll::Ready(Some(Ok(batch)));
}
error => return Poll::Ready(error),
};
let new_batch = compute::concat_batches(&batch.schema(), &[batch, next_batch])?;
let new_batch =
compute::concat_batches(&batch.schema(), &[batch.clone(), next_batch])?;
self.buffer = Some(new_batch);
continue;
} else {
Expand Down

0 comments on commit 9816d2a

Please sign in to comment.