Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add withConfig api to allow running an execution with a transformed config #1489

Merged
merged 6 commits into from
Feb 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ object Execution {
override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u)
}

def withConfig[T](ex: Execution[T])(c: Config => Config): Execution[T] =
TransformedConfig(ex, c)

/**
* This is the standard semigroup on an Applicative (zip, then inside the Execution do plus)
*/
Expand All @@ -230,7 +233,7 @@ object Execution {
* as it is evaluating.
*/
private[scalding] class EvalCache {
private[this] val cache = new FutureCache[Execution[Any], (Any, ExecutionCounters)]
private[this] val cache = new FutureCache[(Config, Execution[Any]), (Any, ExecutionCounters)]

private[this] val toWriteCache = new FutureCache[ToWrite, ExecutionCounters]

Expand Down Expand Up @@ -304,19 +307,19 @@ object Execution {
def getOrLock(write: ToWrite): Either[Promise[ExecutionCounters], Future[ExecutionCounters]] =
toWriteCache.getOrPromise(write)

def getOrElseInsertWithFeedback[T](ex: Execution[T],
def getOrElseInsertWithFeedback[T](cfg: Config, ex: Execution[T],
res: => Future[(T, ExecutionCounters)]): (Boolean, Future[(T, ExecutionCounters)]) =
// This cast is safe because we always insert with match T types
cache.getOrElseUpdateIsNew(ex, res)
cache.getOrElseUpdateIsNew((cfg, ex), res)
.asInstanceOf[(Boolean, Future[(T, ExecutionCounters)])]

def getOrElseInsert[T](ex: Execution[T],
def getOrElseInsert[T](cfg: Config, ex: Execution[T],
res: => Future[(T, ExecutionCounters)]): Future[(T, ExecutionCounters)] =
getOrElseInsertWithFeedback(ex, res)._2
getOrElseInsertWithFeedback(cfg, ex, res)._2
}
private case class FutureConst[T](get: ConcurrentExecutionContext => Future[T]) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this,
cache.getOrElseInsert(conf, this,
for {
futt <- toFuture(Try(get(cec)))
t <- futt
Expand All @@ -327,7 +330,7 @@ object Execution {
}
private case class FlatMapped[S, T](prev: Execution[S], fn: S => Execution[T]) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this,
cache.getOrElseInsert(conf, this,
for {
(s, st1) <- prev.runStats(conf, mode, cache)
next = fn(s)
Expand All @@ -338,24 +341,31 @@ object Execution {

private case class Mapped[S, T](prev: Execution[S], fn: S => T) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this,
cache.getOrElseInsert(conf, this,
prev.runStats(conf, mode, cache)
.map { case (s, stats) => (fn(s), stats) })
}
private case class GetCounters[T](prev: Execution[T]) extends Execution[(T, ExecutionCounters)] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this,
cache.getOrElseInsert(conf, this,
prev.runStats(conf, mode, cache).map { case tc @ (t, c) => (tc, c) })
}
private case class ResetCounters[T](prev: Execution[T]) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this,
cache.getOrElseInsert(conf, this,
prev.runStats(conf, mode, cache).map { case (t, _) => (t, ExecutionCounters.empty) })
}

private case class TransformedConfig[T](prev: Execution[T], fn: Config => Config) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
val mutatedConfig = fn(conf)
cache.getOrElseInsert(mutatedConfig, this, prev.runStats(mutatedConfig, mode, cache))
}
}

private case class OnComplete[T](prev: Execution[T], fn: Try[T] => Unit) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this, {
cache.getOrElseInsert(conf, this, {
val res = prev.runStats(conf, mode, cache)
/**
* The result we give is only completed AFTER fn is run
Expand All @@ -375,7 +385,7 @@ object Execution {
}
private case class RecoverWith[T](prev: Execution[T], fn: PartialFunction[Throwable, Execution[T]]) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this,
cache.getOrElseInsert(conf, this,
prev.runStats(conf, mode, cache)
.recoverWith(fn.andThen(_.runStats(conf, mode, cache))))
}
Expand Down Expand Up @@ -450,7 +460,7 @@ object Execution {

private case class Zipped[S, T](one: Execution[S], two: Execution[T]) extends Execution[(S, T)] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this, {
cache.getOrElseInsert(conf, this, {
val f1 = one.runStats(conf, mode, cache)
val f2 = two.runStats(conf, mode, cache)
failFastZip(f1, f2)
Expand All @@ -459,7 +469,7 @@ object Execution {
}
private case class UniqueIdExecution[T](fn: UniqueID => Execution[T]) extends Execution[T] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this, {
cache.getOrElseInsert(conf, this, {
val (uid, nextConf) = conf.ensureUniqueId
fn(uid).runStats(nextConf, mode, cache)
})
Expand All @@ -469,7 +479,7 @@ object Execution {
*/
private case class FlowDefExecution(result: (Config, Mode) => FlowDef) extends Execution[Unit] {
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this,
cache.getOrElseInsert(conf, this,
for {
flowDef <- toFuture(Try(result(conf, mode)))
_ = FlowStateMap.validateSources(flowDef, mode)
Expand Down Expand Up @@ -544,7 +554,7 @@ object Execution {
// if so we remove them from the cache.
// Anything not already ran we run as part of a single flow def, using their combined counters for the others
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
cache.getOrElseInsert(this, {
cache.getOrElseInsert(conf, this, {
val cacheLookup: List[(ToWrite, Either[Promise[ExecutionCounters], Future[ExecutionCounters]])] = (head :: tail).map{ tw => (tw, cache.getOrLock(tw)) }
val (weDoOperation, someoneElseDoesOperation) = unwrapListEither(cacheLookup)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,57 @@ class ExecutionTest extends WordSpec with Matchers {
.zip(Execution.from("1"))
.waitFor(Config.default, Local(true)).get shouldBe (1, "1")
}

"Config transformer will isolate Configs" in {
def doesNotHaveVariable(message: String) = Execution.getConfig.flatMap { cfg =>
if (cfg.get("test.cfg.variable").isDefined)
Execution.failed(new Exception(s"${message}\n: var: ${cfg.get("test.cfg.variable")}"))
else
Execution.from(())
}

val hasVariable = Execution.getConfig.flatMap { cfg =>
if (cfg.get("test.cfg.variable").isEmpty)
Execution.failed(new Exception("Should see variable inside of transform"))
else
Execution.from(())
}

def addOption(cfg: Config) = cfg.+ ("test.cfg.variable", "dummyValue")

doesNotHaveVariable("Should not see variable before we've started transforming")
.flatMap{ _ => Execution.withConfig(hasVariable)(addOption) }
.flatMap(_ => doesNotHaveVariable("Should not see variable in flatMap's after the isolation"))
.map(_ => true)
.waitFor(Config.default, Local(false)) shouldBe scala.util.Success(true)
}

"Config transformer will interact correctly with the cache" in {
var incrementIfDefined = 0
var totalEvals = 0

val incrementor = Execution.getConfig.flatMap { cfg =>
totalEvals += 1
if (cfg.get("test.cfg.variable").isDefined)
incrementIfDefined += 1
Execution.from(())
}

def addOption(cfg: Config) = cfg.+ ("test.cfg.variable", "dummyValue")

// Here we run without the option, with the option, and finally without again.
incrementor
.flatMap{ _ => Execution.withConfig(incrementor)(addOption) }
.flatMap(_ => incrementor)
.map(_ => true)
.waitFor(Config.default, Local(false)) shouldBe scala.util.Success(true)

assert(incrementIfDefined === 1)
// We should evaluate once for the default config, and once for the modified config.
assert(totalEvals === 2)
}
}

"ExecutionApp" should {
val parser = new ExecutionApp { def job = Execution.from(()) }
"parse hadoop args correctly" in {
Expand Down