Skip to content

Commit

Permalink
[SPARK-23390][SQL] Flaky Test Suite: FileBasedDataSourceSuite in Spar…
Browse files Browse the repository at this point in the history
…k 2.3/hadoop 2.7

## What changes were proposed in this pull request?

This test only fails with sbt on Hadoop 2.7, I can't reproduce it locally, but here is my speculation by looking at the code:
1. FileSystem.delete doesn't delete the directory entirely, somehow we can still open the file as a 0-length empty file.(just speculation)
2. ORC intentionally allow empty files, and the reader fails during reading without closing the file stream.

This PR improves the test to make sure all files are deleted and can't be opened.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20584 from cloud-fan/flaky-test.
  • Loading branch information
cloud-fan authored and sameeragarwal committed Feb 12, 2018
1 parent c0c902a commit 6efd5d1
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import java.io.FileNotFoundException

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
Expand Down Expand Up @@ -102,17 +104,27 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
def testIgnoreMissingFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath

Seq("0").toDF("a").write.format(format).save(new Path(basePath, "first").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "second").toString)

val thirdPath = new Path(basePath, "third")
val fs = thirdPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
Seq("2").toDF("a").write.format(format).save(thirdPath.toString)
val files = fs.listStatus(thirdPath).filter(_.isFile).map(_.getPath)

val df = spark.read.format(format).load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)

val fs = thirdPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
// Make sure all data files are deleted and can't be opened.
files.foreach(f => fs.delete(f, false))
assert(fs.delete(thirdPath, true))
for (f <- files) {
intercept[FileNotFoundException](fs.open(f))
}

checkAnswer(df, Seq(Row("0"), Row("1")))
}
}
Expand Down

0 comments on commit 6efd5d1

Please sign in to comment.