Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Monoid to semigroup #487

Merged
merged 6 commits into from
Mar 31, 2014
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package com.twitter.summingbird

/** Monoid stands alone. */
import com.twitter.algebird.{ Monoid, Semigroup }
import com.twitter.algebird.Semigroup
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/** Semigroup is half-standing */


object Producer {

Expand All @@ -30,33 +29,46 @@ object Producer {
p :: above
}

// The parents of the current node, possibly not its dependencies

/** The parents of the current node, possibly not its dependencies
* It defers from dependencies in that it will also return the predecessor of an AlsoProducer
*/
def parentsOf[P <: Platform[P]](in: Producer[P, Any]): List[Producer[P, Any]] = {
in match {
case AlsoProducer(l, r) => List(l, r)
case _ => dependenciesOf(in)
}
}

/** Returns the first Summer of the provided list of Producers */
def retrieveSummer[P <: Platform[P]](paths: List[Producer[P, _]]): Option[Summer[P, _, _]] =
paths.collectFirst { case s: Summer[P, _, _] => s }

/**
* Begin from some base representation. An iterator for in-memory,
* for example.
*/
* A producer DAG starts from sources.
* The actual source type depend on the Platform.
* For example: An iterator for in-memory.
*/
def source[P <: Platform[P], T](s: P#Source[T]): Producer[P, T] = Source[P, T](s)

/** implicit conversion from Producer[P, T] to KeyedProducer[P, K, V] when the type T is a tuple of (K, V) as proven by evidence ev
* enabling the operations on keys (sumByKey, ...)
*/
implicit def evToKeyed[P <: Platform[P], T, K, V](producer: Producer[P, T])
(implicit ev: T <:< (K, V)): KeyedProducer[P, K, V] =
IdentityKeyedProducer[P, K, V](producer.asInstanceOf[Producer[P, (K, V)]])

/** implicit conversion from Producer[P, (K, V)] to KeyedProducer[P, K, V]
* enabling the operations on keys (sumByKey, ...)
*/
implicit def toKeyed[P <: Platform[P], K, V](producer: Producer[P, (K, V)]): KeyedProducer[P, K, V] =
IdentityKeyedProducer[P, K, V](producer)

/** a semigroup on producers where + means merge */
implicit def semigroup[P <: Platform[P], T]: Semigroup[Producer[P, T]] =
Semigroup.from(_ merge _)

/** the list of the Producers, this producer directly depends on */
def dependenciesOf[P <: Platform[P]](p: Producer[P, Any]): List[Producer[P, Any]] =
p match {
case AlsoProducer(_, prod) => List(prod)
Expand Down Expand Up @@ -89,10 +101,12 @@ object Producer {
case Summer(_, _, _) => false
}

/** returns true if this Producer is an output of the DAG (Summer and WrittenProducer) */
def isOutput[P <: Platform[P]](p: Producer[P, Any]): Boolean = p match {
case Summer(_, _, _) | WrittenProducer(_, _) => true
case _ => false
}

/**
* Return all dependencies of a given node in depth first, left first order.
*/
Expand Down Expand Up @@ -167,6 +181,7 @@ sealed trait Producer[P <: Platform[P], +T] {
.merge(other.map(Right(_): Either[T, U]))
}

/** Wraps the sources of the given Platform */
case class Source[P <: Platform[P], T](source: P#Source[T])
extends Producer[P, T]

Expand All @@ -177,7 +192,6 @@ sealed trait TailProducer[P <: Platform[P], +T] extends Producer[P, T] {
* This can be used to combine two independent Producers in a way that ensures
* that the Platform will plan both into a single Plan.
*/

def also[R](that: TailProducer[P, R])(implicit ev: DummyImplicit): TailProducer[P, R] =
new AlsoTailProducer(this, that)

Expand Down Expand Up @@ -214,31 +228,33 @@ case class WrittenProducer[P <: Platform[P], T, U >: T](producer: Producer[P, T]
case class Summer[P <: Platform[P], K, V](
producer: KeyedProducer[P, K, V],
store: P#Store[K, V],
monoid: Monoid[V]) extends KeyedProducer[P, K, (Option[V], V)] with TailProducer[P, (K, (Option[V], V))]
semigroup: Semigroup[V]) extends KeyedProducer[P, K, (Option[V], V)] with TailProducer[P, (K, (Option[V], V))]

/** This has the methods on Key-Value streams.
* The rule is: if you can easily express your logic on the keys and values indendently,
* The rule is: if you can easily express your logic on the keys and values independently,
* do it! This is how you communicate structure to Summingbird and it uses these hints
* to attempt the most efficient run of your code.
*/
sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] {

/** Builds a new KeyedProvider by applying a partial function to keys of elements of this one on which the function is defined.*/
def collectKeys[K2](pf: PartialFunction[K,K2]): KeyedProducer[P, K2, V] =
IdentityKeyedProducer( collect { case (k, v) if pf.isDefinedAt(k) => (pf(k), v) })

/** Builds a new KeyedProvider by applying a partial function to values of elements of this one on which the function is defined.*/
def collectValues[V2](pf: PartialFunction[V,V2]): KeyedProducer[P, K, V2] =
IdentityKeyedProducer( collect { case (k, v) if pf.isDefinedAt(v) => (k, pf(v)) })

/** Prefer this to filter or flatMap/flatMapKeys if you are filtering.
* This may be optimized in the future with an intrensic node in the Producer graph.
* This may be optimized in the future with an intrinsic node in the Producer graph.
* We know this never increases the number of items, and we know it does not rekey
* the partition.
*/
def filterKeys(pred: K => Boolean): KeyedProducer[P, K, V] =
IdentityKeyedProducer(filter { case (k, _) => pred(k) })

/** Prefer this to filter or flatMap/flatMapValues if you are filtering.
* This may be optimized in the future with an intrensic node in the Producer graph.
* This may be optimized in the future with an intrinsic node in the Producer graph.
* We know this never increases the number of items, and we know it does not rekey
* the partition.
*/
Expand Down Expand Up @@ -291,8 +307,8 @@ sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] {
* (v0, vdelta1), (v0 + vdelta1, vdelta2), (v0 + vdelta1 + vdelta2, vdelta3), ...
*
*/
def sumByKey(store: P#Store[K, V])(implicit monoid: Monoid[V]): Summer[P, K, V] =
Summer(this, store, monoid)
def sumByKey(store: P#Store[K, V])(implicit semigroup: Semigroup[V]): Summer[P, K, V] =
Summer(this, store, semigroup)

/** Exchange values for keys */
def swap: KeyedProducer[P, V, K] = IdentityKeyedProducer(map(_.swap))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ object Scalding {
getCommutativity(names, options, s) match {
case Commutative =>
logger.info("enabling flatMapKeys mapside caching")
s.store.partialMerge(fmp, s.monoid, Commutative)
s.store.partialMerge(fmp, s.semigroup, Commutative)
case NonCommutative =>
logger.info("not enabling flatMapKeys mapside caching, due to non-commutativity")
fmp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ import scala.collection.mutable.{
SynchronizedMap
}
import java.security.Permission

/**
* Tests for Summingbird's Storm planner.
*/


object StormLaws extends Specification {
sequential
import MapAlgebra.sparseEquiv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ case class FlatMapBoltProvider(storm: Storm, stormDag: Dag[Storm], node: StormNo
// When emitting tuples between the Final Flat Map and the summer we encode the timestamp in the value
// The monoid we use in aggregation is timestamp max.
val batcher = summerProducer.store.batcher
implicit val valueMonoid: Semigroup[V] = summerProducer.monoid
implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup

// Query to get the summer paralellism of the summer down stream of us we are emitting to
// to ensure no edge case between what we might see for its parallelism and what it would see/pass to storm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird

private def scheduleSummerBolt[K, V](stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
val summer: Summer[Storm, K, V] = node.members.collect { case c: Summer[Storm, K, V] => c }.head
implicit val monoid = summer.monoid
implicit val semigroup = summer.semigroup
implicit val batcher = summer.store.batcher
val nodeName = stormDag.getNodeName(node)

Expand Down