Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Feature/fix merge planner bug #417

Merged
merged 5 commits into from
Jan 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ case class Dag[P <: Platform[P]](originalTail: TailProducer[P, _], producerToPri
}

def locateOpt(p: Producer[P, _]): Option[Node[P]] = producerToNode.get(p)
def locate(p: Producer[P, _]): Node[P] = locateOpt(p).get
def locate(p: Producer[P, _]): Node[P] = locateOpt(p).getOrElse{ sys.error("Unexpected node missing when looking for %s".format(p))}
def connect(src: Producer[P, _], dest: Producer[P, _]): Dag[P] = connect(locate(src), locate(dest))

def getNodeName(n: Node[P]): String = nodeToName(n)
Expand Down Expand Up @@ -144,6 +144,8 @@ object Dag {
registry: List[Node[P]],
sanitizeName: String => String): Dag[P] = {

require(registry.collect{case n@SourceNode(_) => n}.size > 0, "Valid registries should have at least one source node")

def buildProducerToNodeLookUp(stormNodeSet: List[Node[P]]): Map[Producer[P, _], Node[P]] = {
stormNodeSet.foldLeft(Map[Producer[P, _], Node[P]]()) { (curRegistry, stormNode) =>
stormNode.members.foldLeft(curRegistry) { (innerRegistry, producer) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,50 @@
package com.twitter.summingbird.viz

import com.twitter.summingbird._
import scala.collection.mutable.{Map => MMap}

case class ProducerViz[P <: Platform[P]](tail: Producer[P, _]) {
private val dependantState = Dependants(tail)
private type NameLookupTable = (Map[Producer[P, _], String], Map[String, Int])
private val emptyNameLookupTable = (Map[Producer[P, _], String](), Map[String, Int]())
// These are caches that are only kept/used for a short period
// its single threaded and mutation is tightly controlled.
// Used here instead of an immutable for simplification of code.
private val nodeLookupTable = MMap[Producer[P, _], String]()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these look like caches, so despite being mutable, it is still safe. Is that correct? Can you add a comment to it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will add comment.

private val nameLookupTable = MMap[String, Int]()

@annotation.tailrec
private def recurseGetNode(n :Producer[P, _], nameOpt: Option[String] = None): (String, Producer[P, _], List[Producer[P, _]]) = {
val children: List[Producer[P, _]] = dependantState.dependantsOf(n).getOrElse(List[Producer[P, _]]())
val name = nameOpt.getOrElse(n.getClass.getName.replaceFirst("com.twitter.summingbird.", ""))
children.headOption match {
case Some(child: NamedProducer[_, _]) =>
recurseGetNode(child, Some(child.id))
case _ =>
(name, n, children)
def getName(node: Producer[P, _]): String = {
val preferredName = node match {
case NamedProducer(parent, name) => "NamedProducer(%s)".format(name)
case _ => node.getClass.getName.replaceFirst("com.twitter.summingbird.", "")
}
}

def getName(curLookupTable: NameLookupTable, node: Producer[P, _], preferredName: String): (NameLookupTable, String) = {
val (nodeLookupTable, nameLookupTable) = curLookupTable

nodeLookupTable.get(node) match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like .getOrElseUpdate. Can you use that if so, makes it easier to reason about what is happening.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I guess it is not, because you add two keys.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe comment to that effect?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take that back, you only add one thing to each map. So, I think getOrElseUpdate will work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two tables being updated in the else, would it not make it maybe more confusing to have one be as the return value and the other inline to the code?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. That's fine.

case Some(name) => (curLookupTable, name)
case None =>
case Some(name) => name
case None =>
nameLookupTable.get(preferredName) match {
case Some(count) => {
val newNum = count + 1
val newName = preferredName + "[" + newNum + "]"
(((nodeLookupTable + (node -> newName)), (nameLookupTable + (preferredName -> newNum))), newName)
nodeLookupTable += (node -> newName)
nameLookupTable += (preferredName -> newNum)
newName
}
case None => (((nodeLookupTable + (node -> preferredName)), (nameLookupTable + (preferredName -> 1))), preferredName)
case None =>
nodeLookupTable += (node -> preferredName)
nameLookupTable += (preferredName -> 1)
preferredName
}
}
}

override def toString() : String = {
val base = "digraph summingbirdGraph {\n"
val (graphStr, _) = dependantState.nodes.foldLeft(("", emptyNameLookupTable)) { case ((runningStr, nameLookupTable), nextNode) =>
nextNode match {
case NamedProducer(parent, name) => (runningStr, nameLookupTable)
case _ =>
// Compute the lines and new names for the nextNode
val (rawNodeName, evalNode, children) = recurseGetNode(nextNode)
val (updatedLookupTable, nodeName) = getName(nameLookupTable, evalNode, rawNodeName)

val (new_str, innerNameLookupTable) = children.foldLeft(("", updatedLookupTable)){ case ((innerRunningStr, innerNameLookupTable), c) =>
val (childName, childNode, _) = recurseGetNode(c)

val innerNewStr = "\"" + nodeName + "\" -> \""
val (updatedLookupTable2, pChildName) = getName(innerNameLookupTable, childNode, childName)

val innerNewStr2 = pChildName + "\"\n"
(innerRunningStr + innerNewStr + innerNewStr2, updatedLookupTable2)
}
(runningStr + new_str, innerNameLookupTable)
val graphStr = dependantState.nodes.flatMap { evalNode =>
val children = dependantState.dependantsOf(evalNode).getOrElse(sys.error("Invalid node: %s, unable to find dependants".format(evalNode)))
val nodeName = getName(evalNode)
children.map{ c =>
"\"%s\" -> \"%s\"\n".format(nodeName, getName(c))
}
}
}.mkString("")
base + graphStr + "\n}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
* on which these nodes depends (the producers passing data into these MergedProducer).
*/

def mergeCollapse[A](p: Prod[A]): (List[Prod[A]], List[Prod[A]]) = {
def mergeCollapse[A](p: Prod[A], rootMerge: Boolean = false): (List[Prod[A]], List[Prod[A]]) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is this ever called with rootMerge == false ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its recursively called, so the default parameters will kick in to make it false then

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. Lines 129, 130 are the cases with root = false.

p match {
case MergedProducer(subL, subR) if !forkedNodes.contains(p) =>
case MergedProducer(subL, subR) if (!forkedNodes.contains(p) || rootMerge) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(subL == subR) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.")
val (lMergeNodes, lSiblings) = mergeCollapse(subL)
Expand All @@ -149,7 +149,7 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
case MergedProducer(l, r) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(l == r) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.")
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer)
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer, rootMerge=true)
val newCurrentBolt = otherMergeNodes.foldLeft(currentBolt)(_.add(_))
val visitedWithOther = otherMergeNodes.foldLeft(visitedWithN){ (visited, n) => visited + n }

Expand All @@ -162,6 +162,8 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
}

val (nodeSet, _) = addWithDependencies(tail, FlatMapNode(), List[CNode](), Set())
require(nodeSet.collect{case n@SourceNode(_) => n}.size > 0, "Valid nodeSet should have at least one source node")

}

object OnlinePlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object StripNamedNode {

def apply[P <: Platform[P], T](tail: TailProducer[P, T]): (Map[Producer[P, Any], List[String]], TailProducer[P, T]) = {
val dependants = Dependants(tail)
val (map, newTail) = stripNamedNodes(tail)
(map.mapValues(n => getName(dependants, n)), newTail.asInstanceOf[TailProducer[P, T]])
val (oldProducerToNewMap, newTail) = stripNamedNodes(tail)
(oldProducerToNewMap.mapValues(n => getName(dependants, n)), newTail.asInstanceOf[TailProducer[P, T]])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.online

import com.twitter.algebird.{MapAlgebra, Semigroup}
import com.twitter.storehaus.{ ReadableStore, JMapStore }
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird._
import com.twitter.summingbird.batch.{BatchID, Batcher}
import com.twitter.summingbird.memory._
import com.twitter.summingbird.planner._
import com.twitter.util.Future
import org.specs2.mutable._
import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MMap}
import org.scalacheck._
import Gen._
import Arbitrary._
import org.scalacheck.Prop._
import scala.util.{Try, Success, Failure}

/**
* Tests for Summingbird's Storm planner.
*/

object PlannerSpec extends Specification {
implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L)
private type MemoryDag = Dag[Memory]
def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get

import TestGraphGenerators._

implicit def sink1: Memory#Sink[Int] = sample[Int => Unit]
implicit def sink2: Memory#Sink[(Int, Int)] = sample[((Int, Int)) => Unit]

implicit def testStore: Memory#Store[Int, Int] = MMap[Int, Int]()

implicit val arbIntSource: Arbitrary[Producer[Memory, Int]] =
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[Int]).map{
x: List[Int] =>
Memory.toSource(x)})
implicit val arbTupleSource: Arbitrary[KeyedProducer[Memory, Int, Int]] =
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[(Int, Int)]).map{
x: List[(Int, Int)] =>
IdentityKeyedProducer(Memory.toSource(x))})


def arbSource1 = sample[Producer[Memory, Int]]
def arbSource2 = sample[KeyedProducer[Memory, Int, Int]]

"Must be able to plan user supplied Job A" in {
val store1 = testStore
val store2 = testStore
val store3 = testStore

val h = arbSource1.name("name1")
.flatMap{ i: Int =>
List(i, i)
}
.name("name1PostFM")
val h2 = arbSource2.name("name2")
.flatMap{ tup : (Int, Int) =>
List(tup._1, tup._2)
}.name("name2PostFM")

val combined = h2.merge(h)

val s1 = combined.name("combinedPipes")
.map{ i: Int =>
(i, i * 2)
}

val s2 = combined.map{i : Int =>
(i, i * 3)
}

val tail = s1.sumByKey(store1)
.name("Store one writter")
.also(s2)
.sumByKey(store2)

val planned = Try(OnlinePlan(tail))
val path = TopologyPlannerLaws.dumpGraph(tail)

planned match {
case Success(graph) => true must beTrue
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
error.printStackTrace
println("Dumped failing graph to: " + path)
true must beFalse
}
}

"Must be able to plan user supplied Job B" in {
val store1 = testStore
val store2 = testStore
val store3 = testStore

val h = arbSource1.name("name1")
.flatMap{ i: Int =>
List(i, i)
}
.name("name1PostFM")
val h2 = arbSource2.name("name2")
.flatMap{ tup : (Int, Int) =>
List(tup._1, tup._2)
}.name("name2PostFM")

val combined = h2.merge(h)

val s1 = combined.name("combinedPipes")
.map{ i: Int =>
(i, i * 2)
}

val s2 = combined.map{i : Int =>
(i, i * 3)
}

val s3 = combined.map{i : Int =>
(i, i * 4)
}

val tail = s1.sumByKey(store1)
.name("Store one writter")
.also(s2)
.sumByKey(store2)
.name("Store two writer")
.also(s3)
.sumByKey(store3)
.name("Store three writer")

val planned = Try(OnlinePlan(tail))
planned match {
case Success(graph) => true must beTrue
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
error.printStackTrace
println("Dumped failing graph to: " + path)
true must beFalse
}
}

"Must be able to plan user supplied Job C" in {
val store1 = testStore

val src = arbSource1
val h = src

val h2 = src.map(_ * 3)

val combined = h2.merge(h)

val c1 = combined.map{i: Int => i * 4}
val c2 = combined.map{i: Int => i * 8}
val tail = c1.merge(c2).map{i: Int => (i, i)}.sumByKey(store1)

val planned = Try(OnlinePlan(tail))
planned match {
case Success(graph) => true must beTrue
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
error.printStackTrace
println("Dumped failing graph to: " + path)
true must beFalse
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,26 @@ object TopologyPlannerLaws extends Properties("Online Dag") {
def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get

var dumpNumber = 1
def dumpGraph(dag: MemoryDag) = {
def dumpGraph(dag: MemoryDag): String = {
import java.io._
import com.twitter.summingbird.viz.VizGraph
val writer2 = new PrintWriter(new File("/tmp/failingGraph" + dumpNumber + ".dot"))
val targetPath = "/tmp/failingGraph" + dumpNumber + ".dot"
val writer2 = new PrintWriter(new File(targetPath))
VizGraph(dag, writer2)
writer2.close()
dumpNumber = dumpNumber + 1
targetPath
}

def dumpGraph(tail: Producer[Memory, Any]) = {
def dumpGraph(tail: Producer[Memory, Any]): String = {
import java.io._
import com.twitter.summingbird.viz.VizGraph
val writer2 = new PrintWriter(new File("/tmp/failingProducerGraph" + dumpNumber + ".dot"))
val targetPath = "/tmp/failingProducerGraph" + dumpNumber + ".dot"
val writer2 = new PrintWriter(new File(targetPath))
VizGraph(tail, writer2)
writer2.close()
dumpNumber = dumpNumber + 1
targetPath
}

property("Dag Nodes must be unique") = forAll { (dag: MemoryDag) =>
Expand Down