Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add runtime class #6

Merged
merged 1 commit into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion_jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition = "2018"

[dependencies]
jni = "0.19.0"
tokio = "1.12.0"
datafusion = "^5.0.0"

[lib]
Expand Down
34 changes: 28 additions & 6 deletions datafusion_jni/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
// This is the interface to the JVM that we'll
// call the majority of our methods on.
use jni::JNIEnv;

// These objects are what you should use as arguments to your native function.
// They carry extra lifetime information to prevent them escaping this context
// and getting used after being GC'd.
use jni::objects::{GlobalRef, JClass, JObject, JString};

// This is just a pointer. We'll be returning it from our function.
// We can't return one of the objects with lifetime information because the
// lifetime checker won't let us.
use jni::sys::{jbyteArray, jint, jlong, jstring};

use std::{sync::mpsc, thread, time::Duration};

use datafusion::dataframe::DataFrame;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use jni::sys::{jbyteArray, jint, jlong, jstring};
use std::sync::Arc;
use std::{sync::mpsc, thread, time::Duration};
use tokio::runtime::Runtime;

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_TokioRuntime_createTokioRuntime(
_env: JNIEnv,
_class: JClass,
) -> jlong {
if let Ok(runtime) = Runtime::new() {
println!("successfully created tokio runtime");
Box::into_raw(Box::new(runtime)) as jlong
} else {
// TODO error handling
-1
}
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_TokioRuntime_destroyTokioRuntime(
_env: JNIEnv,
_class: JClass,
pointer: jlong,
) {
let runtime = unsafe { Box::from_raw(pointer as *mut Runtime) };
runtime.shutdown_timeout(Duration::from_millis(100));
println!("successfully shutdown tokio runtime");
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultExecutionContext_querySql(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.apache.arrow.datafusion;

class DataFrames {
final class DataFrames {

private DataFrames() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public DataFrame sql(String sql) {
}
}

@Override
public Runtime getRuntime() {
return runtime;
}

private RuntimeException getErrorForInvocation(long invocationId) {
String errorMessage = errorMessageInbox.remove(invocationId);
assert errorMessage != null : "onErrorMessage was not properly called from JNI";
Expand All @@ -44,17 +49,20 @@ public void close() throws Exception {
for (DefaultDataFrame frame : dataFrames.values()) {
frame.close();
}
runtime.close();
logger.printf(Level.INFO, "closing %x", pointer);
ExecutionContexts.destroyExecutionContext(pointer);
}

private final long pointer;
private final Runtime runtime;
private final ConcurrentMap<Long, String> errorMessageInbox;
private final ConcurrentMap<Long, DefaultDataFrame> dataFrames;

DefaultExecutionContext(long pointer) {
logger.printf(Level.INFO, "obtaining %x", pointer);
this.pointer = pointer;
this.runtime = Runtime.create();
this.errorMessageInbox = new ConcurrentHashMap<>();
this.dataFrames = new ConcurrentHashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
public interface ExecutionContext extends AutoCloseable {

DataFrame sql(String sql);

Runtime getRuntime();
}
12 changes: 12 additions & 0 deletions lib/src/main/java/org/apache/arrow/datafusion/Runtime.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.arrow.datafusion;

public interface Runtime extends AutoCloseable {

static Runtime create() {
long pointer = TokioRuntime.createTokioRuntime();
if (pointer <= 0) {
throw new IllegalStateException("failed to create runtime");
}
return new TokioRuntime(pointer);
}
}
27 changes: 27 additions & 0 deletions lib/src/main/java/org/apache/arrow/datafusion/TokioRuntime.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.arrow.datafusion;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

final class TokioRuntime implements Runtime {

TokioRuntime(long pointer) {
logger.printf(Level.INFO, "obtaining %x", pointer);
this.pointer = pointer;
}

static native long createTokioRuntime();

static native void destroyTokioRuntime(long pointer);

private static final Logger logger = LogManager.getLogger(TokioRuntime.class);

private final long pointer;

@Override
public void close() throws Exception {
logger.printf(Level.INFO, "closing %x", pointer);
destroyTokioRuntime(pointer);
}
}