Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 1, 2024
1 parent 1e0aa40 commit d5e077f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 18 deletions.
5 changes: 2 additions & 3 deletions src/common/meta/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
use common_procedure::store::util::{multiple_values_collector, MultipleValuesStream};
use common_procedure::store::util::MultipleValuesStream;
use common_procedure::Result as ProcedureResult;
use futures::future::try_join_all;
use futures::StreamExt;
Expand Down Expand Up @@ -166,8 +166,7 @@ impl StateStore for KvStateStore {
.with_context(|_| ListStateSnafu { path })
});

let stream =
MultipleValuesStream::new(Box::pin(stream), Box::new(multiple_values_collector));
let stream = MultipleValuesStream::new(Box::pin(stream));

Ok(Box::pin(stream))
}
Expand Down
23 changes: 8 additions & 15 deletions src/common/procedure/src/store/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn parse_segments(segments: Vec<(String, Vec<u8>)>, prefix: &str) -> Result<Vec<
/// Returns an error if:
/// - Part values are lost.
/// - Failed to parse the key of segment.
pub fn multiple_values_collector(
fn multiple_values_collector(
CollectingState { mut pairs }: CollectingState,
) -> Result<(KeySet, Vec<u8>)> {
if pairs.len() == 1 {
Expand Down Expand Up @@ -106,24 +106,21 @@ enum MultipleValuesStreamState {
End,
}

pub type Collector = dyn Fn(CollectingState) -> Result<(KeySet, Vec<u8>)> + Send;
pub type Upstream = dyn Stream<Item = Result<(String, Vec<u8>)>> + Send;

/// A stream collects multiple values into a single key-value pair.
pub struct MultipleValuesStream {
upstream: Pin<Box<Upstream>>,
state: MultipleValuesStreamState,
collecting: Option<CollectingState>,
collector: Box<Collector>,
}

impl MultipleValuesStream {
pub fn new(upstream: Pin<Box<Upstream>>, collector: Box<Collector>) -> Self {
pub fn new(upstream: Pin<Box<Upstream>>) -> Self {
Self {
upstream,
state: MultipleValuesStreamState::Idle,
collecting: None,
collector,
}
}
}
Expand Down Expand Up @@ -162,15 +159,15 @@ impl Stream for MultipleValuesStream {
self.collecting = Some(CollectingState::new(key, value));
self.state = MultipleValuesStreamState::Collecting;
// Yields the result.
return Poll::Ready(Some((self.collector)(collecting)));
return Poll::Ready(Some(multiple_values_collector(collecting)));
}
}
Ok(None) => {
let collecting =
self.collecting.take().expect("The `collecting` must exist");

self.state = MultipleValuesStreamState::Idle;
return Poll::Ready(Some((self.collector)(collecting)));
return Poll::Ready(Some(multiple_values_collector(collecting)));
}
Err(err) => {
self.state = MultipleValuesStreamState::Idle;
Expand Down Expand Up @@ -216,8 +213,7 @@ mod tests {
Ok(("baz/0001".to_string(), vec![4, 5])),
Ok(("baz/0002".to_string(), vec![6, 7])),
]);
let mut stream =
MultipleValuesStream::new(Box::pin(upstream), Box::new(multiple_values_collector));
let mut stream = MultipleValuesStream::new(Box::pin(upstream));
let (key, value) = stream.try_next().await.unwrap().unwrap();
let keys = key.keys();
assert_eq!(keys[0], "foo");
Expand All @@ -241,8 +237,7 @@ mod tests {
#[tokio::test]
async fn test_empty_upstream() {
let upstream = stream::iter(vec![]);
let mut stream =
MultipleValuesStream::new(Box::pin(upstream), Box::new(multiple_values_collector));
let mut stream = MultipleValuesStream::new(Box::pin(upstream));
assert!(stream.try_next().await.unwrap().is_none());
// Call again
assert!(stream.try_next().await.unwrap().is_none());
Expand All @@ -255,8 +250,7 @@ mod tests {
Ok(("foo".to_string(), vec![0, 1, 2, 3])),
Ok(("foo/0001".to_string(), vec![4, 5])),
]);
let mut stream =
MultipleValuesStream::new(Box::pin(upstream), Box::new(multiple_values_collector));
let mut stream = MultipleValuesStream::new(Box::pin(upstream));
let err = stream.try_next().await.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });

Expand All @@ -265,8 +259,7 @@ mod tests {
Ok(("foo/0001".to_string(), vec![4, 5])),
Err(error::UnexpectedSnafu { err_msg: "mock" }.build()),
]);
let mut stream =
MultipleValuesStream::new(Box::pin(upstream), Box::new(multiple_values_collector));
let mut stream = MultipleValuesStream::new(Box::pin(upstream));
let err = stream.try_next().await.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });
}
Expand Down

0 comments on commit d5e077f

Please sign in to comment.