diff --git a/sdks/rust/src/internals/pipeline.rs b/sdks/rust/src/internals/pipeline.rs index 36cbaa3ad666d..7a3df7e7b15bd 100644 --- a/sdks/rust/src/internals/pipeline.rs +++ b/sdks/rust/src/internals/pipeline.rs @@ -67,7 +67,7 @@ pub struct Pipeline { coder_proto_counter: Mutex, } -impl<'a> Pipeline { +impl Pipeline { pub fn new(component_prefix: String) -> Self { let proto = proto_pipeline::Pipeline { components: Some(proto_pipeline::Components { @@ -257,7 +257,7 @@ impl<'a> Pipeline { .as_mut() .unwrap() .transforms - .insert(transform_id.clone(), transform_proto.clone()); + .insert(transform_id, transform_proto.clone()); drop(pipeline_proto); // TODO: ensure this happens even if an error takes place above diff --git a/sdks/rust/src/internals/pvalue.rs b/sdks/rust/src/internals/pvalue.rs index d0f6a144134bb..de979e79c8dd3 100644 --- a/sdks/rust/src/internals/pvalue.rs +++ b/sdks/rust/src/internals/pvalue.rs @@ -122,7 +122,7 @@ where }, PType::PValueArr => { // TODO: Remove this hack, PValues can have multiple ids. - for (i, id) in pvalue.get_id().split(",").enumerate() { + for (i, id) in pvalue.get_id().split(',').enumerate() { result.insert(i.to_string(), id.to_string()); } } diff --git a/sdks/rust/src/internals/serialize.rs b/sdks/rust/src/internals/serialize.rs index 3415c26b65448..e62152402294d 100644 --- a/sdks/rust/src/internals/serialize.rs +++ b/sdks/rust/src/internals/serialize.rs @@ -25,9 +25,7 @@ pub fn deserialize_fn(name: &String) -> Option<&'static T> None => None, }; - unsafe { - return std::mem::transmute::, Option<&'static T>>(typed); - } + unsafe { std::mem::transmute::, Option<&'static T>>(typed) } } // ******* DoFn Wrappers, perhaps move elsewhere? ******* @@ -37,7 +35,7 @@ pub type GenericDoFn = Box Box>> + Send + Sync>; struct GenericDoFnWrapper { - func: GenericDoFn, + _func: GenericDoFn, } unsafe impl std::marker::Send for GenericDoFnWrapper {} @@ -51,9 +49,9 @@ impl> Iterator for BoxedIter { fn next(&mut self) -> Option> { if let Some(x) = self.typed_iter.next() { - return Some(Box::new(x)); + Some(Box::new(x)) } else { - return None; + None } } } @@ -88,7 +86,7 @@ pub trait KeyExtractor: Sync + Send { fn extract(&self, kv: &dyn Any) -> (String, Box); fn recombine( &self, - key: &String, + key: &str, values: &Box>>, ) -> Box; } @@ -97,8 +95,8 @@ pub struct TypedKeyExtractor { phantom_data: PhantomData, } -impl TypedKeyExtractor { - pub fn default() -> Self { +impl Default for TypedKeyExtractor { + fn default() -> Self { Self { phantom_data: PhantomData, } @@ -108,17 +106,17 @@ impl TypedKeyExtractor { impl KeyExtractor for TypedKeyExtractor { fn extract(&self, kv: &dyn Any) -> (String, Box) { let typed_kv = kv.downcast_ref::<(String, V)>().unwrap(); - return (typed_kv.0.clone(), Box::new(typed_kv.1.clone())); + (typed_kv.0.clone(), Box::new(typed_kv.1.clone())) } fn recombine( &self, - key: &String, + key: &str, values: &Box>>, ) -> Box { let mut typed_values: Vec = Vec::new(); for untyped_value in values.iter() { typed_values.push(untyped_value.downcast_ref::().unwrap().clone()); } - return Box::new((key.clone(), typed_values)); + Box::new((key.to_string(), typed_values)) } } diff --git a/sdks/rust/src/tests/primitives_test.rs b/sdks/rust/src/tests/primitives_test.rs index 088c775cd8d87..1acf3f736427b 100644 --- a/sdks/rust/src/tests/primitives_test.rs +++ b/sdks/rust/src/tests/primitives_test.rs @@ -53,7 +53,8 @@ mod tests { .await; } - //#[tokio::test] + #[tokio::test] + #[ignore] #[should_panic] async fn ensure_assert_fails_on_empty() { DirectRunner::new() @@ -84,7 +85,7 @@ mod tests { KV::new("a".to_string(), 2), KV::new("b".to_string(), 3), ])) - .apply(GroupByKey::new()) + .apply(GroupByKey::default()) .apply(AssertEqualUnordered::new(&[ KV::new("a".to_string(), vec![1, 2]), KV::new("b".to_string(), vec![3]), @@ -99,7 +100,7 @@ mod tests { .run(|root| { let first = root.clone().apply(Create::new(&[1, 2, 3])); let second = root.apply(Create::new(&[100, 200])); - PValue::new_array(&vec![first, second]) + PValue::new_array(&[first, second]) .apply(Flatten::new()) .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200])) }) @@ -109,7 +110,7 @@ mod tests { #[tokio::test] async fn run_impulse_expansion() { let p = Arc::new(Pipeline::default()); - let root = PValue::new_root(p.clone()); + let root = PValue::new_root(p); let pcoll = root.apply(Impulse::new()); diff --git a/sdks/rust/src/transforms/flatten.rs b/sdks/rust/src/transforms/flatten.rs index bc335ada1beac..ce47bccf047b5 100644 --- a/sdks/rust/src/transforms/flatten.rs +++ b/sdks/rust/src/transforms/flatten.rs @@ -36,7 +36,7 @@ impl Flatten { impl PTransform for Flatten { fn expand_internal( &self, - input: &PValue, + _input: &PValue, pipeline: Arc, transform_proto: &mut proto_pipeline::PTransform, ) -> PValue { diff --git a/sdks/rust/src/transforms/group_by_key.rs b/sdks/rust/src/transforms/group_by_key.rs index 7a6af0e07a519..7a46875f1531e 100644 --- a/sdks/rust/src/transforms/group_by_key.rs +++ b/sdks/rust/src/transforms/group_by_key.rs @@ -19,8 +19,8 @@ use std::marker::PhantomData; use std::sync::Arc; -use crate::elem_types::ElemType; use crate::elem_types::kv::KV; +use crate::elem_types::ElemType; use crate::internals::pipeline::Pipeline; use crate::internals::pvalue::{PTransform, PValue}; use crate::internals::serialize; @@ -35,8 +35,8 @@ pub struct GroupByKey { } // TODO: Use coders to allow arbitrary keys. -impl GroupByKey { - pub fn new() -> Self { +impl Default for GroupByKey { + fn default() -> Self { Self { payload: serialize::serialize_fn::>(Box::new( Box::new(serialize::TypedKeyExtractor::::default()), diff --git a/sdks/rust/src/transforms/testing.rs b/sdks/rust/src/transforms/testing.rs index 6b7ceb464cffe..eddd4390a59a5 100644 --- a/sdks/rust/src/transforms/testing.rs +++ b/sdks/rust/src/transforms/testing.rs @@ -50,11 +50,11 @@ impl PTransform for AssertEqu .clone() .apply(ParDo::from_map(|x: &T| -> Option { Some(x.clone()) })); let expected = self.expected_sorted.clone(); - PValue::new_array(&vec![singleton, actual]) + PValue::new_array(&[singleton, actual]) .apply(ParDo::from_map(|x: &Option| -> KV> { KV::new("".to_string(), x.clone()) })) - .apply(GroupByKey::new()) + .apply(GroupByKey::default()) .apply(ParDo::from_dyn_map(Box::new( move |kvs: &KV>>| { let mut actual: Vec = kvs diff --git a/sdks/rust/src/worker/operators.rs b/sdks/rust/src/worker/operators.rs index 0520287b7cc14..a0fe4a4a6ef34 100644 --- a/sdks/rust/src/worker/operators.rs +++ b/sdks/rust/src/worker/operators.rs @@ -43,11 +43,11 @@ static OPERATORS_BY_URN: Lazy> = Lazy::new(|| { // Test operators (urns::CREATE_URN, OperatorDiscriminants::Create), (urns::RECORDING_URN, OperatorDiscriminants::Recording), - (urns::PARTITION_URN, OperatorDiscriminants::Partitioning), + (urns::PARTITION_URN, OperatorDiscriminants::_Partitioning), (urns::IMPULSE_URN, OperatorDiscriminants::Impulse), (urns::GROUP_BY_KEY_URN, OperatorDiscriminants::GroupByKey), // Production operators - (urns::DATA_INPUT_URN, OperatorDiscriminants::DataSource), + (urns::DATA_INPUT_URN, OperatorDiscriminants::_DataSource), (urns::PAR_DO_URN, OperatorDiscriminants::ParDo), (urns::FLATTEN_URN, OperatorDiscriminants::Flatten), ]); @@ -55,7 +55,7 @@ static OPERATORS_BY_URN: Lazy> = Lazy::new(|| { Mutex::new(m) }); -pub trait OperatorI { +pub(crate) trait OperatorI { fn new( transform_id: Arc, transform: Arc, @@ -75,16 +75,16 @@ pub trait OperatorI { } #[derive(fmt::Debug, EnumDiscriminants)] -pub enum Operator { +pub(crate) enum Operator { // Test operators Create(CreateOperator), Recording(RecordingOperator), - Partitioning, + _Partitioning, GroupByKey(GroupByKeyWithinBundleOperator), Impulse(ImpulsePerBundleOperator), // Production operators - DataSource, + _DataSource, ParDo(ParDoOperator), Flatten(FlattenOperator), } @@ -156,7 +156,7 @@ impl OperatorI for Operator { } } -pub fn create_operator(transform_id: &str, context: Arc) -> Operator { +pub(crate) fn create_operator(transform_id: &str, context: Arc) -> Operator { let descriptor: &ProcessBundleDescriptor = context.descriptor.as_ref(); let transform = descriptor @@ -228,7 +228,7 @@ pub struct Receiver { } impl Receiver { - pub fn new(operators: Vec>) -> Self { + pub(crate) fn new(operators: Vec>) -> Self { Receiver { operators } } @@ -427,10 +427,10 @@ pub struct ImpulsePerBundleOperator { impl OperatorI for ImpulsePerBundleOperator { fn new( - transform_id: Arc, + _transform_id: Arc, transform: Arc, context: Arc, - operator_discriminant: OperatorDiscriminants, + _operator_discriminant: OperatorDiscriminants, ) -> Self { let receivers = transform .outputs @@ -451,12 +451,12 @@ impl OperatorI for ImpulsePerBundleOperator { } } - fn process(&self, value: &WindowedValue) {} + fn process(&self, _value: &WindowedValue) {} fn finish_bundle(&self) {} } -struct GroupByKeyWithinBundleOperator { +pub(crate) struct GroupByKeyWithinBundleOperator { receivers: Vec>, key_extractor: &'static Box, // TODO: Operator requiring locking for structures only ever manipulated in @@ -466,10 +466,10 @@ struct GroupByKeyWithinBundleOperator { impl OperatorI for GroupByKeyWithinBundleOperator { fn new( - transform_id: Arc, + _transform_id: Arc, transform_proto: Arc, context: Arc, - operator_discriminant: OperatorDiscriminants, + _operator_discriminant: OperatorDiscriminants, ) -> Self { // TODO: Shared by all operators, move up? let receivers = transform_proto @@ -530,10 +530,10 @@ impl std::fmt::Debug for GroupByKeyWithinBundleOperator { // ******* Production Operator definitions ******* pub struct ParDoOperator { - transform_id: Arc, - transform: Arc, - context: Arc, - operator_discriminant: OperatorDiscriminants, + _transform_id: Arc, + _transform: Arc, + _context: Arc, + _operator_discriminant: OperatorDiscriminants, receivers: Vec>, dofn: &'static serialize::GenericDoFn, @@ -562,10 +562,10 @@ impl OperatorI for ParDoOperator { .unwrap(); Self { - transform_id, - transform: transform_proto, - context, - operator_discriminant, + _transform_id: transform_id, + _transform: transform_proto, + _context: context, + _operator_discriminant: operator_discriminant, receivers, dofn, } @@ -599,10 +599,10 @@ pub struct FlattenOperator { impl OperatorI for FlattenOperator { fn new( - transform_id: Arc, + _transform_id: Arc, transform: Arc, context: Arc, - operator_discriminant: OperatorDiscriminants, + _operator_discriminant: OperatorDiscriminants, ) -> Self { let receivers = transform .outputs