Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Feature/mutliple summer #274

Merged
merged 11 commits into from
Oct 9, 2013
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,7 @@ object Producer {
* in-progress TopologyBuilder.
*/
sealed trait Producer[P <: Platform[P], +T] {
/** Ensure this is scheduled, but return something equivalent to the argument
* like the function `par` in Haskell.
* This can be used to combine two independent Producers in a way that ensures
* that the Platform will plan both into a single Plan.
*/
def also[R](that: Producer[P, R]): Producer[P, R] = AlsoProducer(this, that)

def name(id: String): Producer[P, T] = NamedProducer(this, id)
def merge[U >: T](r: Producer[P, U]): Producer[P, U] = MergedProducer(this, r)

Expand All @@ -114,7 +109,7 @@ sealed trait Producer[P <: Platform[P], +T] {
def flatMap[U](fn: T => TraversableOnce[U]): Producer[P, U] =
FlatMappedProducer[P, T, U](this, fn)

def write[U >: T](sink: P#Sink[U]): Producer[P, T] = WrittenProducer(this, sink)
def write[U >: T](sink: P#Sink[U]): TailProducer[P, T] = WrittenProducer(this, sink)

def either[U](other: Producer[P, U]): Producer[P, Either[T, U]] =
map(Left(_): Either[T, U])
Expand All @@ -124,11 +119,21 @@ sealed trait Producer[P <: Platform[P], +T] {
case class Source[P <: Platform[P], T](source: P#Source[T])
extends Producer[P, T]


trait TailProducer[P <: Platform[P], +T] extends Producer[P, T] {
/** Ensure this is scheduled, but return something equivalent to the argument
* like the function `par` in Haskell.
* This can be used to combine two independent Producers in a way that ensures
* that the Platform will plan both into a single Plan.
*/
def also[R](that: Producer[P, R]): Producer[P, R] = AlsoProducer(this, that)
}

/**
* This is a special node that ensures that the first argument is planned, but produces values
* equivalent to the result.
*/
case class AlsoProducer[P <: Platform[P], T, R](ensure: Producer[P, T], result: Producer[P, R]) extends Producer[P, R]
case class AlsoProducer[P <: Platform[P], T, R](ensure: TailProducer[P, T], result: Producer[P, R]) extends Producer[P, R]

case class NamedProducer[P <: Platform[P], T](producer: Producer[P, T], id: String) extends Producer[P, T]

Expand All @@ -143,12 +148,12 @@ case class FlatMappedProducer[P <: Platform[P], T, U](producer: Producer[P, T],

case class MergedProducer[P <: Platform[P], T](left: Producer[P, T], right: Producer[P, T]) extends Producer[P, T]

case class WrittenProducer[P <: Platform[P], T, U >: T](producer: Producer[P, T], sink: P#Sink[U]) extends Producer[P, T]
case class WrittenProducer[P <: Platform[P], T, U >: T](producer: Producer[P, T], sink: P#Sink[U]) extends TailProducer[P, T]

case class Summer[P <: Platform[P], K, V](
producer: KeyedProducer[P, K, V],
store: P#Store[K, V],
monoid: Monoid[V]) extends KeyedProducer[P, K, V]
monoid: Monoid[V]) extends KeyedProducer[P, K, V] with TailProducer[P, (K, V)]

sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] {
def leftJoin[RightV](service: P#Service[K, RightV]): KeyedProducer[P, K, (V, Option[RightV])] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object Dag {

def tryGetName(name: String, seen: Set[String], indxOpt: Option[Int] = None): String = {
indxOpt match {
case None => if (seen.contains(name)) tryGetName(name, seen, Some(1)) else name
case None => if (seen.contains(name)) tryGetName(name, seen, Some(2)) else name
case Some(indx) => if (seen.contains(name + "." + indx)) tryGetName(name, seen, Some(indx + 1)) else name + "." + indx
}
}
Expand All @@ -183,7 +183,20 @@ object Dag {
}
}

val (nodeToName, _) = genNames(dag.tailN, dag, Map(dag.tailN -> "Tail"), Set("Tail"))
def allTails(dag: Dag[P]): List[Node[P]] = {
dag.nodes.filter{m => dag.dependantsOf(m).size == 0 }
}

//start with the true tail
val (nodeToName, _) = (dag.tailN :: allTails(dag)).foldLeft((Map[Node[P], String](), Set[String]())) { case ((nodeToName, usedNames), curTail) =>
if(!nodeToName.contains(curTail)) {
val tailN = tryGetName("Tail", usedNames)
genNames(curTail, dag, nodeToName + (curTail -> tailN), usedNames + tailN)
} else {
(nodeToName, usedNames)
}
}

val nameToNode = nodeToName.map((t) => (t._2, t._1))
dag.copy(nodeToName = nodeToName, nameToNode = nameToNode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ object TestGraphGenerators {
if(deps.size == 1) genProd2 else oneOf(deps)
}

def aTailDependency[P <: Platform[P]](p: Producer[P, Any])(implicit genSource1 : Arbitrary[Producer[P, Int]],
genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int],
sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[TailProducer[P, Any]] = {
val validDeps = Producer.transitiveDependenciesOf(p).collect{case x:TailProducer[_, _] => x.asInstanceOf[TailProducer[P, Any]]}
if(validDeps.size == 0) summed else oneOf(validDeps)
}


def genMerged2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
_ <- Gen.choose(0,1)
p1 <- genProd2
Expand Down Expand Up @@ -77,13 +85,15 @@ object TestGraphGenerators {
name <- Gen.alphaStr
} yield NamedProducer(FlatMappedProducer(in, fn), name)

def genMerged1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
def genMerged1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
_ <- Gen.choose(0,1)
p1 <- genProd1
p2 <- genProd1
} yield MergedProducer(p1, p2)

def genFlatMap12[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
def genFlatMap12[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
fn <- arbitrary[(Int) => List[(Int, Int)]]
in <- genProd1
} yield IdentityKeyedProducer(FlatMappedProducer(in, fn))
Expand All @@ -96,16 +106,18 @@ object TestGraphGenerators {
} yield IdentityKeyedProducer(p1.write(sink2))


def also1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
def also1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
_ <- Gen.choose(0, 1) // avoids blowup on self recursion
out <- genProd1
ignored <- oneOf(genProd2, genProd1, oneOf(Producer.transitiveDependenciesOf(out))): Gen[Producer[P, _]]
ignored <- oneOf(summed, written, aTailDependency(out)): Gen[TailProducer[P, _]]
} yield ignored.also(out)

def also2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
def also2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for {
_ <- Gen.choose(0, 1) // avoids blowup on self recursion
out <- genProd2
ignored <- oneOf(genProd2, genProd1, oneOf(Producer.transitiveDependenciesOf(out))): Gen[Producer[P, _]]
ignored <- oneOf(summed, written, aTailDependency(out)): Gen[TailProducer[P, _]]
} yield IdentityKeyedProducer(ignored.also(out))


Expand All @@ -116,17 +128,24 @@ object TestGraphGenerators {
in <- genProd2
} yield in.sumByKey(testStore)

def written[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]],
genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int],
sink2: P#Sink[(Int, Int)]): Gen[TailProducer[P, Int]] = for {
in <- genProd1
} yield in.write(sink1)

def genProd2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]],
genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] =
frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2),
frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), (2, also2),
(0, also2), (3, genFlatMap22), (3, genFlatMap12))


def genProd1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]],
genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[Producer[P, Int]] =
frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (3, genFlatMap11),
frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (2, also1), (3, genFlatMap11),
(0, also1), (3, genFlatMap21))

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ object TestGraphs {
.flatMap(postJoinFn)
.sumByKey(store)


def multipleSummerJobInScala[T1, T2, K1, V1: Monoid, K2, V2: Monoid]
(source: List[T1])
(fnR: T1 => TraversableOnce[T2], fnA: T2 => TraversableOnce[(K1, V1)], fnB: T2 => TraversableOnce[(K2, V2)])
: (Map[K1, V1], Map[K2, V2]) = {
val mapA = MapAlgebra.sumByKey(source.flatMap(fnR).flatMap(fnA))
val mapB = MapAlgebra.sumByKey(source.flatMap(fnR).flatMap(fnB))
(mapA, mapB)
}

def multipleSummerJob[P <: Platform[P], T1, T2, K1, V1: Monoid, K2, V2: Monoid]
(source: Producer[P, T1], store1: P#Store[K1, V1], store2: P#Store[K2, V2])
(fnR: T1 => TraversableOnce[T2], fnA: T2 => TraversableOnce[(K1, V1)], fnB: T2 => TraversableOnce[(K2, V2)]): Summer[P, K2, V2] = {
val combined = source.flatMap(fnR)
combined.flatMap(fnA).sumByKey(store1).also(combined).flatMap(fnB).sumByKey(store2)
}

def mapOnlyJob[P <: Platform[P], T, U](
source: Producer[P, T],
sink: P#Sink[U]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,37 @@ object ScaldingLaws extends Properties("Scalding") {
Monoid.isNonZero(Group.minus(inMemory, smap)) == false
}


property("ScaldingPlatform matches Scala for multiple summer jobs, with everything in one Batch") =
forAll { (original: List[Int], fnA:(Int) => List[(Int)], fnB: (Int) => List[(Int, Int)], fnC: (Int) => List[(Int, Int)]) =>
val (inMemoryA, inMemoryB) = TestGraphs.multipleSummerJobInScala(original)(fnA, fnB, fnC)

// Add a time:
val inWithTime = original.zipWithIndex.map { case (item, time) => (time.toLong, item) }
val batcher = simpleBatcher
import Dsl._ // Won't be needed in scalding 0.9.0
val testStoreA = new TestStore[Int,Int]("testA", batcher, BatchID(0), Iterable.empty, BatchID(1))
val testStoreB = new TestStore[Int,Int]("testB", batcher, BatchID(0), Iterable.empty, BatchID(1))
val (buffer, source) = testSource(inWithTime)

val tail = TestGraphs.multipleSummerJob[Scalding, (Long, Int), Int, Int, Int, Int, Int](source, testStoreA, testStoreB)({t => fnA(t._2)}, fnB, fnC)

val intr = Interval.leftClosedRightOpen(0L, original.size.toLong)
val scald = new Scalding("scalaCheckMultipleSumJob")
val ws = new LoopState(intr.mapNonDecreasing(t => new Date(t)))
val mode: Mode = TestMode(t => (testStoreA.sourceToBuffer ++ testStoreB.sourceToBuffer ++ buffer).get(t))

scald.run(ws, mode, scald.plan(tail))
// Now check that the inMemory ==

val smapA = testStoreA.lastToIterable(BatchID(1)).toMap
Monoid.isNonZero(Group.minus(inMemoryB, smapA)) == false

val smapB = testStoreB.lastToIterable(BatchID(1)).toMap
Monoid.isNonZero(Group.minus(inMemoryB, smapB)) == false
}


property("ScaldingPlatform matches Scala for leftJoin jobs, with everything in one Batch") =
forAll { (original: List[Int],
prejoinMap: (Int) => List[(Int, Int)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable {

override def close { }

/*
* maybeFlush may be periodically called to empty any internal state
* Not used yet so commented out
*/
def maybeFlush: Future[TraversableOnce[U]] = Future.value(Seq.empty[U])

/**
* TODO: Think about getting an implicit FutureCollector here, in
* case we don't want to completely choke on large expansions (and
Expand All @@ -42,6 +48,17 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable {
Future.collect(next).map(_.flatten) // flatten the inner
}

override def maybeFlush = {
self.maybeFlush.flatMap{ x: TraversableOnce[U] =>
val z: Seq[Future[TraversableOnce[V]]] = x.map(fmo.apply(_)).toSeq
val w: Future[Seq[V]] = Future.collect(z).map(_.flatten)
for {
ws <- w
maybes <- fmo.maybeFlush
maybeSeq = maybes.toSeq
} yield ws ++ maybeSeq
}
}
override def close { self.close; fmo.close }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
case IdentityKeyedProducer(_) => acc
case MergedProducer(_, _) => acc
case NamedProducer(_, _) => acc
case AlsoProducer(_, _) => acc
case _ => throw new Exception("Not found! : " + p)
}
}
Expand Down Expand Up @@ -145,6 +146,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
case OptionMappedProducer(_, op) => spout.flatMap {case (time, t) => op.apply(t).map { x => (time, x) }}
case NamedProducer(_, _) => spout
case IdentityKeyedProducer(_) => spout
case AlsoProducer(_, _) => spout
case _ => sys.error("not possible, given the above call to span.\n" + p)
}
}.getSpout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ object DagBuilder {

// This takes an initial pass through all of the Producers, assigning them to Nodes
private def buildNodesSet[P](tail: Producer[Storm, P]): List[StormNode] = {
val dep = Dependants(tail)
val depData = Dependants(tail)
val forkedNodes = Producer.transitiveDependenciesOf(tail)
.filter(dep.fanOut(_).exists(_ > 1)).toSet
.filter(depData.fanOut(_).exists(_ > 1)).toSet
def distinctAddToList[T](l : List[T], n : T): List[T] = if(l.contains(n)) l else (n :: l)

// Add the dependentProducer to a Node along with each of its dependencies in turn.
Expand All @@ -66,10 +66,15 @@ object DagBuilder {
case IdentityKeyedProducer(producer) => true
case OptionMappedProducer(producer, _) => true
case Source(_) => true
case AlsoProducer(_, _) => true
case _ => false
}
}

def hasSummerAsNextProducer(p: Prod[_]): Boolean =
depData.dependantsOf(p).get.collect { case s: Summer[_, _, _] => s }.headOption.isDefined


def allDepsMergeableWithSource(p: Prod[_]): Boolean = mergableWithSource(p) && Producer.dependenciesOf(p).forall(allDepsMergeableWithSource)

/*
Expand All @@ -85,7 +90,7 @@ object DagBuilder {
case _ if (forkedNodes.contains(dep)) => true
// If we are a flatmap, but there haven't been any other flatmaps yet(i.e. the registry is of size 1, the summer).
// Then we must split to avoid a 2 level higherarchy
case _ if (currentBolt.isInstanceOf[FlatMapNode[_]] && stormRegistry.size == 1 && allDepsMergeableWithSource(dep)) => true
case _ if (currentBolt.isInstanceOf[FlatMapNode[_]] && hasSummerAsNextProducer(currentProducer) && allDepsMergeableWithSource(dep)) => true
case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true
case _ => false
}
Expand All @@ -96,6 +101,8 @@ object DagBuilder {
}
}



/*
* This is a peek ahead when we meet a MergedProducer. We pull the directly depended on MergedProducer's into the same Node,
* only if that MergedProducer is not a fan out node.
Expand All @@ -120,6 +127,9 @@ object DagBuilder {
case Summer(producer, _, _) => recurse(producer, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt.toSummer))
case IdentityKeyedProducer(producer) => maybeSplitThenRecurse(dependantProducer, producer)
case NamedProducer(producer, newId) => maybeSplitThenRecurse(dependantProducer, producer)
case AlsoProducer(lProducer, rProducer) =>
val (updatedReg, updatedVisited) = maybeSplitThenRecurse(dependantProducer, rProducer)
recurse(lProducer, FlatMapNode(), updatedReg, updatedVisited)
case Source(spout) => (distinctAddToList(stormRegistry, currentBolt.toSource), visitedWithN)
case OptionMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer)
case FlatMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer)
Expand Down
Loading