From abc509e12d9162bad048dce40fef2f8865ff2194 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Sun, 25 Sep 2022 23:42:44 +0800 Subject: [PATCH] [HUDI-4915] improve avro serializer/deserializer (#6788) --- .../spark/sql/avro/AvroDeserializer.scala | 20 +++++++------------ .../spark/sql/avro/AvroSerializer.scala | 17 ++++++---------- .../spark/sql/avro/AvroDeserializer.scala | 20 +++++++------------ .../spark/sql/avro/AvroSerializer.scala | 19 +++++++----------- .../spark/sql/avro/AvroDeserializer.scala | 20 +++++++------------ .../spark/sql/avro/AvroSerializer.scala | 19 +++++++----------- .../spark/sql/avro/AvroDeserializer.scala | 20 +++++++------------ .../spark/sql/avro/AvroSerializer.scala | 19 +++++++----------- 8 files changed, 55 insertions(+), 99 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 9725fb63f555..921e6deb58f1 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -49,33 +49,27 @@ import scala.collection.mutable.ArrayBuffer class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { private lazy val decimalConversions = new DecimalConversion() - private val converter: Any => Any = rootCatalystType match { + def deserialize(data: Any): Any = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - (data: Any) => InternalRow.empty + InternalRow.empty case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val writer = getRecordWriter(rootAvroType, st, Nil) - (data: Any) => { - val record = data.asInstanceOf[GenericRecord] - writer(fieldUpdater, record) - resultRow - } + val record = data.asInstanceOf[GenericRecord] + writer(fieldUpdater, record) + resultRow case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil) - (data: Any) => { - writer(fieldUpdater, 0, data) - tmpRow.get(0, rootCatalystType) - } + writer(fieldUpdater, 0, data) + tmpRow.get(0, rootCatalystType) } - def deserialize(data: Any): Any = converter(data) - /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 2b88be81656f..e0c734413870 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -47,10 +47,6 @@ import org.apache.spark.sql.types._ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) { def serialize(catalystData: Any): Any = { - converter.apply(catalystData) - } - - private val converter: Any => Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = rootCatalystType match { case st: StructType => @@ -63,14 +59,13 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: converter.apply(tmpRow, 0) } if (nullable) { - (data: Any) => - if (data == null) { - null - } else { - baseConverter.apply(data) - } + if (catalystData == null) { + null + } else { + baseConverter.apply(catalystData) + } } else { - baseConverter + baseConverter.apply(catalystData) } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 5fb6d907bdc8..61482ab96f3f 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -69,34 +69,28 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, private val timestampRebaseFunc = createTimestampRebaseFuncInRead( datetimeRebaseMode, "Avro") - private val converter: Any => Option[Any] = rootCatalystType match { + def deserialize(data: Any): Option[Any] = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - (data: Any) => Some(InternalRow.empty) + Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val applyFilters = filters.skipRow(resultRow, _) val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters) - (data: Any) => { - val record = data.asInstanceOf[GenericRecord] - val skipRow = writer(fieldUpdater, record) - if (skipRow) None else Some(resultRow) - } + val record = data.asInstanceOf[GenericRecord] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil) - (data: Any) => { - writer(fieldUpdater, 0, data) - Some(tmpRow.get(0, rootCatalystType)) - } + writer(fieldUpdater, 0, data) + Some(tmpRow.get(0, rootCatalystType)) } - def deserialize(data: Any): Option[Any] = converter(data) - /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 36d86c1e01f0..2397186a17fe 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -57,17 +57,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))) } - def serialize(catalystData: Any): Any = { - converter.apply(catalystData) - } - private val dateRebaseFunc = createDateRebaseFuncInWrite( datetimeRebaseMode, "Avro") private val timestampRebaseFunc = createTimestampRebaseFuncInWrite( datetimeRebaseMode, "Avro") - private val converter: Any => Any = { + def serialize(catalystData: Any): Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = rootCatalystType match { case st: StructType => @@ -80,14 +76,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, converter.apply(tmpRow, 0) } if (nullable) { - (data: Any) => - if (data == null) { - null - } else { - baseConverter.apply(data) - } + if (catalystData == null) { + null + } else { + baseConverter.apply(catalystData) + } } else { - baseConverter + baseConverter.apply(catalystData) } } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 0b609330756e..9f3b60b8c30c 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -72,39 +72,33 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro") - private val converter: Any => Option[Any] = try { + def deserialize(data: Any): Option[Any] = try { rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - (_: Any) => Some(InternalRow.empty) + Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val applyFilters = filters.skipRow(resultRow, _) val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters) - (data: Any) => { - val record = data.asInstanceOf[GenericRecord] - val skipRow = writer(fieldUpdater, record) - if (skipRow) None else Some(resultRow) - } + val record = data.asInstanceOf[GenericRecord] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil) - (data: Any) => { - writer(fieldUpdater, 0, data) - Some(tmpRow.get(0, rootCatalystType)) - } + writer(fieldUpdater, 0, data) + Some(tmpRow.get(0, rootCatalystType)) } } catch { case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException( s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise) } - def deserialize(data: Any): Option[Any] = converter(data) - /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index ba9812b02674..0f9b60c70163 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -66,17 +66,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE))) } - def serialize(catalystData: Any): Any = { - converter.apply(catalystData) - } - private val dateRebaseFunc = createDateRebaseFuncInWrite( datetimeRebaseMode, "Avro") private val timestampRebaseFunc = createTimestampRebaseFuncInWrite( datetimeRebaseMode, "Avro") - private val converter: Any => Any = { + def serialize(catalystData: Any): Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = try { rootCatalystType match { @@ -94,14 +90,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise) } if (nullable) { - (data: Any) => - if (data == null) { - null - } else { - baseConverter.apply(data) - } + if (catalystData == null) { + null + } else { + baseConverter.apply(catalystData) + } } else { - baseConverter + baseConverter.apply(catalystData) } } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 5e7bab3e51fb..0b00b6d1ab03 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -71,39 +71,33 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro") - private val converter: Any => Option[Any] = try { + def deserialize(data: Any): Option[Any] = try { rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - (_: Any) => Some(InternalRow.empty) + Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val applyFilters = filters.skipRow(resultRow, _) val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters) - (data: Any) => { - val record = data.asInstanceOf[GenericRecord] - val skipRow = writer(fieldUpdater, record) - if (skipRow) None else Some(resultRow) - } + val record = data.asInstanceOf[GenericRecord] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil) - (data: Any) => { - writer(fieldUpdater, 0, data) - Some(tmpRow.get(0, rootCatalystType)) - } + writer(fieldUpdater, 0, data) + Some(tmpRow.get(0, rootCatalystType)) } } catch { case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException( s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise) } - def deserialize(data: Any): Option[Any] = converter(data) - /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 450d9d73465c..dfa970f57314 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -65,17 +65,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE))) } - def serialize(catalystData: Any): Any = { - converter.apply(catalystData) - } - private val dateRebaseFunc = createDateRebaseFuncInWrite( datetimeRebaseMode, "Avro") private val timestampRebaseFunc = createTimestampRebaseFuncInWrite( datetimeRebaseMode, "Avro") - private val converter: Any => Any = { + def serialize(catalystData: Any): Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = try { rootCatalystType match { @@ -93,14 +89,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise) } if (nullable) { - (data: Any) => - if (data == null) { - null - } else { - baseConverter.apply(data) - } + if (catalystData == null) { + null + } else { + baseConverter.apply(catalystData) + } } else { - baseConverter + baseConverter.apply(catalystData) } }