Skip to content

Commit

Permalink
Merge pull request apache#13 from laysakura/feat/ParDo
Browse files Browse the repository at this point in the history
feat: rename ParDo::from_*() functions and add doc comments
  • Loading branch information
laysakura authored Apr 12, 2023
2 parents 4ba3711 + 5cd5114 commit f7df217
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
6 changes: 3 additions & 3 deletions sdks/rust/src/transforms/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl<E: ElemType> PTransform<Never, E> for Create<E> {
input
.clone()
.apply(Impulse::new())
.apply(ParDo::from_dyn_flat_map(Box::new(move |_x| -> Vec<E> {
elements.to_vec()
})))
.apply(ParDo::from_flatmap_with_context(Box::new(
move |_x| -> Vec<E> { elements.to_vec() },
)))
}
}
23 changes: 17 additions & 6 deletions sdks/rust/src/transforms/pardo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,27 @@ pub struct ParDo<T, O> {
// TODO: Is the Sync + Send stuff needed?
impl<T: 'static, O: 'static> ParDo<T, O> {
// TODO: These should correspond to methods on PCollection<T> (but not on PValue).
/// Creates a ParDo from a function that maps a single input element to a single output element.
/// The function must not have any context. See [Understanding Closures in Rust](https://medium.com/swlh/understanding-closures-in-rust-21f286ed1759) for detail.
pub fn from_map(func: fn(&T) -> O) -> Self {
Self::from_dyn_map(Box::new(func))
Self::from_map_with_context(Box::new(func))
}
pub fn from_dyn_map(func: Box<dyn Fn(&T) -> O + Send + Sync>) -> Self {
Self::from_dyn_flat_map(Box::new(move |x: &T| -> Vec<O> { vec![func(x)] }))

/// Creates a ParDo from a function that maps a single input element to a single output element.
/// The function may have an immutable context. See [Understanding Closures in Rust](https://medium.com/swlh/understanding-closures-in-rust-21f286ed1759) for detail.
pub fn from_map_with_context(func: Box<dyn Fn(&T) -> O + Send + Sync>) -> Self {
Self::from_flatmap_with_context(Box::new(move |x: &T| -> Vec<O> { vec![func(x)] }))
}
pub fn from_flat_map<I: IntoIterator<Item = O> + 'static>(func: fn(&T) -> I) -> Self {
Self::from_dyn_flat_map(Box::new(func))

/// Creates a ParDo from a function that maps a single input element to a collection of output elements.
/// The function must not have any context. See [Understanding Closures in Rust](https://medium.com/swlh/understanding-closures-in-rust-21f286ed1759) for detail.
pub fn from_flatmap<I: IntoIterator<Item = O> + 'static>(func: fn(&T) -> I) -> Self {
Self::from_flatmap_with_context(Box::new(func))
}
pub fn from_dyn_flat_map<I: IntoIterator<Item = O> + 'static>(

/// Creates a ParDo from a function that maps a single input element to a collection of output elements.
/// The function may have an immutable context. See [Understanding Closures in Rust](https://medium.com/swlh/understanding-closures-in-rust-21f286ed1759) for detail.
pub fn from_flatmap_with_context<I: IntoIterator<Item = O> + 'static>(
func: Box<dyn Fn(&T) -> I + Send + Sync>,
) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion sdks/rust/src/transforms/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<T: ElemType + PartialEq + Ord + fmt::Debug> PTransform<T, ()> for AssertEqu
KV::new("".to_string(), x.clone())
}))
.apply(GroupByKey::default())
.apply(ParDo::from_dyn_map(Box::new(
.apply(ParDo::from_map_with_context(Box::new(
move |kvs: &KV<String, Vec<Option<T>>>| {
let mut actual: Vec<T> = kvs
.as_values()
Expand Down

0 comments on commit f7df217

Please sign in to comment.