Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #417 from twitter/feature/fix_merge_planner_bug
Browse files Browse the repository at this point in the history
Feature/fix merge planner bug
  • Loading branch information
johnynek committed Jan 17, 2014
2 parents c6df389 + d98ea2c commit d3298e7
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ case class Dag[P <: Platform[P]](originalTail: TailProducer[P, _], producerToPri
}

def locateOpt(p: Producer[P, _]): Option[Node[P]] = producerToNode.get(p)
def locate(p: Producer[P, _]): Node[P] = locateOpt(p).get
def locate(p: Producer[P, _]): Node[P] = locateOpt(p).getOrElse{ sys.error("Unexpected node missing when looking for %s".format(p))}
def connect(src: Producer[P, _], dest: Producer[P, _]): Dag[P] = connect(locate(src), locate(dest))

def getNodeName(n: Node[P]): String = nodeToName(n)
Expand Down Expand Up @@ -144,6 +144,8 @@ object Dag {
registry: List[Node[P]],
sanitizeName: String => String): Dag[P] = {

require(registry.collect{case n@SourceNode(_) => n}.size > 0, "Valid registries should have at least one source node")

def buildProducerToNodeLookUp(stormNodeSet: List[Node[P]]): Map[Producer[P, _], Node[P]] = {
stormNodeSet.foldLeft(Map[Producer[P, _], Node[P]]()) { (curRegistry, stormNode) =>
stormNode.members.foldLeft(curRegistry) { (innerRegistry, producer) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,50 @@
package com.twitter.summingbird.viz

import com.twitter.summingbird._
import scala.collection.mutable.{Map => MMap}

case class ProducerViz[P <: Platform[P]](tail: Producer[P, _]) {
private val dependantState = Dependants(tail)
private type NameLookupTable = (Map[Producer[P, _], String], Map[String, Int])
private val emptyNameLookupTable = (Map[Producer[P, _], String](), Map[String, Int]())
// These are caches that are only kept/used for a short period
// its single threaded and mutation is tightly controlled.
// Used here instead of an immutable for simplification of code.
private val nodeLookupTable = MMap[Producer[P, _], String]()
private val nameLookupTable = MMap[String, Int]()

@annotation.tailrec
private def recurseGetNode(n :Producer[P, _], nameOpt: Option[String] = None): (String, Producer[P, _], List[Producer[P, _]]) = {
val children: List[Producer[P, _]] = dependantState.dependantsOf(n).getOrElse(List[Producer[P, _]]())
val name = nameOpt.getOrElse(n.getClass.getName.replaceFirst("com.twitter.summingbird.", ""))
children.headOption match {
case Some(child: NamedProducer[_, _]) =>
recurseGetNode(child, Some(child.id))
case _ =>
(name, n, children)
def getName(node: Producer[P, _]): String = {
val preferredName = node match {
case NamedProducer(parent, name) => "NamedProducer(%s)".format(name)
case _ => node.getClass.getName.replaceFirst("com.twitter.summingbird.", "")
}
}

def getName(curLookupTable: NameLookupTable, node: Producer[P, _], preferredName: String): (NameLookupTable, String) = {
val (nodeLookupTable, nameLookupTable) = curLookupTable

nodeLookupTable.get(node) match {
case Some(name) => (curLookupTable, name)
case None =>
case Some(name) => name
case None =>
nameLookupTable.get(preferredName) match {
case Some(count) => {
val newNum = count + 1
val newName = preferredName + "[" + newNum + "]"
(((nodeLookupTable + (node -> newName)), (nameLookupTable + (preferredName -> newNum))), newName)
nodeLookupTable += (node -> newName)
nameLookupTable += (preferredName -> newNum)
newName
}
case None => (((nodeLookupTable + (node -> preferredName)), (nameLookupTable + (preferredName -> 1))), preferredName)
case None =>
nodeLookupTable += (node -> preferredName)
nameLookupTable += (preferredName -> 1)
preferredName
}
}
}

override def toString() : String = {
val base = "digraph summingbirdGraph {\n"
val (graphStr, _) = dependantState.nodes.foldLeft(("", emptyNameLookupTable)) { case ((runningStr, nameLookupTable), nextNode) =>
nextNode match {
case NamedProducer(parent, name) => (runningStr, nameLookupTable)
case _ =>
// Compute the lines and new names for the nextNode
val (rawNodeName, evalNode, children) = recurseGetNode(nextNode)
val (updatedLookupTable, nodeName) = getName(nameLookupTable, evalNode, rawNodeName)

val (new_str, innerNameLookupTable) = children.foldLeft(("", updatedLookupTable)){ case ((innerRunningStr, innerNameLookupTable), c) =>
val (childName, childNode, _) = recurseGetNode(c)

val innerNewStr = "\"" + nodeName + "\" -> \""
val (updatedLookupTable2, pChildName) = getName(innerNameLookupTable, childNode, childName)

val innerNewStr2 = pChildName + "\"\n"
(innerRunningStr + innerNewStr + innerNewStr2, updatedLookupTable2)
}
(runningStr + new_str, innerNameLookupTable)
val graphStr = dependantState.nodes.flatMap { evalNode =>
val children = dependantState.dependantsOf(evalNode).getOrElse(sys.error("Invalid node: %s, unable to find dependants".format(evalNode)))
val nodeName = getName(evalNode)
children.map{ c =>
"\"%s\" -> \"%s\"\n".format(nodeName, getName(c))
}
}
}.mkString("")
base + graphStr + "\n}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
* on which these nodes depends (the producers passing data into these MergedProducer).
*/

def mergeCollapse[A](p: Prod[A]): (List[Prod[A]], List[Prod[A]]) = {
def mergeCollapse[A](p: Prod[A], rootMerge: Boolean = false): (List[Prod[A]], List[Prod[A]]) = {
p match {
case MergedProducer(subL, subR) if !forkedNodes.contains(p) =>
case MergedProducer(subL, subR) if (!forkedNodes.contains(p) || rootMerge) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(subL == subR) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.")
val (lMergeNodes, lSiblings) = mergeCollapse(subL)
Expand All @@ -149,7 +149,7 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
case MergedProducer(l, r) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(l == r) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.")
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer)
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer, rootMerge=true)
val newCurrentBolt = otherMergeNodes.foldLeft(currentBolt)(_.add(_))
val visitedWithOther = otherMergeNodes.foldLeft(visitedWithN){ (visited, n) => visited + n }

Expand All @@ -162,6 +162,8 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
}

val (nodeSet, _) = addWithDependencies(tail, FlatMapNode(), List[CNode](), Set())
require(nodeSet.collect{case n@SourceNode(_) => n}.size > 0, "Valid nodeSet should have at least one source node")

}

object OnlinePlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object StripNamedNode {

def apply[P <: Platform[P], T](tail: TailProducer[P, T]): (Map[Producer[P, Any], List[String]], TailProducer[P, T]) = {
val dependants = Dependants(tail)
val (map, newTail) = stripNamedNodes(tail)
(map.mapValues(n => getName(dependants, n)), newTail.asInstanceOf[TailProducer[P, T]])
val (oldProducerToNewMap, newTail) = stripNamedNodes(tail)
(oldProducerToNewMap.mapValues(n => getName(dependants, n)), newTail.asInstanceOf[TailProducer[P, T]])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
Copyright 2013 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.online

import com.twitter.algebird.{MapAlgebra, Semigroup}
import com.twitter.storehaus.{ ReadableStore, JMapStore }
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird._
import com.twitter.summingbird.batch.{BatchID, Batcher}
import com.twitter.summingbird.memory._
import com.twitter.summingbird.planner._
import com.twitter.util.Future
import org.specs2.mutable._
import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MMap}
import org.scalacheck._
import Gen._
import Arbitrary._
import org.scalacheck.Prop._
import scala.util.{Try, Success, Failure}

/**
* Tests for Summingbird's Storm planner.
*/

object PlannerSpec extends Specification {
implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L)
private type MemoryDag = Dag[Memory]
def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get

import TestGraphGenerators._

implicit def sink1: Memory#Sink[Int] = sample[Int => Unit]
implicit def sink2: Memory#Sink[(Int, Int)] = sample[((Int, Int)) => Unit]

implicit def testStore: Memory#Store[Int, Int] = MMap[Int, Int]()

implicit val arbIntSource: Arbitrary[Producer[Memory, Int]] =
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[Int]).map{
x: List[Int] =>
Memory.toSource(x)})
implicit val arbTupleSource: Arbitrary[KeyedProducer[Memory, Int, Int]] =
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[(Int, Int)]).map{
x: List[(Int, Int)] =>
IdentityKeyedProducer(Memory.toSource(x))})


def arbSource1 = sample[Producer[Memory, Int]]
def arbSource2 = sample[KeyedProducer[Memory, Int, Int]]

"Must be able to plan user supplied Job A" in {
val store1 = testStore
val store2 = testStore
val store3 = testStore

val h = arbSource1.name("name1")
.flatMap{ i: Int =>
List(i, i)
}
.name("name1PostFM")
val h2 = arbSource2.name("name2")
.flatMap{ tup : (Int, Int) =>
List(tup._1, tup._2)
}.name("name2PostFM")

val combined = h2.merge(h)

val s1 = combined.name("combinedPipes")
.map{ i: Int =>
(i, i * 2)
}

val s2 = combined.map{i : Int =>
(i, i * 3)
}

val tail = s1.sumByKey(store1)
.name("Store one writter")
.also(s2)
.sumByKey(store2)

val planned = Try(OnlinePlan(tail))
val path = TopologyPlannerLaws.dumpGraph(tail)

planned match {
case Success(graph) => true must beTrue
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
error.printStackTrace
println("Dumped failing graph to: " + path)
true must beFalse
}
}

"Must be able to plan user supplied Job B" in {
val store1 = testStore
val store2 = testStore
val store3 = testStore

val h = arbSource1.name("name1")
.flatMap{ i: Int =>
List(i, i)
}
.name("name1PostFM")
val h2 = arbSource2.name("name2")
.flatMap{ tup : (Int, Int) =>
List(tup._1, tup._2)
}.name("name2PostFM")

val combined = h2.merge(h)

val s1 = combined.name("combinedPipes")
.map{ i: Int =>
(i, i * 2)
}

val s2 = combined.map{i : Int =>
(i, i * 3)
}

val s3 = combined.map{i : Int =>
(i, i * 4)
}

val tail = s1.sumByKey(store1)
.name("Store one writter")
.also(s2)
.sumByKey(store2)
.name("Store two writer")
.also(s3)
.sumByKey(store3)
.name("Store three writer")

val planned = Try(OnlinePlan(tail))
planned match {
case Success(graph) => true must beTrue
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
error.printStackTrace
println("Dumped failing graph to: " + path)
true must beFalse
}
}

"Must be able to plan user supplied Job C" in {
val store1 = testStore

val src = arbSource1
val h = src

val h2 = src.map(_ * 3)

val combined = h2.merge(h)

val c1 = combined.map{i: Int => i * 4}
val c2 = combined.map{i: Int => i * 8}
val tail = c1.merge(c2).map{i: Int => (i, i)}.sumByKey(store1)

val planned = Try(OnlinePlan(tail))
planned match {
case Success(graph) => true must beTrue
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
error.printStackTrace
println("Dumped failing graph to: " + path)
true must beFalse
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,26 @@ object TopologyPlannerLaws extends Properties("Online Dag") {
def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get

var dumpNumber = 1
def dumpGraph(dag: MemoryDag) = {
def dumpGraph(dag: MemoryDag): String = {
import java.io._
import com.twitter.summingbird.viz.VizGraph
val writer2 = new PrintWriter(new File("/tmp/failingGraph" + dumpNumber + ".dot"))
val targetPath = "/tmp/failingGraph" + dumpNumber + ".dot"
val writer2 = new PrintWriter(new File(targetPath))
VizGraph(dag, writer2)
writer2.close()
dumpNumber = dumpNumber + 1
targetPath
}

def dumpGraph(tail: Producer[Memory, Any]) = {
def dumpGraph(tail: Producer[Memory, Any]): String = {
import java.io._
import com.twitter.summingbird.viz.VizGraph
val writer2 = new PrintWriter(new File("/tmp/failingProducerGraph" + dumpNumber + ".dot"))
val targetPath = "/tmp/failingProducerGraph" + dumpNumber + ".dot"
val writer2 = new PrintWriter(new File(targetPath))
VizGraph(tail, writer2)
writer2.close()
dumpNumber = dumpNumber + 1
targetPath
}

property("Dag Nodes must be unique") = forAll { (dag: MemoryDag) =>
Expand Down

0 comments on commit d3298e7

Please sign in to comment.