Skip to content

Commit

Permalink
Revert "Cherry-pick memory estimator changes to 0.17.x branch (twitte…
Browse files Browse the repository at this point in the history
…r#1700)"

This reverts commit 9b8ea00.
  • Loading branch information
Piyush Narang committed Jul 12, 2017
1 parent 135fc82 commit a96b1cf
Show file tree
Hide file tree
Showing 33 changed files with 480 additions and 5,066 deletions.
8 changes: 0 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,6 @@ matrix:
env: BUILD="base" TEST_TARGET="scalding-hadoop-test"
script: "scripts/run_test.sh"

- scala: 2.11.8
env: BUILD="base" TEST_TARGET="scalding-estimators-test"
script: "scripts/run_test.sh"

- scala: 2.12.1
env: BUILD="base" TEST_TARGET="scalding-estimators-test"
script: "scripts/run_test.sh"

- scala: 2.11.8
env: BUILD="base" TEST_TARGET="scalding-serialization"
script: "scripts/run_test.sh"
Expand Down
20 changes: 1 addition & 19 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val elephantbirdVersion = "4.15"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
val hbaseVersion = "0.94.10"
val hravenVersion = "1.0.1"
val hravenVersion = "0.9.17.t05"
val jacksonVersion = "2.8.7"
val json4SVersion = "3.5.0"
val paradiseVersion = "2.1.0"
Expand Down Expand Up @@ -223,7 +223,6 @@ lazy val scalding = Project(
scaldingJson,
scaldingJdbc,
scaldingHadoopTest,
scaldingEstimatorsTest,
scaldingDb,
maple,
executionTutorial,
Expand Down Expand Up @@ -527,23 +526,6 @@ lazy val scaldingHadoopTest = module("hadoop-test").settings(
)
).dependsOn(scaldingCore, scaldingSerialization)

lazy val scaldingEstimatorsTest = module("estimators-test").settings(
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.apache.hadoop" % "hadoop-minicluster" % hadoopVersion,
"org.apache.hadoop" % "hadoop-yarn-server-tests" % hadoopVersion classifier "tests",
"org.apache.hadoop" % "hadoop-yarn-server" % hadoopVersion,
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion classifier "tests",
"org.apache.hadoop" % "hadoop-common" % hadoopVersion classifier "tests",
"org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests",
"com.twitter" %% "chill-algebird" % chillVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion
)
).dependsOn(scaldingHadoopTest % "test")

// This one uses a different naming convention
lazy val maple = Project(
id = "maple",
Expand Down
15 changes: 0 additions & 15 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,21 +443,6 @@ object Config {
/** Whether the number of reducers has been set explicitly using a `withReducers` */
val WithReducersSetExplicitly = "scalding.with.reducers.set.explicitly"

/** Name of parameter to specify which class to use as the default estimator. */
val MemoryEstimators = "scalding.memory.estimator.classes"

/** Hadoop map memory */
val MapMemory = "mapreduce.map.memory.mb"

/** Hadoop map java opts */
val MapJavaOpts = "mapreduce.map.java.opts"

/** Hadoop reduce java opts */
val ReduceJavaOpts = "mapreduce.reduce.java.opts"

/** Hadoop reduce memory */
val ReduceMemory = "mapreduce.reduce.memory.mb"

/** Manual description for use in .dot and MR step names set using a `withDescription`. */
val PipeDescriptions = "scalding.pipe.descriptions"
val StepDescriptions = "scalding.step.descriptions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ limitations under the License.
package com.twitter.scalding

import cascading.flow.hadoop.HadoopFlow
import cascading.flow.{ Flow, FlowDef, FlowListener, FlowStepListener, FlowStepStrategy }
import cascading.flow.planner.BaseFlowStep
import cascading.flow.{ Flow, FlowDef, FlowStepStrategy }
import cascading.pipe.Pipe
import com.twitter.scalding.estimation.memory.MemoryEstimatorStepStrategy
import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy
import com.twitter.scalding.serialization.CascadingBinaryComparator
import org.apache.hadoop.mapred.JobConf
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import org.slf4j.{ Logger, LoggerFactory }

/*
* This has all the state needed to build a single flow
Expand Down Expand Up @@ -94,14 +93,13 @@ trait ExecutionContext {
mode match {
case _: HadoopMode =>
val reducerEstimatorStrategy: Seq[FlowStepStrategy[JobConf]] = config.get(Config.ReducerEstimators).toList.map(_ => ReducerEstimatorStepStrategy)
val memoryEstimatorStrategy: Seq[FlowStepStrategy[JobConf]] = config.get(Config.MemoryEstimators).toList.map(_ => MemoryEstimatorStepStrategy)

val otherStrategies: Seq[FlowStepStrategy[JobConf]] = config.getFlowStepStrategies.map {
case Success(fn) => fn(mode, configWithId)
case Failure(e) => throw new Exception("Failed to decode flow step strategy when submitting job", e)
}

val optionalFinalStrategy = FlowStepStrategies().sumOption(reducerEstimatorStrategy ++ memoryEstimatorStrategy ++ otherStrategies)
val optionalFinalStrategy = FlowStepStrategies().sumOption(reducerEstimatorStrategy ++ otherStrategies)

optionalFinalStrategy.foreach { strategy =>
flow.setFlowStepStrategy(strategy)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit a96b1cf

Please sign in to comment.