From e5bef51826dc2ff4020879e35ae7eb9019aa7fcd Mon Sep 17 00:00:00 2001 From: Tin Hang To Date: Thu, 13 Aug 2020 03:48:33 +0000 Subject: [PATCH] [SPARK-31703][SQL] Parquet RLE float/double are read incorrectly on big endian platforms ### What changes were proposed in this pull request? (back-porting from https://github.com/apache/spark/commit/9a3811dbf5f1234c1587337a3d74823d1d163b53) This PR fixes the issue introduced during SPARK-26985. SPARK-26985 changes the `putDoubles()` and `putFloats()` methods to respect the platform's endian-ness. However, that causes the RLE paths in VectorizedRleValuesReader.java to read the RLE entries in parquet as BIG_ENDIAN on big endian platforms (i.e., as is), even though parquet data is always in little endian format. The comments in `WriteableColumnVector.java` say those methods are used for "ieee formatted doubles in platform native endian" (or floats), but since the data in parquet is always in little endian format, use of those methods appears to be inappropriate. To demonstrate the problem with spark-shell: ```scala import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ var data = Seq( (1.0, 0.1), (2.0, 0.2), (0.3, 3.0), (4.0, 4.0), (5.0, 5.0)) var df = spark.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet("/tmp/data.parquet2") var df2 = spark.read.parquet("/tmp/data.parquet2") df2.show() ``` result: ```scala +--------------------+--------------------+ | _1| _2| +--------------------+--------------------+ | 3.16E-322|-1.54234871366845...| | 2.0553E-320| 2.0553E-320| | 2.561E-320| 2.561E-320| |4.66726145843124E-62| 1.0435E-320| | 3.03865E-319|-1.54234871366757...| +--------------------+--------------------+ ``` Also tests in ParquetIOSuite that involve float/double data would fail, e.g., - basic data types (without binary) - read raw Parquet file /examples/src/main/python/mllib/isotonic_regression_example.py would fail as well. Purposed code change is to add `putDoublesLittleEndian()` and `putFloatsLittleEndian()` methods for parquet to invoke, just like the existing `putIntsLittleEndian()` and `putLongsLittleEndian()`. On little endian platforms they would call `putDoubles()` and `putFloats()`, on big endian they would read the entries as little endian like pre-SPARK-26985. No new unit-test is introduced as the existing ones are actually sufficient. ### Why are the changes needed? RLE float/double data in parquet files will not be read back correctly on big endian platforms. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? All unit tests (mvn test) were ran and OK. Closes #29419 from tinhto-000/SPARK-31703-2.4. Lead-authored-by: Tin Hang To Co-authored-by: angerszhu Signed-off-by: Wenchen Fan --- .../parquet/VectorizedPlainValuesReader.java | 4 +- .../vectorized/OffHeapColumnVector.java | 26 ++++ .../vectorized/OnHeapColumnVector.java | 24 ++++ .../vectorized/WritableColumnVector.java | 12 ++ .../vectorized/ColumnarBatchSuite.scala | 116 +++++++++++++++--- 5 files changed, 166 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index c62dc3d86386e..fd228eefe2533 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -103,7 +103,7 @@ public final void readFloats(int total, WritableColumnVector c, int rowId) { if (buffer.hasArray()) { int offset = buffer.arrayOffset() + buffer.position(); - c.putFloats(rowId, total, buffer.array(), offset); + c.putFloatsLittleEndian(rowId, total, buffer.array(), offset); } else { for (int i = 0; i < total; i += 1) { c.putFloat(rowId + i, buffer.getFloat()); @@ -118,7 +118,7 @@ public final void readDoubles(int total, WritableColumnVector c, int rowId) { if (buffer.hasArray()) { int offset = buffer.arrayOffset() + buffer.position(); - c.putDoubles(rowId, total, buffer.array(), offset); + c.putDoublesLittleEndian(rowId, total, buffer.array(), offset); } else { for (int i = 0; i < total; i += 1) { c.putDouble(rowId + i, buffer.getDouble()); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 3b919c7d471f4..982a26c0ef775 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -425,6 +425,19 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putFloats(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + long offset = data + 4L * rowId; + for (int i = 0; i < count; ++i, offset += 4) { + Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i))); + } + } + } + @Override public float getFloat(int rowId) { if (dictionary == null) { @@ -480,6 +493,19 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putDoubles(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + long offset = data + 8L * rowId; + for (int i = 0; i < count; ++i, offset += 8) { + Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i))); + } + } + } + @Override public double getDouble(int rowId) { if (dictionary == null) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 1bf3126664177..625b78be4de94 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -403,6 +403,18 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putFloats(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < count; ++i) { + floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i)); + } + } + } + @Override public float getFloat(int rowId) { if (dictionary == null) { @@ -452,6 +464,18 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { } } + @Override + public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + if (!bigEndianPlatform) { + putDoubles(rowId, count, src, srcIndex); + } else { + ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < count; ++i) { + doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i)); + } + } + } + @Override public double getDouble(int rowId) { if (dictionary == null) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index b0e119d658cb4..05e81524165a3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -310,6 +310,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) { */ public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex); + /** + * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, rowId + count) + * The data in src must be ieee formatted floats in little endian. + */ + public abstract void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex); + /** * Sets `value` to the value at rowId. */ @@ -331,6 +337,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) { */ public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex); + /** + * Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, rowId + count) + * The data in src must be ieee formatted doubles in little endian. + */ + public abstract void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex); + /** * Puts a byte array that already exists in this column. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index f57f07b498261..086d88b0b2138 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -444,13 +444,6 @@ class ColumnarBatchSuite extends SparkFunSuite { Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f) Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f) - if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - // Ensure array contains Little Endian floats - val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) - Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0)) - Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4)) - } - column.putFloats(idx, 1, buffer, 4) column.putFloats(idx + 1, 1, buffer, 0) reference += 1.123f @@ -488,6 +481,57 @@ class ColumnarBatchSuite extends SparkFunSuite { } } + testVector("[SPARK-31703] Float API - Little Endian", 1024, FloatType) { + column => + val seed = System.currentTimeMillis() + val random = new Random(seed) + val reference = mutable.ArrayBuffer.empty[Float] + + var idx = 0 + + val littleEndian = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) + littleEndian.putFloat(0, 1.357f) + littleEndian.putFloat(4, 2.468f) + val arr = new Array[Byte](littleEndian.remaining) + littleEndian.get(arr) + + column.putFloatsLittleEndian(idx, 1, arr, 4) + column.putFloatsLittleEndian(idx + 1, 1, arr, 0) + reference += 2.468f + reference += 1.357f + idx += 2 + + column.putFloatsLittleEndian(idx, 2, arr, 0) + reference += 1.357f + reference += 2.468f + idx += 2 + + while (idx < column.capacity) { + val single = random.nextBoolean() + if (single) { + val v = random.nextFloat() + column.putFloat(idx, v) + reference += v + idx += 1 + } else { + val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx) + val v = random.nextFloat() + column.putFloats(idx, n, v) + var i = 0 + while (i < n) { + reference += v + i += 1 + } + idx += n + } + } + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getFloat(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) + } + } + testVector("Double APIs", 1024, DoubleType) { column => val seed = System.currentTimeMillis() @@ -528,13 +572,6 @@ class ColumnarBatchSuite extends SparkFunSuite { Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234) Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123) - if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - // Ensure array contains Little Endian doubles - val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) - Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0)) - Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8)) - } - column.putDoubles(idx, 1, buffer, 8) column.putDoubles(idx + 1, 1, buffer, 0) reference += 1.123 @@ -572,6 +609,57 @@ class ColumnarBatchSuite extends SparkFunSuite { } } + testVector("[SPARK-31703] Double API - Little Endian", 1024, DoubleType) { + column => + val seed = System.currentTimeMillis() + val random = new Random(seed) + val reference = mutable.ArrayBuffer.empty[Double] + + var idx = 0 + + val littleEndian = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN) + littleEndian.putDouble(0, 1.357) + littleEndian.putDouble(8, 2.468) + val arr = new Array[Byte](littleEndian.remaining) + littleEndian.get(arr) + + column.putDoublesLittleEndian(idx, 1, arr, 8) + column.putDoublesLittleEndian(idx + 1, 1, arr, 0) + reference += 2.468 + reference += 1.357 + idx += 2 + + column.putDoublesLittleEndian(idx, 2, arr, 0) + reference += 1.357 + reference += 2.468 + idx += 2 + + while (idx < column.capacity) { + val single = random.nextBoolean() + if (single) { + val v = random.nextDouble() + column.putDouble(idx, v) + reference += v + idx += 1 + } else { + val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx) + val v = random.nextDouble() + column.putDoubles(idx, n, v) + var i = 0 + while (i < n) { + reference += v + i += 1 + } + idx += n + } + } + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getDouble(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) + } + } + testVector("String APIs", 7, StringType) { column => val reference = mutable.ArrayBuffer.empty[String]