diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index c62dc3d86386e..fd228eefe2533 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -103,7 +103,7 @@ public final void readFloats(int total, WritableColumnVector c, int rowId) { if (buffer.hasArray()) { int offset = buffer.arrayOffset() + buffer.position(); - c.putFloats(rowId, total, buffer.array(), offset); + c.putFloatsLittleEndian(rowId, total, buffer.array(), offset); } else { for (int i = 0; i < total; i += 1) { c.putFloat(rowId + i, buffer.getFloat()); @@ -118,7 +118,7 @@ public final void readDoubles(int total, WritableColumnVector c, int rowId) { if (buffer.hasArray()) { int offset = buffer.arrayOffset() + buffer.position(); - c.putDoubles(rowId, total, buffer.array(), offset); + c.putDoublesLittleEndian(rowId, total, buffer.array(), offset); } else { for (int i = 0; i < total; i += 1) { c.putDouble(rowId + i, buffer.getDouble()); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 3b919c7d471f4..982a26c0ef775 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -425,6 +425,19 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putFloats(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + long offset = data + 4L * rowId; + for (int i = 0; i < count; ++i, offset += 4) { + Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i))); + } + } + } + @Override public float getFloat(int rowId) { if (dictionary == null) { @@ -480,6 +493,19 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putDoubles(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + long offset = data + 8L * rowId; + for (int i = 0; i < count; ++i, offset += 8) { + Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i))); + } + } + } + @Override public double getDouble(int rowId) { if (dictionary == null) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 1bf3126664177..625b78be4de94 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -403,6 +403,18 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putFloats(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < count; ++i) { + floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i)); + } + } + } + @Override public float getFloat(int rowId) { if (dictionary == null) { @@ -452,6 +464,18 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putDoubles(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < count; ++i) { + doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i)); + } + } + } + @Override public double getDouble(int rowId) { if (dictionary == null) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index b0e119d658cb4..05e81524165a3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -310,6 +310,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) { */ public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex); + /** + * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, rowId + count) + * The data in src must be ieee formatted floats in little endian. + */ + public abstract void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex); + /** * Sets `value` to the value at rowId. */ @@ -331,6 +337,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) { */ public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex); + /** + * Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, rowId + count) + * The data in src must be ieee formatted doubles in little endian. + */ + public abstract void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex); + /** * Puts a byte array that already exists in this column. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index f57f07b498261..086d88b0b2138 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -444,13 +444,6 @@ class ColumnarBatchSuite extends SparkFunSuite { Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f) Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f) - if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - // Ensure array contains Little Endian floats - val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) - Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0)) - Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4)) - } - column.putFloats(idx, 1, buffer, 4) column.putFloats(idx + 1, 1, buffer, 0) reference += 1.123f @@ -488,6 +481,57 @@ class ColumnarBatchSuite extends SparkFunSuite { } } + testVector("[SPARK-31703] Float API - Little Endian", 1024, FloatType) { + column => + val seed = System.currentTimeMillis() + val random = new Random(seed) + val reference = mutable.ArrayBuffer.empty[Float] + + var idx = 0 + + val littleEndian = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) + littleEndian.putFloat(0, 1.357f) + littleEndian.putFloat(4, 2.468f) + val arr = new Array[Byte](littleEndian.remaining) + littleEndian.get(arr) + + column.putFloatsLittleEndian(idx, 1, arr, 4) + column.putFloatsLittleEndian(idx + 1, 1, arr, 0) + reference += 2.468f + reference += 1.357f + idx += 2 + + column.putFloatsLittleEndian(idx, 2, arr, 0) + reference += 1.357f + reference += 2.468f + idx += 2 + + while (idx < column.capacity) { + val single = random.nextBoolean() + if (single) { + val v = random.nextFloat() + column.putFloat(idx, v) + reference += v + idx += 1 + } else { + val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx) + val v = random.nextFloat() + column.putFloats(idx, n, v) + var i = 0 + while (i < n) { + reference += v + i += 1 + } + idx += n + } + } + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getFloat(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) + } + } + testVector("Double APIs", 1024, DoubleType) { column => val seed = System.currentTimeMillis() @@ -528,13 +572,6 @@ class ColumnarBatchSuite extends SparkFunSuite { Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234) Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123) - if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - // Ensure array contains Little Endian doubles - val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) - Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0)) - Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8)) - } - column.putDoubles(idx, 1, buffer, 8) column.putDoubles(idx + 1, 1, buffer, 0) reference += 1.123 @@ -572,6 +609,57 @@ class ColumnarBatchSuite extends SparkFunSuite { } } + testVector("[SPARK-31703] Double API - Little Endian", 1024, DoubleType) { + column => + val seed = System.currentTimeMillis() + val random = new Random(seed) + val reference = mutable.ArrayBuffer.empty[Double] + + var idx = 0 + + val littleEndian = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN) + littleEndian.putDouble(0, 1.357) + littleEndian.putDouble(8, 2.468) + val arr = new Array[Byte](littleEndian.remaining) + littleEndian.get(arr) + + column.putDoublesLittleEndian(idx, 1, arr, 8) + column.putDoublesLittleEndian(idx + 1, 1, arr, 0) + reference += 2.468 + reference += 1.357 + idx += 2 + + column.putDoublesLittleEndian(idx, 2, arr, 0) + reference += 1.357 + reference += 2.468 + idx += 2 + + while (idx < column.capacity) { + val single = random.nextBoolean() + if (single) { + val v = random.nextDouble() + column.putDouble(idx, v) + reference += v + idx += 1 + } else { + val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx) + val v = random.nextDouble() + column.putDoubles(idx, n, v) + var i = 0 + while (i < n) { + reference += v + i += 1 + } + idx += n + } + } + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getDouble(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) + } + } + testVector("String APIs", 7, StringType) { column => val reference = mutable.ArrayBuffer.empty[String]