From 5e41f8997ccd3b2c5c2985bff32452198780f1d2 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Wed, 22 Feb 2017 13:34:02 -0800 Subject: [PATCH] Checking size in tap first and then calculate by ourselves --- .../twitter/scalding/commons/tap/VersionedTap.java | 10 ++++++++++ .../scalding/reducer_estimation/Common.scala | 13 +++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java index 63fad4d53c..6c13b6e925 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java @@ -4,6 +4,7 @@ import com.twitter.scalding.commons.datastores.VersionedStore; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; @@ -90,6 +91,15 @@ public String getSinkPath(JobConf conf) { } } + @Override + public long getSize(JobConf conf) throws IOException { + Path path = new Path(getSourcePath(conf)); + + return getFileSystem(conf) + .getContentSummary(path) + .getLength(); + } + @Override public void sourceConfInit(FlowProcess process, JobConf conf) { super.sourceConfInit(process, conf); diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala index 3520774a9c..ff309facf7 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala @@ -59,10 +59,15 @@ object Common { * pattern in its path, so we must be ready to handle that case. */ def size(f: Hfs, conf: JobConf): Long = { - val fs = f.getPath.getFileSystem(conf) - fs.globStatus(f.getPath) - .map{ s => fs.getContentSummary(s.getPath).getLength } - .sum + val sizeInBytes = f.getSize(conf) + if (sizeInBytes > 0) { + sizeInBytes + } else { + val fs = f.getPath.getFileSystem(conf) + fs.globStatus(f.getPath) + .map{ s => fs.getContentSummary(s.getPath).getLength } + .sum + } } def inputSizes(step: FlowStep[JobConf]): Seq[(String, Long)] = {