From 6e3ad9661a69c6cc926915a535adc791ecbcd384 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Wed, 22 Feb 2017 16:16:19 -0800 Subject: [PATCH] Using MAP_OUTPUT_BYTES instead of FILE_BYTES_READ in RatioBasedEstimator --- .../scalding/reducer_estimation/Common.scala | 1 + .../RatioBasedEstimator.scala | 3 ++- .../RatioBasedEstimatorTest.scala | 9 ++++--- .../HRavenHistoryService.scala | 26 ++++++++++++++----- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala index 3520774a9c..0d205d379c 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala @@ -253,6 +253,7 @@ final case class FlowStepHistory(keys: FlowStepKeys, failedReduces: Long, mapFileBytesRead: Long, mapFileBytesWritten: Long, + mapOutputBytes: Long, reduceFileBytesRead: Long, hdfsBytesRead: Long, hdfsBytesWritten: Long, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala index 9cc52ec9ff..a895257472 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimator.scala @@ -60,8 +60,9 @@ abstract class RatioBasedEstimator extends ReducerEstimator { } else { val ratios = for { h <- history + if h.mapOutputBytes > 0 if acceptableInputRatio(inputBytes, h.hdfsBytesRead, threshold) - } yield h.reduceFileBytesRead / h.hdfsBytesRead.toDouble + } yield h.mapOutputBytes / h.hdfsBytesRead.toDouble if (ratios.isEmpty) { LOG.warn(s"No matching history found within input ratio threshold: $threshold") diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala index 5a9cadc1a8..1ee2d281f4 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala +++ b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/reducer_estimation/RatioBasedEstimatorTest.scala @@ -31,10 +31,10 @@ object ErrorHistoryService extends HistoryService { object HistoryServiceWithData { // we only care about these two input size fields for RatioBasedEstimator - def makeHistory(inputHdfsBytesRead: Long, inputHdfsReduceFileBytesRead: Long): FlowStepHistory = - makeHistory(inputHdfsBytesRead, inputHdfsReduceFileBytesRead, Seq()) + def makeHistory(inputHdfsBytesRead: Long, mapOutputBytes: Long): FlowStepHistory = + makeHistory(inputHdfsBytesRead, mapOutputBytes, Seq()) - def makeHistory(inputHdfsBytesRead: Long, inputHdfsReduceFileBytesRead: Long, taskRuntimes: Seq[Long]): FlowStepHistory = { + def makeHistory(inputHdfsBytesRead: Long, mapOutputBytes: Long, taskRuntimes: Seq[Long]): FlowStepHistory = { val random = new scala.util.Random(123) val tasks = taskRuntimes.map { time => val startTime = random.nextLong @@ -58,7 +58,8 @@ object HistoryServiceWithData { failedReduces = 0L, mapFileBytesRead = 0L, mapFileBytesWritten = 0L, - reduceFileBytesRead = inputHdfsReduceFileBytesRead, + mapOutputBytes = mapOutputBytes, + reduceFileBytesRead = 0l, hdfsBytesRead = inputHdfsBytesRead, hdfsBytesWritten = 0L, mapperTimeMillis = 0L, diff --git a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala index 0e7b0ffa68..1d3678f652 100644 --- a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala +++ b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala @@ -1,17 +1,15 @@ package com.twitter.scalding.hraven.reducer_estimation import java.io.IOException - import cascading.flow.FlowStep -import com.twitter.hraven.{ Flow, JobDetails } +import com.twitter.hraven.{Constants, CounterMap, Flow, HistoryFileType, JobDetails} import com.twitter.hraven.rest.client.HRavenRestClient import com.twitter.scalding.reducer_estimation._ import org.apache.hadoop.mapred.JobConf import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ -import com.twitter.hraven.JobDescFactory.{ JOBTRACKER_KEY, RESOURCE_MANAGER_KEY } - -import scala.util.{ Failure, Success, Try } +import com.twitter.hraven.JobDescFactory.{JOBTRACKER_KEY, RESOURCE_MANAGER_KEY} +import scala.util.{Failure, Success, Try} object HRavenClient { import HRavenHistoryService.jobConfToRichConfig @@ -46,6 +44,7 @@ object HRavenHistoryService extends HistoryService { "status", "startTime", "finishTime").asJava + private val MapOutputBytesKey = "MAP_OUTPUT_BYTES" val RequiredJobConfigs = Seq("cascading.flow.step.num") @@ -179,7 +178,7 @@ object HRavenHistoryService extends HistoryService { } yield toFlowStepHistory(keys, step, tasks) } - private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) = + private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) = { FlowStepHistory( keys = keys, submitTime = step.getSubmitTime, @@ -193,6 +192,7 @@ object HRavenHistoryService extends HistoryService { failedReduces = step.getFailedReduces, mapFileBytesRead = step.getMapFileBytesRead, mapFileBytesWritten = step.getMapFileBytesWritten, + mapOutputBytes = mapOutputBytes(step), reduceFileBytesRead = step.getReduceFileBytesRead, hdfsBytesRead = step.getHdfsBytesRead, hdfsBytesWritten = step.getHdfsBytesWritten, @@ -201,6 +201,20 @@ object HRavenHistoryService extends HistoryService { reduceShuffleBytes = step.getReduceShuffleBytes, cost = 0, tasks = tasks) + } + + private def mapOutputBytes(step: JobDetails): Long = { + if (step.getHistoryFileType == HistoryFileType.TWO) { + getCounterValueAsLong(step.getMapCounters, Constants.TASK_COUNTER_HADOOP2, MapOutputBytesKey) + } else { + getCounterValueAsLong(step.getMapCounters, Constants.TASK_COUNTER, MapOutputBytesKey) + } + } + + private def getCounterValueAsLong(counters: CounterMap, counterGroupName: String, counterName: String): Long = { + val counter = counters.getCounter(counterGroupName, counterName) + if (counter != null) counter.getValue else 0L + } } class HRavenRatioBasedEstimator extends RatioBasedEstimator {