Skip to content

Commit

Permalink
[SPARK-31703][SQL] Parquet RLE float/double are read incorrectly on b…
Browse files Browse the repository at this point in the history
…ig endian platforms

### What changes were proposed in this pull request?
(back-porting from 9a3811d)

This PR fixes the issue introduced during SPARK-26985.

SPARK-26985 changes the `putDoubles()` and `putFloats()` methods to respect the platform's endian-ness.  However, that causes the RLE paths in VectorizedRleValuesReader.java to read the RLE entries in parquet as BIG_ENDIAN on big endian platforms (i.e., as is), even though parquet data is always in little endian format.

The comments in `WriteableColumnVector.java` say those methods are used for "ieee formatted doubles in platform native endian" (or floats), but since the data in parquet is always in little endian format, use of those methods appears to be inappropriate.

To demonstrate the problem with spark-shell:

```scala
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

var data = Seq(
  (1.0, 0.1),
  (2.0, 0.2),
  (0.3, 3.0),
  (4.0, 4.0),
  (5.0, 5.0))

var df = spark.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet("/tmp/data.parquet2")
var df2 = spark.read.parquet("/tmp/data.parquet2")
df2.show()
```

result:

```scala
+--------------------+--------------------+
|                  _1|                  _2|
+--------------------+--------------------+
|           3.16E-322|-1.54234871366845...|
|         2.0553E-320|         2.0553E-320|
|          2.561E-320|          2.561E-320|
|4.66726145843124E-62|         1.0435E-320|
|        3.03865E-319|-1.54234871366757...|
+--------------------+--------------------+
```

Also tests in ParquetIOSuite that involve float/double data would fail, e.g.,

- basic data types (without binary)
- read raw Parquet file

/examples/src/main/python/mllib/isotonic_regression_example.py would fail as well.

Purposed code change is to add `putDoublesLittleEndian()` and `putFloatsLittleEndian()` methods for parquet to invoke, just like the existing `putIntsLittleEndian()` and `putLongsLittleEndian()`.  On little endian platforms they would call `putDoubles()` and `putFloats()`, on big endian they would read the entries as little endian like pre-SPARK-26985.

No new unit-test is introduced as the existing ones are actually sufficient.

### Why are the changes needed?
RLE float/double data in parquet files will not be read back correctly on big endian platforms.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
All unit tests (mvn test) were ran and OK.

Closes #29419 from tinhto-000/SPARK-31703-2.4.

Lead-authored-by: Tin Hang To <tinto@us.ibm.com>
Co-authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Aug 13, 2020
1 parent b9595c3 commit e5bef51
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public final void readFloats(int total, WritableColumnVector c, int rowId) {

if (buffer.hasArray()) {
int offset = buffer.arrayOffset() + buffer.position();
c.putFloats(rowId, total, buffer.array(), offset);
c.putFloatsLittleEndian(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putFloat(rowId + i, buffer.getFloat());
Expand All @@ -118,7 +118,7 @@ public final void readDoubles(int total, WritableColumnVector c, int rowId) {

if (buffer.hasArray()) {
int offset = buffer.arrayOffset() + buffer.position();
c.putDoubles(rowId, total, buffer.array(), offset);
c.putDoublesLittleEndian(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putDouble(rowId + i, buffer.getDouble());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,19 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
}
}

@Override
public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
if (!bigEndianPlatform) {
putFloats(rowId, count, src, srcIndex);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
long offset = data + 4L * rowId;
for (int i = 0; i < count; ++i, offset += 4) {
Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
}
}
}

@Override
public float getFloat(int rowId) {
if (dictionary == null) {
Expand Down Expand Up @@ -480,6 +493,19 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
}
}

@Override
public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
if (!bigEndianPlatform) {
putDoubles(rowId, count, src, srcIndex);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
long offset = data + 8L * rowId;
for (int i = 0; i < count; ++i, offset += 8) {
Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
}
}
}

@Override
public double getDouble(int rowId) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,18 @@ public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
}
}

@Override
public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
if (!bigEndianPlatform) {
putFloats(rowId, count, src, srcIndex);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < count; ++i) {
floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
}
}
}

@Override
public float getFloat(int rowId) {
if (dictionary == null) {
Expand Down Expand Up @@ -452,6 +464,18 @@ public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
}
}

@Override
public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
if (!bigEndianPlatform) {
putDoubles(rowId, count, src, srcIndex);
} else {
ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < count; ++i) {
doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
}
}
}

@Override
public double getDouble(int rowId) {
if (dictionary == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) {
*/
public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, rowId + count)
* The data in src must be ieee formatted floats in little endian.
*/
public abstract void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets `value` to the value at rowId.
*/
Expand All @@ -331,6 +337,12 @@ public WritableColumnVector reserveDictionaryIds(int capacity) {
*/
public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, rowId + count)
* The data in src must be ieee formatted doubles in little endian.
*/
public abstract void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex);

/**
* Puts a byte array that already exists in this column.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Little Endian floats
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0))
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4))
}

column.putFloats(idx, 1, buffer, 4)
column.putFloats(idx + 1, 1, buffer, 0)
reference += 1.123f
Expand Down Expand Up @@ -488,6 +481,57 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}

testVector("[SPARK-31703] Float API - Little Endian", 1024, FloatType) {
column =>
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Float]

var idx = 0

val littleEndian = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
littleEndian.putFloat(0, 1.357f)
littleEndian.putFloat(4, 2.468f)
val arr = new Array[Byte](littleEndian.remaining)
littleEndian.get(arr)

column.putFloatsLittleEndian(idx, 1, arr, 4)
column.putFloatsLittleEndian(idx + 1, 1, arr, 0)
reference += 2.468f
reference += 1.357f
idx += 2

column.putFloatsLittleEndian(idx, 2, arr, 0)
reference += 1.357f
reference += 2.468f
idx += 2

while (idx < column.capacity) {
val single = random.nextBoolean()
if (single) {
val v = random.nextFloat()
column.putFloat(idx, v)
reference += v
idx += 1
} else {
val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
val v = random.nextFloat()
column.putFloats(idx, n, v)
var i = 0
while (i < n) {
reference += v
i += 1
}
idx += n
}
}

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getFloat(v._2),
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
}
}

testVector("Double APIs", 1024, DoubleType) {
column =>
val seed = System.currentTimeMillis()
Expand Down Expand Up @@ -528,13 +572,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Little Endian doubles
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8))
}

column.putDoubles(idx, 1, buffer, 8)
column.putDoubles(idx + 1, 1, buffer, 0)
reference += 1.123
Expand Down Expand Up @@ -572,6 +609,57 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}

testVector("[SPARK-31703] Double API - Little Endian", 1024, DoubleType) {
column =>
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Double]

var idx = 0

val littleEndian = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN)
littleEndian.putDouble(0, 1.357)
littleEndian.putDouble(8, 2.468)
val arr = new Array[Byte](littleEndian.remaining)
littleEndian.get(arr)

column.putDoublesLittleEndian(idx, 1, arr, 8)
column.putDoublesLittleEndian(idx + 1, 1, arr, 0)
reference += 2.468
reference += 1.357
idx += 2

column.putDoublesLittleEndian(idx, 2, arr, 0)
reference += 1.357
reference += 2.468
idx += 2

while (idx < column.capacity) {
val single = random.nextBoolean()
if (single) {
val v = random.nextDouble()
column.putDouble(idx, v)
reference += v
idx += 1
} else {
val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
val v = random.nextDouble()
column.putDoubles(idx, n, v)
var i = 0
while (i < n) {
reference += v
i += 1
}
idx += n
}
}

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getDouble(v._2),
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
}
}

testVector("String APIs", 7, StringType) {
column =>
val reference = mutable.ArrayBuffer.empty[String]
Expand Down

0 comments on commit e5bef51

Please sign in to comment.