diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala index 3520774a9c..dab3fddd9d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala @@ -4,8 +4,8 @@ import cascading.flow.{ FlowStep, Flow, FlowStepStrategy } import com.twitter.algebird.Monoid import com.twitter.scalding.{ StringUtility, Config } import cascading.tap.{ Tap, CompositeTap } -import cascading.tap.hadoop.Hfs -import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.slf4j.LoggerFactory import java.util.{ List => JList } @@ -54,23 +54,12 @@ object Common { def unrollTaps(step: FlowStep[JobConf]): Seq[Tap[_, _, _]] = unrollTaps(step.getSources.asScala.toSeq) - /** - * Get the total size of the file(s) specified by the Hfs, which may contain a glob - * pattern in its path, so we must be ready to handle that case. - */ - def size(f: Hfs, conf: JobConf): Long = { - val fs = f.getPath.getFileSystem(conf) - fs.globStatus(f.getPath) - .map{ s => fs.getContentSummary(s.getPath).getLength } - .sum - } - - def inputSizes(step: FlowStep[JobConf]): Seq[(String, Long)] = { + def inputSizes(step: FlowStep[JobConf]): Seq[(Path, Long)] = { val conf = step.getConfig - unrollTaps(step).flatMap { - case tap: Hfs => Some(tap.toString -> size(tap, conf)) - case _ => None - } + + FileInputFormat + .getInputPaths(conf) + .map { path => path -> path.getFileSystem(conf).getContentSummary(path).getLength } } def totalInputSize(step: FlowStep[JobConf]): Long = inputSizes(step).map(_._2).sum