Skip to content

Commit

Permalink
Merge pull request apache#7 from dahlbaek/bug/test-failures
Browse files Browse the repository at this point in the history
Issue apache#3: Fix test failures
  • Loading branch information
laysakura authored Apr 9, 2023
2 parents 4bfdd26 + 9f2d985 commit 28b838e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
4 changes: 2 additions & 2 deletions sdks/rust/src/elem_types/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::{cmp, fmt};

#[derive(Clone, PartialEq, Eq, Debug)]
pub struct KV<K, V> {
k: K,
v: V,
pub k: K,
pub v: V,
}

impl<K, V> KV<K, V>
Expand Down
23 changes: 16 additions & 7 deletions sdks/rust/src/internals/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ use std::sync::Mutex;

use once_cell::sync::Lazy;

use crate::elem_types::kv::KV;

static SERIALIZED_FNS: Lazy<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn serialize_fn<T: Any + Sync + Send>(obj: Box<T>) -> String {
let name = format!("object{}", SERIALIZED_FNS.lock().unwrap().len());
SERIALIZED_FNS.lock().unwrap().insert(name.to_string(), obj);
let mut serialized_fns = SERIALIZED_FNS.lock().unwrap();
let name = format!("object{}", serialized_fns.len());
serialized_fns.insert(name.to_string(), obj);
name
}

Expand Down Expand Up @@ -83,7 +86,7 @@ pub fn to_generic_dofn_dyn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(
}

pub trait KeyExtractor: Sync + Send {
fn extract(&self, kv: &dyn Any) -> (String, Box<dyn Any + Sync + Send>);
fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>>;
fn recombine(
&self,
key: &str,
Expand All @@ -104,9 +107,12 @@ impl<V: Clone + Sync + Send + 'static> Default for TypedKeyExtractor<V> {
}

impl<V: Clone + Sync + Send + 'static> KeyExtractor for TypedKeyExtractor<V> {
fn extract(&self, kv: &dyn Any) -> (String, Box<dyn Any + Sync + Send>) {
let typed_kv = kv.downcast_ref::<(String, V)>().unwrap();
(typed_kv.0.clone(), Box::new(typed_kv.1.clone()))
fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>> {
let typed_kv = kv.downcast_ref::<KV<String, V>>().unwrap();
KV {
k: typed_kv.k.clone(),
v: Box::new(typed_kv.v.clone()),
}
}
fn recombine(
&self,
Expand All @@ -117,6 +123,9 @@ impl<V: Clone + Sync + Send + 'static> KeyExtractor for TypedKeyExtractor<V> {
for untyped_value in values.iter() {
typed_values.push(untyped_value.downcast_ref::<V>().unwrap().clone());
}
Box::new((key.to_string(), typed_values))
Box::new(KV {
k: key.to_string(),
v: typed_values,
})
}
}
3 changes: 0 additions & 3 deletions sdks/rust/src/tests/primitives_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ mod tests {
.await;
}

// TODO: Enabling these tests seem to cause random failures in other
// tests in this file.
#[tokio::test]
#[should_panic]
// This tests that AssertEqualUnordered is actually doing its job.
Expand All @@ -54,7 +52,6 @@ mod tests {
}

#[tokio::test]
#[ignore]
#[should_panic]
async fn ensure_assert_fails_on_empty() {
DirectRunner::new()
Expand Down
3 changes: 2 additions & 1 deletion sdks/rust/src/worker/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::{Arc, Mutex};
use once_cell::sync::Lazy;
use serde_json;

use crate::elem_types::kv::KV;
use crate::internals::serialize;
use crate::internals::urns;
use crate::proto::beam_api::fn_execution::ProcessBundleDescriptor;
Expand Down Expand Up @@ -500,7 +501,7 @@ impl OperatorI for GroupByKeyWithinBundleOperator {
fn process(&self, element: &WindowedValue) {
// TODO: assumes global window
let untyped_value: &dyn Any = &*element.value;
let (key, value) = self.key_extractor.extract(untyped_value);
let KV { k: key, v: value } = self.key_extractor.extract(untyped_value);
let mut grouped_values = self.grouped_values.lock().unwrap();
if !grouped_values.contains_key(&key) {
grouped_values.insert(key.clone(), Box::default());
Expand Down

0 comments on commit 28b838e

Please sign in to comment.