Skip to content

Commit

Permalink
Log a debug statement about files/directories that are ignored, and t…
Browse files Browse the repository at this point in the history
…hrow an exception only if all of the files are filtered out
  • Loading branch information
Hirobe Keiichi committed Dec 27, 2018
1 parent c0a57d9 commit 1b64ffb
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -542,7 +543,7 @@ case class DataSource(
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
allPaths.flatMap { path =>
val allGlobPath = allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Expand All @@ -559,6 +560,31 @@ case class DataSource(
}
globPath
}.toSeq

if (checkFilesExist) {
val (allLeafPath, ignoredPath) = {
val pathFilter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
allGlobPath, hadoopConf, pathFilter, sparkSession)
val paths = discovered.map { case (_, leaf, ignored) =>
(leaf.map(_.getPath), ignored.map(_.getPath))
}
(paths.flatMap(_._1), paths.flatMap(_._2))
}
if (ignoredPath.nonEmpty) {
if (allLeafPath.isEmpty) {
throw new AnalysisException(
"All files were ignored. The following files were ignored during file scan:\n" +
s"${ignoredPath.mkString("\n ")}")
} else {
logDebug(
"The following files were ignored during file scan:\n" +
s"${ignoredPath.mkString("\n ")}")
}
}
}

allGlobPath
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class InMemoryFileIndex(
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
pathsToFetch, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
discovered.foreach { case (path, leafFiles, _) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
output ++= leafFiles
Expand Down Expand Up @@ -160,18 +160,20 @@ object InMemoryFileIndex extends Logging {
*
* This may only be called on the driver.
*
* @return for each input path, the set of discovered files for the path
* @return for each input path, the set of discovered files and ignored
* files/directories for the path
*/
private[sql] def bulkListLeafFiles(
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus], Seq[FileStatus])] = {

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
val listed = listLeafFiles(path, hadoopConf, filter, Some(sparkSession))
(path, listed._1, listed._2)
}
}

Expand Down Expand Up @@ -204,9 +206,10 @@ object InMemoryFileIndex extends Logging {
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
val listed = listLeafFiles(path, hadoopConf, filter, None)
(path, listed._1, listed._2)
}.iterator
}.map { case (path, statuses) =>
}.map { case (path, statuses, ignoredStatuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
Expand All @@ -233,14 +236,14 @@ object InMemoryFileIndex extends Logging {
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
(path.toString, serializableStatuses, ignoredStatuses)
}.collect()
} finally {
sparkContext.setJobDescription(previousJobDescription)
}

// turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses) =>
statusMap.map { case (path, serializableStatuses, ignoredStatuses) =>
val statuses = serializableStatuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
Expand All @@ -251,7 +254,7 @@ object InMemoryFileIndex extends Logging {
new Path(f.path)),
blockLocations)
}
(new Path(path), statuses)
(new Path(path), statuses, ignoredStatuses)
}
}

Expand All @@ -267,7 +270,7 @@ object InMemoryFileIndex extends Logging {
path: Path,
hadoopConf: Configuration,
filter: PathFilter,
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
sessionOpt: Option[SparkSession]): (Seq[FileStatus], Seq[FileStatus]) = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)

Expand All @@ -280,23 +283,32 @@ object InMemoryFileIndex extends Logging {
Array.empty[FileStatus]
}

val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
val (filteredStatuses, ignoredStatuses) = statuses.partition(
status => !shouldFilterOut(status.getPath.getName))

val allLeafStatuses = {
val (allLeafStatuses, allIgnoredStatuses) = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
val nested: (Seq[FileStatus], Seq[FileStatus]) = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
val listed = bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session)
(listed.flatMap(_._2), listed.flatMap(_._3))
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
val listed = dirs.map(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
(listed.flatMap(_._1), listed.flatMap(_._2))
}
val allFiles = topLevelFiles ++ nested._1
val ignoredFiles = ignoredStatuses ++ nested._2
if (filter != null) {
val (filtered, ignored) = allFiles.partition(f => filter.accept(f.getPath))
(filtered, ignoredFiles ++ ignored)
} else {
(allFiles, ignoredFiles)
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}

val missingFiles = mutable.ArrayBuffer.empty[String]
val filteredLeafStatuses = allLeafStatuses.filterNot(
status => shouldFilterOut(status.getPath.getName))
val (filteredLeafStatuses, ignoredLeafStatuses) = allLeafStatuses.partition(
status => !shouldFilterOut(status.getPath.getName))

val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)
Expand Down Expand Up @@ -341,7 +353,7 @@ object InMemoryFileIndex extends Logging {
s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}")
}

resolvedLeafStatuses
(resolvedLeafStatuses, allIgnoredStatuses ++ ignoredLeafStatuses)
}

/** Checks if we should filter out this path name. */
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/test-data/_cars.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

year,make,model,comment,blank
"2012","Tesla","S","No comment",

1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.util.Properties
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.log4j.{AppenderSkeleton, LogManager}
import org.apache.log4j.{AppenderSkeleton, Level, LogManager}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.{SparkException, TestUtils}
Expand All @@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
private val carsEmptyValueFile = "test-data/cars-empty-value.csv"
private val carsBlankColName = "test-data/cars-blank-column-name.csv"
private val carsCrlf = "test-data/cars-crlf.csv"
private val carsFilteredOutFile = "test-data/_cars.csv"
private val emptyFile = "test-data/empty.csv"
private val commentsFile = "test-data/comments.csv"
private val disableCommentsFile = "test-data/disable_comments.csv"
Expand Down Expand Up @@ -345,6 +346,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(result.schema.fieldNames.size === 1)
}

test("SPARK-26339 Debug statement if some of the files are filtered out") {
class TestAppender extends AppenderSkeleton {
var events = new java.util.ArrayList[LoggingEvent]
override def close(): Unit = {}
override def requiresLayout: Boolean = false
protected def append(event: LoggingEvent): Unit = events.add(event)
}

val testAppender1 = new TestAppender
val rootLogger = LogManager.getRootLogger
val origLevel = rootLogger.getLevel
rootLogger.setLevel(Level.DEBUG)
rootLogger.addAppender(testAppender1)
try {
val cars = spark
.read
.format("csv")
.option("header", "false")
.load(testFile(carsFile), testFile(carsFilteredOutFile))

verifyCars(cars, withHeader = false, checkTypes = false)
} finally {
rootLogger.setLevel(origLevel)
rootLogger.removeAppender(testAppender1)
}
assert(testAppender1.events.asScala
.exists(msg => msg.getRenderedMessage.contains(
"The following files were ignored during file scan:")))
}

test("SPARK-26339 Throw an exception only if all of the files are filtered out") {
val e = intercept[AnalysisException] {
val cars = spark
.read
.format("csv")
.option("header", "false")
.load(testFile(carsFilteredOutFile))
}.getMessage
assert(e.contains("All files were ignored. " +
"The following files were ignored during file scan:"))
}

test("DDL test with empty file") {
withView("carsTable") {
spark.sql(
Expand Down

0 comments on commit 1b64ffb

Please sign in to comment.