Skip to content

Commit

Permalink
Merge pull request apache#2 from laysakura/refactor/rm-warnings
Browse files Browse the repository at this point in the history
refactor: resolve some warnings
  • Loading branch information
laysakura authored Apr 6, 2023
2 parents 60933bb + 9e06ddc commit 4bfdd26
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 49 deletions.
4 changes: 2 additions & 2 deletions sdks/rust/src/internals/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct Pipeline {
coder_proto_counter: Mutex<usize>,
}

impl<'a> Pipeline {
impl Pipeline {
pub fn new(component_prefix: String) -> Self {
let proto = proto_pipeline::Pipeline {
components: Some(proto_pipeline::Components {
Expand Down Expand Up @@ -257,7 +257,7 @@ impl<'a> Pipeline {
.as_mut()
.unwrap()
.transforms
.insert(transform_id.clone(), transform_proto.clone());
.insert(transform_id, transform_proto.clone());
drop(pipeline_proto);

// TODO: ensure this happens even if an error takes place above
Expand Down
2 changes: 1 addition & 1 deletion sdks/rust/src/internals/pvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ where
},
PType::PValueArr => {
// TODO: Remove this hack, PValues can have multiple ids.
for (i, id) in pvalue.get_id().split(",").enumerate() {
for (i, id) in pvalue.get_id().split(',').enumerate() {
result.insert(i.to_string(), id.to_string());
}
}
Expand Down
22 changes: 10 additions & 12 deletions sdks/rust/src/internals/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ pub fn deserialize_fn<T: Any + Sync + Send>(name: &String) -> Option<&'static T>
None => None,
};

unsafe {
return std::mem::transmute::<Option<&T>, Option<&'static T>>(typed);
}
unsafe { std::mem::transmute::<Option<&T>, Option<&'static T>>(typed) }
}

// ******* DoFn Wrappers, perhaps move elsewhere? *******
Expand All @@ -37,7 +35,7 @@ pub type GenericDoFn =
Box<dyn Fn(&dyn Any) -> Box<dyn Iterator<Item = Box<dyn Any>>> + Send + Sync>;

struct GenericDoFnWrapper {
func: GenericDoFn,
_func: GenericDoFn,
}

unsafe impl std::marker::Send for GenericDoFnWrapper {}
Expand All @@ -51,9 +49,9 @@ impl<O: Any, I: IntoIterator<Item = O>> Iterator for BoxedIter<O, I> {

fn next(&mut self) -> Option<Box<dyn Any>> {
if let Some(x) = self.typed_iter.next() {
return Some(Box::new(x));
Some(Box::new(x))
} else {
return None;
None
}
}
}
Expand Down Expand Up @@ -88,7 +86,7 @@ pub trait KeyExtractor: Sync + Send {
fn extract(&self, kv: &dyn Any) -> (String, Box<dyn Any + Sync + Send>);
fn recombine(
&self,
key: &String,
key: &str,
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
) -> Box<dyn Any + Sync + Send>;
}
Expand All @@ -97,8 +95,8 @@ pub struct TypedKeyExtractor<V: Clone + Sync + Send + 'static> {
phantom_data: PhantomData<V>,
}

impl<V: Clone + Sync + Send + 'static> TypedKeyExtractor<V> {
pub fn default() -> Self {
impl<V: Clone + Sync + Send + 'static> Default for TypedKeyExtractor<V> {
fn default() -> Self {
Self {
phantom_data: PhantomData,
}
Expand All @@ -108,17 +106,17 @@ impl<V: Clone + Sync + Send + 'static> TypedKeyExtractor<V> {
impl<V: Clone + Sync + Send + 'static> KeyExtractor for TypedKeyExtractor<V> {
fn extract(&self, kv: &dyn Any) -> (String, Box<dyn Any + Sync + Send>) {
let typed_kv = kv.downcast_ref::<(String, V)>().unwrap();
return (typed_kv.0.clone(), Box::new(typed_kv.1.clone()));
(typed_kv.0.clone(), Box::new(typed_kv.1.clone()))
}
fn recombine(
&self,
key: &String,
key: &str,
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
) -> Box<dyn Any + Sync + Send> {
let mut typed_values: Vec<V> = Vec::new();
for untyped_value in values.iter() {
typed_values.push(untyped_value.downcast_ref::<V>().unwrap().clone());
}
return Box::new((key.clone(), typed_values));
Box::new((key.to_string(), typed_values))
}
}
9 changes: 5 additions & 4 deletions sdks/rust/src/tests/primitives_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ mod tests {
.await;
}

//#[tokio::test]
#[tokio::test]
#[ignore]
#[should_panic]
async fn ensure_assert_fails_on_empty() {
DirectRunner::new()
Expand Down Expand Up @@ -84,7 +85,7 @@ mod tests {
KV::new("a".to_string(), 2),
KV::new("b".to_string(), 3),
]))
.apply(GroupByKey::new())
.apply(GroupByKey::default())
.apply(AssertEqualUnordered::new(&[
KV::new("a".to_string(), vec![1, 2]),
KV::new("b".to_string(), vec![3]),
Expand All @@ -99,7 +100,7 @@ mod tests {
.run(|root| {
let first = root.clone().apply(Create::new(&[1, 2, 3]));
let second = root.apply(Create::new(&[100, 200]));
PValue::new_array(&vec![first, second])
PValue::new_array(&[first, second])
.apply(Flatten::new())
.apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))
})
Expand All @@ -109,7 +110,7 @@ mod tests {
#[tokio::test]
async fn run_impulse_expansion() {
let p = Arc::new(Pipeline::default());
let root = PValue::new_root(p.clone());
let root = PValue::new_root(p);

let pcoll = root.apply(Impulse::new());

Expand Down
2 changes: 1 addition & 1 deletion sdks/rust/src/transforms/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Flatten {
impl<E: ElemType> PTransform<E, E> for Flatten {
fn expand_internal(
&self,
input: &PValue<E>,
_input: &PValue<E>,
pipeline: Arc<Pipeline>,
transform_proto: &mut proto_pipeline::PTransform,
) -> PValue<E> {
Expand Down
6 changes: 3 additions & 3 deletions sdks/rust/src/transforms/group_by_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
use std::marker::PhantomData;
use std::sync::Arc;

use crate::elem_types::ElemType;
use crate::elem_types::kv::KV;
use crate::elem_types::ElemType;
use crate::internals::pipeline::Pipeline;
use crate::internals::pvalue::{PTransform, PValue};
use crate::internals::serialize;
Expand All @@ -35,8 +35,8 @@ pub struct GroupByKey<K, V> {
}

// TODO: Use coders to allow arbitrary keys.
impl<V: ElemType> GroupByKey<String, V> {
pub fn new() -> Self {
impl<V: ElemType> Default for GroupByKey<String, V> {
fn default() -> Self {
Self {
payload: serialize::serialize_fn::<Box<dyn serialize::KeyExtractor>>(Box::new(
Box::new(serialize::TypedKeyExtractor::<V>::default()),
Expand Down
4 changes: 2 additions & 2 deletions sdks/rust/src/transforms/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ impl<T: ElemType + PartialEq + Ord + fmt::Debug> PTransform<T, ()> for AssertEqu
.clone()
.apply(ParDo::from_map(|x: &T| -> Option<T> { Some(x.clone()) }));
let expected = self.expected_sorted.clone();
PValue::new_array(&vec![singleton, actual])
PValue::new_array(&[singleton, actual])
.apply(ParDo::from_map(|x: &Option<T>| -> KV<String, Option<T>> {
KV::new("".to_string(), x.clone())
}))
.apply(GroupByKey::new())
.apply(GroupByKey::default())
.apply(ParDo::from_dyn_map(Box::new(
move |kvs: &KV<String, Vec<Option<T>>>| {
let mut actual: Vec<T> = kvs
Expand Down
48 changes: 24 additions & 24 deletions sdks/rust/src/worker/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ static OPERATORS_BY_URN: Lazy<Mutex<OperatorMap>> = Lazy::new(|| {
// Test operators
(urns::CREATE_URN, OperatorDiscriminants::Create),
(urns::RECORDING_URN, OperatorDiscriminants::Recording),
(urns::PARTITION_URN, OperatorDiscriminants::Partitioning),
(urns::PARTITION_URN, OperatorDiscriminants::_Partitioning),
(urns::IMPULSE_URN, OperatorDiscriminants::Impulse),
(urns::GROUP_BY_KEY_URN, OperatorDiscriminants::GroupByKey),
// Production operators
(urns::DATA_INPUT_URN, OperatorDiscriminants::DataSource),
(urns::DATA_INPUT_URN, OperatorDiscriminants::_DataSource),
(urns::PAR_DO_URN, OperatorDiscriminants::ParDo),
(urns::FLATTEN_URN, OperatorDiscriminants::Flatten),
]);

Mutex::new(m)
});

pub trait OperatorI {
pub(crate) trait OperatorI {
fn new(
transform_id: Arc<String>,
transform: Arc<PTransform>,
Expand All @@ -75,16 +75,16 @@ pub trait OperatorI {
}

#[derive(fmt::Debug, EnumDiscriminants)]
pub enum Operator {
pub(crate) enum Operator {
// Test operators
Create(CreateOperator),
Recording(RecordingOperator),
Partitioning,
_Partitioning,
GroupByKey(GroupByKeyWithinBundleOperator),
Impulse(ImpulsePerBundleOperator),

// Production operators
DataSource,
_DataSource,
ParDo(ParDoOperator),
Flatten(FlattenOperator),
}
Expand Down Expand Up @@ -156,7 +156,7 @@ impl OperatorI for Operator {
}
}

pub fn create_operator(transform_id: &str, context: Arc<OperatorContext>) -> Operator {
pub(crate) fn create_operator(transform_id: &str, context: Arc<OperatorContext>) -> Operator {
let descriptor: &ProcessBundleDescriptor = context.descriptor.as_ref();

let transform = descriptor
Expand Down Expand Up @@ -228,7 +228,7 @@ pub struct Receiver {
}

impl Receiver {
pub fn new(operators: Vec<Arc<Operator>>) -> Self {
pub(crate) fn new(operators: Vec<Arc<Operator>>) -> Self {
Receiver { operators }
}

Expand Down Expand Up @@ -427,10 +427,10 @@ pub struct ImpulsePerBundleOperator {

impl OperatorI for ImpulsePerBundleOperator {
fn new(
transform_id: Arc<String>,
_transform_id: Arc<String>,
transform: Arc<PTransform>,
context: Arc<OperatorContext>,
operator_discriminant: OperatorDiscriminants,
_operator_discriminant: OperatorDiscriminants,
) -> Self {
let receivers = transform
.outputs
Expand All @@ -451,12 +451,12 @@ impl OperatorI for ImpulsePerBundleOperator {
}
}

fn process(&self, value: &WindowedValue) {}
fn process(&self, _value: &WindowedValue) {}

fn finish_bundle(&self) {}
}

struct GroupByKeyWithinBundleOperator {
pub(crate) struct GroupByKeyWithinBundleOperator {
receivers: Vec<Arc<Receiver>>,
key_extractor: &'static Box<dyn serialize::KeyExtractor>,
// TODO: Operator requiring locking for structures only ever manipulated in
Expand All @@ -466,10 +466,10 @@ struct GroupByKeyWithinBundleOperator {

impl OperatorI for GroupByKeyWithinBundleOperator {
fn new(
transform_id: Arc<String>,
_transform_id: Arc<String>,
transform_proto: Arc<PTransform>,
context: Arc<OperatorContext>,
operator_discriminant: OperatorDiscriminants,
_operator_discriminant: OperatorDiscriminants,
) -> Self {
// TODO: Shared by all operators, move up?
let receivers = transform_proto
Expand Down Expand Up @@ -530,10 +530,10 @@ impl std::fmt::Debug for GroupByKeyWithinBundleOperator {
// ******* Production Operator definitions *******

pub struct ParDoOperator {
transform_id: Arc<String>,
transform: Arc<PTransform>,
context: Arc<OperatorContext>,
operator_discriminant: OperatorDiscriminants,
_transform_id: Arc<String>,
_transform: Arc<PTransform>,
_context: Arc<OperatorContext>,
_operator_discriminant: OperatorDiscriminants,

receivers: Vec<Arc<Receiver>>,
dofn: &'static serialize::GenericDoFn,
Expand Down Expand Up @@ -562,10 +562,10 @@ impl OperatorI for ParDoOperator {
.unwrap();

Self {
transform_id,
transform: transform_proto,
context,
operator_discriminant,
_transform_id: transform_id,
_transform: transform_proto,
_context: context,
_operator_discriminant: operator_discriminant,
receivers,
dofn,
}
Expand Down Expand Up @@ -599,10 +599,10 @@ pub struct FlattenOperator {

impl OperatorI for FlattenOperator {
fn new(
transform_id: Arc<String>,
_transform_id: Arc<String>,
transform: Arc<PTransform>,
context: Arc<OperatorContext>,
operator_discriminant: OperatorDiscriminants,
_operator_discriminant: OperatorDiscriminants,
) -> Self {
let receivers = transform
.outputs
Expand Down

0 comments on commit 4bfdd26

Please sign in to comment.