Skip to content

Commit

Permalink
[SPARK-2446][SQL] Add BinaryType support to Parquet I/O.
Browse files Browse the repository at this point in the history
Note that this commit changes the semantics when loading in data that was created with prior versions of Spark SQL.  Before, we were writing out strings as Binary data without adding any other annotations. Thus, when data is read in from prior versions, data that was StringType will now become BinaryType.  Users that need strings can CAST that column to a String.  It was decided that while this breaks compatibility, it does make us compatible with other systems (Hive, Thrift, etc) and adds support for Binary data, so this is the right decision long term.

To support `BinaryType`, the following changes are needed:
- Make `StringType` use `OriginalType.UTF8`
- Add `BinaryType` using `PrimitiveTypeName.BINARY` without `OriginalType`

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1373 from ueshin/issues/SPARK-2446 and squashes the following commits:

ecacb92 [Takuya UESHIN] Add BinaryType support to Parquet I/O.
616e04a [Takuya UESHIN] Make StringType use OriginalType.UTF8.
  • Loading branch information
ueshin authored and marmbrus committed Jul 14, 2014
1 parent 3dd8af7 commit 9fe693b
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[sql] object CatalystConverter {
}
}
// All other primitive types use the default converter
case ctype: NativeType => { // note: need the type tag here!
case ctype: PrimitiveType => { // note: need the type tag here!
new CatalystPrimitiveConverter(parent, fieldIndex)
}
case _ => throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
value.asInstanceOf[String].getBytes("utf-8")
)
)
case BinaryType => writer.addBinary(
Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
case ShortType => writer.addInteger(value.asInstanceOf[Short])
case LongType => writer.addLong(value.asInstanceOf[Long])
Expand Down Expand Up @@ -299,6 +301,8 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
record(index).asInstanceOf[String].getBytes("utf-8")
)
)
case BinaryType => writer.addBinary(
Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
case IntegerType => writer.addInteger(record.getInt(index))
case ShortType => writer.addInteger(record.getShort(index))
case LongType => writer.addLong(record.getLong(index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[sql] object ParquetTestData {
"""message myrecord {
optional boolean myboolean;
optional int32 myint;
optional binary mystring;
optional binary mystring (UTF8);
optional int64 mylong;
optional float myfloat;
optional double mydouble;
Expand Down Expand Up @@ -87,7 +87,7 @@ private[sql] object ParquetTestData {
message myrecord {
required boolean myboolean;
required int32 myint;
required binary mystring;
required binary mystring (UTF8);
required int64 mylong;
required float myfloat;
required double mydouble;
Expand Down Expand Up @@ -119,14 +119,14 @@ private[sql] object ParquetTestData {
// so that array types can be translated correctly.
"""
message AddressBook {
required binary owner;
required binary owner (UTF8);
optional group ownerPhoneNumbers {
repeated binary array;
repeated binary array (UTF8);
}
optional group contacts {
repeated group array {
required binary name;
optional binary phoneNumber;
required binary name (UTF8);
optional binary phoneNumber (UTF8);
}
}
}
Expand Down Expand Up @@ -181,16 +181,16 @@ private[sql] object ParquetTestData {
required int32 x;
optional group data1 {
repeated group map {
required binary key;
required binary key (UTF8);
required int32 value;
}
}
required group data2 {
repeated group map {
required binary key;
required binary key (UTF8);
required group value {
required int64 payload1;
optional binary payload2;
optional binary payload2 (UTF8);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,22 @@ private[parquet] object ParquetTypesConverter extends Logging {
def isPrimitiveType(ctype: DataType): Boolean =
classOf[PrimitiveType] isAssignableFrom ctype.getClass

def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
case ParquetPrimitiveTypeName.BINARY => StringType
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType)
case ParquetPrimitiveTypeName.FLOAT => FloatType
case ParquetPrimitiveTypeName.INT32 => IntegerType
case ParquetPrimitiveTypeName.INT64 => LongType
case ParquetPrimitiveTypeName.INT96 =>
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
sys.error("Potential loss of precision: cannot convert INT96")
case _ => sys.error(
s"Unsupported parquet datatype $parquetType")
}
def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
parquetType.getPrimitiveTypeName match {
case ParquetPrimitiveTypeName.BINARY
if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
case ParquetPrimitiveTypeName.BINARY => BinaryType
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
case ParquetPrimitiveTypeName.FLOAT => FloatType
case ParquetPrimitiveTypeName.INT32 => IntegerType
case ParquetPrimitiveTypeName.INT64 => LongType
case ParquetPrimitiveTypeName.INT96 =>
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
sys.error("Potential loss of precision: cannot convert INT96")
case _ => sys.error(
s"Unsupported parquet datatype $parquetType")
}

/**
* Converts a given Parquet `Type` into the corresponding
Expand Down Expand Up @@ -104,7 +106,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}

if (parquetType.isPrimitive) {
toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
toPrimitiveDataType(parquetType.asPrimitiveType)
} else {
val groupType = parquetType.asGroupType()
parquetType.getOriginalType match {
Expand Down Expand Up @@ -164,18 +166,17 @@ private[parquet] object ParquetTypesConverter extends Logging {
* @return The name of the corresponding Parquet primitive type
*/
def fromPrimitiveDataType(ctype: DataType):
Option[ParquetPrimitiveTypeName] = ctype match {
case StringType => Some(ParquetPrimitiveTypeName.BINARY)
case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN)
case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE)
case ArrayType(ByteType) =>
Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
case FloatType => Some(ParquetPrimitiveTypeName.FLOAT)
case IntegerType => Some(ParquetPrimitiveTypeName.INT32)
Option[(ParquetPrimitiveTypeName, Option[ParquetOriginalType])] = ctype match {
case StringType => Some(ParquetPrimitiveTypeName.BINARY, Some(ParquetOriginalType.UTF8))
case BinaryType => Some(ParquetPrimitiveTypeName.BINARY, None)
case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN, None)
case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE, None)
case FloatType => Some(ParquetPrimitiveTypeName.FLOAT, None)
case IntegerType => Some(ParquetPrimitiveTypeName.INT32, None)
// There is no type for Byte or Short so we promote them to INT32.
case ShortType => Some(ParquetPrimitiveTypeName.INT32)
case ByteType => Some(ParquetPrimitiveTypeName.INT32)
case LongType => Some(ParquetPrimitiveTypeName.INT64)
case ShortType => Some(ParquetPrimitiveTypeName.INT32, None)
case ByteType => Some(ParquetPrimitiveTypeName.INT32, None)
case LongType => Some(ParquetPrimitiveTypeName.INT64, None)
case _ => None
}

Expand Down Expand Up @@ -227,17 +228,18 @@ private[parquet] object ParquetTypesConverter extends Logging {
if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
}
val primitiveType = fromPrimitiveDataType(ctype)
if (primitiveType.isDefined) {
new ParquetPrimitiveType(repetition, primitiveType.get, name)
} else {
primitiveType.map {
case (primitiveType, originalType) =>
new ParquetPrimitiveType(repetition, primitiveType, name, originalType.orNull)
}.getOrElse {
ctype match {
case ArrayType(elementType) => {
val parquetElementType = fromDataType(
elementType,
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
nullable = false,
inArray = true)
ConversionPatterns.listType(repetition, name, parquetElementType)
ConversionPatterns.listType(repetition, name, parquetElementType)
}
case StructType(structFields) => {
val fields = structFields.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ case class AllDataTypes(
doubleField: Double,
shortField: Short,
byteField: Byte,
booleanField: Boolean)
booleanField: Boolean,
binaryField: Array[Byte])

case class AllDataTypesWithNonPrimitiveType(
stringField: String,
Expand All @@ -76,6 +77,7 @@ case class AllDataTypesWithNonPrimitiveType(
shortField: Short,
byteField: Byte,
booleanField: Boolean,
binaryField: Array[Byte],
array: Seq[Int],
map: Map[Int, String],
data: Data)
Expand Down Expand Up @@ -116,7 +118,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
TestSQLContext.sparkContext.parallelize(range)
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
Expand All @@ -129,6 +132,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
}
}

Expand All @@ -138,6 +142,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.sparkContext.parallelize(range)
.map(x => AllDataTypesWithNonPrimitiveType(
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray,
(0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
Expand All @@ -151,9 +156,10 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 until i))
assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap)
assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
assert(result(i)(9) === (0 until i))
assert(result(i)(10) === (0 until i).map(i => i -> s"$i").toMap)
assert(result(i)(11) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
}
}

Expand Down

0 comments on commit 9fe693b

Please sign in to comment.