Skip to content

Commit

Permalink
Change to check only filename match, not check dir recursively
Browse files Browse the repository at this point in the history
  • Loading branch information
Hirobe Keiichi committed Dec 29, 2018
1 parent 1b64ffb commit a95637e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -562,24 +561,18 @@ case class DataSource(
}.toSeq

if (checkFilesExist) {
val (allLeafPath, ignoredPath) = {
val pathFilter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
allGlobPath, hadoopConf, pathFilter, sparkSession)
val paths = discovered.map { case (_, leaf, ignored) =>
(leaf.map(_.getPath), ignored.map(_.getPath))
}
(paths.flatMap(_._1), paths.flatMap(_._2))
val (filtered, filteredOut) = allGlobPath.partition { path =>
!InMemoryFileIndex.shouldFilterOut(path.getName)
}
if (ignoredPath.nonEmpty) {
if (allLeafPath.isEmpty) {
if (filteredOut.nonEmpty) {
if (filtered.isEmpty) {
throw new AnalysisException(
"All files were ignored. The following files were ignored during file scan:\n" +
s"${ignoredPath.mkString("\n ")}")
"All path were ignored. The following path were ignored:\n" +
s"${filteredOut.mkString("\n ")}")
} else {
logDebug(
"The following files were ignored during file scan:\n" +
s"${ignoredPath.mkString("\n ")}")
"The following path were ignored:\n" +
s"${filteredOut.mkString("\n ")}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class InMemoryFileIndex(
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
pathsToFetch, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles, _) =>
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
output ++= leafFiles
Expand Down Expand Up @@ -160,20 +160,18 @@ object InMemoryFileIndex extends Logging {
*
* This may only be called on the driver.
*
* @return for each input path, the set of discovered files and ignored
* files/directories for the path
* @return for each input path, the set of discovered files for the path
*/
private[sql] def bulkListLeafFiles(
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus], Seq[FileStatus])] = {
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
val listed = listLeafFiles(path, hadoopConf, filter, Some(sparkSession))
(path, listed._1, listed._2)
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
}
}

Expand Down Expand Up @@ -206,10 +204,9 @@ object InMemoryFileIndex extends Logging {
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
val listed = listLeafFiles(path, hadoopConf, filter, None)
(path, listed._1, listed._2)
(path, listLeafFiles(path, hadoopConf, filter, None))
}.iterator
}.map { case (path, statuses, ignoredStatuses) =>
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
Expand All @@ -236,14 +233,14 @@ object InMemoryFileIndex extends Logging {
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses, ignoredStatuses)
(path.toString, serializableStatuses)
}.collect()
} finally {
sparkContext.setJobDescription(previousJobDescription)
}

// turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses, ignoredStatuses) =>
statusMap.map { case (path, serializableStatuses) =>
val statuses = serializableStatuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
Expand All @@ -254,7 +251,7 @@ object InMemoryFileIndex extends Logging {
new Path(f.path)),
blockLocations)
}
(new Path(path), statuses, ignoredStatuses)
(new Path(path), statuses)
}
}

Expand All @@ -270,7 +267,7 @@ object InMemoryFileIndex extends Logging {
path: Path,
hadoopConf: Configuration,
filter: PathFilter,
sessionOpt: Option[SparkSession]): (Seq[FileStatus], Seq[FileStatus]) = {
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)

Expand All @@ -283,32 +280,23 @@ object InMemoryFileIndex extends Logging {
Array.empty[FileStatus]
}

val (filteredStatuses, ignoredStatuses) = statuses.partition(
status => !shouldFilterOut(status.getPath.getName))
val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))

val (allLeafStatuses, allIgnoredStatuses) = {
val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nested: (Seq[FileStatus], Seq[FileStatus]) = sessionOpt match {
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
val listed = bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session)
(listed.flatMap(_._2), listed.flatMap(_._3))
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
val listed = dirs.map(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
(listed.flatMap(_._1), listed.flatMap(_._2))
}
val allFiles = topLevelFiles ++ nested._1
val ignoredFiles = ignoredStatuses ++ nested._2
if (filter != null) {
val (filtered, ignored) = allFiles.partition(f => filter.accept(f.getPath))
(filtered, ignoredFiles ++ ignored)
} else {
(allFiles, ignoredFiles)
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}
val missingFiles = mutable.ArrayBuffer.empty[String]
val (filteredLeafStatuses, ignoredLeafStatuses) = allLeafStatuses.partition(
status => !shouldFilterOut(status.getPath.getName))

val missingFiles = mutable.ArrayBuffer.empty[String]
val filteredLeafStatuses = allLeafStatuses.filterNot(
status => shouldFilterOut(status.getPath.getName))
val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)
Expand Down Expand Up @@ -353,7 +341,7 @@ object InMemoryFileIndex extends Logging {
s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}")
}

(resolvedLeafStatuses, allIgnoredStatuses ++ ignoredLeafStatuses)
resolvedLeafStatuses
}

/** Checks if we should filter out this path name. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(result.schema.fieldNames.size === 1)
}

test("SPARK-26339 Debug statement if some of the files are filtered out") {
test("SPARK-26339 Debug statement if some of specified paths are filtered out") {
class TestAppender extends AppenderSkeleton {
var events = new java.util.ArrayList[LoggingEvent]
override def close(): Unit = {}
Expand All @@ -373,19 +373,18 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
assert(testAppender1.events.asScala
.exists(msg => msg.getRenderedMessage.contains(
"The following files were ignored during file scan:")))
"The following path were ignored:")))
}

test("SPARK-26339 Throw an exception only if all of the files are filtered out") {
test("SPARK-26339 Throw an exception only if all of the specified paths are filtered out") {
val e = intercept[AnalysisException] {
val cars = spark
.read
.format("csv")
.option("header", "false")
.load(testFile(carsFilteredOutFile))
}.getMessage
assert(e.contains("All files were ignored. " +
"The following files were ignored during file scan:"))
assert(e.contains("All path were ignored. The following path were ignored:"))
}

test("DDL test with empty file") {
Expand Down

0 comments on commit a95637e

Please sign in to comment.