From f6db40d66619626ea99fcfe8593df0f05e957903 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Wed, 20 Mar 2024 23:40:41 +0800 Subject: [PATCH] [SPARK-46990][SQL] Fix loading empty Avro files emitted by event-hubs ### What changes were proposed in this pull request? This PR fixes a regression introduced by [SPARK-46633](https://issues.apache.org/jira/browse/SPARK-46633), commit: https://github.com/apache/spark/commit/3a6b9adc21c25b01746cc31f3b75fe061a63204c where one could not read an empty Avro file as the reader would be stuck in an infinite loop. I reverted the reader code to the pre-SPARK-46633 version and updated handling for empty blocks. When reading empty blocks in Avro, `blockRemaining` could still be read as 0. Call to `hasNext` status would load the next block but would still return false because of the final check `blockRemaining != 0`. Calling the method again picks up the next non-empty block and seems to fix the issue. ### Why are the changes needed? Fixes a regression introduced in [SPARK-46633](https://issues.apache.org/jira/browse/SPARK-46633). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test to verify that empty files can be read correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45578 from sadikovi/SPARK-46990. Authored-by: Ivan Sadikov Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/avro/AvroUtils.scala | 14 ++++++++------ connector/avro/src/test/resources/empty_file.avro | Bin 0 -> 508 bytes .../org/apache/spark/sql/avro/AvroSuite.scala | 10 ++++++++++ 3 files changed, 18 insertions(+), 6 deletions(-) create mode 100644 connector/avro/src/test/resources/empty_file.avro diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index c1365d1b5ae1c..269a8b80c2b77 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -197,19 +197,21 @@ private[sql] object AvroUtils extends Logging { def hasNextRow: Boolean = { while (!completed && currentRow.isEmpty) { - if (fileReader.pastSync(stopPosition)) { + // In the case of empty blocks in an Avro file, `blockRemaining` could still read as 0 so + // `fileReader.hasNext()` returns false but advances the cursor to the next block, so we + // need to call `fileReader.hasNext()` again to correctly report if the next record + // exists. + val moreData = + (fileReader.hasNext || fileReader.hasNext) && !fileReader.pastSync(stopPosition) + if (!moreData) { fileReader.close() completed = true currentRow = None - } else if (fileReader.hasNext()) { + } else { val record = fileReader.next() // the row must be deserialized in hasNextRow, because AvroDeserializer#deserialize // potentially filters rows currentRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] - } else { - // In this case, `fileReader.hasNext()` returns false but we are not past sync point yet. - // This means empty blocks, we need to continue reading the file in case there are non - // empty blocks or we are past sync point. } } currentRow.isDefined diff --git a/connector/avro/src/test/resources/empty_file.avro b/connector/avro/src/test/resources/empty_file.avro new file mode 100644 index 0000000000000000000000000000000000000000..9c7e15352149b4fdd1039ed98f8e6fe81949ef36 GIT binary patch literal 508 zcmb`EKTg9i6vhiehyx%t7E5eV7PgLsj-{Z$6m@Xymty2U!S6XzROJFlEZl&5FmVzN zzyTO>N=S>4m>A#p{rtVp_Pn^;**oO5cd2j^#DNW3AFXE;w_y1D{)$mk0UI+9!g7@sWHNxkXHEObQ^cm4O=)f1z1}1m@Da3Op&mfgNsacooH( z>k!>>&enAjCS52rM5kY$u%e+zEZVIi|Ck$JJpRhh_=%IvP&oerHa!;Y= s"$maxPartitionBytes") { + val file = getResourceAvroFilePath("empty_file.avro") + val df = spark.read.format("avro").load(file) + assert(df.count() == 0) + } + } + } } class AvroV1Suite extends AvroSuite {