-
Notifications
You must be signed in to change notification settings - Fork 266
Feature/fix merge planner bug #417
Changes from 4 commits
0743cc8
a854163
b2b72f4
33f3a2c
d98ea2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,61 +17,53 @@ | |
package com.twitter.summingbird.viz | ||
|
||
import com.twitter.summingbird._ | ||
import scala.collection.mutable.{Map => MMap} | ||
|
||
case class ProducerViz[P <: Platform[P]](tail: Producer[P, _]) { | ||
private val dependantState = Dependants(tail) | ||
private type NameLookupTable = (Map[Producer[P, _], String], Map[String, Int]) | ||
private val emptyNameLookupTable = (Map[Producer[P, _], String](), Map[String, Int]()) | ||
private val nodeLookupTable = MMap[Producer[P, _], String]() | ||
private val nameLookupTable = MMap[String, Int]() | ||
|
||
@annotation.tailrec | ||
private def recurseGetNode(n :Producer[P, _], nameOpt: Option[String] = None): (String, Producer[P, _], List[Producer[P, _]]) = { | ||
val children: List[Producer[P, _]] = dependantState.dependantsOf(n).getOrElse(List[Producer[P, _]]()) | ||
val name = nameOpt.getOrElse(n.getClass.getName.replaceFirst("com.twitter.summingbird.", "")) | ||
children.headOption match { | ||
case Some(child: NamedProducer[_, _]) => | ||
recurseGetNode(child, Some(child.id)) | ||
case _ => | ||
(name, n, children) | ||
def getName(node: Producer[P, _]): String = { | ||
val preferredName = node match { | ||
case NamedProducer(parent, name) => "NamedProducer(%s)".format(name) | ||
case _ => node.getClass.getName.replaceFirst("com.twitter.summingbird.", "") | ||
} | ||
} | ||
|
||
def getName(curLookupTable: NameLookupTable, node: Producer[P, _], preferredName: String): (NameLookupTable, String) = { | ||
val (nodeLookupTable, nameLookupTable) = curLookupTable | ||
|
||
nodeLookupTable.get(node) match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like .getOrElseUpdate. Can you use that if so, makes it easier to reason about what is happening. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, I guess it is not, because you add two keys. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe comment to that effect? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Take that back, you only add one thing to each map. So, I think getOrElseUpdate will work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two tables being updated in the else, would it not make it maybe more confusing to have one be as the return value and the other inline to the code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure. That's fine. |
||
case Some(name) => (curLookupTable, name) | ||
case None => | ||
case Some(name) => name | ||
case None => | ||
nameLookupTable.get(preferredName) match { | ||
case Some(count) => { | ||
val newNum = count + 1 | ||
val newName = preferredName + "[" + newNum + "]" | ||
(((nodeLookupTable + (node -> newName)), (nameLookupTable + (preferredName -> newNum))), newName) | ||
nodeLookupTable += (node -> newName) | ||
nameLookupTable += (preferredName -> newNum) | ||
newName | ||
} | ||
case None => (((nodeLookupTable + (node -> preferredName)), (nameLookupTable + (preferredName -> 1))), preferredName) | ||
case None => | ||
nodeLookupTable += (node -> preferredName) | ||
nameLookupTable += (preferredName -> 1) | ||
preferredName | ||
} | ||
} | ||
} | ||
|
||
override def toString() : String = { | ||
val base = "digraph summingbirdGraph {\n" | ||
val (graphStr, _) = dependantState.nodes.foldLeft(("", emptyNameLookupTable)) { case ((runningStr, nameLookupTable), nextNode) => | ||
nextNode match { | ||
case NamedProducer(parent, name) => (runningStr, nameLookupTable) | ||
case _ => | ||
// Compute the lines and new names for the nextNode | ||
val (rawNodeName, evalNode, children) = recurseGetNode(nextNode) | ||
val (updatedLookupTable, nodeName) = getName(nameLookupTable, evalNode, rawNodeName) | ||
println(dependantState.nodes.size) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. log or remove? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing |
||
val graphStr = dependantState.nodes.foldLeft("") { case (runningStr, nextNode) => | ||
val evalNode = nextNode | ||
val children = dependantState.dependantsOf(evalNode).getOrElse(List[Producer[P, _]]()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you get None here, it is because we don't know about the node, isn't that right? Shouldn't we error if that ever happens? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was copied from the older code, but your right that looks like the case. I'll change it to an error |
||
val nodeName = getName(evalNode) | ||
val new_str = children.foldLeft(""){ case (innerRunningStr, c) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, children.map { c =>
// build some lines from c
}.mkString("") // this is faster and clearer. |
||
val innerNewStr = "\"" + nodeName + "\" -> \"" | ||
val pChildName = getName(c) | ||
|
||
val (new_str, innerNameLookupTable) = children.foldLeft(("", updatedLookupTable)){ case ((innerRunningStr, innerNameLookupTable), c) => | ||
val (childName, childNode, _) = recurseGetNode(c) | ||
|
||
val innerNewStr = "\"" + nodeName + "\" -> \"" | ||
val (updatedLookupTable2, pChildName) = getName(innerNameLookupTable, childNode, childName) | ||
|
||
val innerNewStr2 = pChildName + "\"\n" | ||
(innerRunningStr + innerNewStr + innerNewStr2, updatedLookupTable2) | ||
} | ||
(runningStr + new_str, innerNameLookupTable) | ||
val innerNewStr2 = pChildName + "\"\n" | ||
innerRunningStr + innerNewStr + innerNewStr2 | ||
} | ||
runningStr + new_str | ||
} | ||
base + graphStr + "\n}" | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,9 +121,9 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) { | |
* on which these nodes depends (the producers passing data into these MergedProducer). | ||
*/ | ||
|
||
def mergeCollapse[A](p: Prod[A]): (List[Prod[A]], List[Prod[A]]) = { | ||
def mergeCollapse[A](p: Prod[A], rootMerge: Boolean = false): (List[Prod[A]], List[Prod[A]]) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when is this ever called with rootMerge == false ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its recursively called, so the default parameters will kick in to make it false then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it. Lines 129, 130 are the cases with root = false. |
||
p match { | ||
case MergedProducer(subL, subR) if !forkedNodes.contains(p) => | ||
case MergedProducer(subL, subR) if (!forkedNodes.contains(p) || rootMerge) => | ||
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237 | ||
if(subL == subR) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.") | ||
val (lMergeNodes, lSiblings) = mergeCollapse(subL) | ||
|
@@ -149,7 +149,7 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) { | |
case MergedProducer(l, r) => | ||
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237 | ||
if(l == r) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.") | ||
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer) | ||
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer, rootMerge=true) | ||
val newCurrentBolt = otherMergeNodes.foldLeft(currentBolt)(_.add(_)) | ||
val visitedWithOther = otherMergeNodes.foldLeft(visitedWithN){ (visited, n) => visited + n } | ||
|
||
|
@@ -162,6 +162,8 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) { | |
} | ||
|
||
val (nodeSet, _) = addWithDependencies(tail, FlatMapNode(), List[CNode](), Set()) | ||
require(nodeSet.collect{case n@SourceNode(_) => n}.size > 0, "Valid nodeSet should have at least one source node") | ||
|
||
} | ||
|
||
object OnlinePlan { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
/* | ||
Copyright 2013 Twitter, Inc. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package com.twitter.summingbird.online | ||
|
||
import com.twitter.algebird.{MapAlgebra, Semigroup} | ||
import com.twitter.storehaus.{ ReadableStore, JMapStore } | ||
import com.twitter.storehaus.algebra.MergeableStore | ||
import com.twitter.summingbird._ | ||
import com.twitter.summingbird.batch.{BatchID, Batcher} | ||
import com.twitter.summingbird.memory._ | ||
import com.twitter.summingbird.planner._ | ||
import com.twitter.util.Future | ||
import org.specs2.mutable._ | ||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.{Map => MMap} | ||
import org.scalacheck._ | ||
import Gen._ | ||
import Arbitrary._ | ||
import org.scalacheck.Prop._ | ||
import scala.util.{Try, Success, Failure} | ||
|
||
/** | ||
* Tests for Summingbird's Storm planner. | ||
*/ | ||
|
||
object PlannerSpec extends Specification { | ||
implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L) | ||
private type MemoryDag = Dag[Memory] | ||
def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get | ||
|
||
import TestGraphGenerators._ | ||
|
||
implicit def sink1: Memory#Sink[Int] = sample[Int => Unit] | ||
implicit def sink2: Memory#Sink[(Int, Int)] = sample[((Int, Int)) => Unit] | ||
|
||
implicit def testStore: Memory#Store[Int, Int] = MMap[Int, Int]() | ||
|
||
implicit val arbIntSource: Arbitrary[Producer[Memory, Int]] = | ||
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[Int]).map{ | ||
x: List[Int] => | ||
Memory.toSource(x)}) | ||
implicit val arbTupleSource: Arbitrary[KeyedProducer[Memory, Int, Int]] = | ||
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[(Int, Int)]).map{ | ||
x: List[(Int, Int)] => | ||
IdentityKeyedProducer(Memory.toSource(x))}) | ||
|
||
|
||
def arbSource1 = sample[Producer[Memory, Int]] | ||
def arbSource2 = sample[KeyedProducer[Memory, Int, Int]] | ||
|
||
"Must be able to plan user supplied Job A" in { | ||
val store1 = testStore | ||
val store2 = testStore | ||
val store3 = testStore | ||
|
||
val h = arbSource1.name("name1") | ||
.flatMap{ i: Int => | ||
List(i, i) | ||
} | ||
.name("name1PostFM") | ||
val h2 = arbSource2.name("name2") | ||
.flatMap{ tup : (Int, Int) => | ||
List(tup._1, tup._2) | ||
}.name("name2PostFM") | ||
|
||
val combined = h2.merge(h) | ||
|
||
val s1 = combined.name("combinedPipes") | ||
.map{ i: Int => | ||
(i, i * 2) | ||
} | ||
|
||
val s2 = combined.map{i : Int => | ||
(i, i * 3) | ||
} | ||
|
||
val tail = s1.sumByKey(store1) | ||
.name("Store one writter") | ||
.also(s2) | ||
.sumByKey(store2) | ||
|
||
val planned = Try(OnlinePlan(tail)) | ||
val path = TopologyPlannerLaws.dumpGraph(tail) | ||
|
||
planned match { | ||
case Success(graph) => true must beTrue | ||
case Failure(error) => | ||
val path = TopologyPlannerLaws.dumpGraph(tail) | ||
error.printStackTrace | ||
println("Dumped failing graph to: " + path) | ||
true must beFalse | ||
} | ||
} | ||
|
||
"Must be able to plan user supplied Job B" in { | ||
val store1 = testStore | ||
val store2 = testStore | ||
val store3 = testStore | ||
|
||
val h = arbSource1.name("name1") | ||
.flatMap{ i: Int => | ||
List(i, i) | ||
} | ||
.name("name1PostFM") | ||
val h2 = arbSource2.name("name2") | ||
.flatMap{ tup : (Int, Int) => | ||
List(tup._1, tup._2) | ||
}.name("name2PostFM") | ||
|
||
val combined = h2.merge(h) | ||
|
||
val s1 = combined.name("combinedPipes") | ||
.map{ i: Int => | ||
(i, i * 2) | ||
} | ||
|
||
val s2 = combined.map{i : Int => | ||
(i, i * 3) | ||
} | ||
|
||
val s3 = combined.map{i : Int => | ||
(i, i * 4) | ||
} | ||
|
||
val tail = s1.sumByKey(store1) | ||
.name("Store one writter") | ||
.also(s2) | ||
.sumByKey(store2) | ||
.name("Store two writer") | ||
.also(s3) | ||
.sumByKey(store3) | ||
.name("Store three writer") | ||
|
||
val planned = Try(OnlinePlan(tail)) | ||
planned match { | ||
case Success(graph) => true must beTrue | ||
case Failure(error) => | ||
val path = TopologyPlannerLaws.dumpGraph(tail) | ||
error.printStackTrace | ||
println("Dumped failing graph to: " + path) | ||
true must beFalse | ||
} | ||
} | ||
|
||
"Must be able to plan user supplied Job C" in { | ||
val store1 = testStore | ||
|
||
val src = arbSource1 | ||
val h = src | ||
|
||
val h2 = src.map(_ * 3) | ||
|
||
val combined = h2.merge(h) | ||
|
||
val c1 = combined.map{i: Int => i * 4} | ||
val c2 = combined.map{i: Int => i * 8} | ||
val tail = c1.merge(c2).map{i: Int => (i, i)}.sumByKey(store1) | ||
|
||
val planned = Try(OnlinePlan(tail)) | ||
planned match { | ||
case Success(graph) => true must beTrue | ||
case Failure(error) => | ||
val path = TopologyPlannerLaws.dumpGraph(tail) | ||
error.printStackTrace | ||
println("Dumped failing graph to: " + path) | ||
true must beFalse | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these look like caches, so despite being mutable, it is still safe. Is that correct? Can you add a comment to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, will add comment.