From 4e807200d033d89d66644ed2e4fc05e9158785c8 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 7 Oct 2013 15:50:06 -0700 Subject: [PATCH 01/10] State --- .../twitter/summingbird/storm/StormLaws.scala | 193 +++++++++++------- 1 file changed, 121 insertions(+), 72 deletions(-) diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala index 58891d91b..c284bfd48 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala @@ -119,6 +119,17 @@ object StormLaws extends Specification { def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get + def genStore: (String, Storm#Store[Int, Int]) = { + val id = UUID.randomUUID.toString + globalState += (id -> TestState()) + val store = MergeableStoreSupplier(() => testingStore(id), Batcher.unit) + (id, store) + } + + def genSink:() => ((Int) => Future[Unit]) = () => {x: Int => + append(x) + Future.Unit + } /** * Perform a single run of TestGraphs.singleStepJob using the * supplied list of integers and the testFn defined above. @@ -187,18 +198,7 @@ object StormLaws extends Specification { StormLaws.outputList.toList } - "StormPlatform matches Scala for single step jobs" in { - val original = sample[List[Int]] - val (fn, returnedState) = - runOnce(original)( - TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(testFn) - ) - Equiv[Map[Int, Int]].equiv( - TestGraphs.singleStepInScala(original)(fn), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } - ) must beTrue - } + val nextFn = { pair: ((Int, (Int, Option[Int]))) => val (k, (v, joinedV)) = pair @@ -208,76 +208,125 @@ object StormLaws extends Specification { val serviceFn = Arbitrary.arbitrary[Int => Option[Int]].sample.get val service = StoreWrapper[Int, Int](() => ReadableStore.fromFn(serviceFn)) - "StormPlatform matches Scala for left join jobs" in { - val original = sample[List[Int]] - - val (fn, returnedState) = - runOnce(original)( - TestGraphs.leftJoinJob[Storm, Int, Int, Int, Int, Int](_, service, _)(testFn)(nextFn) - ) - Equiv[Map[Int, Int]].equiv( - TestGraphs.leftJoinInScala(original)(serviceFn) - (fn)(nextFn), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } - ) must beTrue - } - - "StormPlatform matches Scala for optionMap only jobs" in { - val original = sample[List[Int]] - val id = UUID.randomUUID.toString - - val cluster = new LocalCluster() - globalState += (id -> TestState()) + // "StormPlatform matches Scala for single step jobs" in { + // val original = sample[List[Int]] + // val (fn, returnedState) = + // runOnce(original)( + // TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(testFn) + // ) + // Equiv[Map[Int, Int]].equiv( + // TestGraphs.singleStepInScala(original)(fn), + // returnedState.store.asScala.toMap + // .collect { case ((k, batchID), Some(v)) => (k, v) } + // ) must beTrue + // } + // "StormPlatform matches Scala for left join jobs" in { + // val original = sample[List[Int]] + + // val (fn, returnedState) = + // runOnce(original)( + // TestGraphs.leftJoinJob[Storm, Int, Int, Int, Int, Int](_, service, _)(testFn)(nextFn) + // ) + // Equiv[Map[Int, Int]].equiv( + // TestGraphs.leftJoinInScala(original)(serviceFn) + // (fn)(nextFn), + // returnedState.store.asScala.toMap + // .collect { case ((k, batchID), Some(v)) => (k, v) } + // ) must beTrue + // } + + // "StormPlatform matches Scala for optionMap only jobs" in { + // val original = sample[List[Int]] + // val id = UUID.randomUUID.toString + + // val cluster = new LocalCluster() + + // globalState += (id -> TestState()) + + // val producer = + // Storm.source(TraversableSpout(original)) + // .filter(_ % 2 == 0) + // .map(_ -> 10) + // .sumByKey(Storm.store(testingStore(id))) + + // val topo = storm.plan(producer) + + // Testing.completeTopology(cluster, topo, completeTopologyParam) + // // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 + // Thread.sleep(1000) + // cluster.shutdown + + // Equiv[Map[Int, Int]].equiv( + // MapAlgebra.sumByKey(original.filter(_ % 2 == 0).map(_ -> 10)), + // globalState(id).store.asScala + // .toMap + // .collect { case ((k, batchID), Some(v)) => (k, v) } + // ) must beTrue + // } + + // "StormPlatform matches Scala for MapOnly/NoSummer" in { + // val original = sample[List[Int]] + // val doubler = {x: Int => List(x*2)} + + // val stormOutputList = + // runWithOutSummer(original)( + // TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler) + // ).sorted + // val memoryOutputList = + // memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted + + // stormOutputList must_==(memoryOutputList) + // } + + // "StormPlatform matches Scala for MapOnly/NoSummer with dangling FM" in { + // val original = sample[List[Int]] + // val doubler = {x: Int => List(x*2)} + + // val stormOutputList = + // runWithOutSummer(original)( + // TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler).map{x:Int => x*3} + // ).sorted + // val memoryOutputList = + // memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted + + // stormOutputList must_==(memoryOutputList) + // } - val producer = - Storm.source(TraversableSpout(original)) - .filter(_ % 2 == 0) - .map(_ -> 10) - .sumByKey(Storm.store(testingStore(id))) + "StormPlatform with multiple summers" in { + val original = List((1, 1), (2, 2) , (3, 3)) // sample[List[Int]] + val doubler = {(x): (Int, Int) => List((x._1 -> x._2*2))} - val topo = storm.plan(producer) + val source = Storm.source(TraversableSpout(original)) + val (summer1Id, summer1) = genStore + val (summer2Id, summer2) = genStore - Testing.completeTopology(cluster, topo, completeTopologyParam) - // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 - Thread.sleep(1000) - cluster.shutdown + val preAlso = source + .flatMap(doubler) - Equiv[Map[Int, Int]].equiv( - MapAlgebra.sumByKey(original.filter(_ % 2 == 0).map(_ -> 10)), - globalState(id).store.asScala - .toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } - ) must beTrue - } + val task = preAlso.sumByKey(summer1).also(preAlso).flatMap(doubler) + .sumByKey(summer2) - "StormPlatform matches Scala for MapOnly/NoSummer" in { - val original = sample[List[Int]] - val doubler = {x: Int => List(x*2)} - val stormOutputList = - runWithOutSummer(original)( - TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler) - ).sorted - val memoryOutputList = - memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted - - stormOutputList must_==(memoryOutputList) - } + import com.twitter.summingbird.storm.planner._ + TopologyPlannerLaws.dumpGraph(DagBuilder(task)) + // val topo = storm.plan(task) + + // val cluster = new LocalCluster() + // Testing.completeTopology(cluster, topo, completeTopologyParam) + // // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 + // Thread.sleep(1000) + // cluster.shutdown - "StormPlatform matches Scala for MapOnly/NoSummer with dangling FM" in { - val original = sample[List[Int]] - val doubler = {x: Int => List(x*2)} - val stormOutputList = - runWithOutSummer(original)( - TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler).map{x:Int => x*3} - ).sorted - val memoryOutputList = - memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted + // val stormOutputList = + // runWithOutSummer(original)( + // TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler).map{x:Int => x*3} + // ).sorted + // val memoryOutputList = + // memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted - stormOutputList must_==(memoryOutputList) + // stormOutputList must_==(memoryOutputList) } } From 99d537e8574da27fe3e15c3ded7ff5b4dfb6b574 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 14:50:44 -0700 Subject: [PATCH 02/10] Commented out maybe flush --- .../summingbird/storm/FlatMapOperation.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala index 1629af370..b89465ca4 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala @@ -27,6 +27,12 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable { override def close { } + /* + * maybeFlush may be periodically called to empty any internal state + * Not used yet so commented out + */ + //def maybeFlush: Future[TraversableOnce[U]] = Future.value(Seq.empty[U]) + /** * TODO: Think about getting an implicit FutureCollector here, in * case we don't want to completely choke on large expansions (and @@ -42,6 +48,17 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable { Future.collect(next).map(_.flatten) // flatten the inner } + // override def maybeFlush = { + // self.maybeFlush.flatMap{ x: TraversableOnce[U] => + // val z: Seq[Future[TraversableOnce[V]]] = x.map(fmo.apply(_)).toSeq + // val w: Future[Seq[V]] = Future.collect(z).map(_.flatten) + // for { + // ws <- w + // maybes <- fmo.maybeFlush + // maybeSeq = maybes.toSeq + // } yield ws ++ maybeSeq + // } + // } override def close { self.close; fmo.close } } } From ff3b0d6d3f8e28f6bba9ffc4d105dea512ac3135 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 18:23:23 -0700 Subject: [PATCH 03/10] Update name generation to handle multiple tails --- .../com/twitter/summingbird/planner/Node.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/planner/Node.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/planner/Node.scala index 1ac6bb031..c181c67e5 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/planner/Node.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/planner/Node.scala @@ -166,7 +166,7 @@ object Dag { def tryGetName(name: String, seen: Set[String], indxOpt: Option[Int] = None): String = { indxOpt match { - case None => if (seen.contains(name)) tryGetName(name, seen, Some(1)) else name + case None => if (seen.contains(name)) tryGetName(name, seen, Some(2)) else name case Some(indx) => if (seen.contains(name + "." + indx)) tryGetName(name, seen, Some(indx + 1)) else name + "." + indx } } @@ -183,7 +183,20 @@ object Dag { } } - val (nodeToName, _) = genNames(dag.tailN, dag, Map(dag.tailN -> "Tail"), Set("Tail")) + def allTails(dag: Dag[P]): List[Node[P]] = { + dag.nodes.filter{m => dag.dependantsOf(m).size == 0 } + } + + //start with the true tail + val (nodeToName, _) = (dag.tailN :: allTails(dag)).foldLeft((Map[Node[P], String](), Set[String]())) { case ((nodeToName, usedNames), curTail) => + if(!nodeToName.contains(curTail)) { + val tailN = tryGetName("Tail", usedNames) + genNames(curTail, dag, nodeToName + (curTail -> tailN), usedNames + tailN) + } else { + (nodeToName, usedNames) + } + } + val nameToNode = nodeToName.map((t) => (t._2, t._1)) dag.copy(nodeToName = nodeToName, nameToNode = nameToNode) } From 4f587fd4845199913db47a0adea4db65f76935a0 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 18:23:47 -0700 Subject: [PATCH 04/10] Add a TailProducer to encapsulate things which also can be called on --- .../com/twitter/summingbird/Producer.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala index 709fe51fe..74b44714c 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala @@ -88,12 +88,7 @@ object Producer { * in-progress TopologyBuilder. */ sealed trait Producer[P <: Platform[P], +T] { - /** Ensure this is scheduled, but return something equivalent to the argument - * like the function `par` in Haskell. - * This can be used to combine two independent Producers in a way that ensures - * that the Platform will plan both into a single Plan. - */ - def also[R](that: Producer[P, R]): Producer[P, R] = AlsoProducer(this, that) + def name(id: String): Producer[P, T] = NamedProducer(this, id) def merge[U >: T](r: Producer[P, U]): Producer[P, U] = MergedProducer(this, r) @@ -114,7 +109,7 @@ sealed trait Producer[P <: Platform[P], +T] { def flatMap[U](fn: T => TraversableOnce[U]): Producer[P, U] = FlatMappedProducer[P, T, U](this, fn) - def write[U >: T](sink: P#Sink[U]): Producer[P, T] = WrittenProducer(this, sink) + def write[U >: T](sink: P#Sink[U]): TailProducer[P, T] = WrittenProducer(this, sink) def either[U](other: Producer[P, U]): Producer[P, Either[T, U]] = map(Left(_): Either[T, U]) @@ -124,11 +119,21 @@ sealed trait Producer[P <: Platform[P], +T] { case class Source[P <: Platform[P], T](source: P#Source[T]) extends Producer[P, T] + +trait TailProducer[P <: Platform[P], +T] extends Producer[P, T] { + /** Ensure this is scheduled, but return something equivalent to the argument + * like the function `par` in Haskell. + * This can be used to combine two independent Producers in a way that ensures + * that the Platform will plan both into a single Plan. + */ + def also[R](that: Producer[P, R]): Producer[P, R] = AlsoProducer(this, that) +} + /** * This is a special node that ensures that the first argument is planned, but produces values * equivalent to the result. */ -case class AlsoProducer[P <: Platform[P], T, R](ensure: Producer[P, T], result: Producer[P, R]) extends Producer[P, R] +case class AlsoProducer[P <: Platform[P], T, R](ensure: TailProducer[P, T], result: Producer[P, R]) extends Producer[P, R] case class NamedProducer[P <: Platform[P], T](producer: Producer[P, T], id: String) extends Producer[P, T] @@ -143,12 +148,12 @@ case class FlatMappedProducer[P <: Platform[P], T, U](producer: Producer[P, T], case class MergedProducer[P <: Platform[P], T](left: Producer[P, T], right: Producer[P, T]) extends Producer[P, T] -case class WrittenProducer[P <: Platform[P], T, U >: T](producer: Producer[P, T], sink: P#Sink[U]) extends Producer[P, T] +case class WrittenProducer[P <: Platform[P], T, U >: T](producer: Producer[P, T], sink: P#Sink[U]) extends TailProducer[P, T] case class Summer[P <: Platform[P], K, V]( producer: KeyedProducer[P, K, V], store: P#Store[K, V], - monoid: Monoid[V]) extends KeyedProducer[P, K, V] + monoid: Monoid[V]) extends KeyedProducer[P, K, V] with TailProducer[P, (K, V)] sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] { def leftJoin[RightV](service: P#Service[K, RightV]): KeyedProducer[P, K, (V, Option[RightV])] = From 84aca7df660b9c5bb91f24a0b646cccc8110cb8f Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 18:25:51 -0700 Subject: [PATCH 05/10] Fixes a bad name collision, handling of splitting around a summer is more robust. --- .../storm/planner/StormPlanner.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala index d5e8ce2c4..8d39e1204 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala @@ -37,9 +37,9 @@ object DagBuilder { // This takes an initial pass through all of the Producers, assigning them to Nodes private def buildNodesSet[P](tail: Producer[Storm, P]): List[StormNode] = { - val dep = Dependants(tail) + val depData = Dependants(tail) val forkedNodes = Producer.transitiveDependenciesOf(tail) - .filter(dep.fanOut(_).exists(_ > 1)).toSet + .filter(depData.fanOut(_).exists(_ > 1)).toSet def distinctAddToList[T](l : List[T], n : T): List[T] = if(l.contains(n)) l else (n :: l) // Add the dependentProducer to a Node along with each of its dependencies in turn. @@ -66,10 +66,18 @@ object DagBuilder { case IdentityKeyedProducer(producer) => true case OptionMappedProducer(producer, _) => true case Source(_) => true + case AlsoProducer(_, _) => true case _ => false } } + def hasSummerAsNextProducer(p: Prod[_]): Boolean = { + depData.dependantsOf(p).get.collect { case s: Summer[_, _, _] => s }.headOption match { + case Some(_) => true + case None => false + } + } + def allDepsMergeableWithSource(p: Prod[_]): Boolean = mergableWithSource(p) && Producer.dependenciesOf(p).forall(allDepsMergeableWithSource) /* @@ -85,7 +93,7 @@ object DagBuilder { case _ if (forkedNodes.contains(dep)) => true // If we are a flatmap, but there haven't been any other flatmaps yet(i.e. the registry is of size 1, the summer). // Then we must split to avoid a 2 level higherarchy - case _ if (currentBolt.isInstanceOf[FlatMapNode[_]] && stormRegistry.size == 1 && allDepsMergeableWithSource(dep)) => true + case _ if (currentBolt.isInstanceOf[FlatMapNode[_]] && hasSummerAsNextProducer(currentProducer) && allDepsMergeableWithSource(dep)) => true case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true case _ => false } @@ -96,6 +104,8 @@ object DagBuilder { } } + + /* * This is a peek ahead when we meet a MergedProducer. We pull the directly depended on MergedProducer's into the same Node, * only if that MergedProducer is not a fan out node. @@ -120,6 +130,9 @@ object DagBuilder { case Summer(producer, _, _) => recurse(producer, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt.toSummer)) case IdentityKeyedProducer(producer) => maybeSplitThenRecurse(dependantProducer, producer) case NamedProducer(producer, newId) => maybeSplitThenRecurse(dependantProducer, producer) + case AlsoProducer(lProducer, rProducer) => + val (updatedReg, updatedVisited) = maybeSplitThenRecurse(dependantProducer, rProducer) + recurse(lProducer, FlatMapNode(), updatedReg, updatedVisited) case Source(spout) => (distinctAddToList(stormRegistry, currentBolt.toSource), visitedWithN) case OptionMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer) case FlatMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer) From 49dcb4832b25f8aaace0a5960cd7d9d2c7e54b99 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 18:26:31 -0700 Subject: [PATCH 06/10] Also test --- .../twitter/summingbird/storm/StormLaws.scala | 218 +++++++++--------- 1 file changed, 110 insertions(+), 108 deletions(-) diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala index c284bfd48..93c5f468c 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala @@ -209,124 +209,126 @@ object StormLaws extends Specification { val service = StoreWrapper[Int, Int](() => ReadableStore.fromFn(serviceFn)) - // "StormPlatform matches Scala for single step jobs" in { - // val original = sample[List[Int]] - // val (fn, returnedState) = - // runOnce(original)( - // TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(testFn) - // ) - // Equiv[Map[Int, Int]].equiv( - // TestGraphs.singleStepInScala(original)(fn), - // returnedState.store.asScala.toMap - // .collect { case ((k, batchID), Some(v)) => (k, v) } - // ) must beTrue - // } - // "StormPlatform matches Scala for left join jobs" in { - // val original = sample[List[Int]] - - // val (fn, returnedState) = - // runOnce(original)( - // TestGraphs.leftJoinJob[Storm, Int, Int, Int, Int, Int](_, service, _)(testFn)(nextFn) - // ) - // Equiv[Map[Int, Int]].equiv( - // TestGraphs.leftJoinInScala(original)(serviceFn) - // (fn)(nextFn), - // returnedState.store.asScala.toMap - // .collect { case ((k, batchID), Some(v)) => (k, v) } - // ) must beTrue - // } - - // "StormPlatform matches Scala for optionMap only jobs" in { - // val original = sample[List[Int]] - // val id = UUID.randomUUID.toString - - // val cluster = new LocalCluster() - - // globalState += (id -> TestState()) - - // val producer = - // Storm.source(TraversableSpout(original)) - // .filter(_ % 2 == 0) - // .map(_ -> 10) - // .sumByKey(Storm.store(testingStore(id))) - - // val topo = storm.plan(producer) - - // Testing.completeTopology(cluster, topo, completeTopologyParam) - // // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 - // Thread.sleep(1000) - // cluster.shutdown - - // Equiv[Map[Int, Int]].equiv( - // MapAlgebra.sumByKey(original.filter(_ % 2 == 0).map(_ -> 10)), - // globalState(id).store.asScala - // .toMap - // .collect { case ((k, batchID), Some(v)) => (k, v) } - // ) must beTrue - // } - - // "StormPlatform matches Scala for MapOnly/NoSummer" in { - // val original = sample[List[Int]] - // val doubler = {x: Int => List(x*2)} - - // val stormOutputList = - // runWithOutSummer(original)( - // TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler) - // ).sorted - // val memoryOutputList = - // memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted + "StormPlatform matches Scala for single step jobs" in { + val original = sample[List[Int]] + val (fn, returnedState) = + runOnce(original)( + TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(testFn) + ) + Equiv[Map[Int, Int]].equiv( + TestGraphs.singleStepInScala(original)(fn), + returnedState.store.asScala.toMap + .collect { case ((k, batchID), Some(v)) => (k, v) } + ) must beTrue + } + "StormPlatform matches Scala for left join jobs" in { + val original = sample[List[Int]] + + val (fn, returnedState) = + runOnce(original)( + TestGraphs.leftJoinJob[Storm, Int, Int, Int, Int, Int](_, service, _)(testFn)(nextFn) + ) + Equiv[Map[Int, Int]].equiv( + TestGraphs.leftJoinInScala(original)(serviceFn) + (fn)(nextFn), + returnedState.store.asScala.toMap + .collect { case ((k, batchID), Some(v)) => (k, v) } + ) must beTrue + } + + "StormPlatform matches Scala for optionMap only jobs" in { + val original = sample[List[Int]] + val id = UUID.randomUUID.toString + + val cluster = new LocalCluster() + + globalState += (id -> TestState()) + + val producer = + Storm.source(TraversableSpout(original)) + .filter(_ % 2 == 0) + .map(_ -> 10) + .sumByKey(Storm.store(testingStore(id))) + + val topo = storm.plan(producer) + + Testing.completeTopology(cluster, topo, completeTopologyParam) + // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 + Thread.sleep(1000) + cluster.shutdown + + Equiv[Map[Int, Int]].equiv( + MapAlgebra.sumByKey(original.filter(_ % 2 == 0).map(_ -> 10)), + globalState(id).store.asScala + .toMap + .collect { case ((k, batchID), Some(v)) => (k, v) } + ) must beTrue + } + + "StormPlatform matches Scala for MapOnly/NoSummer" in { + val original = sample[List[Int]] + val doubler = {x: Int => List(x*2)} + + val stormOutputList = + runWithOutSummer(original)( + TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler) + ).sorted + val memoryOutputList = + memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted - // stormOutputList must_==(memoryOutputList) - // } - - // "StormPlatform matches Scala for MapOnly/NoSummer with dangling FM" in { - // val original = sample[List[Int]] - // val doubler = {x: Int => List(x*2)} - - // val stormOutputList = - // runWithOutSummer(original)( - // TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler).map{x:Int => x*3} - // ).sorted - // val memoryOutputList = - // memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted + stormOutputList must_==(memoryOutputList) + } + + "StormPlatform matches Scala for MapOnly/NoSummer with dangling FM" in { + val original = sample[List[Int]] + val doubler = {x: Int => List(x*2)} + + val stormOutputList = + runWithOutSummer(original)( + TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler).map{x:Int => x*3} + ).sorted + val memoryOutputList = + memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted - // stormOutputList must_==(memoryOutputList) - // } + stormOutputList must_==(memoryOutputList) + } "StormPlatform with multiple summers" in { - val original = List((1, 1), (2, 2) , (3, 3)) // sample[List[Int]] - val doubler = {(x): (Int, Int) => List((x._1 -> x._2*2))} + val original = List(1, 2, 3, 45) // sample[List[Int]] + val doubler = {(x): (Int) => List((x -> x*2))} + val simpleOp = {(x): (Int) => List(x * 10)} val source = Storm.source(TraversableSpout(original)) - val (summer1Id, summer1) = genStore - val (summer2Id, summer2) = genStore + val (store1Id, store1) = genStore + val (store2Id, store2) = genStore - val preAlso = source - .flatMap(doubler) + val tail = TestGraphs.multipleSummerJob[Storm, Int, Int, Int, Int, Int, Int](source, store1, store2)(simpleOp, doubler, doubler) - val task = preAlso.sumByKey(summer1).also(preAlso).flatMap(doubler) - .sumByKey(summer2) + val topo = storm.plan(tail) + + val cluster = new LocalCluster() + Testing.completeTopology(cluster, topo, completeTopologyParam) + // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 + Thread.sleep(1000) + cluster.shutdown + + val (scalaA, scalaB) = TestGraphs.multipleSummerJobInScala(original)(simpleOp, doubler, doubler) + + val store1Map = globalState(store1Id).store.asScala.toMap + val store2Map = globalState(store2Id).store.asScala.toMap + Equiv[Map[Int, Int]].equiv( + scalaA, + store1Map + .collect { case ((k, batchID), Some(v)) => (k, v) } + ) must beTrue + + Equiv[Map[Int, Int]].equiv( + scalaB, + store2Map + .collect { case ((k, batchID), Some(v)) => (k, v) } + ) must beTrue - import com.twitter.summingbird.storm.planner._ - TopologyPlannerLaws.dumpGraph(DagBuilder(task)) - // val topo = storm.plan(task) - - // val cluster = new LocalCluster() - // Testing.completeTopology(cluster, topo, completeTopologyParam) - // // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 - // Thread.sleep(1000) - // cluster.shutdown - - - // val stormOutputList = - // runWithOutSummer(original)( - // TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler).map{x:Int => x*3} - // ).sorted - // val memoryOutputList = - // memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted - - // stormOutputList must_==(memoryOutputList) } } From ac10f19be98e0ea539a11aa25a1dfc5b838f97f7 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 18:40:54 -0700 Subject: [PATCH 07/10] Produce sane Also dag's including DAG output and graph output --- .../summingbird/TestGraphGenerators.scala | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala index de18965bb..43354d5bb 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala @@ -48,6 +48,14 @@ object TestGraphGenerators { if(deps.size == 1) genProd2 else oneOf(deps) } + def aTailDependency[P <: Platform[P]](p: Producer[P, Any])(implicit genSource1 : Arbitrary[Producer[P, Int]], + genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], + sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[TailProducer[P, Any]] = { + val validDeps = Producer.transitiveDependenciesOf(p).collect{case x:TailProducer[_, _] => x.asInstanceOf[TailProducer[P, Any]]} + if(validDeps.size == 0) summed else oneOf(validDeps) + } + + def genMerged2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0,1) p1 <- genProd2 @@ -77,13 +85,15 @@ object TestGraphGenerators { name <- Gen.alphaStr } yield NamedProducer(FlatMappedProducer(in, fn), name) - def genMerged1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genMerged1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], + testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0,1) p1 <- genProd1 p2 <- genProd1 } yield MergedProducer(p1, p2) - def genFlatMap12[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genFlatMap12[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], + testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[(Int) => List[(Int, Int)]] in <- genProd1 } yield IdentityKeyedProducer(FlatMappedProducer(in, fn)) @@ -96,16 +106,18 @@ object TestGraphGenerators { } yield IdentityKeyedProducer(p1.write(sink2)) - def also1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def also1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], + testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0, 1) // avoids blowup on self recursion out <- genProd1 - ignored <- oneOf(genProd2, genProd1, oneOf(Producer.transitiveDependenciesOf(out))): Gen[Producer[P, _]] + ignored <- oneOf(summed, written, aTailDependency(out)): Gen[TailProducer[P, _]] } yield ignored.also(out) - def also2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def also2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], + testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0, 1) // avoids blowup on self recursion out <- genProd2 - ignored <- oneOf(genProd2, genProd1, oneOf(Producer.transitiveDependenciesOf(out))): Gen[Producer[P, _]] + ignored <- oneOf(summed, written, aTailDependency(out)): Gen[TailProducer[P, _]] } yield IdentityKeyedProducer(ignored.also(out)) @@ -116,17 +128,24 @@ object TestGraphGenerators { in <- genProd2 } yield in.sumByKey(testStore) + def written[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], + genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], + testStore: P#Store[Int, Int], sink1: P#Sink[Int], + sink2: P#Sink[(Int, Int)]): Gen[TailProducer[P, Int]] = for { + in <- genProd1 + } yield in.write(sink1) + def genProd2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] = - frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), + frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), (2, also2), (0, also2), (3, genFlatMap22), (3, genFlatMap12)) def genProd1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[Producer[P, Int]] = - frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (3, genFlatMap11), + frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (2, also1), (3, genFlatMap11), (0, also1), (3, genFlatMap21)) } From 7a7140738d7bc913f9c7f2f7cc94eb95c91bec4f Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 18:46:19 -0700 Subject: [PATCH 08/10] Tests and test cases for handling multiple summers --- .../com/twitter/summingbird/TestGraphs.scala | 17 ++++++++++ .../summingbird/scalding/ScaldingLaws.scala | 31 +++++++++++++++++++ .../summingbird/storm/StormPlatform.scala | 2 ++ .../storm/TopologyPlannerLaws.scala | 15 ++++++--- 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala index 7610dbaf0..cd39e19f4 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala @@ -87,6 +87,23 @@ object TestGraphs { .flatMap(postJoinFn) .sumByKey(store) + + def multipleSummerJobInScala[T1, T2, K1, V1: Monoid, K2, V2: Monoid] + (source: List[T1]) + (fnR: T1 => TraversableOnce[T2], fnA: T2 => TraversableOnce[(K1, V1)], fnB: T2 => TraversableOnce[(K2, V2)]) + : (Map[K1, V1], Map[K2, V2]) = { + val mapA = MapAlgebra.sumByKey(source.flatMap(fnR).flatMap(fnA)) + val mapB = MapAlgebra.sumByKey(source.flatMap(fnR).flatMap(fnB)) + (mapA, mapB) + } + + def multipleSummerJob[P <: Platform[P], T1, T2, K1, V1: Monoid, K2, V2: Monoid] + (source: Producer[P, T1], store1: P#Store[K1, V1], store2: P#Store[K2, V2]) + (fnR: T1 => TraversableOnce[T2], fnA: T2 => TraversableOnce[(K1, V1)], fnB: T2 => TraversableOnce[(K2, V2)]): Summer[P, K2, V2] = { + val combined = source.flatMap(fnR) + combined.flatMap(fnA).sumByKey(store1).also(combined).flatMap(fnB).sumByKey(store2) + } + def mapOnlyJob[P <: Platform[P], T, U]( source: Producer[P, T], sink: P#Sink[U] diff --git a/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala b/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala index 240b83033..6b06bc1f0 100644 --- a/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala +++ b/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala @@ -244,6 +244,37 @@ object ScaldingLaws extends Properties("Scalding") { Monoid.isNonZero(Group.minus(inMemory, smap)) == false } + + property("ScaldingPlatform matches Scala for multiple summer jobs, with everything in one Batch") = + forAll { (original: List[Int], fnA:(Int) => List[(Int)], fnB: (Int) => List[(Int, Int)], fnC: (Int) => List[(Int, Int)]) => + val (inMemoryA, inMemoryB) = TestGraphs.multipleSummerJobInScala(original)(fnA, fnB, fnC) + + // Add a time: + val inWithTime = original.zipWithIndex.map { case (item, time) => (time.toLong, item) } + val batcher = simpleBatcher + import Dsl._ // Won't be needed in scalding 0.9.0 + val testStoreA = new TestStore[Int,Int]("testA", batcher, BatchID(0), Iterable.empty, BatchID(1)) + val testStoreB = new TestStore[Int,Int]("testB", batcher, BatchID(0), Iterable.empty, BatchID(1)) + val (buffer, source) = testSource(inWithTime) + + val tail = TestGraphs.multipleSummerJob[Scalding, (Long, Int), Int, Int, Int, Int, Int](source, testStoreA, testStoreB)({t => fnA(t._2)}, fnB, fnC) + + val intr = Interval.leftClosedRightOpen(0L, original.size.toLong) + val scald = new Scalding("scalaCheckMultipleSumJob") + val ws = new LoopState(intr.mapNonDecreasing(t => new Date(t))) + val mode: Mode = TestMode(t => (testStoreA.sourceToBuffer ++ testStoreB.sourceToBuffer ++ buffer).get(t)) + + scald.run(ws, mode, scald.plan(tail)) + // Now check that the inMemory == + + val smapA = testStoreA.lastToIterable(BatchID(1)).toMap + Monoid.isNonZero(Group.minus(inMemoryB, smapA)) == false + + val smapB = testStoreB.lastToIterable(BatchID(1)).toMap + Monoid.isNonZero(Group.minus(inMemoryB, smapB)) == false + } + + property("ScaldingPlatform matches Scala for leftJoin jobs, with everything in one Batch") = forAll { (original: List[Int], prejoinMap: (Int) => List[(Int, Int)], diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index eee1e30fe..cb024b8b7 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -104,6 +104,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config case IdentityKeyedProducer(_) => acc case MergedProducer(_, _) => acc case NamedProducer(_, _) => acc + case AlsoProducer(_, _) => acc case _ => throw new Exception("Not found! : " + p) } } @@ -145,6 +146,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config case OptionMappedProducer(_, op) => spout.flatMap {case (time, t) => op.apply(t).map { x => (time, x) }} case NamedProducer(_, _) => spout case IdentityKeyedProducer(_) => spout + case AlsoProducer(_, _) => spout case _ => sys.error("not possible, given the above call to span.\n" + p) } }.getSpout diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala index 1858b3797..2b5c76cf9 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala @@ -133,28 +133,32 @@ object TopologyPlannerLaws extends Properties("StormDag") { property("All producers are in a StormNode") = forAll { (dag: StormDag) => - val allProducers = Producer.transitiveDependenciesOf(dag.tail).toSet + dag.tail + val allProducers = Producer.entireGraphOf(dag.tail).toSet + dag.tail val numAllProducersInDag = dag.nodes.foldLeft(0){(sum, n) => sum + n.members.size} allProducers.size == numAllProducersInDag } property("Only spouts can have no incoming dependencies") = forAll { (dag: StormDag) => dag.nodes.forall{n => - n match { + val success = n match { case _: SourceNode[_] => true case _ => dag.dependenciesOf(n).size > 0 } + if(!success) dumpGraph(dag) + success } } - property("Spouts must have no incoming dependencies, and they must have dependants") = forAll { (dag: StormDag) => + property("Spouts must have no incoming dependencies") = forAll { (dag: StormDag) => dag.nodes.forall{n => - n match { + val success = n match { case _: SourceNode[_] => - dag.dependenciesOf(n).size == 0 && dag.dependantsOf(n).size > 0 + dag.dependenciesOf(n).size == 0 case _ => true } + if(!success) dumpGraph(dag) + success } } @@ -196,6 +200,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { true } catch { case e: Throwable => + dumpGraph(dag) println(e) e.printStackTrace false From c9055d3991f3eb6135a51ccfee9f484795dbb89a Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 18:52:11 -0700 Subject: [PATCH 09/10] Uncomment, add back in law --- .../summingbird/storm/FlatMapOperation.scala | 24 +++++++++---------- .../storm/TopologyPlannerLaws.scala | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala index b89465ca4..b04fd2384 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala @@ -31,7 +31,7 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable { * maybeFlush may be periodically called to empty any internal state * Not used yet so commented out */ - //def maybeFlush: Future[TraversableOnce[U]] = Future.value(Seq.empty[U]) + def maybeFlush: Future[TraversableOnce[U]] = Future.value(Seq.empty[U]) /** * TODO: Think about getting an implicit FutureCollector here, in @@ -48,17 +48,17 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable { Future.collect(next).map(_.flatten) // flatten the inner } - // override def maybeFlush = { - // self.maybeFlush.flatMap{ x: TraversableOnce[U] => - // val z: Seq[Future[TraversableOnce[V]]] = x.map(fmo.apply(_)).toSeq - // val w: Future[Seq[V]] = Future.collect(z).map(_.flatten) - // for { - // ws <- w - // maybes <- fmo.maybeFlush - // maybeSeq = maybes.toSeq - // } yield ws ++ maybeSeq - // } - // } + override def maybeFlush = { + self.maybeFlush.flatMap{ x: TraversableOnce[U] => + val z: Seq[Future[TraversableOnce[V]]] = x.map(fmo.apply(_)).toSeq + val w: Future[Seq[V]] = Future.collect(z).map(_.flatten) + for { + ws <- w + maybes <- fmo.maybeFlush + maybeSeq = maybes.toSeq + } yield ws ++ maybeSeq + } + } override def close { self.close; fmo.close } } } diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala index 2b5c76cf9..50aaea6ed 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala @@ -150,11 +150,11 @@ object TopologyPlannerLaws extends Properties("StormDag") { } - property("Spouts must have no incoming dependencies") = forAll { (dag: StormDag) => + property("Spouts must have no incoming dependencies, and they must have dependants") = forAll { (dag: StormDag) => dag.nodes.forall{n => val success = n match { case _: SourceNode[_] => - dag.dependenciesOf(n).size == 0 + dag.dependenciesOf(n).size == 0 && dag.dependantsOf(n).size > 0 case _ => true } if(!success) dumpGraph(dag) From 6f2da126a2ddf21513aad8d5b8defba70c479592 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 8 Oct 2013 19:07:37 -0700 Subject: [PATCH 10/10] Added simplification --- .../twitter/summingbird/storm/planner/StormPlanner.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala index 8d39e1204..8daa5a24b 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala @@ -71,12 +71,9 @@ object DagBuilder { } } - def hasSummerAsNextProducer(p: Prod[_]): Boolean = { - depData.dependantsOf(p).get.collect { case s: Summer[_, _, _] => s }.headOption match { - case Some(_) => true - case None => false - } - } + def hasSummerAsNextProducer(p: Prod[_]): Boolean = + depData.dependantsOf(p).get.collect { case s: Summer[_, _, _] => s }.headOption.isDefined + def allDepsMergeableWithSource(p: Prod[_]): Boolean = mergableWithSource(p) && Producer.dependenciesOf(p).forall(allDepsMergeableWithSource)