From f5250ba7985353fbf71c4ec2c5a8bd4775bbb822 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 18 Mar 2024 18:15:28 +0800 Subject: [PATCH 1/3] feat(bindings/java): avoid double dispose NativeObject Signed-off-by: tison --- .../src/main/java/org/apache/opendal/NativeObject.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bindings/java/src/main/java/org/apache/opendal/NativeObject.java b/bindings/java/src/main/java/org/apache/opendal/NativeObject.java index c3496ea248f..bd9c926e5f0 100644 --- a/bindings/java/src/main/java/org/apache/opendal/NativeObject.java +++ b/bindings/java/src/main/java/org/apache/opendal/NativeObject.java @@ -19,6 +19,8 @@ package org.apache.opendal; +import java.util.concurrent.atomic.AtomicBoolean; + /** * NativeObject is the base-class of all OpenDAL classes that have * a pointer to a native object. @@ -54,6 +56,8 @@ public abstract class NativeObject implements AutoCloseable { NativeLibrary.loadLibrary(); } + private final AtomicBoolean disposed = new AtomicBoolean(false); + /** * An immutable reference to the value of the underneath pointer pointing * to some underlying native OpenDAL object. @@ -66,7 +70,9 @@ protected NativeObject(long nativeHandle) { @Override public void close() { - disposeInternal(nativeHandle); + if (disposed.compareAndSet(false, true)) { + disposeInternal(nativeHandle); + } } /** From b2ffc0948e38660b80a1aa29def01a069aee77db Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 18 Mar 2024 19:08:12 +0800 Subject: [PATCH 2/3] more Signed-off-by: tison --- bindings/java/src/executor.rs | 26 +++++++++-------- .../java/org/apache/opendal/NativeObject.java | 9 ++++++ bindings/java/src/operator.rs | 29 +++++++++---------- .../opendal/test/AsyncExecutorTest.java | 8 +++-- 4 files changed, 43 insertions(+), 29 deletions(-) diff --git a/bindings/java/src/executor.rs b/bindings/java/src/executor.rs index 506e86b2335..3fe67f4f7f4 100644 --- a/bindings/java/src/executor.rs +++ b/bindings/java/src/executor.rs @@ -128,25 +128,27 @@ pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result( +/// Crash if the executor is disposed. +#[inline] +pub(crate) fn executor_or_default<'a>( env: &mut JNIEnv<'a>, executor: *const Executor, -) -> &'a Executor { - if executor.is_null() { - default_executor(env) - } else { - &*executor +) -> Result<&'a Executor> { + unsafe { + if executor.is_null() { + default_executor(env) + } else { + // SAFETY: executor must be valid + Ok(&*executor) + } } } /// # Safety /// /// This function could be only when the lib is loaded. -unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> &'a Executor { - RUNTIME - .get_or_try_init(|| make_tokio_executor(env, num_cpus::get())) - .expect("default executor must be able to initialize") +unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> Result<&'a Executor> { + RUNTIME.get_or_try_init(|| make_tokio_executor(env, num_cpus::get())) } diff --git a/bindings/java/src/main/java/org/apache/opendal/NativeObject.java b/bindings/java/src/main/java/org/apache/opendal/NativeObject.java index bd9c926e5f0..37fc47f5700 100644 --- a/bindings/java/src/main/java/org/apache/opendal/NativeObject.java +++ b/bindings/java/src/main/java/org/apache/opendal/NativeObject.java @@ -75,6 +75,15 @@ public void close() { } } + /** + * Check if the object has been disposed. Useful for defensive programming. + * + * @return if the object has been disposed. + */ + public boolean isDisposed() { + return disposed.get(); + } + /** * Deletes underlying native object pointer. * diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 30ff8b76d9d..80e55b452a8 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -66,8 +66,7 @@ fn intern_constructor( let map = jmap_to_hashmap(env, &map)?; let mut op = Operator::via_map(scheme, map)?; if !op.info().full_capability().blocking { - let layer = - unsafe { executor_or_default(env, executor) }.enter_with(BlockingLayer::create)?; + let layer = executor_or_default(env, executor)?.enter_with(BlockingLayer::create)?; op = op.layer(layer); } Ok(Box::into_raw(Box::new(op)) as jlong) @@ -129,7 +128,7 @@ fn intern_write( let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_write(op, path, content).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -172,7 +171,7 @@ fn intern_append( let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_append(op, path, content).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -212,7 +211,7 @@ fn intern_stat( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_stat(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -254,7 +253,7 @@ fn intern_read( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_read(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -298,7 +297,7 @@ fn intern_delete( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_delete(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -371,7 +370,7 @@ fn intern_create_dir( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_create_dir(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -414,7 +413,7 @@ fn intern_copy( let source_path = jstring_to_string(env, &source_path)?; let target_path = jstring_to_string(env, &target_path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_copy(op, source_path, target_path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -457,7 +456,7 @@ fn intern_rename( let source_path = jstring_to_string(env, &source_path)?; let target_path = jstring_to_string(env, &target_path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_rename(op, source_path, target_path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -497,7 +496,7 @@ fn intern_remove_all( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_remove_all(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -537,7 +536,7 @@ fn intern_list( let path = jstring_to_string(env, &path)?; - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_list(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -594,7 +593,7 @@ fn intern_presign_read( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_presign_read(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); @@ -643,7 +642,7 @@ fn intern_presign_write( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_presign_write(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); @@ -692,7 +691,7 @@ fn intern_presign_stat( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { executor_or_default(env, executor) }.spawn(async move { + executor_or_default(env, executor)?.spawn(async move { let result = do_presign_stat(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); diff --git a/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java index 266a7c121c5..2fdbf5381c9 100644 --- a/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java @@ -30,11 +30,11 @@ public class AsyncExecutorTest { @Test - void testOperatorWithRetryLayer() { + void testDedicatedTokioExecutor() { final Map conf = new HashMap<>(); conf.put("root", "/opendal/"); final int cores = Runtime.getRuntime().availableProcessors(); - @Cleanup final AsyncExecutor executor = AsyncExecutor.createTokioExecutor(cores); + final AsyncExecutor executor = AsyncExecutor.createTokioExecutor(cores); @Cleanup final Operator op = Operator.of("memory", conf, executor); assertThat(op.info).isNotNull(); @@ -45,5 +45,9 @@ void testOperatorWithRetryLayer() { assertThat(op.read(key).join()).isEqualTo(v0); op.write(key, v1).join(); assertThat(op.read(key).join()).isEqualTo(v1); + + assertThat(executor.isDisposed()).isFalse(); + executor.close(); + assertThat(executor.isDisposed()).isTrue(); } } From 3e2aec9fe8402f8553b7886eaaf3de0b83670dbb Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 18 Mar 2024 19:19:28 +0800 Subject: [PATCH 3/3] check more Signed-off-by: tison --- .../test/java/org/apache/opendal/test/AsyncExecutorTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java index 2fdbf5381c9..6b797435d30 100644 --- a/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java @@ -34,7 +34,7 @@ void testDedicatedTokioExecutor() { final Map conf = new HashMap<>(); conf.put("root", "/opendal/"); final int cores = Runtime.getRuntime().availableProcessors(); - final AsyncExecutor executor = AsyncExecutor.createTokioExecutor(cores); + @Cleanup final AsyncExecutor executor = AsyncExecutor.createTokioExecutor(cores); @Cleanup final Operator op = Operator.of("memory", conf, executor); assertThat(op.info).isNotNull(); @@ -49,5 +49,7 @@ void testDedicatedTokioExecutor() { assertThat(executor.isDisposed()).isFalse(); executor.close(); assertThat(executor.isDisposed()).isTrue(); + + // @Cleanup will close executor once more, but we don't crash with the guard. } }