Skip to content

Commit

Permalink
Merge pull request twitter#417 from twitter/eitherAgg
Browse files Browse the repository at this point in the history
Add some combinators on MonoidAggregator
  • Loading branch information
ianoc committed Mar 31, 2015
2 parents 3b2c292 + 9bd22c6 commit 616df8d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
43 changes: 43 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,55 @@ trait MonoidAggregator[-A, B, +C] extends Aggregator[A, B, C] { self =>
}
}

/**
* Build a MonoidAggregator that either takes left or right input
* and outputs the pair from both
*/
def either[A2, B2, C2](that: MonoidAggregator[A2, B2, C2]): MonoidAggregator[Either[A, A2], (B, B2), (C, C2)] =
new MonoidAggregator[Either[A, A2], (B, B2), (C, C2)] {
def prepare(e: Either[A, A2]) = e match {
case Left(a) => (self.prepare(a), that.monoid.zero)
case Right(a2) => (self.monoid.zero, that.prepare(a2))
}
val monoid = new Tuple2Monoid[B, B2]()(self.monoid, that.monoid)
def present(bs: (B, B2)) = (self.present(bs._1), that.present(bs._2))
}

/**
* Only aggregate items that match a predicate
*/
def filterBefore[A1 <: A](pred: A1 => Boolean): MonoidAggregator[A1, B, C] =
new MonoidAggregator[A1, B, C] {
def prepare(a: A1) = if (pred(a)) self.prepare(a) else self.monoid.zero
def monoid = self.monoid
def present(b: B) = self.present(b)
}
/**
* This maps the inputs to Bs, then sums them, effectively flattening
* the inputs to the MonoidAggregator
*/
def sumBefore: MonoidAggregator[TraversableOnce[A], B, C] =
new MonoidAggregator[TraversableOnce[A], B, C] {
def monoid: Monoid[B] = self.monoid
def prepare(input: TraversableOnce[A]): B = monoid.sum(input.map(self.prepare))
def present(reduction: B): C = self.present(reduction)
}

/**
* This allows you to join two aggregators into one that takes a tuple input,
* which in turn allows you to chain .composePrepare onto the result if you have
* an initial input that has to be prepared differently for each of the joined aggregators.
*
* The law here is: ag1.zip(ag2).apply(as.zip(bs)) == (ag1(as), ag2(bs))
*/
def zip[A2, B2, C2](ag2: MonoidAggregator[A2, B2, C2]): MonoidAggregator[(A, A2), (B, B2), (C, C2)] = {
val ag1 = self
new MonoidAggregator[(A, A2), (B, B2), (C, C2)] {
def prepare(a: (A, A2)) = (ag1.prepare(a._1), ag2.prepare(a._2))
val monoid = new Tuple2Monoid[B, B2]()(ag1.monoid, ag2.monoid)
def present(b: (B, B2)) = (ag1.present(b._1), ag2.present(b._2))
}
}
}

trait RingAggregator[-A, B, +C] extends MonoidAggregator[A, B, C] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ sealed trait Preparer[A, T] extends java.io.Serializable {
* Filter out values that do not meet the predicate.
* Like flatMap, this limits future aggregations to MonoidAggregator.
*/
def filter(fn: T => Boolean) = flatMap{ t => if (fn(t)) Some(t) else None }
def filter(fn: T => Boolean) = flatMap { t => if (fn(t)) Some(t) else None }

def collect[U](p: PartialFunction[T, U]): FlatMapPreparer[A, U] =
flatMap { t => if (p.isDefinedAt(t)) Some(p(t)) else None }

/**
* count and following methods all just call monoidAggregate with one of the standard Aggregators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,15 @@ class AggregatorLaws extends CheckProperties {
}
}
}
property("MonoidAggregator.either is correct") {
forAll { (in: List[(Int, Int)], agl: MonoidAggregator[Int, Int, Int], agr: MonoidAggregator[Int, Int, Int]) =>
assert(agl.zip(agr).apply(in) ==
agl.either(agr).apply(in.flatMap { case (l, r) => List(Left(l), Right(r)) }))
}
}
property("MonoidAggregator.filter is correct") {
forAll { (in: List[Int], ag: MonoidAggregator[Int, Int, Int], fn: Int => Boolean) =>
assert(ag.filterBefore(fn).apply(in) == ag.apply(in.filter(fn)))
}
}
}

0 comments on commit 616df8d

Please sign in to comment.