Skip to content

Commit

Permalink
A mostly naive implementation of select, because of absence for selec…
Browse files Browse the repository at this point in the history
…ting among mpsc channels: rust-lang/rust#27800.
  • Loading branch information
astonbitecode committed Sep 16, 2020
1 parent 1446ea5 commit d8b6256
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
import org.astonbitecode.j4rs.api.invocation.NativeCallbackToRustChannelSupport;

public class J4rsEventHandler<T extends Event> extends NativeCallbackToRustChannelSupport implements EventHandler<T> {
public J4rsEventHandler() {
System.out.println("------NEW---");
}

@Override
public void handle(T event) {
System.out.println("---------");
doCallback(event);
}
}
Binary file modified rust/jassets/j4rs-0.12.1-SNAPSHOT-jar-with-dependencies.jar
Binary file not shown.
104 changes: 97 additions & 7 deletions rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fs, mem};
use std::{fs, mem, thread, time};
use std::any::Any;
use std::convert::TryFrom;
use std::env;
Expand Down Expand Up @@ -1013,6 +1013,44 @@ impl Jvm {
}
}
}

/// Returns the first `Instance` that is available from the passed `InstanceReceiver`s,
/// along with the index of the receiver that was selected and actually returned the instance.
///
/// This is a mostly naive implementation of select, because of [absence for selecting among mpsc channels](https://github.com/rust-lang/rust/issues/27800).
pub fn select(instance_receivers: &[&InstanceReceiver]) -> errors::Result<(usize, Instance)> {
loop {
for (index, ir) in instance_receivers.iter().enumerate() {
let res = ir.rx.try_recv();
if res.is_ok() {
return Ok((index, res.unwrap()));
}
}
thread::yield_now();
}
}

/// Returns the first `Instance` that is available from the passed `InstanceReceiver`s,
/// along with the index of the receiver that was selected and actually returned the instance.
///
/// If there are no instances returned for the duration defined in timeout argument, an error is returned.
///
/// This is a mostly naive implementation of select, because of [absence for selecting among mpsc channels](https://github.com/rust-lang/rust/issues/27800).
pub fn select_timeout(instance_receivers: &[&InstanceReceiver], timeout: &time::Duration) -> errors::Result<(usize, Instance)> {
let start = time::Instant::now();
loop {
for (index, ir) in instance_receivers.iter().enumerate() {
let res = ir.rx.try_recv();
if res.is_ok() {
return Ok((index, res.unwrap()));
}
}
if &start.elapsed() > timeout {
return Err(errors::J4RsError::Timeout);
}
thread::yield_now();
}
}
}

impl Drop for Jvm {
Expand Down Expand Up @@ -1739,11 +1777,13 @@ impl InstanceReceiver {

impl Drop for InstanceReceiver {
fn drop(&mut self) {
debug("Dropping an InstanceReceiver");
let p = self.tx_address as *mut Sender<Instance>;
unsafe {
let tx = Box::from_raw(p);
mem::drop(tx);
if self.tx_address > 0 {
debug("Dropping an InstanceReceiver");
let p = self.tx_address as *mut Sender<Instance>;
unsafe {
let tx = Box::from_raw(p);
mem::drop(tx);
}
}
}
}
Expand Down Expand Up @@ -1867,7 +1907,7 @@ impl<'a> ChainableInstance<'a> {

fn new_with_instance_ref(instance: &Instance, jvm: &'a Jvm) -> errors::Result<ChainableInstance<'a>> {
let cloned = jvm.clone_instance(&instance)?;
Ok(ChainableInstance { instance: cloned, jvm })
Ok(ChainableInstance { instance: cloned, jvm })
}

pub fn collect(self) -> Instance {
Expand Down Expand Up @@ -2020,6 +2060,56 @@ mod api_unit_tests {
let _ = fs_extra::remove_items(&vec![newdir]);
}

#[test]
fn test_select() {
let (tx1, rx1) = channel();
let ir1 = InstanceReceiver::new(rx1, 0);
let (_tx2, rx2) = channel();
let ir2 = InstanceReceiver::new(rx2, 0);
let (tx3, rx3) = channel();
let ir3 = InstanceReceiver::new(rx3, 0);

thread::spawn(move || {
let _ = tx3.send(Instance::new(ptr::null_mut(), CLASS_STRING).unwrap());
// Block the thread as sending does not block the current thread
thread::sleep(time::Duration::from_millis(10));
let _ = tx1.send(Instance::new(ptr::null_mut(), CLASS_STRING).unwrap());
thread::sleep(time::Duration::from_millis(10));
let _ = tx3.send(Instance::new(ptr::null_mut(), CLASS_STRING).unwrap());
});

let (index1, _) = Jvm::select(&[&ir1, &ir2, &ir3]).unwrap();
let (index2, _) = Jvm::select(&[&ir1, &ir2, &ir3]).unwrap();
let (index3, _) = Jvm::select(&[&ir1, &ir2, &ir3]).unwrap();
assert!(index1 == 2);
assert!(index2 == 0);
assert!(index3 == 2);
}

#[test]
fn test_select_timeout() {
let (tx1, rx1) = channel();
let ir1 = InstanceReceiver::new(rx1, 0);
let (tx2, rx2) = channel();
let ir2 = InstanceReceiver::new(rx2, 0);

thread::spawn(move || {
let _ = tx1.send(Instance::new(ptr::null_mut(), CLASS_STRING).unwrap());
// Block the thread as sending does not block the current thread
thread::sleep(time::Duration::from_millis(10));
let _ = tx2.send(Instance::new(ptr::null_mut(), CLASS_STRING).unwrap());
});

let d = time::Duration::from_millis(500);
let (index1, _) = Jvm::select_timeout(&[&ir1, &ir2], &d).unwrap();
let (index2, _) = Jvm::select_timeout(&[&ir1, &ir2], &d).unwrap();
assert!(Jvm::select_timeout(&[&ir1, &ir2], &d).is_err());
dbg!(index1);
dbg!(index2);
assert!(index1 == 0);
assert!(index2 == 1);
}

fn validate_type(ia: InvocationArg, class: &str) {
let b = match ia {
_s @ InvocationArg::Java { .. } => false,
Expand Down
5 changes: 4 additions & 1 deletion rust/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt, result};
use std::convert::Infallible;
use std::error::Error;
use std::ffi::NulError;
use std::io;
use std::sync::{PoisonError, TryLockError};
use std::{fmt, result};

use fs_extra;
use serde_json;
Expand Down Expand Up @@ -46,6 +46,7 @@ pub enum J4RsError {
JniError(String),
RustError(String),
ParseError(String),
Timeout,
}

impl fmt::Display for J4RsError {
Expand All @@ -56,6 +57,7 @@ impl fmt::Display for J4RsError {
&J4RsError::JniError(ref message) => write!(f, "{}", message),
&J4RsError::RustError(ref message) => write!(f, "{}", message),
&J4RsError::ParseError(ref message) => write!(f, "{}", message),
&J4RsError::Timeout => write!(f, "Timeout"),
}
}
}
Expand All @@ -68,6 +70,7 @@ impl Error for J4RsError {
J4RsError::JniError(_) => "A JNI error occured",
J4RsError::RustError(_) => "An error coming from Rust occured",
J4RsError::ParseError(_) => "A parsing error occured",
J4RsError::Timeout => "Timeout",
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions rust/src/jfx.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::TryFrom;
// Copyright 2020 astonbitecode
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,9 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::convert::TryFrom;

use crate::{InstanceReceiver, Jvm, MavenArtifact, Instance, InvocationArg};
use crate::{Instance, InstanceReceiver, InvocationArg, Jvm, MavenArtifact};
use crate::errors;
use crate::errors::J4RsError;

Expand All @@ -39,9 +39,8 @@ impl JavaFxSupport for Jvm {
let fx_callback = self.create_instance(
"org.astonbitecode.j4rs.api.jfx.FxApplicationStartCallback",
&[])?;
let cb = self.init_callback_channel(&fx_callback)?;
self.invoke(&fx_callback, "setCallbackToApplicationAndLaunch", &[])?;
Ok(cb)

self.invoke_to_channel(&fx_callback, "setCallbackToApplicationAndLaunch", &[])
}

fn set_javafx_event_receiver(&self, instance: &Instance, method: &str) -> errors::Result<InstanceReceiver> {
Expand Down Expand Up @@ -88,9 +87,10 @@ fn maven(s: &str, jvm: &Jvm) {

#[cfg(test)]
mod api_unit_tests {
use super::*;
use crate::JvmBuilder;

use super::*;

#[test]
#[should_panic]
fn test_deploy_javafx_dependencies() {
Expand Down
24 changes: 24 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ mod lib_unit_tests {
}
}

// #[test]
// #[ignore]
fn _memory_leaks_invoke_instances_to_channel() {
let jvm: Jvm = super::new_jvm(Vec::new(), Vec::new()).unwrap();
match jvm.create_instance("org.astonbitecode.j4rs.tests.MySecondTest", Vec::new().as_ref()) {
Ok(instance) => {
for i in 0..100000000 {
let instance_receiver = jvm.invoke_to_channel(&instance, "performCallback", &[]).unwrap();
let thousand_millis = time::Duration::from_millis(1000);
let res = instance_receiver.rx().recv_timeout(thousand_millis);
if i % 100000 == 0 {
println!("{}: {}", i, res.is_ok());
}
}
}
Err(error) => {
panic!("ERROR when creating Instance: {:?}", error);
}
}

let thousand_millis = time::Duration::from_millis(1000);
thread::sleep(thousand_millis);
}

#[test]
fn clone_instance() {
let jvm: Jvm = super::new_jvm(vec![ClasspathEntry::new("onemore.jar")], Vec::new()).unwrap();
Expand Down

0 comments on commit d8b6256

Please sign in to comment.