From 7bfc8e6c479cfaa1d1247c5a960a65d721d86429 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Wed, 6 Nov 2019 10:53:17 -0800 Subject: [PATCH 01/26] initial implementation of scan plus a lot of tests --- .../scala/com/twitter/algebird/Scan.scala | 299 ++++++++++++++++++ .../scala/com/twitter/algebird/ScanTest.scala | 85 +++++ 2 files changed, 384 insertions(+) create mode 100644 algebird-core/src/main/scala/com/twitter/algebird/Scan.scala create mode 100644 algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala new file mode 100644 index 000000000..ee08a6504 --- /dev/null +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -0,0 +1,299 @@ +package com.twitter.algebird + +import scala.collection.AbstractIterator +import scala.collection.generic.CanBuildFrom + +object Scan { + implicit def applicative[I]: Applicative[({ type L[O] = Scan[I, O] })#L] = new ScanApplicative[I] + + def from[I, S, O](initState: S)(presentAndNextStateFn: (I, S) => (O, S)): Scan[I, O] = + new Scan[I, O] { + override type State = S + override val initialState = initState + override def presentAndNextState(i: I, s: State): (O, State) = presentAndNextStateFn(i, s) + } + + /** + * + * @param initStateCreator A call-by-name method that allocates new mutable state + * @param presentAndUpdateStateFn A function that both presents the output value, and has the side-effect of updating the mutable state + * @tparam I + * @tparam S + * @tparam O + * @return A Scan that safely encapsulates state while it's doing its thing. + */ + def mutable[I, S, O](initStateCreator: => S)(presentAndUpdateStateFn: (I, S) => O): Scan[I, O] = + new Scan[I, O] { + override type State = S + override def initialState = initStateCreator + override def presentAndNextState(i: I, s: S): (O, S) = (presentAndUpdateStateFn(i, s), s) + } + + /** + * The trivial scan that always returns the same value, regardless of input + * @param t + * @tparam T + */ + def const[T](t: T): Scan[Any, T] = from(()) { (_, _) => + (t, ()) + } + + /** + * + * @param aggregator + * @param initState + * @tparam A + * @tparam B + * @tparam C + * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where + * c_i = initState + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i) + */ + def fromAggregator[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Scan[A, C] = + new Scan[A, C] { + override type State = B + + override val initialState = initState + + override def presentAndNextState(a: A, stateBeforeProcessingI: B): (C, B) = { + // nb: the order of the arguments to semigroup.plus here is what determines the order of the final summation; + // this matters because not all semigroups are commutative + val stateAfterProcessingA = + aggregator.semigroup.plus(stateBeforeProcessingI, aggregator.prepare(a)) + (aggregator.present(stateAfterProcessingA), stateAfterProcessingA) + } + } + + /** + * + * @param monoidAggregator + * @tparam A + * @tparam B + * @tparam C + * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where + * c_i = monoidAggregator.monoid.zero + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i) + */ + def fromMonoidAggregator[A, B, C](monoidAggregator: MonoidAggregator[A, B, C]): Scan[A, C] = + fromAggregator(monoidAggregator, monoidAggregator.monoid.zero) + + /** + * + * @param aggregator + * @param initState + * @tparam A + * @tparam B + * @tparam C + * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where + * c_i = aggregator.prepare(a_i) + ... + aggregator.prepare(a_1) + initState + */ + def fromAggregatorReverse[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Scan[A, C] = + new Scan[A, C] { + override type State = B + + override val initialState = initState + + override def presentAndNextState(a: A, stateBeforeProcessingI: B): (C, B) = { + // nb: the order of the arguments to semigroup.plus here is what determines the order of the final summation; + // this matters because not all semigroups are commutative + val stateAfterProcessingA = + aggregator.semigroup.plus(aggregator.prepare(a), stateBeforeProcessingI) + (aggregator.present(stateAfterProcessingA), stateAfterProcessingA) + } + } + + /** + * + * @param monoidAggregator + * @tparam A + * @tparam B + * @tparam C + * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where + * c_i = aggregator.prepare(a_i) + ... + aggregator.prepare(a_1) + monoidAggregator.monoid.zero + */ + def fromMonoidAggregatorReverse[A, B, C]( + monoidAggregator: MonoidAggregator[A, B, C] + ): Scan[A, C] = + fromAggregatorReverse(monoidAggregator, monoidAggregator.monoid.zero) + +} + +/** + * The Scan trait is an alternative to the "scanLeft" method on iterators/other collections for a range of + * of use-cases where scanLeft is awkward to use. At a high level it provides some of the same functionality as scanleft, + * but with a separation of "what is the state of the scan" from "what are the elements that I'm scanning over?" + * In particular, when scanning over an iterator with N elements, the output is an iterator with N elements (in contrast + * to scanLeft's N+1). + * + * If you find yourself writing a ScanLeft over pairs of elements, where you only use one element of the pair within + * the scanLeft itself then throw that element away in a "map" immediately after the scanLeft is done, then this + * abstraction is for you. + * + * + * @tparam I The type of elements that the computation is scanning over. + * @tparam O The output type of the scan (typically distinct from the hidden {{{State}}} of the scan. + */ +sealed trait Scan[-I, +O] extends Serializable { self => + + /** + * The computation of any given scan involves keeping track of a hidden state of type {{{State}}} + */ + type State + + /** + * + * @return The state of the scan before any elements have been processed + */ + def initialState: State + + /** + * + * @param i An element in the stream to process + * @param stateBeforeProcessingI The state of the scan before processing {{{i}}} + * @return The output of the scan corresponding to processing {{{i}}} with state {{{stateBeforeProcessing}}}, + * along with the result of updating {{{stateBeforeProcessing}}} with the information from {{{i}}}. + */ + def presentAndNextState(i: I, stateBeforeProcessingI: State): (O, State) + + /** + * @param iter + * @return If iter = Iterator(a_1, ..., a_n), return: + * Iterator(o_1, ..., o_n) where + * (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) + * and state_0 = initialState + * + */ + def scanIterator(iter: Iterator[I]): Iterator[O] = new AbstractIterator[O] { + override def hasNext: Boolean = iter.hasNext + var state: State = initialState + override def next: O = { + val thisState = state + val thisA = iter.next + val (thisC, nextState) = presentAndNextState(thisA, thisState) + state = nextState + thisC + } + } + + /** + * @param inputs + * @param bf + * @tparam In The type of the input collection + * @tparam Out The type of the output collection + * @return + * Given inputs as a collection of the form [a_1, ..., a_n] the output will be a collection of the form: + * [o_1, ..., o_n] where + * (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) + * and state_0 = initialState + */ + def apply[In <: TraversableOnce[I], Out]( + inputs: In + )(implicit bf: CanBuildFrom[In, O, Out]): Out = { + val builder = bf() + builder ++= scanIterator(inputs.toIterator) + builder.result + } + + // combinators + + def replaceState(newInitialState: => State): Scan[I, O] = new Scan[I, O] { + override type State = self.State + + override def initialState: State = newInitialState + + override def presentAndNextState(i: I, stateBeforeProcessingI: State): (O, State) = + self.presentAndNextState(i, stateBeforeProcessingI) + } + + def composePrepare[I1](f: I1 => I): Scan[I1, O] = new Scan[I1, O] { + override type State = self.State + + override def initialState: State = self.initialState + + override def presentAndNextState(i: I1, stateBeforeProcessingI: State): (O, State) = + self.presentAndNextState(f(i), stateBeforeProcessingI) + } + + def andThenPresent[O1](g: O => O1): Scan[I, O1] = new Scan[I, O1] { + override type State = self.State + override def initialState: State = self.initialState + + override def presentAndNextState(i: I, stateBeforeProcessingI: State): (O1, State) = { + val (c, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + (g(c), stateAfterProcessingA) + } + } + + def zipWithInput[I1 <: I]: Scan[I1, (I1, O)] = new Scan[I1, (I1, O)] { + override type State = self.State + + override def initialState: State = self.initialState + + override def presentAndNextState(i: I1, stateBeforeProcessingI: State): ((I1, O), State) = { + val (o, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + ((i, o), stateAfterProcessingA) + } + } + + def zipWithPriorState: Scan[I, (State, O)] = new Scan[I, (State, O)] { + override type State = self.State + + override def initialState: State = self.initialState + + override def presentAndNextState(i: I, stateBeforeProcessingI: State): ((State, O), State) = { + val (o, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + ((stateBeforeProcessingI, o), stateAfterProcessingA) + } + } + + def zipWithPosteriorState: Scan[I, (O, State)] = new Scan[I, (O, State)] { + override type State = self.State + + override def initialState: State = self.initialState + + override def presentAndNextState(i: I, stateBeforeProcessingI: State): ((O, State), State) = { + val (c, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + ((c, stateAfterProcessingA), stateAfterProcessingA) + } + } + + def zip[I2, O2](scan2: Scan[I2, O2]): Scan[(I, I2), (O, O2)] = new Scan[(I, I2), (O, O2)] { + override type State = (self.State, scan2.State) + + override def initialState: State = (self.initialState, scan2.initialState) + + override def presentAndNextState( + i1i2: (I, I2), + stateBeforeProcessingI1I2: State + ): ((O, O2), State) = { + val (o1, state1AfterProcesingI1) = + self.presentAndNextState(i1i2._1, stateBeforeProcessingI1I2._1) + val (o2, state2AfterProcesingI2) = + scan2.presentAndNextState(i1i2._2, stateBeforeProcessingI1I2._2) + ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) + + } + } + + def join[I2 <: I, O2](scan2: Scan[I2, O2]): Scan[I2, (O, O2)] = new Scan[I2, (O, O2)] { + override type State = (self.State, scan2.State) + + override def initialState: State = (self.initialState, scan2.initialState) + + override def presentAndNextState(i: I2, stateBeforeProcessingI: State): ((O, O2), State) = { + val (o1, state1AfterProcesingI1) = self.presentAndNextState(i, stateBeforeProcessingI._1) + val (o2, state2AfterProcesingI2) = scan2.presentAndNextState(i, stateBeforeProcessingI._2) + ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) + } + } + +} + +class ScanApplicative[I] extends Applicative[({ type L[O] = Scan[I, O] })#L] { + override def map[T, U](mt: Scan[I, T])(fn: T => U): Scan[I, U] = + mt.andThenPresent(fn) + + override def apply[T](v: T): Scan[I, T] = + Scan.const(v) + + override def join[T, U](mt: Scan[I, T], mu: Scan[I, U]): Scan[I, (T, U)] = + mt.join(mu) +} diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala new file mode 100644 index 000000000..553b81f14 --- /dev/null +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -0,0 +1,85 @@ +package com.twitter.algebird + +import org.scalacheck.Gen +import org.scalatest.prop.GeneratorDrivenPropertyChecks +import org.scalatest.{Matchers, WordSpec} + +import scala.collection.mutable.Queue + + +object ScanTest { + type StringScan = Scan[Char, String] + + // technically speaking, these aren't exactly the Free scanner, since that would output a giant tree structure from + // the whole scan, but that giant tree structure is pretty close to a String. + val directFreeScan: StringScan = Scan.from(List.empty[Char]) { (char, previousState) => + val nextState = char :: previousState + (nextState.reverse.mkString, nextState) + } + + val mutableFreeScan: StringScan = Scan.mutable(new Queue[Char]()) { + (char, previousState) => + previousState.enqueue(char) + previousState.mkString + } + + val aggregatorFreeScan: StringScan = { + val aggregator = Aggregator.fromMonoid[List[Char]] + + Scan + .fromMonoidAggregator(aggregator) + .composePrepare[Char](c => List(c)) + .andThenPresent(_.mkString) + + } + + val reverseAggregatorFreeScan: StringScan = { + val aggregator = Aggregator.fromMonoid[List[Char]] + + Scan + .fromMonoidAggregatorReverse(aggregator) + .composePrepare[Char](c => List(c)) + .andThenPresent(_.reverse.mkString) + + } + + +} + +class ScanTest extends WordSpec with Matchers with GeneratorDrivenPropertyChecks { + import ScanTest._ + + def freeScanLaws(freeScan: StringScan): Unit = + forAll(Gen.listOf(Gen.alphaLowerChar)) { inputList => + val outputList = freeScan(inputList) + + outputList.length should equal(inputList.length) + outputList.zipWithIndex + .foreach { + case (ithOutput, i) => + val expectedOutput = inputList.slice(0, i + 1).mkString + ithOutput should equal(expectedOutput) + } + } + + "freeAggreator laws" should { + "be obeyed by a direct implementation of the almost-free Scan" in { + freeScanLaws(directFreeScan) + } + + "be obeyed by a mutable implementation of the almost-free Scan" in { + freeScanLaws(mutableFreeScan) + } + + "be obeyed by an implementation of the almost-free Scan using fromAggregator, composePrepare, and andThenPresent" in { + freeScanLaws(aggregatorFreeScan) + } + + "be obeyed by an implementation of the almost-free Scan using fromReverseAggregator, composePrepare, and andThenPresent" in { + freeScanLaws(reverseAggregatorFreeScan) + } + + + } + +} From 96a7cb3ccf86c17932b84683b6d93813aaf99eb8 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Wed, 6 Nov 2019 15:13:05 -0800 Subject: [PATCH 02/26] new combinators and tests + renaming some methods from zip to join for consistency --- .../scala/com/twitter/algebird/Scan.scala | 94 +++++++++++++++---- .../scala/com/twitter/algebird/ScanTest.scala | 39 +++++++- 2 files changed, 116 insertions(+), 17 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index ee08a6504..3433f44ce 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -1,18 +1,58 @@ package com.twitter.algebird -import scala.collection.AbstractIterator +import scala.collection.compat._ import scala.collection.generic.CanBuildFrom object Scan { + /** + * Most consumers of Scan don't care about the type of the {{State}} type variable. But for those that do, + * we make an effort to expose it in all of our combinators. + * @tparam I + * @tparam S + * @tparam O + */ + type Aux[-I, S, +O] = Scan[I, O] { type State = S } + implicit def applicative[I]: Applicative[({ type L[O] = Scan[I, O] })#L] = new ScanApplicative[I] - def from[I, S, O](initState: S)(presentAndNextStateFn: (I, S) => (O, S)): Scan[I, O] = + def from[I, S, O](initState: S)(presentAndNextStateFn: (I, S) => (O, S)): Aux[I, S, O] = new Scan[I, O] { override type State = S override val initialState = initState override def presentAndNextState(i: I, s: State): (O, State) = presentAndNextStateFn(i, s) } + def fromFunction[I, O](f: I => O): Aux[I, Unit, O] = new Scan[I, O] { + override type State = Unit + override val initialState = () + override def presentAndNextState(i: I, stateBeforeProcessingI: Unit): (O, State) = (f(i), ()) + } + + /** + * Streams can be thought of as being a hidden state that is queryable for a head element, and another hidden state + * that represents the rest of the stream. Scans take streams of inputs to streams of outputs, but some scans + * have trivial inputs and just produce a stream of outputs. + * @param initState The initial state of the scan; think of this as an infinite stream. + * @param destructor This function decomposes a stream into the its head-element and tail-stream. + * @tparam S The hidden state of the stream that we are turning into a Scan. + * @tparam O The type of the elments of the stream that we are turning into a Scan + * @return A Scan whose inputs are irrelevant, and whose outputs are those that we would get from implementing + * a stream using the information provided to this method. + */ + def fromStreamLike[S, O](initState: S)(destructor: S => (O, S)): Aux[Any, S, O] = new Scan[Any, O] { + override type State = S + override val initialState = initState + override def presentAndNextState(i: Any, stateBeforeProcessingI: S): (O, S) = + destructor(stateBeforeProcessingI) + } + + /** + * A Scan that returns the number N for the Nth input (starting from 0) + */ + val index: Aux[Any, Long, Long] = fromStreamLike(0L)(n => (n, n+1)) + + def identity[A] = fromFunction[A, A](x => x) + /** * * @param initStateCreator A call-by-name method that allocates new mutable state @@ -22,7 +62,7 @@ object Scan { * @tparam O * @return A Scan that safely encapsulates state while it's doing its thing. */ - def mutable[I, S, O](initStateCreator: => S)(presentAndUpdateStateFn: (I, S) => O): Scan[I, O] = + def mutable[I, S, O](initStateCreator: => S)(presentAndUpdateStateFn: (I, S) => O): Aux[I, S, O] = new Scan[I, O] { override type State = S override def initialState = initStateCreator @@ -34,7 +74,7 @@ object Scan { * @param t * @tparam T */ - def const[T](t: T): Scan[Any, T] = from(()) { (_, _) => + def const[T](t: T): Aux[Any, Unit, T] = from(()) { (_, _) => (t, ()) } @@ -48,7 +88,7 @@ object Scan { * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where * c_i = initState + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i) */ - def fromAggregator[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Scan[A, C] = + def fromAggregator[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Aux[A, B, C] = new Scan[A, C] { override type State = B @@ -72,7 +112,7 @@ object Scan { * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where * c_i = monoidAggregator.monoid.zero + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i) */ - def fromMonoidAggregator[A, B, C](monoidAggregator: MonoidAggregator[A, B, C]): Scan[A, C] = + def fromMonoidAggregator[A, B, C](monoidAggregator: MonoidAggregator[A, B, C]): Aux[A, B, C] = fromAggregator(monoidAggregator, monoidAggregator.monoid.zero) /** @@ -85,7 +125,7 @@ object Scan { * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where * c_i = aggregator.prepare(a_i) + ... + aggregator.prepare(a_1) + initState */ - def fromAggregatorReverse[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Scan[A, C] = + def fromAggregatorReverse[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Aux[A, B, C] = new Scan[A, C] { override type State = B @@ -111,7 +151,7 @@ object Scan { */ def fromMonoidAggregatorReverse[A, B, C]( monoidAggregator: MonoidAggregator[A, B, C] - ): Scan[A, C] = + ): Aux[A, B, C] = fromAggregatorReverse(monoidAggregator, monoidAggregator.monoid.zero) } @@ -133,6 +173,8 @@ object Scan { */ sealed trait Scan[-I, +O] extends Serializable { self => + import Scan.Aux + /** * The computation of any given scan involves keeping track of a hidden state of type {{{State}}} */ @@ -194,7 +236,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => // combinators - def replaceState(newInitialState: => State): Scan[I, O] = new Scan[I, O] { + def replaceState(newInitialState: => State): Aux[I, State, O] = new Scan[I, O] { override type State = self.State override def initialState: State = newInitialState @@ -203,7 +245,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => self.presentAndNextState(i, stateBeforeProcessingI) } - def composePrepare[I1](f: I1 => I): Scan[I1, O] = new Scan[I1, O] { + def composePrepare[I1](f: I1 => I): Aux[I1, State, O] = new Scan[I1, O] { override type State = self.State override def initialState: State = self.initialState @@ -212,7 +254,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => self.presentAndNextState(f(i), stateBeforeProcessingI) } - def andThenPresent[O1](g: O => O1): Scan[I, O1] = new Scan[I, O1] { + def andThenPresent[O1](g: O => O1): Aux[I, State, O1] = new Scan[I, O1] { override type State = self.State override def initialState: State = self.initialState @@ -222,7 +264,13 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } - def zipWithInput[I1 <: I]: Scan[I1, (I1, O)] = new Scan[I1, (I1, O)] { + /** + * + * @tparam I1 + * @return A scanner that is semantically identical to .join(Scan.identity[I1]), but without + * where we don't pollute the {{State}} by pairing it redundantly with {{Unit}}. + */ + def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = new Scan[I1, (I1, O)] { override type State = self.State override def initialState: State = self.initialState @@ -233,7 +281,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } - def zipWithPriorState: Scan[I, (State, O)] = new Scan[I, (State, O)] { + def joinWithPriorState: Aux[I, State, (State, O)] = new Scan[I, (State, O)] { override type State = self.State override def initialState: State = self.initialState @@ -244,7 +292,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } - def zipWithPosteriorState: Scan[I, (O, State)] = new Scan[I, (O, State)] { + def joinWithPosteriorState: Aux[I, State, (O, State)] = new Scan[I, (O, State)] { override type State = self.State override def initialState: State = self.initialState @@ -255,7 +303,9 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } - def zip[I2, O2](scan2: Scan[I2, O2]): Scan[(I, I2), (O, O2)] = new Scan[(I, I2), (O, O2)] { + def joinWithIndex: Aux[I, (State, Long), (O, Long)] = join(Scan.index) + + def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = new Scan[(I, I2), (O, O2)] { override type State = (self.State, scan2.State) override def initialState: State = (self.initialState, scan2.initialState) @@ -273,7 +323,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } - def join[I2 <: I, O2](scan2: Scan[I2, O2]): Scan[I2, (O, O2)] = new Scan[I2, (O, O2)] { + def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = new Scan[I2, (O, O2)] { override type State = (self.State, scan2.State) override def initialState: State = (self.initialState, scan2.initialState) @@ -285,6 +335,18 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } + def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = new Scan[I, P] { + override type State = (self.State, scan2.State) + + override def initialState: State = (self.initialState, scan2.initialState) + + override def presentAndNextState(i: I, stateBeforeProcessingI: State): (P, State) = { + val (o, state1AfterProcesingI1) = self.presentAndNextState(i, stateBeforeProcessingI._1) + val (p, state2AfterProcesingI2) = scan2.presentAndNextState(o, stateBeforeProcessingI._2) + (p, (state1AfterProcesingI1, state2AfterProcesingI2)) + } + } + } class ScanApplicative[I] extends Applicative[({ type L[O] = Scan[I, O] })#L] { diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 553b81f14..50318b503 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -8,11 +8,13 @@ import scala.collection.mutable.Queue object ScanTest { + // normal people will use Scan not Scan.Aux, so it's good for most of the tests to be using the more common interface. type StringScan = Scan[Char, String] // technically speaking, these aren't exactly the Free scanner, since that would output a giant tree structure from // the whole scan, but that giant tree structure is pretty close to a String. - val directFreeScan: StringScan = Scan.from(List.empty[Char]) { (char, previousState) => + + val directFreeScan: Scan.Aux[Char, List[Char], String] = Scan.from(List.empty[Char]) { (char, previousState) => val nextState = char :: previousState (nextState.reverse.mkString, nextState) } @@ -43,6 +45,25 @@ object ScanTest { } + val joinWithPosteriorStateFreeScan: StringScan = + directFreeScan + .andThenPresent(_ => ()) + .joinWithPosteriorState + .andThenPresent { case ((), state) => state.reverse.mkString } + + val joinWithPriorStateFreeScan1: StringScan = + directFreeScan + .andThenPresent(_ => ()) + .joinWithPriorState + .joinWithInput + .andThenPresent{ case (input, (state, ())) => (input::state).mkString.reverse} + + val joinWithPriorStateFreeScan2: StringScan = + directFreeScan + .andThenPresent(_ => ()) + .joinWithPriorState + .join(Scan.identity[Char]) + .andThenPresent{ case ((state, ()) , input) => (input::state).mkString.reverse} } @@ -79,6 +100,22 @@ class ScanTest extends WordSpec with Matchers with GeneratorDrivenPropertyChecks freeScanLaws(reverseAggregatorFreeScan) } + "be obeyed by an implementation of the almost-free Scan using a direct implementation, andThenPresent, and joinWithPosteriorState" in { + freeScanLaws(joinWithPosteriorStateFreeScan) + } + + "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and joinWithInput" in { + freeScanLaws(joinWithPriorStateFreeScan1) + } + + "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and join with scan.Identity" in { + freeScanLaws(joinWithPriorStateFreeScan2) + } + + "be obeyed by composing the identity scan on either side of a direct-implementation of the almost-free Scan" in { + freeScanLaws(Scan.identity.compose(directFreeScan)) + freeScanLaws(directFreeScan.compose(Scan.identity)) + } } From aab110e615d302cee3e187694e1952cf1488d592 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Wed, 6 Nov 2019 15:16:05 -0800 Subject: [PATCH 03/26] scalafmt --- .../src/test/scala/com/twitter/algebird/ScanTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 50318b503..389bc9a2f 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -73,7 +73,7 @@ class ScanTest extends WordSpec with Matchers with GeneratorDrivenPropertyChecks def freeScanLaws(freeScan: StringScan): Unit = forAll(Gen.listOf(Gen.alphaLowerChar)) { inputList => val outputList = freeScan(inputList) - + outputList.length should equal(inputList.length) outputList.zipWithIndex .foreach { From 7bf5fee19a34c8cbcff903c906116903791eb403 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Wed, 6 Nov 2019 15:19:23 -0800 Subject: [PATCH 04/26] scalafmt --- .../src/test/scala/com/twitter/algebird/ScanTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 389bc9a2f..50318b503 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -73,7 +73,7 @@ class ScanTest extends WordSpec with Matchers with GeneratorDrivenPropertyChecks def freeScanLaws(freeScan: StringScan): Unit = forAll(Gen.listOf(Gen.alphaLowerChar)) { inputList => val outputList = freeScan(inputList) - + outputList.length should equal(inputList.length) outputList.zipWithIndex .foreach { From 1dd77aac5f5ca05a8740c3562e12a8cfd9ceaaff Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Wed, 6 Nov 2019 17:10:53 -0800 Subject: [PATCH 05/26] scalafmt for real this time --- .../scala/com/twitter/algebird/Scan.scala | 76 ++++++++++--------- .../scala/com/twitter/algebird/ScanTest.scala | 31 ++++---- 2 files changed, 54 insertions(+), 53 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 3433f44ce..4e2707e48 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -4,13 +4,14 @@ import scala.collection.compat._ import scala.collection.generic.CanBuildFrom object Scan { + /** - * Most consumers of Scan don't care about the type of the {{State}} type variable. But for those that do, - * we make an effort to expose it in all of our combinators. - * @tparam I - * @tparam S - * @tparam O - */ + * Most consumers of Scan don't care about the type of the {{State}} type variable. But for those that do, + * we make an effort to expose it in all of our combinators. + * @tparam I + * @tparam S + * @tparam O + */ type Aux[-I, S, +O] = Scan[I, O] { type State = S } implicit def applicative[I]: Applicative[({ type L[O] = Scan[I, O] })#L] = new ScanApplicative[I] @@ -29,16 +30,16 @@ object Scan { } /** - * Streams can be thought of as being a hidden state that is queryable for a head element, and another hidden state - * that represents the rest of the stream. Scans take streams of inputs to streams of outputs, but some scans - * have trivial inputs and just produce a stream of outputs. - * @param initState The initial state of the scan; think of this as an infinite stream. - * @param destructor This function decomposes a stream into the its head-element and tail-stream. - * @tparam S The hidden state of the stream that we are turning into a Scan. - * @tparam O The type of the elments of the stream that we are turning into a Scan - * @return A Scan whose inputs are irrelevant, and whose outputs are those that we would get from implementing - * a stream using the information provided to this method. - */ + * Streams can be thought of as being a hidden state that is queryable for a head element, and another hidden state + * that represents the rest of the stream. Scans take streams of inputs to streams of outputs, but some scans + * have trivial inputs and just produce a stream of outputs. + * @param initState The initial state of the scan; think of this as an infinite stream. + * @param destructor This function decomposes a stream into the its head-element and tail-stream. + * @tparam S The hidden state of the stream that we are turning into a Scan. + * @tparam O The type of the elments of the stream that we are turning into a Scan + * @return A Scan whose inputs are irrelevant, and whose outputs are those that we would get from implementing + * a stream using the information provided to this method. + */ def fromStreamLike[S, O](initState: S)(destructor: S => (O, S)): Aux[Any, S, O] = new Scan[Any, O] { override type State = S override val initialState = initState @@ -47,9 +48,9 @@ object Scan { } /** - * A Scan that returns the number N for the Nth input (starting from 0) - */ - val index: Aux[Any, Long, Long] = fromStreamLike(0L)(n => (n, n+1)) + * A Scan that returns the number N for the Nth input (starting from 0) + */ + val index: Aux[Any, Long, Long] = fromStreamLike(0L)(n => (n, n + 1)) def identity[A] = fromFunction[A, A](x => x) @@ -265,11 +266,11 @@ sealed trait Scan[-I, +O] extends Serializable { self => } /** - * - * @tparam I1 - * @return A scanner that is semantically identical to .join(Scan.identity[I1]), but without - * where we don't pollute the {{State}} by pairing it redundantly with {{Unit}}. - */ + * + * @tparam I1 + * @return A scanner that is semantically identical to .join(Scan.identity[I1]), but without + * where we don't pollute the {{State}} by pairing it redundantly with {{Unit}}. + */ def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = new Scan[I1, (I1, O)] { override type State = self.State @@ -305,23 +306,24 @@ sealed trait Scan[-I, +O] extends Serializable { self => def joinWithIndex: Aux[I, (State, Long), (O, Long)] = join(Scan.index) - def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = new Scan[(I, I2), (O, O2)] { - override type State = (self.State, scan2.State) + def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = + new Scan[(I, I2), (O, O2)] { + override type State = (self.State, scan2.State) - override def initialState: State = (self.initialState, scan2.initialState) + override def initialState: State = (self.initialState, scan2.initialState) - override def presentAndNextState( - i1i2: (I, I2), - stateBeforeProcessingI1I2: State - ): ((O, O2), State) = { - val (o1, state1AfterProcesingI1) = - self.presentAndNextState(i1i2._1, stateBeforeProcessingI1I2._1) - val (o2, state2AfterProcesingI2) = - scan2.presentAndNextState(i1i2._2, stateBeforeProcessingI1I2._2) - ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) + override def presentAndNextState( + i1i2: (I, I2), + stateBeforeProcessingI1I2: State + ): ((O, O2), State) = { + val (o1, state1AfterProcesingI1) = + self.presentAndNextState(i1i2._1, stateBeforeProcessingI1I2._1) + val (o2, state2AfterProcesingI2) = + scan2.presentAndNextState(i1i2._2, stateBeforeProcessingI1I2._2) + ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) + } } - } def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = new Scan[I2, (O, O2)] { override type State = (self.State, scan2.State) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 50318b503..e09eb2c98 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -6,7 +6,6 @@ import org.scalatest.{Matchers, WordSpec} import scala.collection.mutable.Queue - object ScanTest { // normal people will use Scan not Scan.Aux, so it's good for most of the tests to be using the more common interface. type StringScan = Scan[Char, String] @@ -14,15 +13,15 @@ object ScanTest { // technically speaking, these aren't exactly the Free scanner, since that would output a giant tree structure from // the whole scan, but that giant tree structure is pretty close to a String. - val directFreeScan: Scan.Aux[Char, List[Char], String] = Scan.from(List.empty[Char]) { (char, previousState) => - val nextState = char :: previousState - (nextState.reverse.mkString, nextState) + val directFreeScan: Scan.Aux[Char, List[Char], String] = Scan.from(List.empty[Char]) { + (char, previousState) => + val nextState = char :: previousState + (nextState.reverse.mkString, nextState) } - val mutableFreeScan: StringScan = Scan.mutable(new Queue[Char]()) { - (char, previousState) => - previousState.enqueue(char) - previousState.mkString + val mutableFreeScan: StringScan = Scan.mutable(new Queue[Char]()) { (char, previousState) => + previousState.enqueue(char) + previousState.mkString } val aggregatorFreeScan: StringScan = { @@ -53,17 +52,17 @@ object ScanTest { val joinWithPriorStateFreeScan1: StringScan = directFreeScan - .andThenPresent(_ => ()) - .joinWithPriorState - .joinWithInput - .andThenPresent{ case (input, (state, ())) => (input::state).mkString.reverse} + .andThenPresent(_ => ()) + .joinWithPriorState + .joinWithInput + .andThenPresent { case (input, (state, ())) => (input :: state).mkString.reverse } val joinWithPriorStateFreeScan2: StringScan = directFreeScan - .andThenPresent(_ => ()) - .joinWithPriorState - .join(Scan.identity[Char]) - .andThenPresent{ case ((state, ()) , input) => (input::state).mkString.reverse} + .andThenPresent(_ => ()) + .joinWithPriorState + .join(Scan.identity[Char]) + .andThenPresent { case ((state, ()), input) => (input :: state).mkString.reverse } } From 0ff2168071cce707902d0473ffbbb50d6a187808 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Wed, 6 Nov 2019 17:33:11 -0800 Subject: [PATCH 06/26] 2.13 compatibility, I think --- .../src/main/scala/com/twitter/algebird/Scan.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 4e2707e48..9c220bab9 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -1,7 +1,6 @@ package com.twitter.algebird import scala.collection.compat._ -import scala.collection.generic.CanBuildFrom object Scan { @@ -229,11 +228,8 @@ sealed trait Scan[-I, +O] extends Serializable { self => */ def apply[In <: TraversableOnce[I], Out]( inputs: In - )(implicit bf: CanBuildFrom[In, O, Out]): Out = { - val builder = bf() - builder ++= scanIterator(inputs.toIterator) - builder.result - } + )(implicit bf: BuildFrom[In, O, Out]): Out = + bf.fromSpecific(inputs)(scanIterator(inputs.toIterator)) // combinators From f20428019ea5976df884775b68efeab70dc010ca Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Wed, 6 Nov 2019 17:36:59 -0800 Subject: [PATCH 07/26] rename fromStreamLike to iterate --- algebird-core/src/main/scala/com/twitter/algebird/Scan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 9c220bab9..b94118123 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -39,7 +39,7 @@ object Scan { * @return A Scan whose inputs are irrelevant, and whose outputs are those that we would get from implementing * a stream using the information provided to this method. */ - def fromStreamLike[S, O](initState: S)(destructor: S => (O, S)): Aux[Any, S, O] = new Scan[Any, O] { + def iterate[S, O](initState: S)(destructor: S => (O, S)): Aux[Any, S, O] = new Scan[Any, O] { override type State = S override val initialState = initState override def presentAndNextState(i: Any, stateBeforeProcessingI: S): (O, S) = @@ -49,7 +49,7 @@ object Scan { /** * A Scan that returns the number N for the Nth input (starting from 0) */ - val index: Aux[Any, Long, Long] = fromStreamLike(0L)(n => (n, n + 1)) + val index: Aux[Any, Long, Long] = iterate(0L)(n => (n, n + 1)) def identity[A] = fromFunction[A, A](x => x) From 6c7d28472a0c25eea43473e070de4f751f67c470 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Thu, 7 Nov 2019 10:44:27 -0800 Subject: [PATCH 08/26] tests f or zip --- .../scala/com/twitter/algebird/ScanTest.scala | 73 ++++++++++++------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index e09eb2c98..6d22da647 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -1,7 +1,7 @@ package com.twitter.algebird import org.scalacheck.Gen -import org.scalatest.prop.GeneratorDrivenPropertyChecks +import org.scalatest.prop.ScalaCheckDrivenPropertyChecks import org.scalatest.{Matchers, WordSpec} import scala.collection.mutable.Queue @@ -66,7 +66,7 @@ object ScanTest { } -class ScanTest extends WordSpec with Matchers with GeneratorDrivenPropertyChecks { +class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyChecks { import ScanTest._ def freeScanLaws(freeScan: StringScan): Unit = @@ -82,40 +82,61 @@ class ScanTest extends WordSpec with Matchers with GeneratorDrivenPropertyChecks } } - "freeAggreator laws" should { - "be obeyed by a direct implementation of the almost-free Scan" in { - freeScanLaws(directFreeScan) - } + def zipLaws(scan1: StringScan, scan2: StringScan): Unit = { + forAll(Gen.listOf(Gen.alphaLowerChar), Gen.listOf(Gen.alphaLowerChar)) { (inputList1, inputList2) => + val outputList1 = scan1(inputList1) + val outputList2 = scan2(inputList2) + val zippedOutput = outputList1.zip(outputList2) - "be obeyed by a mutable implementation of the almost-free Scan" in { - freeScanLaws(mutableFreeScan) - } + val zippedScan = scan1.zip(scan2) + val zippedInput = inputList1.zip(inputList2) + val zippedScanOutput = zippedScan(zippedInput) - "be obeyed by an implementation of the almost-free Scan using fromAggregator, composePrepare, and andThenPresent" in { - freeScanLaws(aggregatorFreeScan) - } + (zippedOutput should contain).theSameElementsInOrderAs(zippedScanOutput) - "be obeyed by an implementation of the almost-free Scan using fromReverseAggregator, composePrepare, and andThenPresent" in { - freeScanLaws(reverseAggregatorFreeScan) } - "be obeyed by an implementation of the almost-free Scan using a direct implementation, andThenPresent, and joinWithPosteriorState" in { - freeScanLaws(joinWithPosteriorStateFreeScan) - } + "freeAggreator laws" should { + "be obeyed by a direct implementation of the almost-free Scan" in { + freeScanLaws(directFreeScan) + } - "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and joinWithInput" in { - freeScanLaws(joinWithPriorStateFreeScan1) - } + "be obeyed by a mutable implementation of the almost-free Scan" in { + freeScanLaws(mutableFreeScan) + } - "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and join with scan.Identity" in { - freeScanLaws(joinWithPriorStateFreeScan2) - } + "be obeyed by an implementation of the almost-free Scan using fromAggregator, composePrepare, and andThenPresent" in { + freeScanLaws(aggregatorFreeScan) + } + + "be obeyed by an implementation of the almost-free Scan using fromReverseAggregator, composePrepare, and andThenPresent" in { + freeScanLaws(reverseAggregatorFreeScan) + } + + "be obeyed by an implementation of the almost-free Scan using a direct implementation, andThenPresent, and joinWithPosteriorState" in { + freeScanLaws(joinWithPosteriorStateFreeScan) + } + + "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and joinWithInput" in { + freeScanLaws(joinWithPriorStateFreeScan1) + } + + "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and join with scan.Identity" in { + freeScanLaws(joinWithPriorStateFreeScan2) + } + + "be obeyed by composing the identity scan on either side of a direct-implementation of the almost-free Scan" in { + freeScanLaws(Scan.identity.compose(directFreeScan)) + freeScanLaws(directFreeScan.compose(Scan.identity)) + } - "be obeyed by composing the identity scan on either side of a direct-implementation of the almost-free Scan" in { - freeScanLaws(Scan.identity.compose(directFreeScan)) - freeScanLaws(directFreeScan.compose(Scan.identity)) } + "zipping aggregators" should { + "obey their laws" in { + zipLaws(directFreeScan, directFreeScan) + } + } } } From 7408af087132f4e5bb81503ff87a8dbe8433fcd6 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Thu, 7 Nov 2019 10:54:43 -0800 Subject: [PATCH 09/26] test for zipWithIndex --- .../scala/com/twitter/algebird/ScanTest.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 6d22da647..bdf5eb6b4 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -1,7 +1,7 @@ package com.twitter.algebird import org.scalacheck.Gen -import org.scalatest.prop.ScalaCheckDrivenPropertyChecks +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks import org.scalatest.{Matchers, WordSpec} import scala.collection.mutable.Queue @@ -93,7 +93,21 @@ class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyCheck val zippedScanOutput = zippedScan(zippedInput) (zippedOutput should contain).theSameElementsInOrderAs(zippedScanOutput) + } + + def joinWithIndexLaws(freeScan: StringScan): Unit = + forAll(Gen.listOf(Gen.alphaLowerChar)) { inputList => + val unIndexedOutput = freeScan(inputList) + + val joinedWithIndexOutput = freeScan.joinWithIndex(inputList) + (unIndexedOutput.zipWithIndex should contain).theSameElementsInOrderAs(joinedWithIndexOutput) + } + "an illustrative example without scalacheck" should { + "work as you'd expect" in { + val output = directFreeScan(List('a', 'b', 'c')) + (output should contain).theSameElementsInOrderAs(List("a", "ab", "abc")) + } } "freeAggreator laws" should { @@ -133,10 +147,16 @@ class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyCheck } "zipping aggregators" should { - "obey their laws" in { + "obey its laws" in { zipLaws(directFreeScan, directFreeScan) } } + + "joinWithIndex" should { + "obey its laws" in { + joinWithIndexLaws(directFreeScan) + } + } } } From 044b2a32d92f3ff0b9d1c4176dbc33c268144884 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Thu, 7 Nov 2019 11:17:46 -0800 Subject: [PATCH 10/26] more comments --- .../scala/com/twitter/algebird/Scan.scala | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index b94118123..77c1215ed 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -264,8 +264,12 @@ sealed trait Scan[-I, +O] extends Serializable { self => /** * * @tparam I1 - * @return A scanner that is semantically identical to .join(Scan.identity[I1]), but without - * where we don't pollute the {{State}} by pairing it redundantly with {{Unit}}. + * @return If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs + * of the form [o_1, ..., o_n], then {{joinWithInput}} results in a Scan whose {{apply}} method + * returns [(a_1, o_1), ..., (a_n, o_n)] when given the same input. + * In other words, {{joinWithInput}} returns a scanner that is semantically identical to + * {{this.join(Scan.identity[I1])}}, but where we don't pollute the {{State}} by pairing it + * redundantly with {{Unit}}. */ def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = new Scan[I1, (I1, O)] { override type State = self.State @@ -278,6 +282,13 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } + /** + * If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs + * of the form [o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) + * and state_0 = initialState: + * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return + * [(o_1, state_0), ..., (o_n, state_(n-1))]. + */ def joinWithPriorState: Aux[I, State, (State, O)] = new Scan[I, (State, O)] { override type State = self.State @@ -289,6 +300,13 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } + /** + * If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs + * of the form [o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) + * and state_0 = initialState: + * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return + * [(o_1, state_1), ..., (o_n, state_n]. + */ def joinWithPosteriorState: Aux[I, State, (O, State)] = new Scan[I, (O, State)] { override type State = self.State @@ -300,8 +318,25 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } + /** + * If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs + * of the form [o_1, ..., o_n] + * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return + * [(o_1, 1), ..., (o_n, n)]. + * In other words: {{scan.joinWithIndex(foo) == scan(foo).zipWithIndex)}} + */ def joinWithIndex: Aux[I, (State, Long), (O, Long)] = join(Scan.index) + /** + * @param scan2 + * @tparam I2 + * @tparam O2 + * @return f this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of + * the form [o_1, ..., o_n], and scan2.apply([b_1, ..., b_n] = [p_1, ..., p_n] then + * {{zip}} will return a scan whose apply method, when given input + * [(a_1, b_1), ..., (a_n, b_n)] results in the output [(o_1, p_1), ..., (o_2, p_2)]. + * In other words: {{scan.zip(scan2)(foo.zip(bar)) == scan(foo).zip(scan2(bar)) }} + */ def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = new Scan[(I, I2), (O, O2)] { override type State = (self.State, scan2.State) @@ -321,6 +356,15 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } + /** + * @param scan2 + * @tparam I2 + * @tparam O2 + * @return If this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of + * the form [o_1, ..., o_n], and scan2.apply([a_1, ..., a_n] = [p_1, ..., p_n] then + * {{join}} will return a scan whose apply method returns [(o_1, p_1), ..., (o_2, p_2)]. + * In other words: {{scan.join(scan2)(foo) == scan(foo).zip(scan2(foo)) }} + */ def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = new Scan[I2, (O, O2)] { override type State = (self.State, scan2.State) @@ -333,6 +377,14 @@ sealed trait Scan[-I, +O] extends Serializable { self => } } + /** + * Takes the output of this scan and feeds as input into scan2. + * @param scan2 + * @tparam P + * @return If this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of + * the form [o_1, ..., o_n], and scan2.apply([o_1, ..., o_n] = [p_1, ..., p_n] then + * {{compose}} will return a scan which returns [p_1, ..., p_n]. + */ def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = new Scan[I, P] { override type State = (self.State, scan2.State) From e58066a0de923606b62f52bea938a22112f02e7a Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Thu, 7 Nov 2019 11:40:14 -0800 Subject: [PATCH 11/26] get rid of reverse methods --- .../scala/com/twitter/algebird/Scan.scala | 39 ------------------- .../scala/com/twitter/algebird/ScanTest.scala | 14 ------- 2 files changed, 53 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 77c1215ed..579192b37 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -115,45 +115,6 @@ object Scan { def fromMonoidAggregator[A, B, C](monoidAggregator: MonoidAggregator[A, B, C]): Aux[A, B, C] = fromAggregator(monoidAggregator, monoidAggregator.monoid.zero) - /** - * - * @param aggregator - * @param initState - * @tparam A - * @tparam B - * @tparam C - * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where - * c_i = aggregator.prepare(a_i) + ... + aggregator.prepare(a_1) + initState - */ - def fromAggregatorReverse[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Aux[A, B, C] = - new Scan[A, C] { - override type State = B - - override val initialState = initState - - override def presentAndNextState(a: A, stateBeforeProcessingI: B): (C, B) = { - // nb: the order of the arguments to semigroup.plus here is what determines the order of the final summation; - // this matters because not all semigroups are commutative - val stateAfterProcessingA = - aggregator.semigroup.plus(aggregator.prepare(a), stateBeforeProcessingI) - (aggregator.present(stateAfterProcessingA), stateAfterProcessingA) - } - } - - /** - * - * @param monoidAggregator - * @tparam A - * @tparam B - * @tparam C - * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where - * c_i = aggregator.prepare(a_i) + ... + aggregator.prepare(a_1) + monoidAggregator.monoid.zero - */ - def fromMonoidAggregatorReverse[A, B, C]( - monoidAggregator: MonoidAggregator[A, B, C] - ): Aux[A, B, C] = - fromAggregatorReverse(monoidAggregator, monoidAggregator.monoid.zero) - } /** diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index bdf5eb6b4..15d4ecafa 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -34,16 +34,6 @@ object ScanTest { } - val reverseAggregatorFreeScan: StringScan = { - val aggregator = Aggregator.fromMonoid[List[Char]] - - Scan - .fromMonoidAggregatorReverse(aggregator) - .composePrepare[Char](c => List(c)) - .andThenPresent(_.reverse.mkString) - - } - val joinWithPosteriorStateFreeScan: StringScan = directFreeScan .andThenPresent(_ => ()) @@ -123,10 +113,6 @@ class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyCheck freeScanLaws(aggregatorFreeScan) } - "be obeyed by an implementation of the almost-free Scan using fromReverseAggregator, composePrepare, and andThenPresent" in { - freeScanLaws(reverseAggregatorFreeScan) - } - "be obeyed by an implementation of the almost-free Scan using a direct implementation, andThenPresent, and joinWithPosteriorState" in { freeScanLaws(joinWithPosteriorStateFreeScan) } From c4c5b9a46b0d8e60e7cf9b2a60da505ffb82e708 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Mon, 11 Nov 2019 10:20:35 -0800 Subject: [PATCH 12/26] fix curly brace error --- .../scala/com/twitter/algebird/ScanTest.scala | 88 +++++++++---------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 15d4ecafa..830f14600 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -72,7 +72,7 @@ class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyCheck } } - def zipLaws(scan1: StringScan, scan2: StringScan): Unit = { + def zipLaws(scan1: StringScan, scan2: StringScan): Unit = forAll(Gen.listOf(Gen.alphaLowerChar), Gen.listOf(Gen.alphaLowerChar)) { (inputList1, inputList2) => val outputList1 = scan1(inputList1) val outputList2 = scan2(inputList2) @@ -85,64 +85,62 @@ class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyCheck (zippedOutput should contain).theSameElementsInOrderAs(zippedScanOutput) } - def joinWithIndexLaws(freeScan: StringScan): Unit = - forAll(Gen.listOf(Gen.alphaLowerChar)) { inputList => - val unIndexedOutput = freeScan(inputList) - - val joinedWithIndexOutput = freeScan.joinWithIndex(inputList) - (unIndexedOutput.zipWithIndex should contain).theSameElementsInOrderAs(joinedWithIndexOutput) - } + def joinWithIndexLaws(freeScan: StringScan): Unit = + forAll(Gen.listOf(Gen.alphaLowerChar)) { inputList => + val unIndexedOutput = freeScan(inputList) - "an illustrative example without scalacheck" should { - "work as you'd expect" in { - val output = directFreeScan(List('a', 'b', 'c')) - (output should contain).theSameElementsInOrderAs(List("a", "ab", "abc")) - } + val joinedWithIndexOutput = freeScan.joinWithIndex(inputList) + (unIndexedOutput.zipWithIndex should contain).theSameElementsInOrderAs(joinedWithIndexOutput) } - "freeAggreator laws" should { - "be obeyed by a direct implementation of the almost-free Scan" in { - freeScanLaws(directFreeScan) - } - - "be obeyed by a mutable implementation of the almost-free Scan" in { - freeScanLaws(mutableFreeScan) - } + "an illustrative example without scalacheck" should { + "work as you'd expect" in { + val output = directFreeScan(List('a', 'b', 'c')) + (output should contain).theSameElementsInOrderAs(List("a", "ab", "abc")) + } + } - "be obeyed by an implementation of the almost-free Scan using fromAggregator, composePrepare, and andThenPresent" in { - freeScanLaws(aggregatorFreeScan) - } + "freeAggreator laws" should { + "be obeyed by a direct implementation of the almost-free Scan" in { + freeScanLaws(directFreeScan) + } - "be obeyed by an implementation of the almost-free Scan using a direct implementation, andThenPresent, and joinWithPosteriorState" in { - freeScanLaws(joinWithPosteriorStateFreeScan) - } + "be obeyed by a mutable implementation of the almost-free Scan" in { + freeScanLaws(mutableFreeScan) + } - "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and joinWithInput" in { - freeScanLaws(joinWithPriorStateFreeScan1) - } + "be obeyed by an implementation of the almost-free Scan using fromAggregator, composePrepare, and andThenPresent" in { + freeScanLaws(aggregatorFreeScan) + } - "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and join with scan.Identity" in { - freeScanLaws(joinWithPriorStateFreeScan2) - } + "be obeyed by an implementation of the almost-free Scan using a direct implementation, andThenPresent, and joinWithPosteriorState" in { + freeScanLaws(joinWithPosteriorStateFreeScan) + } - "be obeyed by composing the identity scan on either side of a direct-implementation of the almost-free Scan" in { - freeScanLaws(Scan.identity.compose(directFreeScan)) - freeScanLaws(directFreeScan.compose(Scan.identity)) - } + "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and joinWithInput" in { + freeScanLaws(joinWithPriorStateFreeScan1) + } + "be obeyed by an implementation of the almost-free Scan using a direct implmeentation, andThenPresent, joinWithPriorState, and join with scan.Identity" in { + freeScanLaws(joinWithPriorStateFreeScan2) } - "zipping aggregators" should { - "obey its laws" in { - zipLaws(directFreeScan, directFreeScan) - } + "be obeyed by composing the identity scan on either side of a direct-implementation of the almost-free Scan" in { + freeScanLaws(Scan.identity.compose(directFreeScan)) + freeScanLaws(directFreeScan.compose(Scan.identity)) } - "joinWithIndex" should { - "obey its laws" in { - joinWithIndexLaws(directFreeScan) - } + } + + "zipping aggregators" should { + "obey its laws" in { + zipLaws(directFreeScan, directFreeScan) } } + "joinWithIndex" should { + "obey its laws" in { + joinWithIndexLaws(directFreeScan) + } + } } From 1bf19bb6cd1274ad5d633b810455ba16d79dd73e Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 10:56:04 -0800 Subject: [PATCH 13/26] comments formatting, starting to use from more widely --- .../scala/com/twitter/algebird/Scan.scala | 86 +++++++------------ 1 file changed, 33 insertions(+), 53 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 579192b37..6986a1baa 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -5,7 +5,7 @@ import scala.collection.compat._ object Scan { /** - * Most consumers of Scan don't care about the type of the {{State}} type variable. But for those that do, + * Most consumers of Scan don't care about the type of the type State type variable. But for those that do, * we make an effort to expose it in all of our combinators. * @tparam I * @tparam S @@ -29,9 +29,9 @@ object Scan { } /** - * Streams can be thought of as being a hidden state that is queryable for a head element, and another hidden state - * that represents the rest of the stream. Scans take streams of inputs to streams of outputs, but some scans - * have trivial inputs and just produce a stream of outputs. + * Scans take streams of inputs to streams of outputs, but some scans have trivial inputs and just produce a stream of + * outputs. Streams can be thought of as being a hidden state that is queryable for a head element, and another hidden + * state that represents the rest of the stream. * @param initState The initial state of the scan; think of this as an infinite stream. * @param destructor This function decomposes a stream into the its head-element and tail-stream. * @tparam S The hidden state of the stream that we are turning into a Scan. @@ -74,9 +74,7 @@ object Scan { * @param t * @tparam T */ - def const[T](t: T): Aux[Any, Unit, T] = from(()) { (_, _) => - (t, ()) - } + def const[T](t: T): Aux[Any, Unit, T] = fromFunction(_ => t) /** * @@ -89,18 +87,12 @@ object Scan { * c_i = initState + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i) */ def fromAggregator[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Aux[A, B, C] = - new Scan[A, C] { - override type State = B - - override val initialState = initState - - override def presentAndNextState(a: A, stateBeforeProcessingI: B): (C, B) = { - // nb: the order of the arguments to semigroup.plus here is what determines the order of the final summation; - // this matters because not all semigroups are commutative - val stateAfterProcessingA = - aggregator.semigroup.plus(stateBeforeProcessingI, aggregator.prepare(a)) - (aggregator.present(stateAfterProcessingA), stateAfterProcessingA) - } + from(initState) { (a: A, stateBeforeProcessingI: B) => + // nb: the order of the arguments to semigroup.plus here is what determines the order of the final summation; + // this matters because not all semigroups are commutative + val stateAfterProcessingA = + aggregator.semigroup.plus(stateBeforeProcessingI, aggregator.prepare(a)) + (aggregator.present(stateAfterProcessingA), stateAfterProcessingA) } /** @@ -130,29 +122,29 @@ object Scan { * * * @tparam I The type of elements that the computation is scanning over. - * @tparam O The output type of the scan (typically distinct from the hidden {{{State}}} of the scan. + * @tparam O The output type of the scan (typically distinct from the hidden `State` of the scan. */ sealed trait Scan[-I, +O] extends Serializable { self => - import Scan.Aux + import Scan.{Aux, from} /** - * The computation of any given scan involves keeping track of a hidden state of type {{{State}}} + * The computation of any given scan involves keeping track of a hidden state. */ type State /** - * - * @return The state of the scan before any elements have been processed + * The state of the scan before any elements have been processed + * @return */ def initialState: State /** * * @param i An element in the stream to process - * @param stateBeforeProcessingI The state of the scan before processing {{{i}}} - * @return The output of the scan corresponding to processing {{{i}}} with state {{{stateBeforeProcessing}}}, - * along with the result of updating {{{stateBeforeProcessing}}} with the information from {{{i}}}. + * @param stateBeforeProcessingI The state of the scan before processing i + * @return The output of the scan corresponding to processing i with state stateBeforeProcessing, + * along with the result of updating stateBeforeProcessing with the information from i. */ def presentAndNextState(i: I, stateBeforeProcessingI: State): (O, State) @@ -194,22 +186,10 @@ sealed trait Scan[-I, +O] extends Serializable { self => // combinators - def replaceState(newInitialState: => State): Aux[I, State, O] = new Scan[I, O] { - override type State = self.State - - override def initialState: State = newInitialState - - override def presentAndNextState(i: I, stateBeforeProcessingI: State): (O, State) = - self.presentAndNextState(i, stateBeforeProcessingI) - } - - def composePrepare[I1](f: I1 => I): Aux[I1, State, O] = new Scan[I1, O] { - override type State = self.State - - override def initialState: State = self.initialState + def replaceState(newInitialState: => State): Aux[I, State, O] = from(newInitialState)(presentAndNextState(_, _)) - override def presentAndNextState(i: I1, stateBeforeProcessingI: State): (O, State) = - self.presentAndNextState(f(i), stateBeforeProcessingI) + def composePrepare[I1](f: I1 => I): Aux[I1, State, O] = from(self.initialState){(i, stateBeforeProcessingI) => + presentAndNextState(f(i), stateBeforeProcessingI) } def andThenPresent[O1](g: O => O1): Aux[I, State, O1] = new Scan[I, O1] { @@ -225,8 +205,8 @@ sealed trait Scan[-I, +O] extends Serializable { self => /** * * @tparam I1 - * @return If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs - * of the form [o_1, ..., o_n], then {{joinWithInput}} results in a Scan whose {{apply}} method + * @return If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs + * of the form [o_1, ..., o_n], then {{joinWithInput}} results in a Scan whose `apply` method * returns [(a_1, o_1), ..., (a_n, o_n)] when given the same input. * In other words, {{joinWithInput}} returns a scanner that is semantically identical to * {{this.join(Scan.identity[I1])}}, but where we don't pollute the {{State}} by pairing it @@ -244,7 +224,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => } /** - * If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs + * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs * of the form [o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) * and state_0 = initialState: * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return @@ -262,7 +242,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => } /** - * If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs + * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs * of the form [o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) * and state_0 = initialState: * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return @@ -280,11 +260,11 @@ sealed trait Scan[-I, +O] extends Serializable { self => } /** - * If this Scan's {{apply}} method is given inputs [a_1, ..., a_n] resulting in outputs + * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs * of the form [o_1, ..., o_n] * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return * [(o_1, 1), ..., (o_n, n)]. - * In other words: {{scan.joinWithIndex(foo) == scan(foo).zipWithIndex)}} + * In other words: `scan.joinWithIndex(foo) == scan(foo).zipWithIndex)` */ def joinWithIndex: Aux[I, (State, Long), (O, Long)] = join(Scan.index) @@ -294,9 +274,9 @@ sealed trait Scan[-I, +O] extends Serializable { self => * @tparam O2 * @return f this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of * the form [o_1, ..., o_n], and scan2.apply([b_1, ..., b_n] = [p_1, ..., p_n] then - * {{zip}} will return a scan whose apply method, when given input + * `zip` will return a scan whose apply method, when given input * [(a_1, b_1), ..., (a_n, b_n)] results in the output [(o_1, p_1), ..., (o_2, p_2)]. - * In other words: {{scan.zip(scan2)(foo.zip(bar)) == scan(foo).zip(scan2(bar)) }} + * In other words: `scan.zip(scan2)(foo.zip(bar)) == scan(foo).zip(scan2(bar)) ` */ def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = new Scan[(I, I2), (O, O2)] { @@ -323,8 +303,8 @@ sealed trait Scan[-I, +O] extends Serializable { self => * @tparam O2 * @return If this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of * the form [o_1, ..., o_n], and scan2.apply([a_1, ..., a_n] = [p_1, ..., p_n] then - * {{join}} will return a scan whose apply method returns [(o_1, p_1), ..., (o_2, p_2)]. - * In other words: {{scan.join(scan2)(foo) == scan(foo).zip(scan2(foo)) }} + * `join` will return a scan whose apply method returns [(o_1, p_1), ..., (o_2, p_2)]. + * In other words: `scan.join(scan2)(foo) == scan(foo).zip(scan2(foo)) ` */ def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = new Scan[I2, (O, O2)] { override type State = (self.State, scan2.State) @@ -344,7 +324,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => * @tparam P * @return If this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of * the form [o_1, ..., o_n], and scan2.apply([o_1, ..., o_n] = [p_1, ..., p_n] then - * {{compose}} will return a scan which returns [p_1, ..., p_n]. + * `compose` will return a scan which returns [p_1, ..., p_n]. */ def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = new Scan[I, P] { override type State = (self.State, scan2.State) From 5acde442272d3dc80612daa16a280e6100273a8e Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 11:16:45 -0800 Subject: [PATCH 14/26] reimplement in terms of from --- .../scala/com/twitter/algebird/Scan.scala | 110 ++++++------------ 1 file changed, 34 insertions(+), 76 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 6986a1baa..7ef92ed97 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -126,7 +126,7 @@ object Scan { */ sealed trait Scan[-I, +O] extends Serializable { self => - import Scan.{Aux, from} + import Scan.{from, Aux} /** * The computation of any given scan involves keeping track of a hidden state. @@ -186,41 +186,31 @@ sealed trait Scan[-I, +O] extends Serializable { self => // combinators - def replaceState(newInitialState: => State): Aux[I, State, O] = from(newInitialState)(presentAndNextState(_, _)) + def replaceState(newInitialState: => State): Aux[I, State, O] = + from(newInitialState)(presentAndNextState(_, _)) - def composePrepare[I1](f: I1 => I): Aux[I1, State, O] = from(self.initialState){(i, stateBeforeProcessingI) => - presentAndNextState(f(i), stateBeforeProcessingI) + def composePrepare[I1](f: I1 => I): Aux[I1, State, O] = from(initialState) { (i, stateBeforeProcessingI) => + presentAndNextState(f(i), stateBeforeProcessingI) } - def andThenPresent[O1](g: O => O1): Aux[I, State, O1] = new Scan[I, O1] { - override type State = self.State - override def initialState: State = self.initialState - - override def presentAndNextState(i: I, stateBeforeProcessingI: State): (O1, State) = { - val (c, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) - (g(c), stateAfterProcessingA) - } + def andThenPresent[O1](g: O => O1): Aux[I, State, O1] = from(initialState) { (i, stateBeforeProcessingI) => + val (c, stateAfterProcessingA) = presentAndNextState(i, stateBeforeProcessingI) + (g(c), stateAfterProcessingA) } /** * * @tparam I1 - * @return If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs + * @return If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs * of the form [o_1, ..., o_n], then {{joinWithInput}} results in a Scan whose `apply` method * returns [(a_1, o_1), ..., (a_n, o_n)] when given the same input. - * In other words, {{joinWithInput}} returns a scanner that is semantically identical to - * {{this.join(Scan.identity[I1])}}, but where we don't pollute the {{State}} by pairing it - * redundantly with {{Unit}}. + * In other words, `joinWithInput` returns a scanner that is semantically identical to + * `this.join(Scan.identity[I1]`, but where we don't pollute the `State` by pairing it + * redundantly with `Unit`. */ - def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = new Scan[I1, (I1, O)] { - override type State = self.State - - override def initialState: State = self.initialState - - override def presentAndNextState(i: I1, stateBeforeProcessingI: State): ((I1, O), State) = { - val (o, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) - ((i, o), stateAfterProcessingA) - } + def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = from(initialState) { (i, stateBeforeProcessingI) => + val (o, stateAfterProcessingI) = self.presentAndNextState(i, stateBeforeProcessingI) + ((i, o), stateAfterProcessingI) } /** @@ -230,15 +220,9 @@ sealed trait Scan[-I, +O] extends Serializable { self => * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return * [(o_1, state_0), ..., (o_n, state_(n-1))]. */ - def joinWithPriorState: Aux[I, State, (State, O)] = new Scan[I, (State, O)] { - override type State = self.State - - override def initialState: State = self.initialState - - override def presentAndNextState(i: I, stateBeforeProcessingI: State): ((State, O), State) = { - val (o, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) - ((stateBeforeProcessingI, o), stateAfterProcessingA) - } + def joinWithPriorState: Aux[I, State, (State, O)] = from(initialState) { (i, stateBeforeProcessingI) => + val (o, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + ((stateBeforeProcessingI, o), stateAfterProcessingA) } /** @@ -248,15 +232,9 @@ sealed trait Scan[-I, +O] extends Serializable { self => * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return * [(o_1, state_1), ..., (o_n, state_n]. */ - def joinWithPosteriorState: Aux[I, State, (O, State)] = new Scan[I, (O, State)] { - override type State = self.State - - override def initialState: State = self.initialState - - override def presentAndNextState(i: I, stateBeforeProcessingI: State): ((O, State), State) = { - val (c, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) - ((c, stateAfterProcessingA), stateAfterProcessingA) - } + def joinWithPosteriorState: Aux[I, State, (O, State)] = from(initialState) { (i, stateBeforeProcessingI) => + val (c, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + ((c, stateAfterProcessingA), stateAfterProcessingA) } /** @@ -279,22 +257,12 @@ sealed trait Scan[-I, +O] extends Serializable { self => * In other words: `scan.zip(scan2)(foo.zip(bar)) == scan(foo).zip(scan2(bar)) ` */ def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = - new Scan[(I, I2), (O, O2)] { - override type State = (self.State, scan2.State) - - override def initialState: State = (self.initialState, scan2.initialState) - - override def presentAndNextState( - i1i2: (I, I2), - stateBeforeProcessingI1I2: State - ): ((O, O2), State) = { - val (o1, state1AfterProcesingI1) = - self.presentAndNextState(i1i2._1, stateBeforeProcessingI1I2._1) - val (o2, state2AfterProcesingI2) = - scan2.presentAndNextState(i1i2._2, stateBeforeProcessingI1I2._2) - ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) - - } + from((self.initialState, scan2.initialState)) { (i1i2, stateBeforeProcessingI1I2) => + val (o1, state1AfterProcesingI1) = + self.presentAndNextState(i1i2._1, stateBeforeProcessingI1I2._1) + val (o2, state2AfterProcesingI2) = + scan2.presentAndNextState(i1i2._2, stateBeforeProcessingI1I2._2) + ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) } /** @@ -306,17 +274,12 @@ sealed trait Scan[-I, +O] extends Serializable { self => * `join` will return a scan whose apply method returns [(o_1, p_1), ..., (o_2, p_2)]. * In other words: `scan.join(scan2)(foo) == scan(foo).zip(scan2(foo)) ` */ - def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = new Scan[I2, (O, O2)] { - override type State = (self.State, scan2.State) - - override def initialState: State = (self.initialState, scan2.initialState) - - override def presentAndNextState(i: I2, stateBeforeProcessingI: State): ((O, O2), State) = { + def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = + from((self.initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => val (o1, state1AfterProcesingI1) = self.presentAndNextState(i, stateBeforeProcessingI._1) val (o2, state2AfterProcesingI2) = scan2.presentAndNextState(i, stateBeforeProcessingI._2) ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) } - } /** * Takes the output of this scan and feeds as input into scan2. @@ -326,17 +289,12 @@ sealed trait Scan[-I, +O] extends Serializable { self => * the form [o_1, ..., o_n], and scan2.apply([o_1, ..., o_n] = [p_1, ..., p_n] then * `compose` will return a scan which returns [p_1, ..., p_n]. */ - def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = new Scan[I, P] { - override type State = (self.State, scan2.State) - - override def initialState: State = (self.initialState, scan2.initialState) - - override def presentAndNextState(i: I, stateBeforeProcessingI: State): (P, State) = { - val (o, state1AfterProcesingI1) = self.presentAndNextState(i, stateBeforeProcessingI._1) - val (p, state2AfterProcesingI2) = scan2.presentAndNextState(o, stateBeforeProcessingI._2) - (p, (state1AfterProcesingI1, state2AfterProcesingI2)) + def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = + from((self.initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => + val (o, state1AfterProcesingI) = presentAndNextState(i, stateBeforeProcessingI._1) + val (p, state2AfterProcesingO) = scan2.presentAndNextState(o, stateBeforeProcessingI._2) + (p, (state1AfterProcesingI, state2AfterProcesingO)) } - } } From 9448c9bc4c5d8290c86405ead2c5f63e6225e1c5 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 11:24:20 -0800 Subject: [PATCH 15/26] some comment updating --- .../scala/com/twitter/algebird/Scan.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 7ef92ed97..621f70a88 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -111,15 +111,17 @@ object Scan { /** * The Scan trait is an alternative to the "scanLeft" method on iterators/other collections for a range of - * of use-cases where scanLeft is awkward to use. At a high level it provides some of the same functionality as scanleft, - * but with a separation of "what is the state of the scan" from "what are the elements that I'm scanning over?" - * In particular, when scanning over an iterator with N elements, the output is an iterator with N elements (in contrast - * to scanLeft's N+1). + * of use-cases where scanLeft is awkward to use. At a high level it provides some of the same functionality as + * [[scala.collection.Iterator.scanLeft]], but with a separation of "what is the state of the scan" from + * "what are the elements that I'm scanning over?". In particular, when scanning over an iterator with `N` elements, + * the output is an iterator with `N` elements (in contrast to scanLeft's `N+1`). * - * If you find yourself writing a ScanLeft over pairs of elements, where you only use one element of the pair within - * the scanLeft itself then throw that element away in a "map" immediately after the scanLeft is done, then this + * If you find yourself writing a `scanLeft` over pairs of elements, where you only use one element of the pair within + * the `scanLeft` itself then throw that element away in a `map` immediately after the scanLeft is done, then this * abstraction is for you. * + * The canonical method to use a scanner is its `apply` method. + * * * @tparam I The type of elements that the computation is scanning over. * @tparam O The output type of the scan (typically distinct from the hidden `State` of the scan. @@ -174,10 +176,10 @@ sealed trait Scan[-I, +O] extends Serializable { self => * @tparam In The type of the input collection * @tparam Out The type of the output collection * @return - * Given inputs as a collection of the form [a_1, ..., a_n] the output will be a collection of the form: - * [o_1, ..., o_n] where - * (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) - * and state_0 = initialState + * Given inputs as a collection of the form `[a_1, ..., a_n]` the output will be a collection of the form: + * `[o_1, ..., o_n]` where + * `(o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)` + * and `state_0 = initialState`. */ def apply[In <: TraversableOnce[I], Out]( inputs: In From 5a9f3660257099e02eba39ba8f21d919ef57a9ab Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 11:32:46 -0800 Subject: [PATCH 16/26] comment thingy --- algebird-core/src/main/scala/com/twitter/algebird/Scan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 621f70a88..af64f4a9e 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -110,9 +110,9 @@ object Scan { } /** - * The Scan trait is an alternative to the "scanLeft" method on iterators/other collections for a range of + * The Scan trait is an alternative to the `scanLeft` method on iterators/other collections for a range of * of use-cases where scanLeft is awkward to use. At a high level it provides some of the same functionality as - * [[scala.collection.Iterator.scanLeft]], but with a separation of "what is the state of the scan" from + * `scanleft`, but with a separation of "what is the state of the scan" from * "what are the elements that I'm scanning over?". In particular, when scanning over an iterator with `N` elements, * the output is an iterator with `N` elements (in contrast to scanLeft's `N+1`). * From b3346976246f2627b01d7a2ce0a2a70969e81523 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 13:23:08 -0800 Subject: [PATCH 17/26] got rid of self --- .../scala/com/twitter/algebird/Scan.scala | 69 +++++++++++-------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index af64f4a9e..491ea5f76 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -111,22 +111,22 @@ object Scan { /** * The Scan trait is an alternative to the `scanLeft` method on iterators/other collections for a range of - * of use-cases where scanLeft is awkward to use. At a high level it provides some of the same functionality as - * `scanleft`, but with a separation of "what is the state of the scan" from + * of use-cases where `scanLeft` is awkward to use. At a high level it provides some of the same functionality as + * `scanLeft`, but with a separation of "what is the state of the scan" from * "what are the elements that I'm scanning over?". In particular, when scanning over an iterator with `N` elements, * the output is an iterator with `N` elements (in contrast to scanLeft's `N+1`). * - * If you find yourself writing a `scanLeft` over pairs of elements, where you only use one element of the pair within - * the `scanLeft` itself then throw that element away in a `map` immediately after the scanLeft is done, then this + * If you find yourwriting a `scanLeft` over pairs of elements, where you only use one element of the pair within + * the `scanLeft` itthen throw that element away in a `map` immediately after the scanLeft is done, then this * abstraction is for you. * - * The canonical method to use a scanner is its `apply` method. + * The canonical method to use a scanner is `apply`. * * * @tparam I The type of elements that the computation is scanning over. * @tparam O The output type of the scan (typically distinct from the hidden `State` of the scan. */ -sealed trait Scan[-I, +O] extends Serializable { self => +sealed trait Scan[-I, +O] extends Serializable { import Scan.{from, Aux} @@ -188,6 +188,11 @@ sealed trait Scan[-I, +O] extends Serializable { self => // combinators + /** + * Returns a new scan that is the same as this scan, but with a different `initialState`. + * @param newInitialState + * @return + */ def replaceState(newInitialState: => State): Aux[I, State, O] = from(newInitialState)(presentAndNextState(_, _)) @@ -211,63 +216,67 @@ sealed trait Scan[-I, +O] extends Serializable { self => * redundantly with `Unit`. */ def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = from(initialState) { (i, stateBeforeProcessingI) => - val (o, stateAfterProcessingI) = self.presentAndNextState(i, stateBeforeProcessingI) + val (o, stateAfterProcessingI) = presentAndNextState(i, stateBeforeProcessingI) ((i, o), stateAfterProcessingI) } /** * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs - * of the form [o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) - * and state_0 = initialState: - * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return - * [(o_1, state_0), ..., (o_n, state_(n-1))]. + * of the form `[o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)`` + * and `state_0 = initialState`: + * @return A scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return + * `[(o_1, state_0), ..., (o_n, state_(n-1))]`. */ def joinWithPriorState: Aux[I, State, (State, O)] = from(initialState) { (i, stateBeforeProcessingI) => - val (o, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + val (o, stateAfterProcessingA) = presentAndNextState(i, stateBeforeProcessingI) ((stateBeforeProcessingI, o), stateAfterProcessingA) } /** - * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs - * of the form [o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) + * If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs + * of the form `[o_1, ..., o_n]`, where `(o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)`` * and state_0 = initialState: - * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return - * [(o_1, state_1), ..., (o_n, state_n]. + * @return A scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return + * `[(o_1, state_1), ..., (o_n, state_n]`. */ def joinWithPosteriorState: Aux[I, State, (O, State)] = from(initialState) { (i, stateBeforeProcessingI) => - val (c, stateAfterProcessingA) = self.presentAndNextState(i, stateBeforeProcessingI) + val (c, stateAfterProcessingA) = presentAndNextState(i, stateBeforeProcessingI) ((c, stateAfterProcessingA), stateAfterProcessingA) } /** - * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs - * of the form [o_1, ..., o_n] - * @return A scan that whose apply method, when given inputs [a_1, ..., a_n] will return - * [(o_1, 1), ..., (o_n, n)]. - * In other words: `scan.joinWithIndex(foo) == scan(foo).zipWithIndex)` + * `scan.joinWithIndex(foo) == scan(foo).zipWithIndex)`. + * @return + * * If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs + * of the form `[o_1, ..., o_n]`, return a scan that whose apply method, when given the same input, will return + * `[(o_1, 1), ..., (o_n, n)]`. */ def joinWithIndex: Aux[I, (State, Long), (O, Long)] = join(Scan.index) /** + * Compose two scans pairwise such that, when given pairwise zipped inputs, the resulting scan will output pairwise + * zipped outputs. * @param scan2 * @tparam I2 * @tparam O2 - * @return f this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of - * the form [o_1, ..., o_n], and scan2.apply([b_1, ..., b_n] = [p_1, ..., p_n] then + * @return If this Scan's apply method is given inputs `[a_1, ..., a_n]` resulting in outputs of + * the form `[o_1, ..., o_n]`, and `scan2.apply([b_1, ..., b_n] = [p_1, ..., p_n]` then * `zip` will return a scan whose apply method, when given input - * [(a_1, b_1), ..., (a_n, b_n)] results in the output [(o_1, p_1), ..., (o_2, p_2)]. + * `[(a_1, b_1), ..., (a_n, b_n)]` results in the output `[(o_1, p_1), ..., (o_2, p_2)]`. * In other words: `scan.zip(scan2)(foo.zip(bar)) == scan(foo).zip(scan2(bar)) ` */ def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = - from((self.initialState, scan2.initialState)) { (i1i2, stateBeforeProcessingI1I2) => + from((initialState, scan2.initialState)) { (i1i2, stateBeforeProcessingI1I2) => val (o1, state1AfterProcesingI1) = - self.presentAndNextState(i1i2._1, stateBeforeProcessingI1I2._1) + presentAndNextState(i1i2._1, stateBeforeProcessingI1I2._1) val (o2, state2AfterProcesingI2) = scan2.presentAndNextState(i1i2._2, stateBeforeProcessingI1I2._2) ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) } /** + * Given a scan that takes compatible input to this one, compose the state and outputs of each scan pairwise + * on a common input stream. * @param scan2 * @tparam I2 * @tparam O2 @@ -277,8 +286,8 @@ sealed trait Scan[-I, +O] extends Serializable { self => * In other words: `scan.join(scan2)(foo) == scan(foo).zip(scan2(foo)) ` */ def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = - from((self.initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => - val (o1, state1AfterProcesingI1) = self.presentAndNextState(i, stateBeforeProcessingI._1) + from((initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => + val (o1, state1AfterProcesingI1) = presentAndNextState(i, stateBeforeProcessingI._1) val (o2, state2AfterProcesingI2) = scan2.presentAndNextState(i, stateBeforeProcessingI._2) ((o1, o2), (state1AfterProcesingI1, state2AfterProcesingI2)) } @@ -292,7 +301,7 @@ sealed trait Scan[-I, +O] extends Serializable { self => * `compose` will return a scan which returns [p_1, ..., p_n]. */ def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = - from((self.initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => + from((initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => val (o, state1AfterProcesingI) = presentAndNextState(i, stateBeforeProcessingI._1) val (p, state2AfterProcesingO) = scan2.presentAndNextState(o, stateBeforeProcessingI._2) (p, (state1AfterProcesingI, state2AfterProcesingO)) From 420f3120143d0f6d9cf153b971b6f37f27a8cb8a Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 13:29:44 -0800 Subject: [PATCH 18/26] more comment fiddling --- .../src/main/scala/com/twitter/algebird/Scan.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 491ea5f76..9bddbb979 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -223,8 +223,8 @@ sealed trait Scan[-I, +O] extends Serializable { /** * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs * of the form `[o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)`` - * and `state_0 = initialState`: - * @return A scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return + * and `state_0 = initialState`, + * return scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return * `[(o_1, state_0), ..., (o_n, state_(n-1))]`. */ def joinWithPriorState: Aux[I, State, (State, O)] = from(initialState) { (i, stateBeforeProcessingI) => @@ -236,7 +236,7 @@ sealed trait Scan[-I, +O] extends Serializable { * If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs * of the form `[o_1, ..., o_n]`, where `(o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)`` * and state_0 = initialState: - * @return A scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return + * return A scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return * `[(o_1, state_1), ..., (o_n, state_n]`. */ def joinWithPosteriorState: Aux[I, State, (O, State)] = from(initialState) { (i, stateBeforeProcessingI) => From 5d36385d765f0c60a8ced963e90b65bd015bec82 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 13:31:29 -0800 Subject: [PATCH 19/26] more backticks --- .../src/main/scala/com/twitter/algebird/Scan.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 9bddbb979..865ab531c 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -152,10 +152,10 @@ sealed trait Scan[-I, +O] extends Serializable { /** * @param iter - * @return If iter = Iterator(a_1, ..., a_n), return: - * Iterator(o_1, ..., o_n) where - * (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i) - * and state_0 = initialState + * @return If `iter = Iterator(a_1, ..., a_n)`, return:` + * `Iterator(o_1, ..., o_n)` where + * `(o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)` + * and `state_0 = initialState` * */ def scanIterator(iter: Iterator[I]): Iterator[O] = new AbstractIterator[O] { From 83160cbc4deba409f74afb50e1523be40c45d864 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 14:08:38 -0800 Subject: [PATCH 20/26] more comment fine-tuning --- .../scala/com/twitter/algebird/Scan.scala | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 865ab531c..a05eb68e7 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -83,8 +83,8 @@ object Scan { * @tparam A * @tparam B * @tparam C - * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where - * c_i = initState + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i) + * @return A scan which, when given `[a_1, ..., a_n]` outputs `[c_1, ..., c_n]` where + * `c_i = initState + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i)` */ def fromAggregator[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Aux[A, B, C] = from(initState) { (a: A, stateBeforeProcessingI: B) => @@ -101,8 +101,8 @@ object Scan { * @tparam A * @tparam B * @tparam C - * @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where - * c_i = monoidAggregator.monoid.zero + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i) + * @return A scan which, when given `[a_1, ..., a_n]` outputs `[c_1, ..., c_n]` where + * `c_i = monoidAggregator.monoid.zero + aggregator.prepare(a_1) + ... + aggregator.prepare(a_i)` */ def fromMonoidAggregator[A, B, C](monoidAggregator: MonoidAggregator[A, B, C]): Aux[A, B, C] = fromAggregator(monoidAggregator, monoidAggregator.monoid.zero) @@ -206,14 +206,13 @@ sealed trait Scan[-I, +O] extends Serializable { } /** - * - * @tparam I1 - * @return If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs - * of the form [o_1, ..., o_n], then {{joinWithInput}} results in a Scan whose `apply` method - * returns [(a_1, o_1), ..., (a_n, o_n)] when given the same input. - * In other words, `joinWithInput` returns a scanner that is semantically identical to + * Returns a scanner that is semantically identical to * `this.join(Scan.identity[I1]`, but where we don't pollute the `State` by pairing it * redundantly with `Unit`. + * @tparam I1 + * @return If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs + * of the form `[o_1, ..., o_n`, then this results in a Scan whose `apply` method + * returns `[(a_1, o_1), ..., (a_n, o_n)]` when given the same input. */ def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = from(initialState) { (i, stateBeforeProcessingI) => val (o, stateAfterProcessingI) = presentAndNextState(i, stateBeforeProcessingI) @@ -222,7 +221,7 @@ sealed trait Scan[-I, +O] extends Serializable { /** * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs - * of the form `[o_1, ..., o_n], where (o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)`` + * of the form `[o_1, ..., o_n]`, where `(o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)` * and `state_0 = initialState`, * return scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return * `[(o_1, state_0), ..., (o_n, state_(n-1))]`. @@ -245,9 +244,9 @@ sealed trait Scan[-I, +O] extends Serializable { } /** - * `scan.joinWithIndex(foo) == scan(foo).zipWithIndex)`. + * `scan.joinWithIndex(foo) == scan(foo).zipWithIndex`. * @return - * * If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs + * If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs * of the form `[o_1, ..., o_n]`, return a scan that whose apply method, when given the same input, will return * `[(o_1, 1), ..., (o_n, n)]`. */ @@ -263,7 +262,7 @@ sealed trait Scan[-I, +O] extends Serializable { * the form `[o_1, ..., o_n]`, and `scan2.apply([b_1, ..., b_n] = [p_1, ..., p_n]` then * `zip` will return a scan whose apply method, when given input * `[(a_1, b_1), ..., (a_n, b_n)]` results in the output `[(o_1, p_1), ..., (o_2, p_2)]`. - * In other words: `scan.zip(scan2)(foo.zip(bar)) == scan(foo).zip(scan2(bar)) ` + * In other words: `scan.zip(scan2)(foo.zip(bar)) == scan(foo).zip(scan2(bar))` */ def zip[I2, O2](scan2: Scan[I2, O2]): Aux[(I, I2), (State, scan2.State), (O, O2)] = from((initialState, scan2.initialState)) { (i1i2, stateBeforeProcessingI1I2) => @@ -275,15 +274,15 @@ sealed trait Scan[-I, +O] extends Serializable { } /** - * Given a scan that takes compatible input to this one, compose the state and outputs of each scan pairwise + * Given a scan that takes compatible input to this one, pairwise compose the state and outputs of each scan * on a common input stream. * @param scan2 * @tparam I2 * @tparam O2 * @return If this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of - * the form [o_1, ..., o_n], and scan2.apply([a_1, ..., a_n] = [p_1, ..., p_n] then - * `join` will return a scan whose apply method returns [(o_1, p_1), ..., (o_2, p_2)]. - * In other words: `scan.join(scan2)(foo) == scan(foo).zip(scan2(foo)) ` + * the form `[o_1, ..., o_n]`, and `scan2.apply([a_1, ..., a_n] = [p_1, ..., p_n]` then + * `join` will return a scan whose apply method returns `[(o_1, p_1), ..., (o_2, p_2)]`. + * In other words: `scan.join(scan2)(foo) == scan(foo).zip(scan2(foo))` */ def join[I2 <: I, O2](scan2: Scan[I2, O2]): Aux[I2, (State, scan2.State), (O, O2)] = from((initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => @@ -296,9 +295,9 @@ sealed trait Scan[-I, +O] extends Serializable { * Takes the output of this scan and feeds as input into scan2. * @param scan2 * @tparam P - * @return If this Scan's apply method is given inputs [a_1, ..., a_n] resulting in outputs of - * the form [o_1, ..., o_n], and scan2.apply([o_1, ..., o_n] = [p_1, ..., p_n] then - * `compose` will return a scan which returns [p_1, ..., p_n]. + * @return If this Scan's apply method is given inputs `[a_1, ..., a_n]` resulting in outputs of + * the form `[o_1, ..., o_n]`, and `scan2.apply([o_1, ..., o_n] = [p_1, ..., p_n]` then + * `compose` will return a scan which returns `[p_1, ..., p_n]`. */ def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = from((initialState, scan2.initialState)) { (i, stateBeforeProcessingI) => From c2dcb008d3c32dccac89f126caaa2c05af19e7b0 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 14:20:53 -0800 Subject: [PATCH 21/26] more comment tweaking --- .../scala/com/twitter/algebird/Scan.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index a05eb68e7..21e1c1885 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -47,7 +47,7 @@ object Scan { } /** - * A Scan that returns the number N for the Nth input (starting from 0) + * A Scan whose `Nth` output is the number `N` (starting from 0). */ val index: Aux[Any, Long, Long] = iterate(0L)(n => (n, n + 1)) @@ -116,15 +116,15 @@ object Scan { * "what are the elements that I'm scanning over?". In particular, when scanning over an iterator with `N` elements, * the output is an iterator with `N` elements (in contrast to scanLeft's `N+1`). * - * If you find yourwriting a `scanLeft` over pairs of elements, where you only use one element of the pair within - * the `scanLeft` itthen throw that element away in a `map` immediately after the scanLeft is done, then this + * If you find yourself writing a `scanLeft` over pairs of elements, where you only use one element of the pair within + * the `scanLeft`, then throw that element away in a `map` immediately after the scanLeft is done, then this * abstraction is for you. * - * The canonical method to use a scanner is `apply`. + * The canonical method to use a scan is `apply`. * * * @tparam I The type of elements that the computation is scanning over. - * @tparam O The output type of the scan (typically distinct from the hidden `State` of the scan. + * @tparam O The output type of the scan (typically distinct from the hidden `State` of the scan). */ sealed trait Scan[-I, +O] extends Serializable { @@ -206,8 +206,8 @@ sealed trait Scan[-I, +O] extends Serializable { } /** - * Returns a scanner that is semantically identical to - * `this.join(Scan.identity[I1]`, but where we don't pollute the `State` by pairing it + * Return a scan that is semantically identical to + * `this.join(Scan.identity[I1])`, but where we don't pollute the `State` by pairing it * redundantly with `Unit`. * @tparam I1 * @return If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs @@ -220,10 +220,11 @@ sealed trait Scan[-I, +O] extends Serializable { } /** - * If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs + * Return a scan whose output is paired with the state of the scan before each input updates the state. + * @return If this Scan's `apply` method is given inputs [a_1, ..., a_n] resulting in outputs * of the form `[o_1, ..., o_n]`, where `(o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)` - * and `state_0 = initialState`, - * return scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return + * and `state_0 = initialState`, + * return a scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return * `[(o_1, state_0), ..., (o_n, state_(n-1))]`. */ def joinWithPriorState: Aux[I, State, (State, O)] = from(initialState) { (i, stateBeforeProcessingI) => @@ -232,10 +233,11 @@ sealed trait Scan[-I, +O] extends Serializable { } /** - * If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs + * Return a scan whose output is paired with the state of the scan after each input updates the state. + * @return If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs * of the form `[o_1, ..., o_n]`, where `(o_(i+1), state_(i+1)) = presentAndNextState(a_i, state_i)`` - * and state_0 = initialState: - * return A scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return + * and state_0 = initialState, + * return a scan that whose apply method, when given inputs `[a_1, ..., a_n]` will return * `[(o_1, state_1), ..., (o_n, state_n]`. */ def joinWithPosteriorState: Aux[I, State, (O, State)] = from(initialState) { (i, stateBeforeProcessingI) => @@ -244,7 +246,7 @@ sealed trait Scan[-I, +O] extends Serializable { } /** - * `scan.joinWithIndex(foo) == scan(foo).zipWithIndex`. + * For every `foo`, `scan.joinWithIndex(foo) == scan(foo).zipWithIndex`. * @return * If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs * of the form `[o_1, ..., o_n]`, return a scan that whose apply method, when given the same input, will return From e10796c5685f73bf7b26eb56a8dfd43585496cd7 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 14:24:39 -0800 Subject: [PATCH 22/26] oscar's suggestions --- algebird-core/src/main/scala/com/twitter/algebird/Scan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 21e1c1885..f15017f23 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -51,7 +51,7 @@ object Scan { */ val index: Aux[Any, Long, Long] = iterate(0L)(n => (n, n + 1)) - def identity[A] = fromFunction[A, A](x => x) + def identity[A]: Aux[A, Unit, A] = fromFunction[A, A](x => x) /** * @@ -91,7 +91,7 @@ object Scan { // nb: the order of the arguments to semigroup.plus here is what determines the order of the final summation; // this matters because not all semigroups are commutative val stateAfterProcessingA = - aggregator.semigroup.plus(stateBeforeProcessingI, aggregator.prepare(a)) + aggregator.append(stateBeforeProcessingI, a) (aggregator.present(stateAfterProcessingA), stateAfterProcessingA) } From f6446f877e747ae4f64d49429635707c202117ec Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 14:30:08 -0800 Subject: [PATCH 23/26] change order of joinWithInput --- .../src/main/scala/com/twitter/algebird/Scan.scala | 6 +++--- .../src/test/scala/com/twitter/algebird/ScanTest.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index f15017f23..47f6845f3 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -212,11 +212,11 @@ sealed trait Scan[-I, +O] extends Serializable { * @tparam I1 * @return If this Scan's `apply` method is given inputs `[a_1, ..., a_n]` resulting in outputs * of the form `[o_1, ..., o_n`, then this results in a Scan whose `apply` method - * returns `[(a_1, o_1), ..., (a_n, o_n)]` when given the same input. + * returns `[(o_1, a_1), ..., (o_n, a_n)]` when given the same input. */ - def joinWithInput[I1 <: I]: Aux[I1, State, (I1, O)] = from(initialState) { (i, stateBeforeProcessingI) => + def joinWithInput[I1 <: I]: Aux[I1, State, (O, I1)] = from(initialState) { (i, stateBeforeProcessingI) => val (o, stateAfterProcessingI) = presentAndNextState(i, stateBeforeProcessingI) - ((i, o), stateAfterProcessingI) + ((o, i), stateAfterProcessingI) } /** diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 830f14600..9d24547b1 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -45,7 +45,7 @@ object ScanTest { .andThenPresent(_ => ()) .joinWithPriorState .joinWithInput - .andThenPresent { case (input, (state, ())) => (input :: state).mkString.reverse } + .andThenPresent { case ((state, ()), input) => (input :: state).mkString.reverse } val joinWithPriorStateFreeScan2: StringScan = directFreeScan From 07c6b58de756ede931da220019c7d11f32d84f66 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 15:01:08 -0800 Subject: [PATCH 24/26] tests for replaceState and Scan.const --- .../scala/com/twitter/algebird/ScanTest.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 9d24547b1..3c73001c5 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -142,5 +142,27 @@ class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyCheck "obey its laws" in { joinWithIndexLaws(directFreeScan) } + + "replaceState" should { + "behave as you'd expect" in { + forAll(Gen.listOf(Gen.alphaLowerChar), Gen.listOf(Gen.alphaLowerChar)) { (inputList1, inputList2) => + // first we'll run the scan on inputList1 ++ inputList2, which. We should be able to replace the initial state + // of the scan such that just scanning only inputList2 will + val (_, output2) = directFreeScan(inputList1 ++ inputList2).splitAt(inputList1.length) + val stateOfScanAfterProcessingList1 = inputList1.reverse + val scanAfterReplacingState = directFreeScan.replaceState(stateOfScanAfterProcessingList1) + scanAfterReplacingState(inputList2) should equal(output2) + } + } + + "Scan.const" should { + "behave as you'd expect" in { + forAll(Gen.alphaLowerChar, Gen.listOf(Gen.alphaLowerChar)) { (const, inputList) => + (Scan.const(const)(inputList) should contain) + .theSameElementsInOrderAs(List.fill(inputList.length)(const)) + } + } + } + } } } From a244a2f28c4220b0a83a283ef47f2c2546aaf905 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 15:12:34 -0800 Subject: [PATCH 25/26] return not returns --- algebird-core/src/main/scala/com/twitter/algebird/Scan.scala | 2 +- .../src/test/scala/com/twitter/algebird/ScanTest.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 47f6845f3..5ef18b10b 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -189,7 +189,7 @@ sealed trait Scan[-I, +O] extends Serializable { // combinators /** - * Returns a new scan that is the same as this scan, but with a different `initialState`. + * Return a new scan that is the same as this scan, but with a different `initialState`. * @param newInitialState * @return */ diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala index 3c73001c5..4538f4f8f 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ScanTest.scala @@ -146,8 +146,9 @@ class ScanTest extends WordSpec with Matchers with ScalaCheckDrivenPropertyCheck "replaceState" should { "behave as you'd expect" in { forAll(Gen.listOf(Gen.alphaLowerChar), Gen.listOf(Gen.alphaLowerChar)) { (inputList1, inputList2) => - // first we'll run the scan on inputList1 ++ inputList2, which. We should be able to replace the initial state - // of the scan such that just scanning only inputList2 will + // first we'll run the scan on inputList1 ++ inputList2, which will result in output1 ++ output2. + // We should be able to replace the initial state of the scan such that just scanning only inputList2 + // will return output2. val (_, output2) = directFreeScan(inputList1 ++ inputList2).splitAt(inputList1.length) val stateOfScanAfterProcessingList1 = inputList1.reverse val scanAfterReplacingState = directFreeScan.replaceState(stateOfScanAfterProcessingList1) From dbe97f9e41ffc8ce5bbaa6d1f467d5f3e4162136 Mon Sep 17 00:00:00 2001 From: Jeff Sarnat Date: Tue, 12 Nov 2019 16:53:56 -0800 Subject: [PATCH 26/26] abstract class, not trait --- algebird-core/src/main/scala/com/twitter/algebird/Scan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala index 5ef18b10b..de2d59d33 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Scan.scala @@ -126,7 +126,7 @@ object Scan { * @tparam I The type of elements that the computation is scanning over. * @tparam O The output type of the scan (typically distinct from the hidden `State` of the scan). */ -sealed trait Scan[-I, +O] extends Serializable { +sealed abstract class Scan[-I, +O] extends Serializable { import Scan.{from, Aux}