Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] C callback with data #306

Merged
merged 4 commits into from
Aug 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 128 additions & 8 deletions native-io/lakesoul-io-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@
extern crate core;

use core::ffi::{c_ptrdiff_t, c_size_t};
use std::ffi::{c_char, CStr, CString};
use std::ffi::{c_char, c_void, CStr, CString};
use std::ptr::NonNull;
use std::slice;
use std::sync::Arc;

pub use arrow::array::StructArray;
use arrow::array::{Array, ArrayData};
use arrow::datatypes::Schema;
pub use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow::ffi::ArrowArray;
pub use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};

use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder};
use tokio::runtime::{Builder, Runtime};

use lakesoul_io::lakesoul_reader::{
Result, LakeSoulReader, RecordBatch, SyncSendableMutableLakeSoulReader,
};
use lakesoul_io::lakesoul_reader::{LakeSoulReader, RecordBatch, Result, SyncSendableMutableLakeSoulReader};
use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter;

#[repr(C)]
Expand Down Expand Up @@ -286,7 +284,9 @@ pub extern "C" fn lakesoul_config_builder_set_default_column_value(
unsafe {
let field = CStr::from_ptr(field).to_str().unwrap().to_string();
let value = CStr::from_ptr(value).to_str().unwrap().to_string();
convert_to_opaque(from_opaque::<IOConfigBuilder, LakeSoulIOConfigBuilder>(builder).with_default_column_value(field, value))
convert_to_opaque(
from_opaque::<IOConfigBuilder, LakeSoulIOConfigBuilder>(builder).with_default_column_value(field, value),
)
}
}

Expand Down Expand Up @@ -323,9 +323,26 @@ pub extern "C" fn check_reader_created(reader: NonNull<CResult<Reader>>) -> *con
}

pub type ResultCallback = extern "C" fn(bool, *const c_char);
pub type DataResultCallback = extern "C" fn(bool, *const c_char, *const c_void);

fn call_result_callback(callback: ResultCallback, status: bool, err: *const c_char) {
callback(status, err);
// release error string
if !err.is_null() {
unsafe {
let _ = CString::from_raw(err as *mut c_char);
}
}
}

fn call_data_result_callback(
callback: DataResultCallback,
status: bool,
err: *const c_char,
data: Cvoid,
) {
// release error string
callback(status, err, data.data);
if !err.is_null() {
unsafe {
let _ = CString::from_raw(err as *mut c_char);
Expand All @@ -334,9 +351,26 @@ fn call_result_callback(callback: ResultCallback, status: bool, err: *const c_ch
}

pub type I32ResultCallback = extern "C" fn(i32, *const c_char);
pub type I32DataResultCallback = extern "C" fn(i32, *const c_char, *const c_void);

fn call_i32_result_callback(callback: I32ResultCallback, status: i32, err: *const c_char) {
callback(status, err);
// release error string
if !err.is_null() {
unsafe {
let _ = CString::from_raw(err as *mut c_char);
}
}
}

fn call_i32_data_result_callback(
callback: I32DataResultCallback,
status: i32,
err: *const c_char,
data: Cvoid,
) {
callback(status, err, data.data);
// release error string
if !err.is_null() {
unsafe {
let _ = CString::from_raw(err as *mut c_char);
Expand All @@ -360,6 +394,28 @@ pub extern "C" fn start_reader(reader: NonNull<CResult<Reader>>, callback: Resul
}
}

#[no_mangle]
pub extern "C" fn start_reader_with_data(
reader: NonNull<CResult<Reader>>,
data: *const c_void,
callback: DataResultCallback,
) {
unsafe {
let mut reader = NonNull::new_unchecked(reader.as_ref().ptr as *mut SyncSendableMutableLakeSoulReader);
let data = Cvoid {data};
let result = reader.as_mut().start_blocked();
match result {
Ok(_) => call_data_result_callback(callback, true, std::ptr::null(), data),
Err(e) => call_data_result_callback(
callback,
false,
CString::new(format!("{}", e).as_str()).unwrap().into_raw(),
data,
),
}
}
}

#[no_mangle]
pub extern "C" fn next_record_batch(
reader: NonNull<CResult<Reader>>,
Expand All @@ -371,7 +427,7 @@ pub extern "C" fn next_record_batch(
let reader = NonNull::new_unchecked(reader.as_ref().ptr as *mut SyncSendableMutableLakeSoulReader);
let f = move |rb: Option<Result<RecordBatch>>| match rb {
None => {
call_i32_result_callback(callback, -1, std::ptr::null());
call_i32_result_callback(callback, 0, std::ptr::null());
}
Some(rb_result) => match rb_result {
Err(e) => {
Expand Down Expand Up @@ -409,11 +465,75 @@ pub extern "C" fn next_record_batch(
}
}

// accept a callback with arbitrary user data pointer

struct Cvoid {
data: *const c_void,
}
unsafe impl Send for Cvoid {}
unsafe impl Sync for Cvoid {}

#[no_mangle]
pub extern "C" fn next_record_batch_with_data(
reader: NonNull<CResult<Reader>>,
schema_addr: c_ptrdiff_t,
array_addr: c_ptrdiff_t,
data: *const c_void,
callback: I32DataResultCallback,
) {
unsafe {
let reader = NonNull::new_unchecked(reader.as_ref().ptr as *mut SyncSendableMutableLakeSoulReader);
let data = Cvoid {data};
let f = move |rb: Option<Result<RecordBatch>>| match rb {
None => {
call_i32_data_result_callback(callback, 0, std::ptr::null(), data);
}
Some(rb_result) => match rb_result {
Err(e) => {
call_i32_data_result_callback(
callback,
-1,
CString::new(format!("{}", e).as_str()).unwrap().into_raw(),
data,
);
}
Ok(rb) => {
let rows = rb.num_rows() as i32;
let batch: Arc<StructArray> = Arc::new(rb.into());
let ffi_array = FFI_ArrowArray::new(&batch.to_data());
(&ffi_array as *const FFI_ArrowArray).copy_to(array_addr as *mut FFI_ArrowArray, 1);
std::mem::forget(ffi_array);
let schema_result = FFI_ArrowSchema::try_from(batch.data_type());
match schema_result {
Ok(schema) => {
(&schema as *const FFI_ArrowSchema).copy_to(schema_addr as *mut FFI_ArrowSchema, 1);
std::mem::forget(schema);
call_i32_data_result_callback(callback, rows, std::ptr::null(), data);
}
Err(e) => {
call_i32_data_result_callback(
callback,
-1,
CString::new(format!("{}", e).as_str()).unwrap().into_raw(),
data,
);
}
}
}
},
};
reader.as_ref().next_rb_callback(Box::new(f));
}
}

#[no_mangle]
pub extern "C" fn lakesoul_reader_get_schema(reader: NonNull<CResult<Reader>>, schema_addr: c_ptrdiff_t) {
unsafe {
let reader = NonNull::new_unchecked(reader.as_ref().ptr as *mut SyncSendableMutableLakeSoulReader);
let schema = reader.as_ref().get_schema().unwrap_or_else(|| Arc::new(Schema::empty()));
let schema = reader
.as_ref()
.get_schema()
.unwrap_or_else(|| Arc::new(Schema::empty()));
let schema_addr = schema_addr as *mut FFI_ArrowSchema;
let _ = FFI_ArrowSchema::try_from(schema.as_ref()).map(|s| {
std::ptr::write_unaligned(schema_addr, s);
Expand Down