Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick memory estimator changes to 0.17.x branch #1700

Merged
merged 3 commits into from
Jun 26, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val elephantbirdVersion = "4.15"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
val hbaseVersion = "0.94.10"
val hravenVersion = "0.9.17.t05"
val hravenVersion = "1.0.1"
val jacksonVersion = "2.8.7"
val json4SVersion = "3.5.0"
val paradiseVersion = "2.1.0"
Expand Down
15 changes: 15 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,21 @@ object Config {
/** Whether the number of reducers has been set explicitly using a `withReducers` */
val WithReducersSetExplicitly = "scalding.with.reducers.set.explicitly"

/** Name of parameter to specify which class to use as the default estimator. */
val MemoryEstimators = "scalding.memory.estimator.classes"

/** Hadoop map memory */
val MapMemory = "mapreduce.map.memory.mb"

/** Hadoop map java opts */
val MapJavaOpts = "mapreduce.map.java.opts"

/** Hadoop reduce java opts */
val ReduceJavaOpts = "mapreduce.reduce.java.opts"

/** Hadoop reduce memory */
val ReduceMemory = "mapreduce.reduce.memory.mb"

/** Manual description for use in .dot and MR step names set using a `withDescription`. */
val PipeDescriptions = "scalding.pipe.descriptions"
val StepDescriptions = "scalding.step.descriptions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ limitations under the License.
package com.twitter.scalding

import cascading.flow.hadoop.HadoopFlow
import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStepListener, FlowStepStrategy }
import cascading.flow.planner.BaseFlowStep
import cascading.flow.{ Flow, FlowDef, FlowStepStrategy }
import cascading.pipe.Pipe
import com.twitter.scalding.estimation.memory.MemoryEstimatorStepStrategy
import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy
import com.twitter.scalding.serialization.CascadingBinaryComparator
import org.apache.hadoop.mapred.JobConf
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import org.slf4j.{ Logger, LoggerFactory }

/*
* This has all the state needed to build a single flow
Expand Down Expand Up @@ -93,13 +94,14 @@ trait ExecutionContext {
mode match {
case _: HadoopMode =>
val reducerEstimatorStrategy: Seq[FlowStepStrategy[JobConf]] = config.get(Config.ReducerEstimators).toList.map(_ => ReducerEstimatorStepStrategy)
val memoryEstimatorStrategy: Seq[FlowStepStrategy[JobConf]] = config.get(Config.MemoryEstimators).toList.map(_ => MemoryEstimatorStepStrategy)

val otherStrategies: Seq[FlowStepStrategy[JobConf]] = config.getFlowStepStrategies.map {
case Success(fn) => fn(mode, configWithId)
case Failure(e) => throw new Exception("Failed to decode flow step strategy when submitting job", e)
}

val optionalFinalStrategy = FlowStepStrategies().sumOption(reducerEstimatorStrategy ++ otherStrategies)
val optionalFinalStrategy = FlowStepStrategies().sumOption(reducerEstimatorStrategy ++ memoryEstimatorStrategy ++ otherStrategies)

optionalFinalStrategy.foreach { strategy =>
flow.setFlowStepStrategy(strategy)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.twitter.scalding.estimation

import cascading.flow.FlowStep
import cascading.tap.hadoop.Hfs
import cascading.tap.{ CompositeTap, Tap }
import com.twitter.scalding.tap.GlobHfs
import org.apache.hadoop.mapred.JobConf
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._

object Common {
private[this] val LOG = LoggerFactory.getLogger(this.getClass)

private def unrollTaps(taps: Seq[Tap[_, _, _]]): Seq[Tap[_, _, _]] =
taps.flatMap {
case multi: CompositeTap[_] =>
unrollTaps(multi.getChildTaps.asScala.toSeq)
case t => Seq(t)
}

def unrollTaps(step: FlowStep[JobConf]): Seq[Tap[_, _, _]] =
unrollTaps(step.getSources.asScala.toSeq)

def inputSizes(step: FlowStep[JobConf]): Seq[(String, Long)] = {
val conf = step.getConfig
unrollTaps(step).flatMap {
case tap: GlobHfs => Some(tap.toString -> tap.getSize(conf))
case tap: Hfs => Some(tap.toString -> GlobHfs.getSize(tap.getPath, conf))
case tap =>
LOG.warn("InputSizeReducerEstimator unable to calculate size: " + tap)
None
}
}

def totalInputSize(step: FlowStep[JobConf]): Long = inputSizes(step).map(_._2).sum
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.twitter.scalding.estimation

import cascading.flow.{ Flow, FlowStep }
import com.twitter.algebird.Monoid
import org.apache.hadoop.mapred.JobConf
import org.slf4j.LoggerFactory
import scala.util.{ Failure, Success }

case class FlowStrategyInfo(
flow: Flow[JobConf],
predecessorSteps: Seq[FlowStep[JobConf]],
step: FlowStep[JobConf])

/**
* Trait for estimation some parameters of Job.
* @tparam T return type of estimation
*/
trait Estimator[T] {
def estimate(info: FlowStrategyInfo): Option[T]
}

case class FallbackEstimator[T](first: Estimator[T], fallback: Estimator[T]) extends Estimator[T] {
private val LOG = LoggerFactory.getLogger(this.getClass)

override def estimate(info: FlowStrategyInfo): Option[T] = {
first.estimate(info).orElse {
LOG.warn(s"$first estimator failed. Falling back to $fallback.")
fallback.estimate(info)
}
}
}

class FallbackEstimatorMonoid[T] extends Monoid[Estimator[T]] {
override def zero: Estimator[T] = new Estimator[T] {
override def estimate(info: FlowStrategyInfo): Option[T] = None
}

override def plus(l: Estimator[T], r: Estimator[T]): Estimator[T] = FallbackEstimator(l, r)
}

trait HistoryEstimator[T] extends Estimator[T] {
private val LOG = LoggerFactory.getLogger(this.getClass)

def maxHistoryItems(conf: JobConf): Int

def historyService: HistoryService

override def estimate(info: FlowStrategyInfo): Option[T] = {
val conf = info.step.getConfig

historyService.fetchHistory(info, maxHistoryItems(conf)) match {
case Success(history) if history.isEmpty =>
LOG.warn(s"No matching history found for $info")
None
case Success(history) =>
LOG.info(s"${history.length} history entries found for $info")
val estimation = estimate(info, conf, history)
LOG.info(s"$getClass estimate: $estimation")
estimation
case Failure(f) =>
LOG.warn(s"Unable to fetch history in $getClass", f)
None
}
}

protected def estimate(info: FlowStrategyInfo, conf: JobConf, history: Seq[FlowStepHistory]): Option[T]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.twitter.scalding.estimation

import scala.util.Try

/**
* Info about a prior FlowStep, provided by implementers of HistoryService
*/
final case class FlowStepHistory(
keys: FlowStepKeys,
submitTimeMillis: Long,
launchTimeMillis: Long,
finishTimeMillis: Long,
totalMaps: Long,
totalReduces: Long,
finishedMaps: Long,
finishedReduces: Long,
failedMaps: Long,
failedReduces: Long,
mapFileBytesRead: Long,
mapFileBytesWritten: Long,
mapOutputBytes: Long,
reduceFileBytesRead: Long,
hdfsBytesRead: Long,
hdfsBytesWritten: Long,
mapperTimeMillis: Long,
reducerTimeMillis: Long,
reduceShuffleBytes: Long,
cost: Double,
tasks: Seq[Task])

final case class FlowStepKeys(
jobName: String,
user: String,
priority: String,
status: String,
version: String,
queue: String)

final case class Task(details: Map[String, Any], counters: Map[String, Long]) {
def taskType: Option[String] = details.get(Task.TaskType).map(_.asInstanceOf[String])
}

object Task {
val TaskType = "taskType"
}

trait HistoryService {
def fetchHistory(info: FlowStrategyInfo, maxHistory: Int): Try[Seq[FlowStepHistory]]
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.twitter.scalding.estimation.memory

import org.apache.hadoop.mapred.JobConf

object MemoryEstimatorConfig {
/** Output param: what the original job map memory was. */
val originalMapMemory = "scalding.map.memory.estimator.original"

/** Output param: what the original job reduce memory was. */
val originalReduceMemory = "scalding.reduce.memory.estimator.original"

/**
* Value of alpha for exponential smoothing.
* Lower values ensure more smoothing and less importance to newer data
* Higher values provide lesser smoothing and more importance to newer data
*/
val alphaKey = "scalding.memory.estimator.alpha"

/** Indicates how much to scale the memory estimate after it's calculated */
val memoryScaleFactor = "scalding.memory.estimator.scale.factor"

val XmxToMemoryScaleFactorKey = "scalding.memory.estimator.xmx.scale.factor"

val maxContainerMemoryKey = "scalding.memory.estimator.container.max"

val minContainerMemoryKey = "scalding.memory.estimator.container.min"

/** yarn allocates in increments. So we might as well round up our container ask **/
val yarnSchedulerIncrementAllocationMB = "yarn.scheduler.increment-allocation-mb"

/** Maximum number of history items to use for memory estimation. */
val maxHistoryKey = "scalding.memory.estimator.max.history"

def getMaxContainerMemory(conf: JobConf): Long = conf.getLong(maxContainerMemoryKey, 8 * 1024)

def getMinContainerMemory(conf: JobConf): Long = conf.getLong(minContainerMemoryKey, 1 * 1024)

def getAlpha(conf: JobConf): Double = conf.getDouble(alphaKey, 1.0)

def getScaleFactor(conf: JobConf): Double = conf.getDouble(memoryScaleFactor, 1.2)

def getXmxScaleFactor(conf: JobConf): Double = conf.getDouble(XmxToMemoryScaleFactorKey, 1.25)

def getYarnSchedulerIncrement(conf: JobConf): Int = conf.getInt(yarnSchedulerIncrementAllocationMB, 512)

def getMaxHistory(conf: JobConf): Int = conf.getInt(maxHistoryKey, 5)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.twitter.scalding.estimation.memory

import cascading.flow.{ Flow, FlowStep, FlowStepStrategy }
import com.twitter.algebird.Monoid
import com.twitter.scalding.estimation.{ Estimator, FallbackEstimatorMonoid, FlowStrategyInfo }
import com.twitter.scalding.{ Config, StringUtility }
import java.util.{ List => JList }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._

object MemoryEstimatorStepStrategy extends FlowStepStrategy[JobConf] {

private val LOG = LoggerFactory.getLogger(this.getClass)

implicit val estimatorMonoid = new FallbackEstimatorMonoid[MemoryEstimate]

/**
* Make memory estimate, possibly overriding explicitly-set memory settings,
* and save useful info (such as the original & estimate value of memory settings)
* in JobConf for later consumption.
*
* Called by Cascading at the start of each job step.
*/
final override def apply(
flow: Flow[JobConf],
preds: JList[FlowStep[JobConf]],
step: FlowStep[JobConf]): Unit = {

if (skipMemoryEstimation(step)) {
LOG.info(s"Skipping memory estimation as ${Config.MemoryEstimators} is not set ")
} else {
estimate(flow, preds.asScala, step)
}
}

private[estimation] def skipMemoryEstimation(step: FlowStep[JobConf]): Boolean =
step.getConfig.get(Config.MemoryEstimators, "").isEmpty

private[estimation] def estimate(
flow: Flow[JobConf],
preds: Seq[FlowStep[JobConf]],
step: FlowStep[JobConf]): Unit = {
val conf = step.getConfig

Option(conf.get(Config.MemoryEstimators)).foreach { clsNames =>

val clsLoader = Thread.currentThread.getContextClassLoader

val estimators = StringUtility.fastSplit(clsNames, ",")
.map(clsLoader.loadClass(_).newInstance.asInstanceOf[Estimator[MemoryEstimate]])
val combinedEstimator = Monoid.sum(estimators)

val info = FlowStrategyInfo(flow, preds, step)

// get memory estimate
val memoryEstimate: Option[MemoryEstimate] = combinedEstimator.estimate(info)

memoryEstimate match {
case Some(MemoryEstimate(Some(mapMem), Some(reduceMem))) =>
LOG.info(s"Overriding map memory to: $mapMem in Mb and reduce memory to: $reduceMem in Mb")
setMemory(mapMem, (Config.MapJavaOpts, Config.MapMemory), conf)
setMemory(reduceMem, (Config.ReduceJavaOpts, Config.ReduceMemory), conf)
case Some(MemoryEstimate(Some(mapMem), _)) =>
LOG.info(s"Overriding only map memory to: $mapMem in Mb")
setMemory(mapMem, (Config.MapJavaOpts, Config.MapMemory), conf)
case Some(MemoryEstimate(_, Some(reduceMem))) =>
LOG.info(s"Overriding only reduce memory to: $reduceMem in Mb")
setMemory(reduceMem, (Config.ReduceJavaOpts, Config.ReduceMemory), conf)
case _ => LOG.info("Memory estimators didn't calculate any value. Skipping setting memory overrides")
}
}
}

private[estimation] def setMemory(memorySettings: (Long, Long), keys: (String, String), conf: JobConf): Unit = {
val (xmxMemory, containerMemory) = memorySettings
val (xmxKey, containerKey) = keys

conf.setLong(containerKey, containerMemory)

setXmxMemory(xmxKey, xmxMemory, conf)
}

private[estimation] def setXmxMemory(xmxKey: String, xmxMemory: Long, conf: JobConf): Unit = {
val xmxOpts = conf.get(xmxKey, "")
//remove existing xmx / xms
val xmxOptsWithoutXm = xmxOpts.split(" ").filterNot(s => s.startsWith("-Xmx") || s.startsWith("-Xms")).mkString(" ")

conf.set(xmxKey, xmxOptsWithoutXm + s" -Xmx${xmxMemory}m")
}
}
Loading