Skip to content

Commit

Permalink
feat(bindings/java): avoid double dispose NativeObject (#4377)
Browse files Browse the repository at this point in the history
* feat(bindings/java): avoid double dispose NativeObject

Signed-off-by: tison <wander4096@gmail.com>

* more

Signed-off-by: tison <wander4096@gmail.com>

* check more

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Mar 19, 2024
1 parent 71ae6cb commit 9a9e69c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 29 deletions.
26 changes: 14 additions & 12 deletions bindings/java/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,27 @@ pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result<Exec
Ok(Executor::Tokio(executor))
}

/// # Safety
/// # Panic
///
/// This function could be only when the lib is loaded.
pub(crate) unsafe fn executor_or_default<'a>(
/// Crash if the executor is disposed.
#[inline]
pub(crate) fn executor_or_default<'a>(
env: &mut JNIEnv<'a>,
executor: *const Executor,
) -> &'a Executor {
if executor.is_null() {
default_executor(env)
} else {
&*executor
) -> Result<&'a Executor> {
unsafe {
if executor.is_null() {
default_executor(env)
} else {
// SAFETY: executor must be valid
Ok(&*executor)
}
}
}

/// # Safety
///
/// This function could be only when the lib is loaded.
unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> &'a Executor {
RUNTIME
.get_or_try_init(|| make_tokio_executor(env, num_cpus::get()))
.expect("default executor must be able to initialize")
unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> Result<&'a Executor> {
RUNTIME.get_or_try_init(|| make_tokio_executor(env, num_cpus::get()))
}
17 changes: 16 additions & 1 deletion bindings/java/src/main/java/org/apache/opendal/NativeObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.opendal;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* NativeObject is the base-class of all OpenDAL classes that have
* a pointer to a native object.
Expand Down Expand Up @@ -54,6 +56,8 @@ public abstract class NativeObject implements AutoCloseable {
NativeLibrary.loadLibrary();
}

private final AtomicBoolean disposed = new AtomicBoolean(false);

/**
* An immutable reference to the value of the underneath pointer pointing
* to some underlying native OpenDAL object.
Expand All @@ -66,7 +70,18 @@ protected NativeObject(long nativeHandle) {

@Override
public void close() {
disposeInternal(nativeHandle);
if (disposed.compareAndSet(false, true)) {
disposeInternal(nativeHandle);
}
}

/**
* Check if the object has been disposed. Useful for defensive programming.
*
* @return if the object has been disposed.
*/
public boolean isDisposed() {
return disposed.get();
}

/**
Expand Down
29 changes: 14 additions & 15 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ fn intern_constructor(
let map = jmap_to_hashmap(env, &map)?;
let mut op = Operator::via_map(scheme, map)?;
if !op.info().full_capability().blocking {
let layer =
unsafe { executor_or_default(env, executor) }.enter_with(BlockingLayer::create)?;
let layer = executor_or_default(env, executor)?.enter_with(BlockingLayer::create)?;
op = op.layer(layer);
}
Ok(Box::into_raw(Box::new(op)) as jlong)
Expand Down Expand Up @@ -129,7 +128,7 @@ fn intern_write(
let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_write(op, path, content).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});
Expand Down Expand Up @@ -172,7 +171,7 @@ fn intern_append(
let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_append(op, path, content).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});
Expand Down Expand Up @@ -212,7 +211,7 @@ fn intern_stat(

let path = jstring_to_string(env, &path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_stat(op, path).await;
complete_future(id, result.map(JValueOwned::Object))
});
Expand Down Expand Up @@ -254,7 +253,7 @@ fn intern_read(

let path = jstring_to_string(env, &path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_read(op, path).await;
complete_future(id, result.map(JValueOwned::Object))
});
Expand Down Expand Up @@ -298,7 +297,7 @@ fn intern_delete(

let path = jstring_to_string(env, &path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_delete(op, path).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});
Expand Down Expand Up @@ -371,7 +370,7 @@ fn intern_create_dir(

let path = jstring_to_string(env, &path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_create_dir(op, path).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});
Expand Down Expand Up @@ -414,7 +413,7 @@ fn intern_copy(
let source_path = jstring_to_string(env, &source_path)?;
let target_path = jstring_to_string(env, &target_path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_copy(op, source_path, target_path).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});
Expand Down Expand Up @@ -457,7 +456,7 @@ fn intern_rename(
let source_path = jstring_to_string(env, &source_path)?;
let target_path = jstring_to_string(env, &target_path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_rename(op, source_path, target_path).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});
Expand Down Expand Up @@ -497,7 +496,7 @@ fn intern_remove_all(

let path = jstring_to_string(env, &path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_remove_all(op, path).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});
Expand Down Expand Up @@ -537,7 +536,7 @@ fn intern_list(

let path = jstring_to_string(env, &path)?;

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_list(op, path).await;
complete_future(id, result.map(JValueOwned::Object))
});
Expand Down Expand Up @@ -594,7 +593,7 @@ fn intern_presign_read(
let path = jstring_to_string(env, &path)?;
let expire = Duration::from_nanos(expire as u64);

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_presign_read(op, path, expire).await;
let mut env = unsafe { get_current_env() };
let result = result.and_then(|req| make_presigned_request(&mut env, req));
Expand Down Expand Up @@ -643,7 +642,7 @@ fn intern_presign_write(
let path = jstring_to_string(env, &path)?;
let expire = Duration::from_nanos(expire as u64);

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_presign_write(op, path, expire).await;
let mut env = unsafe { get_current_env() };
let result = result.and_then(|req| make_presigned_request(&mut env, req));
Expand Down Expand Up @@ -692,7 +691,7 @@ fn intern_presign_stat(
let path = jstring_to_string(env, &path)?;
let expire = Duration::from_nanos(expire as u64);

unsafe { executor_or_default(env, executor) }.spawn(async move {
executor_or_default(env, executor)?.spawn(async move {
let result = do_presign_stat(op, path, expire).await;
let mut env = unsafe { get_current_env() };
let result = result.and_then(|req| make_presigned_request(&mut env, req));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class AsyncExecutorTest {
@Test
void testOperatorWithRetryLayer() {
void testDedicatedTokioExecutor() {
final Map<String, String> conf = new HashMap<>();
conf.put("root", "/opendal/");
final int cores = Runtime.getRuntime().availableProcessors();
Expand All @@ -45,5 +45,11 @@ void testOperatorWithRetryLayer() {
assertThat(op.read(key).join()).isEqualTo(v0);
op.write(key, v1).join();
assertThat(op.read(key).join()).isEqualTo(v1);

assertThat(executor.isDisposed()).isFalse();
executor.close();
assertThat(executor.isDisposed()).isTrue();

// @Cleanup will close executor once more, but we don't crash with the guard.
}
}

0 comments on commit 9a9e69c

Please sign in to comment.