Skip to content

Commit

Permalink
Using MAP_OUTPUT_BYTES instead of FILE_BYTES_READ in RatioBasedEstimator
Browse files Browse the repository at this point in the history
  • Loading branch information
dieu committed Feb 24, 2017
1 parent 6f11ced commit 6e3ad96
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ final case class FlowStepHistory(keys: FlowStepKeys,
failedReduces: Long,
mapFileBytesRead: Long,
mapFileBytesWritten: Long,
mapOutputBytes: Long,
reduceFileBytesRead: Long,
hdfsBytesRead: Long,
hdfsBytesWritten: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ abstract class RatioBasedEstimator extends ReducerEstimator {
} else {
val ratios = for {
h <- history
if h.mapOutputBytes > 0
if acceptableInputRatio(inputBytes, h.hdfsBytesRead, threshold)
} yield h.reduceFileBytesRead / h.hdfsBytesRead.toDouble
} yield h.mapOutputBytes / h.hdfsBytesRead.toDouble

if (ratios.isEmpty) {
LOG.warn(s"No matching history found within input ratio threshold: $threshold")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ object ErrorHistoryService extends HistoryService {
object HistoryServiceWithData {

// we only care about these two input size fields for RatioBasedEstimator
def makeHistory(inputHdfsBytesRead: Long, inputHdfsReduceFileBytesRead: Long): FlowStepHistory =
makeHistory(inputHdfsBytesRead, inputHdfsReduceFileBytesRead, Seq())
def makeHistory(inputHdfsBytesRead: Long, mapOutputBytes: Long): FlowStepHistory =
makeHistory(inputHdfsBytesRead, mapOutputBytes, Seq())

def makeHistory(inputHdfsBytesRead: Long, inputHdfsReduceFileBytesRead: Long, taskRuntimes: Seq[Long]): FlowStepHistory = {
def makeHistory(inputHdfsBytesRead: Long, mapOutputBytes: Long, taskRuntimes: Seq[Long]): FlowStepHistory = {
val random = new scala.util.Random(123)
val tasks = taskRuntimes.map { time =>
val startTime = random.nextLong
Expand All @@ -58,7 +58,8 @@ object HistoryServiceWithData {
failedReduces = 0L,
mapFileBytesRead = 0L,
mapFileBytesWritten = 0L,
reduceFileBytesRead = inputHdfsReduceFileBytesRead,
mapOutputBytes = mapOutputBytes,
reduceFileBytesRead = 0l,
hdfsBytesRead = inputHdfsBytesRead,
hdfsBytesWritten = 0L,
mapperTimeMillis = 0L,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package com.twitter.scalding.hraven.reducer_estimation

import java.io.IOException

import cascading.flow.FlowStep
import com.twitter.hraven.{ Flow, JobDetails }
import com.twitter.hraven.{Constants, CounterMap, Flow, HistoryFileType, JobDetails}
import com.twitter.hraven.rest.client.HRavenRestClient
import com.twitter.scalding.reducer_estimation._
import org.apache.hadoop.mapred.JobConf
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import com.twitter.hraven.JobDescFactory.{ JOBTRACKER_KEY, RESOURCE_MANAGER_KEY }

import scala.util.{ Failure, Success, Try }
import com.twitter.hraven.JobDescFactory.{JOBTRACKER_KEY, RESOURCE_MANAGER_KEY}
import scala.util.{Failure, Success, Try}

object HRavenClient {
import HRavenHistoryService.jobConfToRichConfig
Expand Down Expand Up @@ -46,6 +44,7 @@ object HRavenHistoryService extends HistoryService {
"status",
"startTime",
"finishTime").asJava
private val MapOutputBytesKey = "MAP_OUTPUT_BYTES"

val RequiredJobConfigs = Seq("cascading.flow.step.num")

Expand Down Expand Up @@ -179,7 +178,7 @@ object HRavenHistoryService extends HistoryService {
} yield toFlowStepHistory(keys, step, tasks)
}

private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) =
private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) = {
FlowStepHistory(
keys = keys,
submitTime = step.getSubmitTime,
Expand All @@ -193,6 +192,7 @@ object HRavenHistoryService extends HistoryService {
failedReduces = step.getFailedReduces,
mapFileBytesRead = step.getMapFileBytesRead,
mapFileBytesWritten = step.getMapFileBytesWritten,
mapOutputBytes = mapOutputBytes(step),
reduceFileBytesRead = step.getReduceFileBytesRead,
hdfsBytesRead = step.getHdfsBytesRead,
hdfsBytesWritten = step.getHdfsBytesWritten,
Expand All @@ -201,6 +201,20 @@ object HRavenHistoryService extends HistoryService {
reduceShuffleBytes = step.getReduceShuffleBytes,
cost = 0,
tasks = tasks)
}

private def mapOutputBytes(step: JobDetails): Long = {
if (step.getHistoryFileType == HistoryFileType.TWO) {
getCounterValueAsLong(step.getMapCounters, Constants.TASK_COUNTER_HADOOP2, MapOutputBytesKey)
} else {
getCounterValueAsLong(step.getMapCounters, Constants.TASK_COUNTER, MapOutputBytesKey)
}
}

private def getCounterValueAsLong(counters: CounterMap, counterGroupName: String, counterName: String): Long = {
val counter = counters.getCounter(counterGroupName, counterName)
if (counter != null) counter.getValue else 0L
}
}

class HRavenRatioBasedEstimator extends RatioBasedEstimator {
Expand Down

0 comments on commit 6e3ad96

Please sign in to comment.