diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index 867911433..de929ca94 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{FromStream, IntoStream}; +use std::convert::identity; impl FromStream> for Option where @@ -17,24 +18,22 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs - let mut found_error = false; + let mut found_none = false; let out: V = stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { - found_error = true; - // Stop processing the stream on error - None - } + .take_while(|elem| { + elem.is_some() || { + found_none = true; + // Stop processing the stream on `None` + false } }) + .filter_map(identity) .collect() .await; - if found_error { None } else { Some(out) } + if found_none { None } else { Some(out) } }) } } diff --git a/src/option/product.rs b/src/option/product.rs index abaab73ed..b446c1ffe 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -1,7 +1,8 @@ use std::pin::Pin; use crate::prelude::*; -use crate::stream::{Stream, Product}; +use crate::stream::{Product, Stream}; +use std::convert::identity; impl Product> for Option where @@ -36,29 +37,27 @@ where ``` "#] fn product<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::product(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::product( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + // Stop processing the stream on `None` + false } - } - })).await; + }) + .filter_map(identity), + ) + .await; - if found_none { - None - } else { - Some(out) - } + if found_none { None } else { Some(out) } }) } } diff --git a/src/option/sum.rs b/src/option/sum.rs index d2e44830f..de404f42d 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{Stream, Sum}; +use std::convert::identity; impl Sum> for Option where @@ -31,29 +32,27 @@ where ``` "#] fn sum<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::sum(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::sum( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + // Stop processing the stream on `None` + false } - } - })).await; + }) + .filter_map(identity), + ) + .await; - if found_none { - None - } else { - Some(out) - } + if found_none { None } else { Some(out) } }) } } diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index a8490d691..8ce4b8534 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -17,26 +17,34 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; let out: V = stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - found_error = Some(err); - // Stop processing the stream on error - None - } + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None } }) .collect() .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/product.rs b/src/result/product.rs index ec9d94a80..45782ff70 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use crate::prelude::*; -use crate::stream::{Stream, Product}; +use crate::stream::{Product, Stream}; impl Product> for Result where @@ -36,26 +36,39 @@ where ``` "#] fn product<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::product(stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), + let out = >::product( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), Err(err) => { found_error = Some(err); - // Stop processing the stream on error None } - } - })).await; - match found_error { - Some(err) => Err(err), - None => Ok(out) + }), + ) + .await; + + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/sum.rs b/src/result/sum.rs index ccc4240e0..b6d84a0c4 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -36,26 +36,39 @@ where ``` "#] fn sum<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::sum(stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), + let out = >::sum( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), Err(err) => { found_error = Some(err); - // Stop processing the stream on error None } - } - })).await; - match found_error { - Some(err) => Err(err), - None => Ok(out) + }), + ) + .await; + + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/unit/from_stream.rs b/src/unit/from_stream.rs index da216e22c..7b784383d 100644 --- a/src/unit/from_stream.rs +++ b/src/unit/from_stream.rs @@ -8,6 +8,6 @@ impl FromStream<()> for () { fn from_stream<'a, S: IntoStream + 'a>( stream: S, ) -> Pin + 'a>> { - Box::pin(stream.into_stream().for_each(|_| ())) + Box::pin(stream.into_stream().for_each(drop)) } }