Skip to content

Commit

Permalink
Addressed PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 24, 2014
1 parent 513b608 commit c19dd8a
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
* files and creates a stream out of them. The way it works as follows.
*
* At each batch interval, the file system is queries for files in the given directory and
* At each batch interval, the file system is queried for files in the given directory and
* detected new files are selected for that batch. In this case "new" means files that
* became visible to readers during that time period. Some extra care is needed to deal
* with the fact that files may become visible after they are created. For this purpose, this
Expand All @@ -50,7 +50,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
*
* The trailing end of the window is the "ignore threshold" and all files whose mod times
* are less than this threshold are assumed to have already been selected and are therefore
* ignored. Files whose mode times are within the "remember window" are checked against files
* ignored. Files whose mod times are within the "remember window" are checked against files
* that have already been selected. At a high level, this is how new files are identified in
* each batch - files whose mod times are greater than the ignore threshold and
* have not been considered within the remember window. See the documentation on the method
Expand All @@ -61,8 +61,8 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* the streaming app.
* - If a file is to be visible in the directory listings, it must be visible within a certain
* duration of the mod time of the file. This duration is the "remember window", which is set to
* 1 minute (see `FileInputDStream.REMEMBER_DURATION`). Otherwise, the file will not be
* selected as the mod time will be less than the ignore threshold when it become visible.
* 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
* selected as the mod time will be less than the ignore threshold when it becomes visible.
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
* processing semantics are undefined.
*/
Expand Down Expand Up @@ -316,15 +316,15 @@ object FileInputDStream {
* older than this "window" of remembering will be ignored. So if new files are visible
* within this window, then the file will get selected in the next batch.
*/
private val REMEMBER_DURATION = Minutes(1)
private val MIN_REMEMBER_DURATION = Minutes(1)

def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")

/**
* Calculate the number of last batches to remember, such that all the files selected in
* at least last REMEMBER_DURATION duration can be remembered.
* at least last MIN_REMEMBER_DURATION duration can be remembered.
*/
def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
math.ceil(REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
}
}

0 comments on commit c19dd8a

Please sign in to comment.