Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor Arrow Array and Schema allocation in ColumnReader and MetadataColumnReader #1047

Merged
merged 2 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 46 additions & 37 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
Expand All @@ -50,6 +52,7 @@

public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class);
protected final BufferAllocator ALLOCATOR = new RootAllocator();

/**
* The current Comet vector holding all the values read by this column reader. Owned by this
Expand Down Expand Up @@ -87,6 +90,9 @@ public class ColumnReader extends AbstractColumnReader {

private final CometSchemaImporter importer;

private ArrowArray array = null;
private ArrowSchema schema = null;

public ColumnReader(
DataType type,
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -201,53 +207,56 @@ public CometDecodedVector loadVector() {
boolean isUuid =
logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;

long[] addresses = Native.currentBatch(nativeHandle);
array = ArrowArray.allocateNew(ALLOCATOR);
schema = ArrowSchema.allocateNew(ALLOCATOR);

try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector vector = importer.importVector(array, schema);
long arrayAddr = array.memoryAddress();
long schemaAddr = schema.memoryAddress();

DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);

CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
FieldVector vector = importer.importVector(array, schema);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
hadNull = cometVector.hasNull();
DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();

if (dictionaryEncoding == null) {
if (dictionary != null) {
// This means the column was using dictionary encoding but now has fall-back to plain
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
// a condition to check if we can re-use vector later.
dictionary = null;
}
// Either the column is not dictionary encoded, or it was using dictionary encoding but
// a new data page has switched back to use plain encoding. For both cases we should
// return plain vector.
currentVector = cometVector;
return currentVector;
}
CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
hadNull = cometVector.hasNull();

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
if (dictionaryEncoding == null) {
if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
// This means the column was using dictionary encoding but now has fall-back to plain
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
// a condition to check if we can re-use vector later.
dictionary = null;
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
// Either the column is not dictionary encoded, or it was using dictionary encoding but
// a new data page has switched back to use plain encoding. For both cases we should
// return plain vector.
currentVector = cometVector;
return currentVector;
}

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated but we should not be needing to call CometDictionaryVector twice

return currentVector;
}

protected void readPage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */
public class MetadataColumnReader extends AbstractColumnReader {
private final BufferAllocator allocator = new RootAllocator();

private CometVector vector;

private ArrowArray array = null;
private ArrowSchema schema = null;

public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, descriptor, useDecimal128, false);
Expand All @@ -50,13 +54,17 @@ public void setBatchSize(int batchSize) {
@Override
public void readBatch(int total) {
if (vector == null) {
long[] addresses = Native.currentBatch(nativeHandle);
try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
}
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);

long arrayAddr = array.memoryAddress();
long schemaAddr = schema.memoryAddress();

Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
}

vector.setNumValues(total);
}

Expand Down
6 changes: 3 additions & 3 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ public static native void setPageV2(
* Returns the current batch constructed via 'readBatch'
*
* @param handle the handle to the native Parquet column reader
* @return a long array with 2 elements, the first is the address to native Arrow array, and the
* second is the address to the Arrow schema.
* @param arrayAddr the memory address to the ArrowArray struct
* @param schemaAddr the memory address to the ArrowSchema struct
*/
public static native long[] currentBatch(long handle);
public static native void currentBatch(long handle, long arrayAddr, long schemaAddr);

/** Set methods to set a constant value for the reader, so it'll return constant vectors */
public static native void setNull(long handle);
Expand Down
24 changes: 7 additions & 17 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{boxed::Box, ptr::NonNull, sync::Arc};

use crate::errors::{try_unwrap_or_throw, CometError};

use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow::ffi::FFI_ArrowArray;

/// JNI exposed methods
use jni::JNIEnv;
Expand All @@ -52,7 +52,6 @@ const STR_CLASS_NAME: &str = "java/lang/String";
/// Parquet read context maintained across multiple JNI calls.
struct Context {
pub column_reader: ColumnReader,
pub arrays: Option<(Arc<FFI_ArrowArray>, Arc<FFI_ArrowSchema>)>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactoring simplifies the context and the logic. We don't need to keep the array and schema pointers in the producer side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also easier to reason the array/schema release logic after this refactoring.

last_data_page: Option<GlobalRef>,
}

Expand Down Expand Up @@ -110,7 +109,6 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initColumnReader(
use_decimal_128 != 0,
use_legacy_date_timestamp != 0,
),
arrays: None,
last_data_page: None,
};
let res = Box::new(ctx);
Expand Down Expand Up @@ -539,24 +537,16 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch(
e: JNIEnv,
_jclass: JClass,
handle: jlong,
) -> jlongArray {
try_unwrap_or_throw(&e, |env| {
array_addr: jlong,
schema_addr: jlong,
) {
try_unwrap_or_throw(&e, |_env| {
let ctx = get_context(handle)?;
let reader = &mut ctx.column_reader;
let data = reader.current_batch();
let (array, schema) = data.to_spark()?;
data.move_to_spark(array_addr, schema_addr)?;
viirya marked this conversation as resolved.
Show resolved Hide resolved

unsafe {
let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray);
let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema);
ctx.arrays = Some((arrow_array, arrow_schema));

let res = env.new_long_array(2)?;
let buf: [i64; 2] = [array, schema];
env.set_long_array_region(&res, 0, &buf)
.expect("set long array region failed");
Ok(res.into_raw())
}
Ok(())
})
}

Expand Down
Loading