Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scalding-core: merge flow step strategies to allow reducer estimation combined with other strategies #1094

Merged
merged 2 commits into from
Nov 14, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions scalding-core/src/main/scala/com/twitter/scalding/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.twitter.algebird.monad.Reader
import com.twitter.chill.config.{ ScalaAnyRefMapConfig, ConfiguredInstantiator }

import cascading.pipe.assembly.AggregateBy
import cascading.flow.{ Flow, FlowDef, FlowProps, FlowListener, FlowStepListener, FlowSkipStrategy, FlowStepStrategy }
import cascading.flow.{ Flow, FlowDef, FlowProps, FlowListener, FlowStep, FlowStepListener, FlowSkipStrategy, FlowStepStrategy }
import cascading.pipe.Pipe
import cascading.property.AppProps
import cascading.tuple.collect.SpillableProps
Expand All @@ -36,7 +36,7 @@ import scala.concurrent.{ Future, Promise }
import scala.util.Try

import java.io.{ BufferedWriter, File, FileOutputStream, OutputStreamWriter }
import java.util.{ Calendar, UUID }
import java.util.{ Calendar, UUID, List => JList }

import java.util.concurrent.{ Executors, TimeUnit, ThreadFactory, Callable, TimeoutException }
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -231,7 +231,17 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
listeners.foreach { flow.addListener(_) }
stepListeners.foreach { flow.addStepListener(_) }
skipStrategy.foreach { flow.setFlowSkipStrategy(_) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do the same for skipStrategy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bias to laziness

stepStrategy.foreach { flow.setFlowStepStrategy(_) }
stepStrategy.foreach { strategy =>
val existing = flow.getFlowStepStrategy
val composed =
if (existing == null)
strategy
else
FlowStepStrategies.plus(
existing.asInstanceOf[FlowStepStrategy[Any]],
strategy.asInstanceOf[FlowStepStrategy[Any]])
flow.setFlowStepStrategy(composed)
}
flow
}
.get
Expand Down Expand Up @@ -526,3 +536,19 @@ class ScriptJob(cmds: Iterable[String]) extends Job(Args("")) {
}
}
}

private[scalding] object FlowStepStrategies {
/**
* Returns a new FlowStepStrategy that runs both strategies in sequence.
*/
def plus[A](l: FlowStepStrategy[A], r: FlowStepStrategy[A]): FlowStepStrategy[A] =
new FlowStepStrategy[A] {
override def apply(
flow: Flow[A],
predecessorSteps: JList[FlowStep[A]],
flowStep: FlowStep[A]): Unit = {
l.apply(flow, predecessorSteps, flowStep)
r.apply(flow, predecessorSteps, flowStep)
}
}
}