From f7242e90bfd2811435f012fb1f85aa8fbaffc192 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 1 Feb 2016 14:32:57 -0800 Subject: [PATCH 1/5] Add a flatMapWithConfigTransform helper --- .../com/twitter/scalding/ExecutionTest.scala | 24 +++++++++++++++++++ .../com/twitter/scalding/Execution.scala | 14 ++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/scalding-commons/src/test/scala/com/twitter/scalding/ExecutionTest.scala b/scalding-commons/src/test/scala/com/twitter/scalding/ExecutionTest.scala index 342093c92a..2edf7fa483 100644 --- a/scalding-commons/src/test/scala/com/twitter/scalding/ExecutionTest.scala +++ b/scalding-commons/src/test/scala/com/twitter/scalding/ExecutionTest.scala @@ -107,6 +107,30 @@ class ExecutionTest extends WordSpec with Matchers { .zip(Execution.from("1")) .waitFor(Config.default, Local(true)).get shouldBe (1, "1") } + + "Config transformer will isolate Configs" in { + def doesNotHaveVariable(message: String) = Execution.getConfig.flatMap { cfg => + if (cfg.get("test.cfg.variable").isDefined) + Execution.failed(new Exception(s"${message}\n: var: ${cfg.get("test.cfg.variable")}")) + else + Execution.from(()) + } + + val hasVariable = Execution.getConfig.flatMap { cfg => + if (cfg.get("test.cfg.variable").isEmpty) + Execution.failed(new Exception("Should see variable inside of transform")) + else + Execution.from(()) + } + + def addOption(cfg: Config) = cfg.+ ("test.cfg.variable", "dummyValue") + + doesNotHaveVariable("Should not see variable before we've started transforming") + .flatMapWithConfigTransform(addOption)(_ => hasVariable) + .flatMap(_ => doesNotHaveVariable("Should not see variable in flatMap's after the isolation")) + .map(_ => true) + .waitFor(Config.default, Local(false)) shouldBe scala.util.Success(true) + } } "Execution K-means" should { "find the correct clusters for trivial cases" in { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index 5481e0a3aa..1005a26b1d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -62,7 +62,15 @@ sealed trait Execution[+T] extends java.io.Serializable { * of the function */ def flatMap[U](fn: T => Execution[U]): Execution[U] = - FlatMapped(this, fn) + FlatMapped(this, fn, identity) + + /** + * The passed in Execution will be run inside of a modified configuration given by the + * passed transform function. This allows hadoop options to be changed for subsets of a flow. + * However these subsets can never be zipped into one Cascading flow as a result. + */ + def flatMapWithConfigTransform[U](transform: Config => Config)(fn: T => Execution[U]): Execution[U] = + FlatMapped(this, fn, transform) /** * This is the same as flatMap(identity) @@ -316,13 +324,13 @@ object Execution { // Note that unit is not optimized away, since Futures are often used with side-effects, so, // we ensure that get is always called in contrast to Mapped, which assumes that fn is pure. } - private case class FlatMapped[S, T](prev: Execution[S], fn: S => Execution[T]) extends Execution[T] { + private case class FlatMapped[S, T](prev: Execution[S], fn: S => Execution[T], cfgTransform: Config => Config) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = cache.getOrElseInsert(this, for { (s, st1) <- prev.runStats(conf, mode, cache) next = fn(s) - fut2 = next.runStats(conf, mode, cache) + fut2 = next.runStats(cfgTransform(conf), mode, cache) (t, st2) <- fut2 } yield (t, Monoid.plus(st1, st2))) } From 37f7d989da7235086d425315c7a0c2d153fb8dc6 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 8 Feb 2016 13:31:31 -0800 Subject: [PATCH 2/5] Adds a failing test to guarantee good cache interaction --- .../com/twitter/scalding/ExecutionTest.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala index 6214a4016b..e68d62d23f 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala @@ -151,7 +151,33 @@ class ExecutionTest extends WordSpec with Matchers { .map(_ => true) .waitFor(Config.default, Local(false)) shouldBe scala.util.Success(true) } + + "Config transformer will interact correctly with the cache" in { + var incrementIfDefined = 0 + var totalEvals = 0 + + val incrementor = Execution.getConfig.flatMap { cfg => + totalEvals += 1 + if (cfg.get("test.cfg.variable").isDefined) + incrementIfDefined += 1 + Execution.from(()) + } + + def addOption(cfg: Config) = cfg.+ ("test.cfg.variable", "dummyValue") + + // Here we run without the option, with the option, and finally without again. + incrementor + .flatMapWithConfigTransform(addOption)(_ => incrementor) + .flatMap(_ => incrementor) + .map(_ => true) + .waitFor(Config.default, Local(false)) shouldBe scala.util.Success(true) + + assert(incrementIfDefined === 1) + // We should evaluate once for the default config, and once for the modified config. + assert(totalEvals === 2) + } } + "ExecutionApp" should { val parser = new ExecutionApp { def job = Execution.from(()) } "parse hadoop args correctly" in { From 65e697eb2d9cd456747bf831f02fd198bbacda67 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 8 Feb 2016 13:31:43 -0800 Subject: [PATCH 3/5] Adds support to the cache to be Config aware --- .../com/twitter/scalding/Execution.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index f6124fa334..ff71db4f7d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -238,7 +238,7 @@ object Execution { * as it is evaluating. */ private[scalding] class EvalCache { - private[this] val cache = new FutureCache[Execution[Any], (Any, ExecutionCounters)] + private[this] val cache = new FutureCache[(Config, Execution[Any]), (Any, ExecutionCounters)] private[this] val toWriteCache = new FutureCache[ToWrite, ExecutionCounters] @@ -312,19 +312,19 @@ object Execution { def getOrLock(write: ToWrite): Either[Promise[ExecutionCounters], Future[ExecutionCounters]] = toWriteCache.getOrPromise(write) - def getOrElseInsertWithFeedback[T](ex: Execution[T], + def getOrElseInsertWithFeedback[T](cfg: Config, ex: Execution[T], res: => Future[(T, ExecutionCounters)]): (Boolean, Future[(T, ExecutionCounters)]) = // This cast is safe because we always insert with match T types - cache.getOrElseUpdateIsNew(ex, res) + cache.getOrElseUpdateIsNew((cfg, ex), res) .asInstanceOf[(Boolean, Future[(T, ExecutionCounters)])] - def getOrElseInsert[T](ex: Execution[T], + def getOrElseInsert[T](cfg: Config, ex: Execution[T], res: => Future[(T, ExecutionCounters)]): Future[(T, ExecutionCounters)] = - getOrElseInsertWithFeedback(ex, res)._2 + getOrElseInsertWithFeedback(cfg, ex, res)._2 } private case class FutureConst[T](get: ConcurrentExecutionContext => Future[T]) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, + cache.getOrElseInsert(conf, this, for { futt <- toFuture(Try(get(cec))) t <- futt @@ -335,7 +335,7 @@ object Execution { } private case class FlatMapped[S, T](prev: Execution[S], fn: S => Execution[T], cfgTransform: Config => Config) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, + cache.getOrElseInsert(conf, this, for { (s, st1) <- prev.runStats(conf, mode, cache) next = fn(s) @@ -346,24 +346,24 @@ object Execution { private case class Mapped[S, T](prev: Execution[S], fn: S => T) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, + cache.getOrElseInsert(conf, this, prev.runStats(conf, mode, cache) .map { case (s, stats) => (fn(s), stats) }) } private case class GetCounters[T](prev: Execution[T]) extends Execution[(T, ExecutionCounters)] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, + cache.getOrElseInsert(conf, this, prev.runStats(conf, mode, cache).map { case tc @ (t, c) => (tc, c) }) } private case class ResetCounters[T](prev: Execution[T]) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, + cache.getOrElseInsert(conf, this, prev.runStats(conf, mode, cache).map { case (t, _) => (t, ExecutionCounters.empty) }) } private case class OnComplete[T](prev: Execution[T], fn: Try[T] => Unit) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, { + cache.getOrElseInsert(conf, this, { val res = prev.runStats(conf, mode, cache) /** * The result we give is only completed AFTER fn is run @@ -383,7 +383,7 @@ object Execution { } private case class RecoverWith[T](prev: Execution[T], fn: PartialFunction[Throwable, Execution[T]]) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, + cache.getOrElseInsert(conf, this, prev.runStats(conf, mode, cache) .recoverWith(fn.andThen(_.runStats(conf, mode, cache)))) } @@ -458,7 +458,7 @@ object Execution { private case class Zipped[S, T](one: Execution[S], two: Execution[T]) extends Execution[(S, T)] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, { + cache.getOrElseInsert(conf, this, { val f1 = one.runStats(conf, mode, cache) val f2 = two.runStats(conf, mode, cache) failFastZip(f1, f2) @@ -467,7 +467,7 @@ object Execution { } private case class UniqueIdExecution[T](fn: UniqueID => Execution[T]) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, { + cache.getOrElseInsert(conf, this, { val (uid, nextConf) = conf.ensureUniqueId fn(uid).runStats(nextConf, mode, cache) }) @@ -477,7 +477,7 @@ object Execution { */ private case class FlowDefExecution(result: (Config, Mode) => FlowDef) extends Execution[Unit] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, + cache.getOrElseInsert(conf, this, for { flowDef <- toFuture(Try(result(conf, mode))) _ = FlowStateMap.validateSources(flowDef, mode) @@ -552,7 +552,7 @@ object Execution { // if so we remove them from the cache. // Anything not already ran we run as part of a single flow def, using their combined counters for the others def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = - cache.getOrElseInsert(this, { + cache.getOrElseInsert(conf, this, { val cacheLookup: List[(ToWrite, Either[Promise[ExecutionCounters], Future[ExecutionCounters]])] = (head :: tail).map{ tw => (tw, cache.getOrLock(tw)) } val (weDoOperation, someoneElseDoesOperation) = unwrapListEither(cacheLookup) From 9f7f2ddaac9537f28540d4c4cdfe13e61579a83f Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 8 Feb 2016 15:30:14 -0800 Subject: [PATCH 4/5] New API discussion --- .../com/twitter/scalding/Execution.scala | 24 ++++++++++--------- .../com/twitter/scalding/ExecutionTest.scala | 4 ++-- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index ff71db4f7d..bc80e595da 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -71,15 +71,7 @@ sealed trait Execution[+T] extends java.io.Serializable { * of the function */ def flatMap[U](fn: T => Execution[U]): Execution[U] = - FlatMapped(this, fn, identity) - - /** - * The passed in Execution will be run inside of a modified configuration given by the - * passed transform function. This allows hadoop options to be changed for subsets of a flow. - * However these subsets can never be zipped into one Cascading flow as a result. - */ - def flatMapWithConfigTransform[U](transform: Config => Config)(fn: T => Execution[U]): Execution[U] = - FlatMapped(this, fn, transform) + FlatMapped(this, fn) /** * This is the same as flatMap(identity) @@ -217,6 +209,9 @@ object Execution { override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u) } + def withConfig[T](ex: => Execution[T])(c: Config => Config): Execution[T] = + TransformedConfig(ex, c) + /** * This is the standard semigroup on an Applicative (zip, then inside the Execution do plus) */ @@ -333,13 +328,13 @@ object Execution { // Note that unit is not optimized away, since Futures are often used with side-effects, so, // we ensure that get is always called in contrast to Mapped, which assumes that fn is pure. } - private case class FlatMapped[S, T](prev: Execution[S], fn: S => Execution[T], cfgTransform: Config => Config) extends Execution[T] { + private case class FlatMapped[S, T](prev: Execution[S], fn: S => Execution[T]) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = cache.getOrElseInsert(conf, this, for { (s, st1) <- prev.runStats(conf, mode, cache) next = fn(s) - fut2 = next.runStats(cfgTransform(conf), mode, cache) + fut2 = next.runStats(conf, mode, cache) (t, st2) <- fut2 } yield (t, Monoid.plus(st1, st2))) } @@ -361,6 +356,13 @@ object Execution { prev.runStats(conf, mode, cache).map { case (t, _) => (t, ExecutionCounters.empty) }) } + private case class TransformedConfig[T](prev: Execution[T], fn: Config => Config) extends Execution[T] { + def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = { + val mutatedConfig = fn(conf) + cache.getOrElseInsert(mutatedConfig, this, prev.runStats(mutatedConfig, mode, cache)) + } + } + private case class OnComplete[T](prev: Execution[T], fn: Try[T] => Unit) extends Execution[T] { def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = cache.getOrElseInsert(conf, this, { diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala index e68d62d23f..0f20948829 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala @@ -146,7 +146,7 @@ class ExecutionTest extends WordSpec with Matchers { def addOption(cfg: Config) = cfg.+ ("test.cfg.variable", "dummyValue") doesNotHaveVariable("Should not see variable before we've started transforming") - .flatMapWithConfigTransform(addOption)(_ => hasVariable) + .flatMap{ _ => Execution.withConfig(hasVariable)(addOption) } .flatMap(_ => doesNotHaveVariable("Should not see variable in flatMap's after the isolation")) .map(_ => true) .waitFor(Config.default, Local(false)) shouldBe scala.util.Success(true) @@ -167,7 +167,7 @@ class ExecutionTest extends WordSpec with Matchers { // Here we run without the option, with the option, and finally without again. incrementor - .flatMapWithConfigTransform(addOption)(_ => incrementor) + .flatMap{ _ => Execution.withConfig(incrementor)(addOption) } .flatMap(_ => incrementor) .map(_ => true) .waitFor(Config.default, Local(false)) shouldBe scala.util.Success(true) From 91a6e6bbd15dd3778da22cb3248efdf45a390680 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 8 Feb 2016 15:41:59 -0800 Subject: [PATCH 5/5] Make arg not lazy --- .../src/main/scala/com/twitter/scalding/Execution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index bc80e595da..406726bd4e 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -209,7 +209,7 @@ object Execution { override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u) } - def withConfig[T](ex: => Execution[T])(c: Config => Config): Execution[T] = + def withConfig[T](ex: Execution[T])(c: Config => Config): Execution[T] = TransformedConfig(ex, c) /**