Skip to content

Commit

Permalink
[SPARK-13537][SQL] Fix readBytes in VectorizedPlainValuesReader
Browse files Browse the repository at this point in the history
JIRA: https://issues.apache.org/jira/browse/SPARK-13537

## What changes were proposed in this pull request?

In readBytes of VectorizedPlainValuesReader, we use buffer[offset] to access bytes in buffer. It is incorrect because offset is added with Platform.BYTE_ARRAY_OFFSET when initialization. We should fix it.

## How was this patch tested?

`ParquetHadoopFsRelationSuite` sometimes (depending on the randomly generated data) will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52136/consoleFull) by this bug. After applying this, the test can be passed.

I added a test to `ParquetHadoopFsRelationSuite` with the data which will fail without this patch.

The error exception:

    [info] ParquetHadoopFsRelationSuite:
    [info] - test all data types - StringType (440 milliseconds)
    [info] - test all data types - BinaryType (434 milliseconds)
    [info] - test all data types - BooleanType (406 milliseconds)
    20:59:38.618 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 2597.0 (TID 67966)
    java.lang.ArrayIndexOutOfBoundsException: 46
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBytes(VectorizedPlainValuesReader.java:88)

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#11418 from viirya/fix-readbytes.
  • Loading branch information
viirya authored and roygao94 committed Mar 22, 2016
1 parent d0c255d commit 9784be6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public final void readBytes(int total, ColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
c.putByte(rowId + i, buffer[offset]);
c.putByte(rowId + i, Platform.getByte(buffer, offset));
offset += 4;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,37 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}

test(s"SPARK-13537: Fix readBytes in VectorizedPlainValuesReader") {
withTempPath { file =>
val path = file.getCanonicalPath

val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", ByteType, nullable = true)

val data = Seq(Row(1, -33.toByte), Row(2, 0.toByte), Row(3, -55.toByte), Row(4, 56.toByte),
Row(5, 127.toByte), Row(6, -44.toByte), Row(7, 23.toByte), Row(8, -95.toByte),
Row(9, 127.toByte), Row(10, 13.toByte))

val rdd = sqlContext.sparkContext.parallelize(data)
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)

df.write
.mode("overwrite")
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.save(path)

val loadedDF = sqlContext
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.schema(df.schema)
.load(path)
.orderBy("index")

checkAnswer(loadedDF, df)
}
}
}

0 comments on commit 9784be6

Please sign in to comment.