From 1978f0c2a4ac09b20a4415225ef3afe5d934ec0c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 6 Sep 2024 13:07:35 -0700 Subject: [PATCH] chore: Address reviews --- .../org/apache/comet/vector/NativeUtil.scala | 13 ------------- native/core/src/execution/jni_api.rs | 12 +++++++----- native/core/src/execution/utils.rs | 18 +++++++++++------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 4b113d89a..33af8662f 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -19,13 +19,9 @@ package org.apache.comet.vector -import java.nio.ByteOrder - import scala.collection.mutable import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data} -import org.apache.arrow.c.NativeUtil.NULL -import org.apache.arrow.memory.util.MemoryUtil import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.dictionary.DictionaryProvider import org.apache.spark.SparkException @@ -74,15 +70,6 @@ class NativeUtil { (0 until numCols).foreach { index => val arrowSchema = ArrowSchema.allocateNew(allocator) - - // Manually fill NULL to `release` slot of ArrowSchema because ArrowSchema doesn't provide - // `markReleased`. - // The total size of ArrowSchema is 72 bytes. - // The `release` slot is at offset 56 in the ArrowSchema struct. - val buffer = - MemoryUtil.directBuffer(arrowSchema.memoryAddress(), 72).order(ByteOrder.nativeOrder) - buffer.putLong(56, NULL); - val arrowArray = ArrowArray.allocateNew(allocator) arrays(index) = arrowArray schemas(index) = arrowSchema diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index c1cc78455..e45897a92 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -259,21 +259,23 @@ fn parse_bool(conf: &HashMap, name: &str) -> CometResult { } /// Prepares arrow arrays for output. -unsafe fn prepare_output( +fn prepare_output( env: &mut JNIEnv, array_addrs: jlongArray, schema_addrs: jlongArray, output_batch: RecordBatch, exec_context: &mut ExecutionContext, ) -> CometResult { - let array_address_array = JLongArray::from_raw(array_addrs); + let array_address_array = unsafe { JLongArray::from_raw(array_addrs) }; let num_cols = env.get_array_length(&array_address_array)? as usize; - let array_addrs = env.get_array_elements(&array_address_array, ReleaseMode::NoCopyBack)?; + let array_addrs = + unsafe { env.get_array_elements(&array_address_array, ReleaseMode::NoCopyBack)? }; let array_addrs = &*array_addrs; - let schema_address_array = JLongArray::from_raw(schema_addrs); - let schema_addrs = env.get_array_elements(&schema_address_array, ReleaseMode::NoCopyBack)?; + let schema_address_array = unsafe { JLongArray::from_raw(schema_addrs) }; + let schema_addrs = + unsafe { env.get_array_elements(&schema_address_array, ReleaseMode::NoCopyBack)? }; let schema_addrs = &*schema_addrs; let results = output_batch.columns(); diff --git a/native/core/src/execution/utils.rs b/native/core/src/execution/utils.rs index bd760bc3d..553d42606 100644 --- a/native/core/src/execution/utils.rs +++ b/native/core/src/execution/utils.rs @@ -108,16 +108,20 @@ impl SparkArrowConvert for ArrayData { let array_align = std::mem::align_of::(); let schema_align = std::mem::align_of::(); - // Check if the pointer alignment is correct for `replace`. + // Check if the pointer alignment is correct. if array_ptr.align_offset(array_align) != 0 || schema_ptr.align_offset(schema_align) != 0 { - return Err(ExecutionError::ArrowError( - "Pointer alignment is not correct".to_string(), - )); + unsafe { + std::ptr::write_unaligned(array_ptr, FFI_ArrowArray::new(self)); + std::ptr::write_unaligned(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?); + } + } else { + // SAFETY: `array_ptr` and `schema_ptr` are aligned correctly. + unsafe { + std::ptr::write(array_ptr, FFI_ArrowArray::new(self)); + std::ptr::write(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?); + } } - unsafe { std::ptr::replace(array_ptr, FFI_ArrowArray::new(self)) }; - unsafe { std::ptr::replace(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?) }; - Ok(()) } }