Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Revert "Remove StripNameNodes" #610

Merged
merged 1 commit into from
Mar 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.twitter.summingbird

import com.twitter.algebird.Semigroup
import com.twitter.summingbird.planner.FunctionContainer

object Producer {

Expand Down Expand Up @@ -88,28 +87,6 @@ object Producer {
case MergedProducer(l, r) => List(l, r)
}

def irreduciblesOf[P <: Platform[P]](p: Producer[P, Any]): List[Irreducible[P]] =
p match {
case Source(s) => List(Irreducible.Source(s))
case AlsoProducer(_, _) => List()
case NamedProducer(_, _) => List()
case IdentityKeyedProducer(_) => List()
// All the function cases need to be flattened in case they have
// been composed by the DagOptimizer
case OptionMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case FlatMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case KeyFlatMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case ValueFlatMappedProducer(_, fn) =>
FunctionContainer.flatten(fn).map(Irreducible.Function[P](_)).toList
case WrittenProducer(_, sink) => List(Irreducible.Sink(sink))
case LeftJoinedProducer(_, srv) => List(Irreducible.Service(srv))
case Summer(_, store, sg) => List(Irreducible.Store(store), Irreducible.Semigroup(sg))
case MergedProducer(_, _) => List()
}

/**
* Returns true if this node does not directly change the data (does not apply any transformation)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@

package com.twitter.summingbird.planner

/**
* This is a marker trait that indicates that
* the subclass holds one or more irreducible
* items passed by the user. These may need to
* be accessed at some planning stage after
* optimization in order to attach the correct
* options
*/
trait IrreducibleContainer {
def irreducibles: Iterable[Any]
}

object IrreducibleContainer {
def flatten(item: Any): Iterable[Any] = item match {
case (ic: IrreducibleContainer) => ic.irreducibles
case _ => Seq(item)
}

def flatten(left: Any, right: Any): Iterable[Any] = (left, right) match {
case (li: IrreducibleContainer, ri: IrreducibleContainer) => li.irreducibles ++ ri.irreducibles
case (li: IrreducibleContainer, _) => Seq(right) ++ li.irreducibles
case (_, ri: IrreducibleContainer) => Seq(left) ++ ri.irreducibles
case _ => Seq(left, right)
}
}

/**
* When optimizing and composing flatMaps, this class can be used
* so that we can recover the parts if needed. For instance,
Expand All @@ -24,41 +50,41 @@ package com.twitter.summingbird.planner
* irreducibiles even after optimization
*/
case class ComposedFlatMap[A, B, C](first: A => TraversableOnce[B],
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with FunctionContainer {
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with IrreducibleContainer {

// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = first(a).flatMap(second)
def innerFunctions = FunctionContainer.flatten(first, second)
def irreducibles = IrreducibleContainer.flatten(first, second)
}

/**
* Composing optionMaps
*/
case class ComposedOptionMap[A, B, C](first: A => Option[B],
second: B => Option[C]) extends (A => Option[C]) with FunctionContainer {
second: B => Option[C]) extends (A => Option[C]) with IrreducibleContainer {

// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = first(a).flatMap(second)
def innerFunctions = FunctionContainer.flatten(first, second)
def irreducibles = IrreducibleContainer.flatten(first, second)
}

case class ComposedOptionFlat[A, B, C](first: A => Option[B],
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with FunctionContainer {
second: B => TraversableOnce[C]) extends (A => TraversableOnce[C]) with IrreducibleContainer {

// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = first(a).map(second).getOrElse(Iterator.empty)
def innerFunctions = FunctionContainer.flatten(first, second)
def irreducibles = IrreducibleContainer.flatten(first, second)
}

case class OptionToFlat[A, B](optionMap: A => Option[B])
extends (A => TraversableOnce[B]) with FunctionContainer {
extends (A => TraversableOnce[B]) with IrreducibleContainer {
// Note we don't allocate a new Function in the apply call,
// we reuse the instances above.
def apply(a: A) = optionMap(a).toIterator
def innerFunctions = FunctionContainer.flatten(optionMap)
def irreducibles = IrreducibleContainer.flatten(optionMap)
}

/*
Expand All @@ -69,18 +95,18 @@ case class OptionToFlat[A, B](optionMap: A => Option[B])
* This may be useful in Storm where we want to filter before serializing out of
* the spouts (or other similar fixed-source parition systems)
*/
case class FlatAsFilter[A](useAsFilter: A => TraversableOnce[Nothing]) extends (A => Option[A]) with FunctionContainer {
case class FlatAsFilter[A](useAsFilter: A => TraversableOnce[Nothing]) extends (A => Option[A]) with IrreducibleContainer {
def apply(a: A) = if (useAsFilter(a).isEmpty) None else Some(a)
def innerFunctions = FunctionContainer.flatten(useAsFilter)
def irreducibles = IrreducibleContainer.flatten(useAsFilter)
}

/**
* (a.flatMap(f1) ++ a.flatMap(f2)) == a.flatMap { i => f1(i) ++ f2(i) }
*/
case class MergeResults[A, B](left: A => TraversableOnce[B], right: A => TraversableOnce[B])
extends (A => TraversableOnce[B]) with FunctionContainer {
extends (A => TraversableOnce[B]) with IrreducibleContainer {
// TODO it is not totally clear the fastest way to merge two TraversableOnce instances
// If they are iterators or iterables, this should be fast
def apply(a: A) = (left(a).toIterator) ++ (right(a).toIterator)
def innerFunctions = FunctionContainer.flatten(left, right)
def irreducibles = IrreducibleContainer.flatten(left, right)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,153 @@ package com.twitter.summingbird.planner

import com.twitter.summingbird._

case class ProducerF[P <: Platform[P]](oldSources: List[Producer[P, Any]],
oldRef: Producer[P, Any],
f: List[Producer[P, Any]] => Producer[P, Any])

object StripNamedNode {
def apply[P <: Platform[P], T](tail: TailProducer[P, T]): (Map[Producer[P, Any], List[String]], TailProducer[P, T]) = {
val opt = new DagOptimizer[P] {}
val newTail = opt.optimize(tail, opt.RemoveNames).asInstanceOf[TailProducer[P, T]]
val dependants = Dependants(tail)
// Where is each irreducible in the original graph:
val irrMap: Map[Irreducible[P], List[Producer[P, Any]]] = (for {
prod <- dependants.nodes
irr <- Producer.irreduciblesOf(prod)
} yield irr -> prod)
.groupBy(_._1)
.map { case (irr, it) => irr -> it.map(_._2).toList }

def originalName(p: Producer[P, Any]): List[String] =
dependants.namesOf(p).map(_.id)
def castTail[P <: Platform[P]](node: Producer[P, Any]): TailProducer[P, Any] = node.asInstanceOf[TailProducer[P, Any]]
def castToPair[P <: Platform[P]](node: Producer[P, Any]): Producer[P, (Any, Any)] = node.asInstanceOf[Producer[P, (Any, Any)]]

def processLevel[P <: Platform[P]](optLast: Option[Producer[P, Any]],
l: TraversableOnce[ProducerF[P]],
m: Map[Producer[P, Any], Producer[P, Any]],
op: PartialFunction[Producer[P, Any], Option[Producer[P, Any]]]): (Option[Producer[P, Any]], Map[Producer[P, Any], Producer[P, Any]]) = {
l.foldLeft((optLast, m)) {
case ((nOptLast, nm), pp) =>
val ns = pp.oldSources.map(m(_))
val res = pp.f(ns)
val mutatedRes = if (op.isDefinedAt(res)) {
op(res) match {
case Some(p) => p
case None => ns(0)
}
} else {
res
}

(Some(mutatedRes), (nm + (pp.oldRef -> mutatedRes)))
}
}

def functionize[P <: Platform[P]](node: Producer[P, Any]): ProducerF[P] = {
node match {
// This case is special/different since AlsoTailProducer needs the full class maintained(unlike TailNamedProducer),
// but it is not a case class. It inherits from TailProducer so cannot be one.
case p: AlsoTailProducer[_, _, _] =>
ProducerF(
List(p.result.asInstanceOf[Producer[P, Any]], p.ensure.asInstanceOf[Producer[P, Any]]),
p,
{ (newEntries): List[Producer[P, Any]] => new AlsoTailProducer[P, Any, Any](castTail(newEntries(1)), castTail(newEntries(0))) }
)
case p @ AlsoProducer(_, _) => ProducerF(
List(p.result, p.ensure),
p,
{ (newEntries): List[Producer[P, Any]] => p.copy(ensure = castTail(newEntries(1)), result = newEntries(0)) }
)
case p @ NamedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)

case p @ Source(_) => ProducerF(
List(),
p,
{ producerL: List[Producer[P, Any]] => p }
)

case p @ IdentityKeyedProducer(producer) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

case p @ OptionMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)

case p @ FlatMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)

case p @ ValueFlatMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

def newToOld(p: Producer[P, Any]): List[Producer[P, Any]] =
Producer.irreduciblesOf(p).flatMap(irrMap)
case p @ KeyFlatMappedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

def newName(p: Producer[P, Any]): List[String] =
newToOld(p).flatMap(originalName)
case p @ MergedProducer(oL, oR) => ProducerF(
List(oL, oR),
p,
{ producerL: List[Producer[P, Any]] => p.copy(left = producerL(0), right = producerL(1)) }
)

val newDependants = Dependants(newTail)
case p @ LeftJoinedProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(left = castToPair(producerL(0))) }
)

(newDependants.nodes.map { n => n -> newName(n) }.toMap, newTail)
case p @ Summer(producer, _, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = castToPair(producerL(0))) }
)

case p @ WrittenProducer(producer, _) => ProducerF(
List(producer),
p,
{ producerL: List[Producer[P, Any]] => p.copy(producer = producerL(0)) }
)
}
}

def toFunctional[P <: Platform[P]](tail: Producer[P, Any]) =
graph
.dagDepth(Producer.entireGraphOf(tail))(Producer.parentsOf(_))
.toSeq
.groupBy(_._2)
.mapValues(_.map(_._1))
.mapValues(_.map(functionize(_)))
.toSeq

def mutateGraph[P <: Platform[P]](tail: Producer[P, Any], op: PartialFunction[Producer[P, Any], Option[Producer[P, Any]]]) = {
val newT: Option[Producer[P, Any]] = None
val x = toFunctional(tail).sortBy(_._1)
x.map(_._2).foldLeft((newT, Map[Producer[P, Any], Producer[P, Any]]())) {
case ((optLast, curMap), v) =>
processLevel(optLast, v, curMap, op)
}
}

def stripNamedNodes[P <: Platform[P]](node: Producer[P, Any]): (Map[Producer[P, Any], Producer[P, Any]], Producer[P, Any]) = {
def removeNamed: PartialFunction[Producer[P, Any], Option[Producer[P, Any]]] =
{ case p @ NamedProducer(p2, _) => None }
val (optTail, oldNewMap) = mutateGraph(node, removeNamed)
val newTail = optTail.get
(oldNewMap.map(x => (x._2, x._1)).toMap, optTail.get)
}

// Priority list of of names for a given producer
private def getName[P <: Platform[P]](dependants: Dependants[P], producer: Producer[P, Any]): List[String] = {
(producer :: dependants.transitiveDependantsOf(producer)).collect { case NamedProducer(_, n) => n }
}

def apply[P <: Platform[P], T](tail: TailProducer[P, T]): (Map[Producer[P, Any], List[String]], TailProducer[P, T]) = {
val dependants = Dependants(tail)
val (oldProducerToNewMap, newTail) = stripNamedNodes(tail)
(oldProducerToNewMap.mapValues(n => getName(dependants, n)), newTail.asInstanceOf[TailProducer[P, T]])
}
}