Skip to content

Commit

Permalink
defensive handle None
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Mar 19, 2024
1 parent 9a468c4 commit 315a4cc
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions src/promql/src/extension_plan/series_divide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,26 @@ impl Stream for SeriesDivideStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(batch) = self.buffer.as_ref() {
let same_length = self.find_first_diff_row(&batch) + 1;
let same_length = self.find_first_diff_row(batch) + 1;
if same_length >= batch.num_rows() {
let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(next_batch)) => next_batch,
None => {
self.num_series += 1;
let batch = self.buffer.take().expect("must be some batch");
return Poll::Ready(Some(Ok(batch)));
return Poll::Ready(self.buffer.take().map(Ok));
}
error => return Poll::Ready(error),
};
let batch = self.buffer.take().expect("must be some batch");
let new_batch = compute::concat_batches(&batch.schema(), &[batch, next_batch])?;
self.buffer = Some(new_batch);
match self.buffer.take() {
None => {
self.buffer = Some(next_batch);
}
Some(batch) => {
let new_batch =
compute::concat_batches(&batch.schema(), &[batch, next_batch])?;
self.buffer = Some(new_batch);
}
}
continue;
} else {
let result_batch = batch.slice(0, same_length);
Expand Down

0 comments on commit 315a4cc

Please sign in to comment.