Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #487 from julienledem/monoid_to_semigroup
Browse files Browse the repository at this point in the history
Monoid to semigroup
  • Loading branch information
ianoc committed Mar 31, 2014
2 parents 60d3462 + 7d0cef6 commit ef2af68
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package com.twitter.summingbird

/** Monoid stands alone. */
import com.twitter.algebird.{ Monoid, Semigroup }
import com.twitter.algebird.Semigroup

object Producer {

Expand All @@ -30,33 +29,46 @@ object Producer {
p :: above
}

// The parents of the current node, possibly not its dependencies

/** The parents of the current node, possibly not its dependencies
* It differs from dependencies in that it will also return the predecessor of an AlsoProducer
*/
def parentsOf[P <: Platform[P]](in: Producer[P, Any]): List[Producer[P, Any]] = {
in match {
case AlsoProducer(l, r) => List(l, r)
case _ => dependenciesOf(in)
}
}

/** Returns the first Summer of the provided list of Producers */
def retrieveSummer[P <: Platform[P]](paths: List[Producer[P, _]]): Option[Summer[P, _, _]] =
paths.collectFirst { case s: Summer[P, _, _] => s }

/**
* Begin from some base representation. An iterator for in-memory,
* for example.
*/
* A producer DAG starts from sources.
* The actual source type depend on the Platform.
* For example: An iterator for in-memory.
*/
def source[P <: Platform[P], T](s: P#Source[T]): Producer[P, T] = Source[P, T](s)

/** implicit conversion from Producer[P, T] to KeyedProducer[P, K, V] when the type T is a tuple of (K, V) as proven by evidence ev
* enabling the operations on keys (sumByKey, ...)
*/
implicit def evToKeyed[P <: Platform[P], T, K, V](producer: Producer[P, T])
(implicit ev: T <:< (K, V)): KeyedProducer[P, K, V] =
IdentityKeyedProducer[P, K, V](producer.asInstanceOf[Producer[P, (K, V)]])

/** implicit conversion from Producer[P, (K, V)] to KeyedProducer[P, K, V]
* enabling the operations on keys (sumByKey, ...)
*/
implicit def toKeyed[P <: Platform[P], K, V](producer: Producer[P, (K, V)]): KeyedProducer[P, K, V] =
IdentityKeyedProducer[P, K, V](producer)

/** a semigroup on producers where + means merge */
implicit def semigroup[P <: Platform[P], T]: Semigroup[Producer[P, T]] =
Semigroup.from(_ merge _)

/** the list of the Producers, this producer directly depends on */
def dependenciesOf[P <: Platform[P]](p: Producer[P, Any]): List[Producer[P, Any]] =
p match {
case AlsoProducer(_, prod) => List(prod)
Expand Down Expand Up @@ -89,10 +101,12 @@ object Producer {
case Summer(_, _, _) => false
}

/** returns true if this Producer is an output of the DAG (Summer and WrittenProducer) */
def isOutput[P <: Platform[P]](p: Producer[P, Any]): Boolean = p match {
case Summer(_, _, _) | WrittenProducer(_, _) => true
case _ => false
}

/**
* Return all dependencies of a given node in depth first, left first order.
*/
Expand Down Expand Up @@ -167,6 +181,7 @@ sealed trait Producer[P <: Platform[P], +T] {
.merge(other.map(Right(_): Either[T, U]))
}

/** Wraps the sources of the given Platform */
case class Source[P <: Platform[P], T](source: P#Source[T])
extends Producer[P, T]

Expand All @@ -177,7 +192,6 @@ sealed trait TailProducer[P <: Platform[P], +T] extends Producer[P, T] {
* This can be used to combine two independent Producers in a way that ensures
* that the Platform will plan both into a single Plan.
*/

def also[R](that: TailProducer[P, R])(implicit ev: DummyImplicit): TailProducer[P, R] =
new AlsoTailProducer(this, that)

Expand Down Expand Up @@ -214,31 +228,33 @@ case class WrittenProducer[P <: Platform[P], T, U >: T](producer: Producer[P, T]
case class Summer[P <: Platform[P], K, V](
producer: KeyedProducer[P, K, V],
store: P#Store[K, V],
monoid: Monoid[V]) extends KeyedProducer[P, K, (Option[V], V)] with TailProducer[P, (K, (Option[V], V))]
semigroup: Semigroup[V]) extends KeyedProducer[P, K, (Option[V], V)] with TailProducer[P, (K, (Option[V], V))]

/** This has the methods on Key-Value streams.
* The rule is: if you can easily express your logic on the keys and values indendently,
* The rule is: if you can easily express your logic on the keys and values independently,
* do it! This is how you communicate structure to Summingbird and it uses these hints
* to attempt the most efficient run of your code.
*/
sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] {

/** Builds a new KeyedProvider by applying a partial function to keys of elements of this one on which the function is defined.*/
def collectKeys[K2](pf: PartialFunction[K,K2]): KeyedProducer[P, K2, V] =
IdentityKeyedProducer( collect { case (k, v) if pf.isDefinedAt(k) => (pf(k), v) })

/** Builds a new KeyedProvider by applying a partial function to values of elements of this one on which the function is defined.*/
def collectValues[V2](pf: PartialFunction[V,V2]): KeyedProducer[P, K, V2] =
IdentityKeyedProducer( collect { case (k, v) if pf.isDefinedAt(v) => (k, pf(v)) })

/** Prefer this to filter or flatMap/flatMapKeys if you are filtering.
* This may be optimized in the future with an intrensic node in the Producer graph.
* This may be optimized in the future with an intrinsic node in the Producer graph.
* We know this never increases the number of items, and we know it does not rekey
* the partition.
*/
def filterKeys(pred: K => Boolean): KeyedProducer[P, K, V] =
IdentityKeyedProducer(filter { case (k, _) => pred(k) })

/** Prefer this to filter or flatMap/flatMapValues if you are filtering.
* This may be optimized in the future with an intrensic node in the Producer graph.
* This may be optimized in the future with an intrinsic node in the Producer graph.
* We know this never increases the number of items, and we know it does not rekey
* the partition.
*/
Expand Down Expand Up @@ -291,8 +307,8 @@ sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] {
* (v0, vdelta1), (v0 + vdelta1, vdelta2), (v0 + vdelta1 + vdelta2, vdelta3), ...
*
*/
def sumByKey(store: P#Store[K, V])(implicit monoid: Monoid[V]): Summer[P, K, V] =
Summer(this, store, monoid)
def sumByKey(store: P#Store[K, V])(implicit semigroup: Semigroup[V]): Summer[P, K, V] =
Summer(this, store, semigroup)

/** Exchange values for keys */
def swap: KeyedProducer[P, V, K] = IdentityKeyedProducer(map(_.swap))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ object Scalding {
getCommutativity(names, options, s) match {
case Commutative =>
logger.info("enabling flatMapKeys mapside caching")
s.store.partialMerge(fmp, s.monoid, Commutative)
s.store.partialMerge(fmp, s.semigroup, Commutative)
case NonCommutative =>
logger.info("not enabling flatMapKeys mapside caching, due to non-commutativity")
fmp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ import scala.collection.mutable.{
SynchronizedMap
}
import java.security.Permission

/**
* Tests for Summingbird's Storm planner.
*/


object StormLaws extends Specification {
sequential
import MapAlgebra.sparseEquiv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ case class FlatMapBoltProvider(storm: Storm, stormDag: Dag[Storm], node: StormNo
// When emitting tuples between the Final Flat Map and the summer we encode the timestamp in the value
// The monoid we use in aggregation is timestamp max.
val batcher = summerProducer.store.batcher
implicit val valueMonoid: Semigroup[V] = summerProducer.monoid
implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup

// Query to get the summer paralellism of the summer down stream of us we are emitting to
// to ensure no edge case between what we might see for its parallelism and what it would see/pass to storm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird

private def scheduleSummerBolt[K, V](stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
val summer: Summer[Storm, K, V] = node.members.collect { case c: Summer[Storm, K, V] => c }.head
implicit val monoid = summer.monoid
implicit val semigroup = summer.semigroup
implicit val batcher = summer.store.batcher
val nodeName = stormDag.getNodeName(node)

Expand Down

0 comments on commit ef2af68

Please sign in to comment.