Skip to content

Commit

Permalink
Merge pull request #1440 from MansurAshraf/mashraf/scalding_viz
Browse files Browse the repository at this point in the history
Scalding viz options
  • Loading branch information
mav911 committed Sep 23, 2015
2 parents e24bb67 + 899b645 commit 2b29fce
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 12 deletions.
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object ScaldingBuild extends Build {
val avroVersion = "1.7.4"
val bijectionVersion = "0.8.1"
val cascadingAvroVersion = "2.1.2"
val chillVersion = "0.7.0"
val chillVersion = "0.7.1"
val dfsDatastoresVersion = "1.3.4"
val elephantbirdVersion = "4.8"
val hadoopLzoVersion = "0.4.19"
Expand Down Expand Up @@ -301,6 +301,7 @@ object ScaldingBuild extends Build {
"cascading" % "cascading-local" % cascadingVersion,
"com.twitter" % "chill-hadoop" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
"com.twitter" %% "chill-bijection" % chillVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "algebird-test" % algebirdVersion % "test",
"com.twitter" %% "bijection-core" % bijectionVersion,
Expand Down
67 changes: 65 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ limitations under the License.
package com.twitter.scalding

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.serializer.{ Serialization => HSerialization }
import com.twitter.chill.KryoInstantiator
import com.twitter.chill.{ ExternalizerCodec, ExternalizerInjection, Externalizer, KryoInstantiator }
import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator }
import com.twitter.bijection.{ Base64String, Injection }

import cascading.pipe.assembly.AggregateBy
import cascading.flow.FlowProps
import cascading.flow.{ FlowListener, FlowStepListener, FlowProps, FlowStepStrategy }
import cascading.property.AppProps
import cascading.tuple.collect.SpillableProps

Expand Down Expand Up @@ -298,6 +300,57 @@ trait Config extends Serializable {
def setReducerEstimators(clsList: String): Config =
this + (Config.ReducerEstimators -> clsList)

/**
* configure flow listeneres for observability
*/
def addFlowListener(flowListenerProvider: (Mode, Config) => FlowListener): Config = {
val serializedListener = flowListenerSerializer(flowListenerProvider)
update(Config.FlowListeners) {
case None => (Some(serializedListener), ())
case Some(lst) => (Some(s"$serializedListener,$lst"), ())
}._2
}

def getFlowListeners: List[Try[(Mode, Config) => FlowListener]] =
get(Config.FlowListeners)
.toIterable
.flatMap(s => StringUtility.fastSplit(s, ","))
.map(flowListenerSerializer.invert(_))
.toList

def addFlowStepListener(flowListenerProvider: (Mode, Config) => FlowStepListener): Config = {
val serializedListener = flowStepListenerSerializer(flowListenerProvider)
update(Config.FlowStepListeners) {
case None => (Some(serializedListener), ())
case Some(lst) => (Some(s"$serializedListener,$lst"), ())
}._2
}

def getFlowStepListeners: List[Try[(Mode, Config) => FlowStepListener]] =
get(Config.FlowStepListeners)
.toIterable
.flatMap(s => StringUtility.fastSplit(s, ","))
.map(flowStepListenerSerializer.invert(_))
.toList

def addFlowStepStrategy(flowStrategyProvider: (Mode, Config) => FlowStepStrategy[JobConf]): Config = {
val serializedListener = flowStepStrategiesSerializer(flowStrategyProvider)
update(Config.FlowStepStrategies) {
case None => (Some(serializedListener), ())
case Some(lst) => (Some(s"$serializedListener,$lst"), ())
}._2
}

def clearFlowStepStrategies: Config =
this.-(Config.FlowStepStrategies)

def getFlowStepStrategies: List[Try[(Mode, Config) => FlowStepStrategy[JobConf]]] =
get(Config.FlowStepStrategies)
.toIterable
.flatMap(s => StringUtility.fastSplit(s, ","))
.map(flowStepStrategiesSerializer.invert(_))
.toList

/** Get the number of reducers (this is the parameter Hadoop will use) */
def getNumReducers: Option[Int] = get(Config.HadoopNumReducers).map(_.toInt)
def setNumReducers(n: Int): Config = this + (Config.HadoopNumReducers -> n.toString)
Expand Down Expand Up @@ -326,6 +379,9 @@ object Config {
val ScaldingVersion: String = "scalding.version"
val HRavenHistoryUserName: String = "hraven.history.user.name"
val ScaldingRequireOrderedSerialization: String = "scalding.require.orderedserialization"
val FlowListeners: String = "scalding.observability.flowlisteners"
val FlowStepListeners: String = "scalding.observability.flowsteplisteners"
val FlowStepStrategies: String = "scalding.strategies.flowstepstrategies"

/**
* Parameter that actually controls the number of reduce tasks.
Expand Down Expand Up @@ -476,4 +532,11 @@ object Config {
is.close()
md5Hex(bytes)
}

private[this] def buildInj[T: ExternalizerInjection: ExternalizerCodec]: Injection[T, String] =
Injection.connect[T, Externalizer[T], Array[Byte], Base64String, String]

@transient private[scalding] lazy val flowStepListenerSerializer = buildInj[(Mode, Config) => FlowStepListener]
@transient private[scalding] lazy val flowListenerSerializer = buildInj[(Mode, Config) => FlowListener]
@transient private[scalding] lazy val flowStepStrategiesSerializer = buildInj[(Mode, Config) => FlowStepStrategy[JobConf]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
package com.twitter.scalding

import cascading.flow.hadoop.HadoopFlow
import cascading.flow.{ FlowDef, Flow }
import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStepListener, FlowStepStrategy }
import cascading.flow.planner.BaseFlowStep
import cascading.pipe.Pipe
import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy
Expand All @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import org.slf4j.{ Logger, LoggerFactory }

/*
* This has all the state needed to build a single flow
Expand All @@ -36,6 +37,8 @@ trait ExecutionContext {
def flowDef: FlowDef
def mode: Mode

import ExecutionContext._

private def getIdentifierOpt(descriptions: Seq[String]): Option[String] = {
if (descriptions.nonEmpty) Some(descriptions.distinct.mkString(", ")) else None
}
Expand Down Expand Up @@ -67,8 +70,8 @@ trait ExecutionContext {
name.foreach(flowDef.setName)

// identify the flowDef
val withId = config.addUniqueId(UniqueID.getIDFor(flowDef))
val flow = mode.newFlowConnector(withId).connect(flowDef)
val configWithId = config.addUniqueId(UniqueID.getIDFor(flowDef))
val flow = mode.newFlowConnector(configWithId).connect(flowDef)
if (config.getRequireOrderedSerialization) {
// This will throw, but be caught by the outer try if
// we have groupby/cogroupby not using OrderedSerializations
Expand All @@ -89,8 +92,29 @@ trait ExecutionContext {
// which instantiates and runs them
mode match {
case _: HadoopMode =>
config.get(Config.ReducerEstimators)
.foreach(_ => flow.setFlowStepStrategy(ReducerEstimatorStepStrategy))
val reducerEstimatorStrategy: Seq[FlowStepStrategy[JobConf]] = config.get(Config.ReducerEstimators).toList.map(_ => ReducerEstimatorStepStrategy)

val otherStrategies: Seq[FlowStepStrategy[JobConf]] = config.getFlowStepStrategies.map {
case Success(fn) => fn(mode, configWithId)
case Failure(e) => throw new Exception("Failed to decode flow step strategy when submitting job", e)
}

val optionalFinalStrategy = FlowStepStrategies().sumOption(reducerEstimatorStrategy ++ otherStrategies)

optionalFinalStrategy.foreach { strategy =>
flow.setFlowStepStrategy(strategy)
}

config.getFlowListeners.foreach {
case Success(fn) => flow.addListener(fn(mode, configWithId))
case Failure(e) => throw new Exception("Failed to decode flow listener", e)
}

config.getFlowStepListeners.foreach {
case Success(fn) => flow.addStepListener(fn(mode, configWithId))
case Failure(e) => new Exception("Failed to decode flow step listener when submitting job", e)
}

case _ => ()
}

Expand Down Expand Up @@ -124,6 +148,8 @@ trait ExecutionContext {
* modeFromImplicit, etc... below.
*/
object ExecutionContext {
private val LOG: Logger = LoggerFactory.getLogger(ExecutionContext.getClass)

private[scalding] def getDesc[T](baseFlowStep: BaseFlowStep[T]): Seq[String] = {
baseFlowStep.getGraph.vertexSet.asScala.toSeq.flatMap(_ match {
case pipe: Pipe => RichPipe.getPipeDescriptions(pipe)
Expand Down
8 changes: 4 additions & 4 deletions scalding-core/src/main/scala/com/twitter/scalding/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
package com.twitter.scalding

import com.twitter.algebird.monad.Reader

import com.twitter.algebird.Semigroup
import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStep, FlowStepListener, FlowSkipStrategy, FlowStepStrategy }
import cascading.pipe.Pipe
import cascading.property.AppProps
Expand Down Expand Up @@ -233,7 +233,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
if (existing == null)
strategy
else
FlowStepStrategies.plus(
FlowStepStrategies[Any].plus(
existing.asInstanceOf[FlowStepStrategy[Any]],
strategy.asInstanceOf[FlowStepStrategy[Any]])
flow.setFlowStepStrategy(composed)
Expand Down Expand Up @@ -521,11 +521,11 @@ trait CounterVerification extends Job {
}
}

private[scalding] object FlowStepStrategies {
private[scalding] case class FlowStepStrategies[A]() extends Semigroup[FlowStepStrategy[A]] {
/**
* Returns a new FlowStepStrategy that runs both strategies in sequence.
*/
def plus[A](l: FlowStepStrategy[A], r: FlowStepStrategy[A]): FlowStepStrategy[A] =
def plus(l: FlowStepStrategy[A], r: FlowStepStrategy[A]): FlowStepStrategy[A] =
new FlowStepStrategy[A] {
override def apply(
flow: Flow[A],
Expand Down

0 comments on commit 2b29fce

Please sign in to comment.