diff --git a/http/src/v0/pubsub.rs b/http/src/v0/pubsub.rs index ac8699cfa..a7f051dc7 100644 --- a/http/src/v0/pubsub.rs +++ b/http/src/v0/pubsub.rs @@ -183,8 +183,13 @@ async fn inner_subscribe( }; // map recv errors into the StreamError and flatten + let mut errored = false; rx.into_stream() .map(|res| res.map_err(|_| StreamError::Recv).and_then(|res| res)) + .take_while(move |res| { + // return until the first error + !std::mem::replace(&mut errored, res.is_err()) + }) } /// Shovel task takes items from the [`SubscriptionStream`], formats them and passes them on to