From 9a9e69ce8ce356cf011eb45872c2d7bc4d9f01f2 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 19 Mar 2024 18:39:09 +0800 Subject: [PATCH] feat(bindings/java): avoid double dispose NativeObject (#4377) * feat(bindings/java): avoid double dispose NativeObject Signed-off-by: tison * more Signed-off-by: tison * check more Signed-off-by: tison --------- Signed-off-by: tison --- bindings/java/src/executor.rs | 26 +++++++++-------- .../java/org/apache/opendal/NativeObject.java | 17 ++++++++++- bindings/java/src/operator.rs | 29 +++++++++---------- .../opendal/test/AsyncExecutorTest.java | 8 ++++- 4 files changed, 51 insertions(+), 29 deletions(-) diff --git a/bindings/java/src/executor.rs b/bindings/java/src/executor.rs index 506e86b2335..3fe67f4f7f4 100644 --- a/bindings/java/src/executor.rs +++ b/bindings/java/src/executor.rs @@ -128,25 +128,27 @@ pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result( +/// Crash if the executor is disposed. +#[inline] +pub(crate) fn executor_or_default<'a>( env: &mut JNIEnv<'a>, executor: *const Executor, -) -> &'a Executor { - if executor.is_null() { - default_executor(env) - } else { - &*executor +) -> Result<&'a Executor> { + unsafe { + if executor.is_null() { + default_executor(env) + } else { + // SAFETY: executor must be valid + Ok(&*executor) + } } } /// # Safety /// /// This function could be only when the lib is loaded. -unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> &'a Executor { - RUNTIME - .get_or_try_init(|| make_tokio_executor(env, num_cpus::get())) - .expect("default executor must be able to initialize") +unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> Result<&'a Executor> { + RUNTIME.get_or_try_init(|| make_tokio_executor(env, num_cpus::get())) } diff --git a/bindings/java/src/main/java/org/apache/opendal/NativeObject.java b/bindings/java/src/main/java/org/apache/opendal/NativeObject.java index c3496ea248f..37fc47f5700 100644 --- a/bindings/java/src/main/java/org/apache/opendal/NativeObject.java +++ b/bindings/java/src/main/java/org/apache/opendal/NativeObject.java @@ -19,6 +19,8 @@ package org.apache.opendal; +import java.util.concurrent.atomic.AtomicBoolean; + /** * NativeObject is the base-class of all OpenDAL classes that have * a pointer to a native object. @@ -54,6 +56,8 @@ public abstract class NativeObject implements AutoCloseable { NativeLibrary.loadLibrary(); } + private final AtomicBoolean disposed = new AtomicBoolean(false); + /** * An immutable reference to the value of the underneath pointer pointing * to some underlying native OpenDAL object. @@ -66,7 +70,18 @@ protected NativeObject(long nativeHandle) { @Override public void close() { - disposeInternal(nativeHandle); + if (disposed.compareAndSet(false, true)) { + disposeInternal(nativeHandle); + } + } + + /** + * Check if the object has been disposed. Useful for defensive programming. + * + * @return if the object has been disposed. + */ + public boolean isDisposed() { + return disposed.get(); } /** diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 30ff8b76d9d..80e55b452a8 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -66,8 +66,7 @@ fn intern_constructor( let map = jmap_to_hashmap(env, &map)?; let mut op = Operator::via_map(scheme, map)?; if !op.info().full_capability().blocking { - let layer = - unsafe { executor_or_default(env, executor) }.enter_with(BlockingLayer::create)?; + let layer = executor_or_default(env, executor)?.enter_with(BlockingLayer::create)?; op = op.layer(layer); } Ok(Box::into_raw(Box::new(op)) as jlong) @@ -129,7 +128,7 @@ fn intern_write( let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_write(op, path, content).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -172,7 +171,7 @@ fn intern_append( let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_append(op, path, content).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -212,7 +211,7 @@ fn intern_stat( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_stat(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -254,7 +253,7 @@ fn intern_read( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_read(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -298,7 +297,7 @@ fn intern_delete( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_delete(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -371,7 +370,7 @@ fn intern_create_dir( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_create_dir(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -414,7 +413,7 @@ fn intern_copy( let source_path = jstring_to_string(env, &source_path)?; let target_path = jstring_to_string(env, &target_path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_copy(op, source_path, target_path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -457,7 +456,7 @@ fn intern_rename( let source_path = jstring_to_string(env, &source_path)?; let target_path = jstring_to_string(env, &target_path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_rename(op, source_path, target_path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -497,7 +496,7 @@ fn intern_remove_all( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_remove_all(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -537,7 +536,7 @@ fn intern_list( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_list(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -594,7 +593,7 @@ fn intern_presign_read( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_presign_read(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); @@ -643,7 +642,7 @@ fn intern_presign_write( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_presign_write(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); @@ -692,7 +691,7 @@ fn intern_presign_stat( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_presign_stat(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); diff --git a/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java index 266a7c121c5..6b797435d30 100644 --- a/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java @@ -30,7 +30,7 @@ public class AsyncExecutorTest { @Test - void testOperatorWithRetryLayer() { + void testDedicatedTokioExecutor() { final Map conf = new HashMap<>(); conf.put("root", "/opendal/"); final int cores = Runtime.getRuntime().availableProcessors(); @@ -45,5 +45,11 @@ void testOperatorWithRetryLayer() { assertThat(op.read(key).join()).isEqualTo(v0); op.write(key, v1).join(); assertThat(op.read(key).join()).isEqualTo(v1); + + assertThat(executor.isDisposed()).isFalse(); + executor.close(); + assertThat(executor.isDisposed()).isTrue(); + + // @Cleanup will close executor once more, but we don't crash with the guard. } }