From 742b75aefe0c158adaeecbd27cff76c84b7bdb92 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 14:41:50 +0900 Subject: [PATCH 01/12] refactor: resolve needless_return --- sdks/rust/src/internals/serialize.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sdks/rust/src/internals/serialize.rs b/sdks/rust/src/internals/serialize.rs index 3415c26b65448..94ea4bdc03490 100644 --- a/sdks/rust/src/internals/serialize.rs +++ b/sdks/rust/src/internals/serialize.rs @@ -25,9 +25,7 @@ pub fn deserialize_fn(name: &String) -> Option<&'static T> None => None, }; - unsafe { - return std::mem::transmute::, Option<&'static T>>(typed); - } + unsafe { std::mem::transmute::, Option<&'static T>>(typed) } } // ******* DoFn Wrappers, perhaps move elsewhere? ******* @@ -51,9 +49,9 @@ impl> Iterator for BoxedIter { fn next(&mut self) -> Option> { if let Some(x) = self.typed_iter.next() { - return Some(Box::new(x)); + Some(Box::new(x)) } else { - return None; + None } } } @@ -108,7 +106,7 @@ impl TypedKeyExtractor { impl KeyExtractor for TypedKeyExtractor { fn extract(&self, kv: &dyn Any) -> (String, Box) { let typed_kv = kv.downcast_ref::<(String, V)>().unwrap(); - return (typed_kv.0.clone(), Box::new(typed_kv.1.clone())); + (typed_kv.0.clone(), Box::new(typed_kv.1.clone())) } fn recombine( &self, @@ -119,6 +117,6 @@ impl KeyExtractor for TypedKeyExtractor { for untyped_value in values.iter() { typed_values.push(untyped_value.downcast_ref::().unwrap().clone()); } - return Box::new((key.clone(), typed_values)); + Box::new((key.clone(), typed_values)) } } From 71409dcd1d92fb1be3dbf44eb668a76f23e804e4 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 14:42:23 +0900 Subject: [PATCH 02/12] refactor: resolve extra_unused_lifetimes --- sdks/rust/src/internals/pipeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/rust/src/internals/pipeline.rs b/sdks/rust/src/internals/pipeline.rs index 36cbaa3ad666d..d861310a1bc34 100644 --- a/sdks/rust/src/internals/pipeline.rs +++ b/sdks/rust/src/internals/pipeline.rs @@ -67,7 +67,7 @@ pub struct Pipeline { coder_proto_counter: Mutex, } -impl<'a> Pipeline { +impl Pipeline { pub fn new(component_prefix: String) -> Self { let proto = proto_pipeline::Pipeline { components: Some(proto_pipeline::Components { From 6de133b10e88bb34eb6980473b4a23031b4fcc05 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 14:42:48 +0900 Subject: [PATCH 03/12] refactor: resolve redundant_clone --- sdks/rust/src/internals/pipeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/rust/src/internals/pipeline.rs b/sdks/rust/src/internals/pipeline.rs index d861310a1bc34..7a3df7e7b15bd 100644 --- a/sdks/rust/src/internals/pipeline.rs +++ b/sdks/rust/src/internals/pipeline.rs @@ -257,7 +257,7 @@ impl Pipeline { .as_mut() .unwrap() .transforms - .insert(transform_id.clone(), transform_proto.clone()); + .insert(transform_id, transform_proto.clone()); drop(pipeline_proto); // TODO: ensure this happens even if an error takes place above From c14893340260d4022668ad1273c6c625aa868c3b Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 14:44:24 +0900 Subject: [PATCH 04/12] refactor: resolve dead_code --- sdks/rust/src/internals/serialize.rs | 2 +- sdks/rust/src/tests/primitives_test.rs | 3 ++- sdks/rust/src/worker/operators.rs | 24 ++++++++++++------------ 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/rust/src/internals/serialize.rs b/sdks/rust/src/internals/serialize.rs index 94ea4bdc03490..4ebe883523d13 100644 --- a/sdks/rust/src/internals/serialize.rs +++ b/sdks/rust/src/internals/serialize.rs @@ -35,7 +35,7 @@ pub type GenericDoFn = Box Box>> + Send + Sync>; struct GenericDoFnWrapper { - func: GenericDoFn, + _func: GenericDoFn, } unsafe impl std::marker::Send for GenericDoFnWrapper {} diff --git a/sdks/rust/src/tests/primitives_test.rs b/sdks/rust/src/tests/primitives_test.rs index 088c775cd8d87..f8429d41eccdc 100644 --- a/sdks/rust/src/tests/primitives_test.rs +++ b/sdks/rust/src/tests/primitives_test.rs @@ -53,7 +53,8 @@ mod tests { .await; } - //#[tokio::test] + #[tokio::test] + #[ignore] #[should_panic] async fn ensure_assert_fails_on_empty() { DirectRunner::new() diff --git a/sdks/rust/src/worker/operators.rs b/sdks/rust/src/worker/operators.rs index 0520287b7cc14..940c207bd1789 100644 --- a/sdks/rust/src/worker/operators.rs +++ b/sdks/rust/src/worker/operators.rs @@ -43,11 +43,11 @@ static OPERATORS_BY_URN: Lazy> = Lazy::new(|| { // Test operators (urns::CREATE_URN, OperatorDiscriminants::Create), (urns::RECORDING_URN, OperatorDiscriminants::Recording), - (urns::PARTITION_URN, OperatorDiscriminants::Partitioning), + (urns::PARTITION_URN, OperatorDiscriminants::_Partitioning), (urns::IMPULSE_URN, OperatorDiscriminants::Impulse), (urns::GROUP_BY_KEY_URN, OperatorDiscriminants::GroupByKey), // Production operators - (urns::DATA_INPUT_URN, OperatorDiscriminants::DataSource), + (urns::DATA_INPUT_URN, OperatorDiscriminants::_DataSource), (urns::PAR_DO_URN, OperatorDiscriminants::ParDo), (urns::FLATTEN_URN, OperatorDiscriminants::Flatten), ]); @@ -79,12 +79,12 @@ pub enum Operator { // Test operators Create(CreateOperator), Recording(RecordingOperator), - Partitioning, + _Partitioning, GroupByKey(GroupByKeyWithinBundleOperator), Impulse(ImpulsePerBundleOperator), // Production operators - DataSource, + _DataSource, ParDo(ParDoOperator), Flatten(FlattenOperator), } @@ -530,10 +530,10 @@ impl std::fmt::Debug for GroupByKeyWithinBundleOperator { // ******* Production Operator definitions ******* pub struct ParDoOperator { - transform_id: Arc, - transform: Arc, - context: Arc, - operator_discriminant: OperatorDiscriminants, + _transform_id: Arc, + _transform: Arc, + _context: Arc, + _operator_discriminant: OperatorDiscriminants, receivers: Vec>, dofn: &'static serialize::GenericDoFn, @@ -562,10 +562,10 @@ impl OperatorI for ParDoOperator { .unwrap(); Self { - transform_id, - transform: transform_proto, - context, - operator_discriminant, + _transform_id: transform_id, + _transform: transform_proto, + _context: context, + _operator_discriminant: operator_discriminant, receivers, dofn, } From 35272b89c3bbba9acb7d1b556fdbf67a0a957e7f Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 14:45:07 +0900 Subject: [PATCH 05/12] refactor: resolve single_char_pattern --- sdks/rust/src/internals/pvalue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/rust/src/internals/pvalue.rs b/sdks/rust/src/internals/pvalue.rs index d0f6a144134bb..de979e79c8dd3 100644 --- a/sdks/rust/src/internals/pvalue.rs +++ b/sdks/rust/src/internals/pvalue.rs @@ -122,7 +122,7 @@ where }, PType::PValueArr => { // TODO: Remove this hack, PValues can have multiple ids. - for (i, id) in pvalue.get_id().split(",").enumerate() { + for (i, id) in pvalue.get_id().split(',').enumerate() { result.insert(i.to_string(), id.to_string()); } } From f5ca4718f8bce7fd0d698b3922429fabd7369e3c Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 14:48:26 +0900 Subject: [PATCH 06/12] refactor: &String -> &str --- sdks/rust/src/internals/serialize.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/rust/src/internals/serialize.rs b/sdks/rust/src/internals/serialize.rs index 4ebe883523d13..e2ae2e0e2841a 100644 --- a/sdks/rust/src/internals/serialize.rs +++ b/sdks/rust/src/internals/serialize.rs @@ -86,7 +86,7 @@ pub trait KeyExtractor: Sync + Send { fn extract(&self, kv: &dyn Any) -> (String, Box); fn recombine( &self, - key: &String, + key: &str, values: &Box>>, ) -> Box; } @@ -110,13 +110,13 @@ impl KeyExtractor for TypedKeyExtractor { } fn recombine( &self, - key: &String, + key: &str, values: &Box>>, ) -> Box { let mut typed_values: Vec = Vec::new(); for untyped_value in values.iter() { typed_values.push(untyped_value.downcast_ref::().unwrap().clone()); } - Box::new((key.clone(), typed_values)) + Box::new((key.to_string(), typed_values)) } } From eebc134c07b81039d0fa3425774539ea9f780195 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 15:10:35 +0900 Subject: [PATCH 07/12] refactor: resolve should_implement_trait --- sdks/rust/src/internals/serialize.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/rust/src/internals/serialize.rs b/sdks/rust/src/internals/serialize.rs index e2ae2e0e2841a..e62152402294d 100644 --- a/sdks/rust/src/internals/serialize.rs +++ b/sdks/rust/src/internals/serialize.rs @@ -95,8 +95,8 @@ pub struct TypedKeyExtractor { phantom_data: PhantomData, } -impl TypedKeyExtractor { - pub fn default() -> Self { +impl Default for TypedKeyExtractor { + fn default() -> Self { Self { phantom_data: PhantomData, } From aed30f7689dab78c45ac48727e8f7872eee5a9e7 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 15:12:27 +0900 Subject: [PATCH 08/12] refactor: resolve useless_vec --- sdks/rust/src/tests/primitives_test.rs | 2 +- sdks/rust/src/transforms/testing.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/rust/src/tests/primitives_test.rs b/sdks/rust/src/tests/primitives_test.rs index f8429d41eccdc..d872d47eefe21 100644 --- a/sdks/rust/src/tests/primitives_test.rs +++ b/sdks/rust/src/tests/primitives_test.rs @@ -100,7 +100,7 @@ mod tests { .run(|root| { let first = root.clone().apply(Create::new(&[1, 2, 3])); let second = root.apply(Create::new(&[100, 200])); - PValue::new_array(&vec![first, second]) + PValue::new_array(&[first, second]) .apply(Flatten::new()) .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200])) }) diff --git a/sdks/rust/src/transforms/testing.rs b/sdks/rust/src/transforms/testing.rs index 6b7ceb464cffe..57aab56575128 100644 --- a/sdks/rust/src/transforms/testing.rs +++ b/sdks/rust/src/transforms/testing.rs @@ -50,7 +50,7 @@ impl PTransform for AssertEqu .clone() .apply(ParDo::from_map(|x: &T| -> Option { Some(x.clone()) })); let expected = self.expected_sorted.clone(); - PValue::new_array(&vec![singleton, actual]) + PValue::new_array(&[singleton, actual]) .apply(ParDo::from_map(|x: &Option| -> KV> { KV::new("".to_string(), x.clone()) })) From e73218d3fde2b362a97549f3b755f16d1780b51d Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 15:13:18 +0900 Subject: [PATCH 09/12] refactor: resolve redundant_clone --- sdks/rust/src/tests/primitives_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/rust/src/tests/primitives_test.rs b/sdks/rust/src/tests/primitives_test.rs index d872d47eefe21..bd4afe236f9ca 100644 --- a/sdks/rust/src/tests/primitives_test.rs +++ b/sdks/rust/src/tests/primitives_test.rs @@ -110,7 +110,7 @@ mod tests { #[tokio::test] async fn run_impulse_expansion() { let p = Arc::new(Pipeline::default()); - let root = PValue::new_root(p.clone()); + let root = PValue::new_root(p); let pcoll = root.apply(Impulse::new()); From 397c8d081720546d749118366591b2657d079e38 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 16:17:45 +0900 Subject: [PATCH 10/12] refactor: resolve unused_variables --- sdks/rust/src/transforms/flatten.rs | 2 +- sdks/rust/src/worker/operators.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/rust/src/transforms/flatten.rs b/sdks/rust/src/transforms/flatten.rs index bc335ada1beac..ce47bccf047b5 100644 --- a/sdks/rust/src/transforms/flatten.rs +++ b/sdks/rust/src/transforms/flatten.rs @@ -36,7 +36,7 @@ impl Flatten { impl PTransform for Flatten { fn expand_internal( &self, - input: &PValue, + _input: &PValue, pipeline: Arc, transform_proto: &mut proto_pipeline::PTransform, ) -> PValue { diff --git a/sdks/rust/src/worker/operators.rs b/sdks/rust/src/worker/operators.rs index 940c207bd1789..13dc606f5708f 100644 --- a/sdks/rust/src/worker/operators.rs +++ b/sdks/rust/src/worker/operators.rs @@ -427,10 +427,10 @@ pub struct ImpulsePerBundleOperator { impl OperatorI for ImpulsePerBundleOperator { fn new( - transform_id: Arc, + _transform_id: Arc, transform: Arc, context: Arc, - operator_discriminant: OperatorDiscriminants, + _operator_discriminant: OperatorDiscriminants, ) -> Self { let receivers = transform .outputs @@ -451,7 +451,7 @@ impl OperatorI for ImpulsePerBundleOperator { } } - fn process(&self, value: &WindowedValue) {} + fn process(&self, _value: &WindowedValue) {} fn finish_bundle(&self) {} } @@ -466,10 +466,10 @@ struct GroupByKeyWithinBundleOperator { impl OperatorI for GroupByKeyWithinBundleOperator { fn new( - transform_id: Arc, + _transform_id: Arc, transform_proto: Arc, context: Arc, - operator_discriminant: OperatorDiscriminants, + _operator_discriminant: OperatorDiscriminants, ) -> Self { // TODO: Shared by all operators, move up? let receivers = transform_proto @@ -599,10 +599,10 @@ pub struct FlattenOperator { impl OperatorI for FlattenOperator { fn new( - transform_id: Arc, + _transform_id: Arc, transform: Arc, context: Arc, - operator_discriminant: OperatorDiscriminants, + _operator_discriminant: OperatorDiscriminants, ) -> Self { let receivers = transform .outputs From 15b7c93bc2548181f64e0d5bb4fd81a47ffc65af Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 5 Apr 2023 16:19:17 +0900 Subject: [PATCH 11/12] refactor: new -> default --- sdks/rust/src/tests/primitives_test.rs | 2 +- sdks/rust/src/transforms/group_by_key.rs | 6 +++--- sdks/rust/src/transforms/testing.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/rust/src/tests/primitives_test.rs b/sdks/rust/src/tests/primitives_test.rs index bd4afe236f9ca..1acf3f736427b 100644 --- a/sdks/rust/src/tests/primitives_test.rs +++ b/sdks/rust/src/tests/primitives_test.rs @@ -85,7 +85,7 @@ mod tests { KV::new("a".to_string(), 2), KV::new("b".to_string(), 3), ])) - .apply(GroupByKey::new()) + .apply(GroupByKey::default()) .apply(AssertEqualUnordered::new(&[ KV::new("a".to_string(), vec![1, 2]), KV::new("b".to_string(), vec![3]), diff --git a/sdks/rust/src/transforms/group_by_key.rs b/sdks/rust/src/transforms/group_by_key.rs index 7a6af0e07a519..7a46875f1531e 100644 --- a/sdks/rust/src/transforms/group_by_key.rs +++ b/sdks/rust/src/transforms/group_by_key.rs @@ -19,8 +19,8 @@ use std::marker::PhantomData; use std::sync::Arc; -use crate::elem_types::ElemType; use crate::elem_types::kv::KV; +use crate::elem_types::ElemType; use crate::internals::pipeline::Pipeline; use crate::internals::pvalue::{PTransform, PValue}; use crate::internals::serialize; @@ -35,8 +35,8 @@ pub struct GroupByKey { } // TODO: Use coders to allow arbitrary keys. -impl GroupByKey { - pub fn new() -> Self { +impl Default for GroupByKey { + fn default() -> Self { Self { payload: serialize::serialize_fn::>(Box::new( Box::new(serialize::TypedKeyExtractor::::default()), diff --git a/sdks/rust/src/transforms/testing.rs b/sdks/rust/src/transforms/testing.rs index 57aab56575128..eddd4390a59a5 100644 --- a/sdks/rust/src/transforms/testing.rs +++ b/sdks/rust/src/transforms/testing.rs @@ -54,7 +54,7 @@ impl PTransform for AssertEqu .apply(ParDo::from_map(|x: &Option| -> KV> { KV::new("".to_string(), x.clone()) })) - .apply(GroupByKey::new()) + .apply(GroupByKey::default()) .apply(ParDo::from_dyn_map(Box::new( move |kvs: &KV>>| { let mut actual: Vec = kvs From 9e06ddc81e34d096cef9cb8bf07a3147fd0afc86 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Fri, 7 Apr 2023 05:14:16 +0900 Subject: [PATCH 12/12] refactor: publicity --- sdks/rust/src/worker/operators.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/rust/src/worker/operators.rs b/sdks/rust/src/worker/operators.rs index 13dc606f5708f..a0fe4a4a6ef34 100644 --- a/sdks/rust/src/worker/operators.rs +++ b/sdks/rust/src/worker/operators.rs @@ -55,7 +55,7 @@ static OPERATORS_BY_URN: Lazy> = Lazy::new(|| { Mutex::new(m) }); -pub trait OperatorI { +pub(crate) trait OperatorI { fn new( transform_id: Arc, transform: Arc, @@ -75,7 +75,7 @@ pub trait OperatorI { } #[derive(fmt::Debug, EnumDiscriminants)] -pub enum Operator { +pub(crate) enum Operator { // Test operators Create(CreateOperator), Recording(RecordingOperator), @@ -156,7 +156,7 @@ impl OperatorI for Operator { } } -pub fn create_operator(transform_id: &str, context: Arc) -> Operator { +pub(crate) fn create_operator(transform_id: &str, context: Arc) -> Operator { let descriptor: &ProcessBundleDescriptor = context.descriptor.as_ref(); let transform = descriptor @@ -228,7 +228,7 @@ pub struct Receiver { } impl Receiver { - pub fn new(operators: Vec>) -> Self { + pub(crate) fn new(operators: Vec>) -> Self { Receiver { operators } } @@ -456,7 +456,7 @@ impl OperatorI for ImpulsePerBundleOperator { fn finish_bundle(&self) {} } -struct GroupByKeyWithinBundleOperator { +pub(crate) struct GroupByKeyWithinBundleOperator { receivers: Vec>, key_extractor: &'static Box, // TODO: Operator requiring locking for structures only ever manipulated in