Skip to content

Commit

Permalink
[SPARK-46990][SQL] Fix loading empty Avro files emitted by event-hubs
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR fixes a regression introduced by [SPARK-46633](https://issues.apache.org/jira/browse/SPARK-46633), commit: 3a6b9ad where one could not read an empty Avro file as the reader would be stuck in an infinite loop.

I reverted the reader code to the pre-SPARK-46633 version and updated handling for empty blocks. When reading empty blocks in Avro, `blockRemaining` could still be read as 0. Call to `hasNext` status would load the next block but would still return false because of the final check `blockRemaining != 0`. Calling the method again picks up the next non-empty block and seems to fix the issue.

### Why are the changes needed?

Fixes a regression introduced in [SPARK-46633](https://issues.apache.org/jira/browse/SPARK-46633).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a unit test to verify that empty files can be read correctly.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45578 from sadikovi/SPARK-46990.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
sadikovi authored and cloud-fan committed Mar 20, 2024
1 parent 8762e25 commit 7626080
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,21 @@ private[sql] object AvroUtils extends Logging {

def hasNextRow: Boolean = {
while (!completed && currentRow.isEmpty) {
if (fileReader.pastSync(stopPosition)) {
// In the case of empty blocks in an Avro file, `blockRemaining` could still read as 0 so
// `fileReader.hasNext()` returns false but advances the cursor to the next block, so we
// need to call `fileReader.hasNext()` again to correctly report if the next record
// exists.
val moreData =
(fileReader.hasNext || fileReader.hasNext) && !fileReader.pastSync(stopPosition)
if (!moreData) {
fileReader.close()
completed = true
currentRow = None
} else if (fileReader.hasNext()) {
} else {
val record = fileReader.next()
// the row must be deserialized in hasNextRow, because AvroDeserializer#deserialize
// potentially filters rows
currentRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]]
} else {
// In this case, `fileReader.hasNext()` returns false but we are not past sync point yet.
// This means empty blocks, we need to continue reading the file in case there are non
// empty blocks or we are past sync point.
}
}
currentRow.isDefined
Expand Down
Binary file added connector/avro/src/test/resources/empty_file.avro
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,16 @@ abstract class AvroSuite
}
}
}

test("SPARK-46990: read an empty file where pastSync returns false at EOF") {
for (maxPartitionBytes <- Seq(100, 100000, 100000000)) {
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> s"$maxPartitionBytes") {
val file = getResourceAvroFilePath("empty_file.avro")
val df = spark.read.format("avro").load(file)
assert(df.count() == 0)
}
}
}
}

class AvroV1Suite extends AvroSuite {
Expand Down

0 comments on commit 7626080

Please sign in to comment.