Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #610 from twitter/revert-587-removestripnames
Browse files Browse the repository at this point in the history
Revert "Remove StripNameNodes"
  • Loading branch information
mav911 committed Mar 17, 2015
2 parents c6e8222 + 1604ed9 commit aa9b8c1
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 133 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.twitter.summingbird

import com.twitter.algebird.Semigroup
import com.twitter.summingbird.planner.FunctionContainer

object Producer {

Expand Down Expand Up @@ -88,28 +87,6 @@ object Producer {
case MergedProducer(l, r) => List(l, r)
}

def irreduciblesOf[P <: Platform[P]](p: Producer[P, Any]): List[Irreducible[P]] =
p match {
case Source(s) => List(Irreducible.Source(s))
case AlsoProducer(_, _) => List()
case NamedProducer(_, _) => List()
case IdentityKeyedProducer(_) => List()
// All the function cases need to be flattened in case they have
// been composed by the DagOptimizer
case OptionMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case FlatMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case KeyFlatMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case ValueFlatMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case WrittenProducer(_, sink) => List(Irreducible.Sink(sink))
case LeftJoinedProducer(_, srv) => List(Irreducible.Service(srv))
case Summer(_, store, sg) => List(Irreducible.Store(store), Irreducible.Semigroup(sg))
case MergedProducer(_, _) => List()
}

/**
* Returns true if this node does not directly change the data (does not apply any transformation)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@

package com.twitter.summingbird.planner

/**
* This is a marker trait that indicates that
* the subclass holds one or more irreducible
* items passed by the user. These may need to
* be accessed at some planning stage after
* optimization in order to attach the correct
* options
*/
trait IrreducibleContainer {
def irreducibles: Iterable[Any]
}

object IrreducibleContainer {
def flatten(item: Any): Iterable[Any] = item match {
case (ic: IrreducibleContainer) => ic.irreducibles
case _ => Seq(item)
}

def flatten(left: Any, right: Any): Iterable[Any] = (left, right) match {
case (li: IrreducibleContainer, ri: IrreducibleContainer) => li.irreducibles ++ ri.irreducibles
case (li: IrreducibleContainer, _) => Seq(right) ++ li.irreducibles
case (_, ri: IrreducibleContainer) => Seq(left) ++ ri.irreducibles
case _ => Seq(left, right)
}
}

/**
* When optimizing and composing flatMaps, this class can be used
* so that we can recover the parts if needed. For instance,
Expand All @@ -24,41 +50,41 @@ package com.twitter.summingbird.planner
* irreducibiles even after optimization
*/
case class ComposedFlatMap[A, B, C](first: A => TraversableOnce[B],
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with FunctionContainer {
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with IrreducibleContainer {

// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = first(a).flatMap(second)
def innerFunctions = FunctionContainer.flatten(first, second)
def irreducibles = IrreducibleContainer.flatten(first, second)
}

/**
* Composing optionMaps
*/
case class ComposedOptionMap[A, B, C](first: A => Option[B],
second: B => Option[C]) extends (A => Option[C]) with FunctionContainer {
second: B => Option[C]) extends (A => Option[C]) with IrreducibleContainer {

// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = first(a).flatMap(second)
def innerFunctions = FunctionContainer.flatten(first, second)
def irreducibles = IrreducibleContainer.flatten(first, second)
}

case class ComposedOptionFlat[A, B, C](first: A => Option[B],
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with FunctionContainer {
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with IrreducibleContainer {

// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = first(a).map(second).getOrElse(Iterator.empty)
def innerFunctions = FunctionContainer.flatten(first, second)
def irreducibles = IrreducibleContainer.flatten(first, second)
}

case class OptionToFlat[A, B](optionMap: A => Option[B])
extends (A => TraversableOnce[B]) with FunctionContainer {
extends (A => TraversableOnce[B]) with IrreducibleContainer {
// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = optionMap(a).toIterator
def innerFunctions = FunctionContainer.flatten(optionMap)
def irreducibles = IrreducibleContainer.flatten(optionMap)
}

/*
Expand All @@ -69,18 +95,18 @@ case class OptionToFlat[A, B](optionMap: A => Option[B])
* This may be useful in Storm where we want to filter before serializing out of
* the spouts (or other similar fixed-source parition systems)
*/
case class FlatAsFilter[A](useAsFilter: A => TraversableOnce[Nothing]) extends (A => Option[A]) with FunctionContainer {
case class FlatAsFilter[A](useAsFilter: A => TraversableOnce[Nothing]) extends (A => Option[A]) with IrreducibleContainer {
def apply(a: A) = if (useAsFilter(a).isEmpty) None else Some(a)
def innerFunctions = FunctionContainer.flatten(useAsFilter)
def irreducibles = IrreducibleContainer.flatten(useAsFilter)
}

/**
* (a.flatMap(f1) ++ a.flatMap(f2)) == a.flatMap { i => f1(i) ++ f2(i) }
*/
case class MergeResults[A, B](left: A => TraversableOnce[B], right: A => TraversableOnce[B])
extends (A => TraversableOnce[B]) with FunctionContainer {
extends (A => TraversableOnce[B]) with IrreducibleContainer {
// TODO it is not totally clear the fastest way to merge two TraversableOnce instances
// If they are iterators or iterables, this should be fast
def apply(a: A) = (left(a).toIterator) ++ (right(a).toIterator)
def innerFunctions = FunctionContainer.flatten(left, right)
def irreducibles = IrreducibleContainer.flatten(left, right)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,153 @@ package com.twitter.summingbird.planner

import com.twitter.summingbird._

case class ProducerF[P <: Platform[P]](oldSources: List[Producer[P, Any]],
oldRef: Producer[P, Any],
f: List[Producer[P, Any]] => Producer[P, Any])

object StripNamedNode {
def apply[P <: Platform[P], T](tail: TailProducer[P, T]): (Map[Producer[P, Any], List[String]], TailProducer[P, T]) = {
val opt = new DagOptimizer[P] {}
val newTail = opt.optimize(tail, opt.RemoveNames).asInstanceOf[TailProducer[P, T]]
val dependants = Dependants(tail)
// Where is each irreducible in the original graph:
val irrMap: Map[Irreducible[P], List[Producer[P, Any]]] = (for {
prod <- dependants.nodes
irr <- Producer.irreduciblesOf(prod)
} yield irr -> prod)
.groupBy(_._1)
.map { case (irr, it) => irr -> it.map(_._2).toList }

def originalName(p: Producer[P, Any]): List[String] =
dependants.namesOf(p).map(_.id)
def castTail[P <: Platform[P]](node: Producer[P, Any]): TailProducer[P, Any] = node.asInstanceOf[TailProducer[P, Any]]
def castToPair[P <: Platform[P]](node: Producer[P, Any]): Producer[P, (Any, Any)] = node.asInstanceOf[Producer[P, (Any, Any)]]

def processLevel[P <: Platform[P]](optLast: Option[Producer[P, Any]],
l: TraversableOnce[ProducerF[P]],
m: Map[Producer[P, Any], Producer[P, Any]],
op: PartialFunction[Producer[P, Any], Option[Producer[P, Any]]]): (Option[Producer[P, Any]], Map[Producer[P, Any], Producer[P, Any]]) = {
l.foldLeft((optLast, m)) {
case ((nOptLast, nm), pp) =>
val ns = pp.oldSources.map(m(_))
val res = pp.f(ns)
val mutatedRes = if (op.isDefinedAt(res)) {
op(res) match {
case Some(p) => p
case None => ns(0)
}
} else {
res
}

(Some(mutatedRes), (nm + (pp.oldRef -> mutatedRes)))
}
}

def functionize[P <: Platform[P]](node: Producer[P, Any]): ProducerF[P] = {
node match {
// This case is special/different since AlsoTailProducer needs the full class maintained(unlike TailNamedProducer),
// but it is not a case class. It inherits from TailProducer so cannot be one.
case p: AlsoTailProducer[_, _, _] =>
ProducerF(
List(p.result.asInstanceOf[Producer[P, Any]], p.ensure.asInstanceOf[Producer[P, Any]]),
p,
{ (newEntries): List[Producer[P, Any]] => new AlsoTailProducer[P, Any, Any](castTail(newEntries(1)), castTail(newEntries(0))) }
)
case p @ AlsoProducer(_, _) => ProducerF(
List(p.result, p.ensure),
p,
{ (newEntries): List[Producer[P, Any]] => p.copy(ensure = castTail(newEntries(1)), result = newEntries(0)) }
)
case p @ NamedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)

case p @ Source(_) => ProducerF(
List(),
p,
{ producerL: List[Producer[P, Any]] => p }
)

case p @ IdentityKeyedProducer(producer) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

case p @ OptionMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)

case p @ FlatMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)

case p @ ValueFlatMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

def newToOld(p: Producer[P, Any]): List[Producer[P, Any]] =
Producer.irreduciblesOf(p).flatMap(irrMap)
case p @ KeyFlatMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

def newName(p: Producer[P, Any]): List[String] =
newToOld(p).flatMap(originalName)
case p @ MergedProducer(oL, oR) => ProducerF(
List(oL, oR),
p,
{ producerL: List[Producer[P, Any]] => p.copy(left = producerL(0), right = producerL(1)) }
)

val newDependants = Dependants(newTail)
case p @ LeftJoinedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(left = castToPair(producerL(0))) }
)

(newDependants.nodes.map { n => n -> newName(n) }.toMap, newTail)
case p @ Summer(producer, _, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

case p @ WrittenProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)
}
}

def toFunctional[P <: Platform[P]](tail: Producer[P, Any]) =
graph
.dagDepth(Producer.entireGraphOf(tail))(Producer.parentsOf(_))
.toSeq
.groupBy(_._2)
.mapValues(_.map(_._1))
.mapValues(_.map(functionize(_)))
.toSeq

def mutateGraph[P <: Platform[P]](tail: Producer[P, Any], op: PartialFunction[Producer[P, Any], Option[Producer[P, Any]]]) = {
val newT: Option[Producer[P, Any]] = None
val x = toFunctional(tail).sortBy(_._1)
x.map(_._2).foldLeft((newT, Map[Producer[P, Any], Producer[P, Any]]())) {
case ((optLast, curMap), v) =>
processLevel(optLast, v, curMap, op)
}
}

def stripNamedNodes[P <: Platform[P]](node: Producer[P, Any]): (Map[Producer[P, Any], Producer[P, Any]], Producer[P, Any]) = {
def removeNamed: PartialFunction[Producer[P, Any], Option[Producer[P, Any]]] =
{ case p @ NamedProducer(p2, _) => None }
val (optTail, oldNewMap) = mutateGraph(node, removeNamed)
val newTail = optTail.get
(oldNewMap.map(x => (x._2, x._1)).toMap, optTail.get)
}

// Priority list of of names for a given producer
private def getName[P <: Platform[P]](dependants: Dependants[P], producer: Producer[P, Any]): List[String] = {
(producer :: dependants.transitiveDependantsOf(producer)).collect { case NamedProducer(_, n) => n }
}

def apply[P <: Platform[P], T](tail: TailProducer[P, T]): (Map[Producer[P, Any], List[String]], TailProducer[P, T]) = {
val dependants = Dependants(tail)
val (oldProducerToNewMap, newTail) = stripNamedNodes(tail)
(oldProducerToNewMap.mapValues(n => getName(dependants, n)), newTail.asInstanceOf[TailProducer[P, T]])
}
}

0 comments on commit aa9b8c1

Please sign in to comment.