Skip to content

Commit

Permalink
Revert "[HUDI-4915] improve avro serializer/deserializer (#6788)"
Browse files Browse the repository at this point in the history
This reverts commit 79b3e2b.
  • Loading branch information
YannByron committed Sep 27, 2022
1 parent 28e3db1 commit 0c2ec8d
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,33 @@ import scala.collection.mutable.ArrayBuffer
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private lazy val decimalConversions = new DecimalConversion()

def deserialize(data: Any): Any = rootCatalystType match {
private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
InternalRow.empty
(data: Any) => InternalRow.empty

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val writer = getRecordWriter(rootAvroType, st, Nil)
val record = data.asInstanceOf[GenericRecord]
writer(fieldUpdater, record)
resultRow
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
writer(fieldUpdater, record)
resultRow
}

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil)
writer(fieldUpdater, 0, data)
tmpRow.get(0, rootCatalystType)
(data: Any) => {
writer(fieldUpdater, 0, data)
tmpRow.get(0, rootCatalystType)
}
}

def deserialize(data: Any): Any = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ import org.apache.spark.sql.types._
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
case st: StructType =>
Expand All @@ -59,13 +63,14 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
converter.apply(tmpRow, 0)
}
if (nullable) {
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
} else {
baseConverter.apply(catalystData)
baseConverter
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,34 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
private val timestampRebaseFunc = createTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")

def deserialize(data: Any): Option[Any] = rootCatalystType match {
private val converter: Any => Option[Any] = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
Some(InternalRow.empty)
(data: Any) => Some(InternalRow.empty)

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters)
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
}

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil)
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
(data: Any) => {
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
}

def deserialize(data: Any): Option[Any] = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

def serialize(catalystData: Any): Any = {
private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
case st: StructType =>
Expand All @@ -76,13 +80,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
converter.apply(tmpRow, 0)
}
if (nullable) {
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
} else {
baseConverter.apply(catalystData)
baseConverter
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,39 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,

private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")

def deserialize(data: Any): Option[Any] = try {
private val converter: Any => Option[Any] = try {
rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
Some(InternalRow.empty)
(_: Any) => Some(InternalRow.empty)

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
}

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
(data: Any) => {
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
}
} catch {
case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
}

def deserialize(data: Any): Option[Any] = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

def serialize(catalystData: Any): Any = {
private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = try {
rootCatalystType match {
Expand All @@ -90,13 +94,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
}
if (nullable) {
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
} else {
baseConverter.apply(catalystData)
baseConverter
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,33 +71,39 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,

private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")

def deserialize(data: Any): Option[Any] = try {
private val converter: Any => Option[Any] = try {
rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
Some(InternalRow.empty)
(_: Any) => Some(InternalRow.empty)

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
}

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
(data: Any) => {
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
}
} catch {
case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
}

def deserialize(data: Any): Option[Any] = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

def serialize(catalystData: Any): Any = {
private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = try {
rootCatalystType match {
Expand All @@ -89,13 +93,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
}
if (nullable) {
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
} else {
baseConverter.apply(catalystData)
baseConverter
}
}

Expand Down

0 comments on commit 0c2ec8d

Please sign in to comment.