diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 9e594804f..1cc42f62e 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -28,6 +28,8 @@ import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.CometSchemaImporter; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; @@ -50,6 +52,7 @@ public class ColumnReader extends AbstractColumnReader { protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class); + protected final BufferAllocator ALLOCATOR = new RootAllocator(); /** * The current Comet vector holding all the values read by this column reader. Owned by this @@ -87,6 +90,9 @@ public class ColumnReader extends AbstractColumnReader { private final CometSchemaImporter importer; + private ArrowArray array = null; + private ArrowSchema schema = null; + public ColumnReader( DataType type, ColumnDescriptor descriptor, @@ -201,53 +207,56 @@ public CometDecodedVector loadVector() { boolean isUuid = logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; - long[] addresses = Native.currentBatch(nativeHandle); + array = ArrowArray.allocateNew(ALLOCATOR); + schema = ArrowSchema.allocateNew(ALLOCATOR); - try (ArrowArray array = ArrowArray.wrap(addresses[0]); - ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector vector = importer.importVector(array, schema); + long arrayAddr = array.memoryAddress(); + long schemaAddr = schema.memoryAddress(); - DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); + Native.currentBatch(nativeHandle, arrayAddr, schemaAddr); - CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); + FieldVector vector = importer.importVector(array, schema); - // Update whether the current vector contains any null values. This is used in the following - // batch(s) to determine whether we can skip loading the native vector. - hadNull = cometVector.hasNull(); + DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); - if (dictionaryEncoding == null) { - if (dictionary != null) { - // This means the column was using dictionary encoding but now has fall-back to plain - // encoding, on the native side. Setting 'dictionary' to null here, so we can use it as - // a condition to check if we can re-use vector later. - dictionary = null; - } - // Either the column is not dictionary encoded, or it was using dictionary encoding but - // a new data page has switched back to use plain encoding. For both cases we should - // return plain vector. - currentVector = cometVector; - return currentVector; - } + CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); + + // Update whether the current vector contains any null values. This is used in the following + // batch(s) to determine whether we can skip loading the native vector. + hadNull = cometVector.hasNull(); - // We should already re-initiate `CometDictionary` here because `Data.importVector` API will - // release the previous dictionary vector and create a new one. - Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId()); - CometPlainVector dictionaryVector = - new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid); + if (dictionaryEncoding == null) { if (dictionary != null) { - dictionary.setDictionaryVector(dictionaryVector); - } else { - dictionary = new CometDictionary(dictionaryVector); + // This means the column was using dictionary encoding but now has fall-back to plain + // encoding, on the native side. Setting 'dictionary' to null here, so we can use it as + // a condition to check if we can re-use vector later. + dictionary = null; } - - currentVector = - new CometDictionaryVector( - cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid); - - currentVector = - new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128); + // Either the column is not dictionary encoded, or it was using dictionary encoding but + // a new data page has switched back to use plain encoding. For both cases we should + // return plain vector. + currentVector = cometVector; return currentVector; } + + // We should already re-initiate `CometDictionary` here because `Data.importVector` API will + // release the previous dictionary vector and create a new one. + Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId()); + CometPlainVector dictionaryVector = + new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid); + if (dictionary != null) { + dictionary.setDictionaryVector(dictionaryVector); + } else { + dictionary = new CometDictionary(dictionaryVector); + } + + currentVector = + new CometDictionaryVector( + cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid); + + currentVector = + new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128); + return currentVector; } protected void readPage() { diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index b8722ca78..13b90e256 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -34,8 +34,12 @@ /** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */ public class MetadataColumnReader extends AbstractColumnReader { private final BufferAllocator allocator = new RootAllocator(); + private CometVector vector; + private ArrowArray array = null; + private ArrowSchema schema = null; + public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) { // TODO: should we handle legacy dates & timestamps for metadata columns? super(type, descriptor, useDecimal128, false); @@ -50,13 +54,17 @@ public void setBatchSize(int batchSize) { @Override public void readBatch(int total) { if (vector == null) { - long[] addresses = Native.currentBatch(nativeHandle); - try (ArrowArray array = ArrowArray.wrap(addresses[0]); - ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector fieldVector = Data.importVector(allocator, array, schema, null); - vector = new CometPlainVector(fieldVector, useDecimal128); - } + array = ArrowArray.allocateNew(allocator); + schema = ArrowSchema.allocateNew(allocator); + + long arrayAddr = array.memoryAddress(); + long schemaAddr = schema.memoryAddress(); + + Native.currentBatch(nativeHandle, arrayAddr, schemaAddr); + FieldVector fieldVector = Data.importVector(allocator, array, schema, null); + vector = new CometPlainVector(fieldVector, useDecimal128); } + vector.setNumValues(total); } diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index f4820fedf..1e666652e 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -192,10 +192,10 @@ public static native void setPageV2( * Returns the current batch constructed via 'readBatch' * * @param handle the handle to the native Parquet column reader - * @return a long array with 2 elements, the first is the address to native Arrow array, and the - * second is the address to the Arrow schema. + * @param arrayAddr the memory address to the ArrowArray struct + * @param schemaAddr the memory address to the ArrowSchema struct */ - public static native long[] currentBatch(long handle); + public static native void currentBatch(long handle, long arrayAddr, long schemaAddr); /** Set methods to set a constant value for the reader, so it'll return constant vectors */ public static native void setNull(long handle); diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index c523f843f..455f19929 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -27,7 +27,7 @@ use std::{boxed::Box, ptr::NonNull, sync::Arc}; use crate::errors::{try_unwrap_or_throw, CometError}; -use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +use arrow::ffi::FFI_ArrowArray; /// JNI exposed methods use jni::JNIEnv; @@ -52,7 +52,6 @@ const STR_CLASS_NAME: &str = "java/lang/String"; /// Parquet read context maintained across multiple JNI calls. struct Context { pub column_reader: ColumnReader, - pub arrays: Option<(Arc, Arc)>, last_data_page: Option, } @@ -110,7 +109,6 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initColumnReader( use_decimal_128 != 0, use_legacy_date_timestamp != 0, ), - arrays: None, last_data_page: None, }; let res = Box::new(ctx); @@ -539,24 +537,15 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch( e: JNIEnv, _jclass: JClass, handle: jlong, -) -> jlongArray { - try_unwrap_or_throw(&e, |env| { + array_addr: jlong, + schema_addr: jlong, +) { + try_unwrap_or_throw(&e, |_env| { let ctx = get_context(handle)?; let reader = &mut ctx.column_reader; let data = reader.current_batch(); - let (array, schema) = data.to_spark()?; - - unsafe { - let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray); - let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema); - ctx.arrays = Some((arrow_array, arrow_schema)); - - let res = env.new_long_array(2)?; - let buf: [i64; 2] = [array, schema]; - env.set_long_array_region(&res, 0, &buf) - .expect("set long array region failed"); - Ok(res.into_raw()) - } + data.move_to_spark(array_addr, schema_addr) + .map_err(|e| e.into()) }) }