Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new algebird abstraction: Scan #739

Merged
merged 26 commits into from
Nov 13, 2019
Merged

new algebird abstraction: Scan #739

merged 26 commits into from
Nov 13, 2019

Conversation

jeff-stripe
Copy link
Contributor

@jeff-stripe jeff-stripe commented Nov 6, 2019

A new abstraction, similar to Aggregator or Fold, that aims to reify the business logic assocaited with scanLeft into its own abstraction with its own combinators.

The testing here is pretty comprehensive, although I don't think .zip is covered. Moreover, I originally had a bunch of combinators named .zipWithXXX that I later renamed to .joinWithXXX to make more consistent with the types of zip and join. I do think it's worth bikeshedding naming on all of this though.

For posterity, my non-work github is @eigenvariable.

@CLAassistant
Copy link

CLAassistant commented Nov 6, 2019

CLA assistant check
All committers have signed the CLA.

*/
type Aux[-I, S, +O] = Scan[I, O] { type State = S }

implicit def applicative[I]: Applicative[({ type L[O] = Scan[I, O] })#L] = new ScanApplicative[I]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use Kind projector here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we use that plugin, we could, but I'd rather not add a build dependency for the few cases where we have done this. This is mostly a demonstration anyway, since I expect if you really want a Applicative[Scan[I, ?]] you will want a cats implementation, but anyway, this shows you how to make one.

import scala.collection.compat._
import scala.collection.generic.CanBuildFrom

object Scan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different than Aggregator[I,M[_]:SemigroupK, O] . I think the motivation here is that you want S type to be a container so that reduce on S is collecting intermediate state. I believe that can be achieved by existing Aggregators by 1) making S a type constructor 2) making sure that S has an instance of SemigroupK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MansurAshraf First: great to be on a PR with you again! It's been a long time :-).

Second: the idea here is that this transforms streams to streams. whereas aggregators transform Sequences to single values. I would look at the apply method (starting on line 229 of Stream.scala as of this writing). The idea is that, for each input in the stream, you have one output in the stream. For example, using directFreeScan from ScanTest, something like directFreeScan(List('a', 'b', 'c')) would result in List("a", "ab", "abc").

Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @jeff-stripe , its been a while :-)

I will take a look at the Stream.scala but cursory look at def fromAggregator[A, B, C](aggregator: Aggregator[A, B, C], initState: B): Aux[A, B, C] gives me the impression that

Scan[A,C] ~= Aggregator[A,S[_],(S[_],C)]

Scan has a new function called presentAndNextState which return both S state type and final output type C. If you aggregator type was Aggregator[A,S[_],(S[_],C)], the present function in existing aggregator will return the same value i think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MansurAshraf In my example, the analogous Aggregator would just return "abc", not List("a", "ab", "abc"). In other words, Aggregator is to the reduce method as Scan is to the scanLeft method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mansur, to a first approximation, Aggregator[A, _, B] is Iterator[A] => Option[B], MonoidAggregator[A, _, B] is Iterator[A] => B and Scan[A, B] is Iterator[A] => Iterator[B] (actually with an added law that the number of items in the result is exactly the same.

You could imagine another idea: ExpandingScan[A, B] that is Iterator[A] => Iterator[B] but without the law of the iterators being the same size (then we could do things like filter, or concatMap, etc...)

Does that help explain it at all?

Think of the ExpandingScan as the most general function you can do in a reducer in Hadoop.

@johnynek
Copy link
Collaborator

johnynek commented Nov 7, 2019

Thanks jeff.

CI failed:

  1. code formatting, I think running scalafmtAll
  2. the 2.13 build fails. We need to make a minor change to the CanBuildFrom stuff. I don't know the answer, but 2.13 BuildFrom which the compat stuff is aliasing is here: https://www.scala-lang.org/api/2.13.1/scala/collection/BuildFrom.html

/**
* A Scan that returns the number N for the Nth input (starting from 0)
*/
val index: Aux[Any, Long, Long] = fromStreamLike(0L)(n => (n, n+1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really cool.

* @return A scan which, when given [a_1, ..., a_n] outputs [c_1, ..., c_n] where
* c_i = aggregator.prepare(a_i) + ... + aggregator.prepare(a_1) + monoidAggregator.monoid.zero
*/
def fromMonoidAggregatorReverse[A, B, C](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about adding a def reverse method to MonoidAggregator

}
}

def joinWithIndex: Aux[I, (State, Long), (O, Long)] = join(Scan.index)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this show the compositional power so well!

}
}

def compose[P](scan2: Scan[O, P]): Aux[I, (State, scan2.State), P] = new Scan[I, P] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add comments to the methods: this feeds each output of this into scan2 and join is like this forks each input and feeds it into both this and scan2, etc...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a bunch of comments. Let me know if you like the wording/framing of them.

@jeff-stripe
Copy link
Contributor Author

Thanks @johnynek for the comments; I'm running out of steam for the day but will pick this up again tomorrow.

@johnynek
Copy link
Collaborator

johnynek commented Nov 7, 2019

a really cool example Scan[Double, Double] would be running z-score. You keep a state of Moments and you use then joinWithPosteriorState, and then normalize the Double against the mean and the std-dev.

... just thinking aloud, that's not a request to add that.

@codecov-io
Copy link

codecov-io commented Nov 7, 2019

Codecov Report

Merging #739 into develop will decrease coverage by 0.02%.
The diff coverage is 92.85%.

Impacted file tree graph

@@             Coverage Diff             @@
##           develop     #739      +/-   ##
===========================================
- Coverage    89.29%   89.26%   -0.03%     
===========================================
  Files          114      115       +1     
  Lines         8955     9011      +56     
  Branches       330      335       +5     
===========================================
+ Hits          7996     8044      +48     
- Misses         959      967       +8
Impacted Files Coverage Δ
...ore/src/main/scala/com/twitter/algebird/Scan.scala 92.85% <92.85%> (ø)
...n/scala/com/twitter/algebird/SuccessibleLaws.scala 78.57% <0%> (-14.29%) ⬇️
...ala/com/twitter/algebird/ApproximateProperty.scala 72% <0%> (-10%) ⬇️
...main/scala/com/twitter/algebird/monad/Reader.scala 50% <0%> (-5.56%) ⬇️
.../main/scala/com/twitter/algebird/Approximate.scala 88.7% <0%> (-1.62%) ⬇️
...re/src/main/scala/com/twitter/algebird/QTree.scala 78.23% <0%> (-1.18%) ⬇️
.../main/scala/com/twitter/algebird/BloomFilter.scala 94.24% <0%> (-0.89%) ⬇️
.../main/scala/com/twitter/algebird/Successible.scala 91.66% <0%> (ø) ⬆️
.../main/scala/com/twitter/algebird/Applicative.scala 61.76% <0%> (+2.94%) ⬆️
... and 2 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2e2bcb0...dbe97f9. Read the comment docs.

@johnynek
Copy link
Collaborator

johnynek commented Nov 7, 2019

also, there is a (not very good) streaming median estimator where you keep track of the current median, and if the value is greater than that, you move it up by a constant, and if it is less than that, you move it down. Since the true median should have about half above as below, this should be a random walk that biases towards the median (so after a long enough warmup, it shouldn't be too far from the median).

@johnynek
Copy link
Collaborator

johnynek commented Nov 7, 2019

https://codecov.io/gh/twitter/algebird/pull/739/diff?src=pr&el=tree#diff-YWxnZWJpcmQtY29yZS9zcmMvbWFpbi9zY2FsYS9jb20vdHdpdHRlci9hbGdlYmlyZC9TY2FuLnNjYWxh

looks like zip is untested as is joinWithIndex Can I talk you into adding tests for those?

@jeff-stripe
Copy link
Contributor Author

@johnynek I think I've addressed all of your comments so far (but welcome more). Also, the example of z-score was totally one original motivation for this abstraction, from way back when I was working on feature generation.

@jeff-stripe
Copy link
Contributor Author

jeff-stripe commented Nov 11, 2019

Unrelated: when I was running tests locally to reproduce the above issue, I ran into the following unrelated test failure:

[info] DecayedVectorProperties:
[info] - DecayedVector[Map[Int, _]] is a monoid *** FAILED ***
[info]   GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
[info]    (CheckProperties.scala:12)
[info]     Falsified after 9 successful property evaluations.
[info]     Location: (CheckProperties.scala:12)
[info]     Occurred when passed generated values (
[info]       arg0 = DecayedVector(Map(0 -> 1.2018386197212909E-281, -2147483648 -> -3.854156922919691E232, -1447110768 -> 9.408465891492915E-167, 1805048232 -> -6.75705880126906E215, 9346750 -> 1.0723063735502226E-173, -420323548 -> -8.302534164574593E-175, -571797472 -> -2.179474851477724E-175, -470808363 -> 3.3072194967626795E-250, 1 -> 2.1209952984836344E229, 2076284047 -> -6.199396905341707E-123, 365574431 -> 1.5142075318601491E184, -1752737747 -> -7.883369021177454E29, 2050082858 -> -2.299358981334486E-15, -1994605430 -> -7.431824108415331E217, -698948772 -> -2.1479418657592524E-191, -1417251516 -> -1.5572134571317626E-254, 241933112 -> 545733.1130589077, 897298981 -> -7.439164205119313E10, -654320593 -> 3.2544666904527896E-292, 887471898 -> 4.069738732277512E125, 373768514 -> 3.432851429494389E63, -932811734 -> -2.0244449228411652E-74, 783480879 -> -8.136720367399272E-279, -1586891494 -> 2.7376160145598414E141, 2147483647 -> -8.574297221458591E-159, -1 -> 1.2104690607925745E246, -1468444249 -> -1.86707430482823E-307, -1009301793 -> 5.918573113974547E148, -1013689345 -> 1.2825959305194982E-110, -1791275591 -> 5.587605356252549E79, 401213385 -> -4.862900034659543E190, -738881469 -> -5.061788993603661E-14, -1913217183 -> 1.7586395366496495E133, -2087846791 -> 7.189097244947145E115, 630320687 -> -4.305922779804356E-304, -1994015926 -> -3.4956222225235067E103, 262178902 -> -1.1507181102048183E43, -612977120 -> 4.0867877449600855E-129, 1761853732 -> -7.897508692485825E301, -1861380341 -> -1.7935785304794388E-177, -1862146545 -> -4.7832258685347225E-94, -1742740609 -> -1.4002257491338442E151, 584865158 -> -9676137.866416803, 349932431 -> -1.0699110976306946E-14),185.95991254209034),
[info]       arg1 = DecayedVector(Map(0 -> -1.0830365240639644E-300, -2147483648 -> -2.520348300233662E-155, -1913741494 -> -2.8893542841978695E-283, -69242322 -> 1.9351525683506846E90, 1508963855 -> 1.44442040727596E-95, 1113162072 -> 9.523002151023972E42, 1 -> -1.0700938964522293E-234, 749585887 -> 4.4074563848912957E-303, 816327294 -> 7.404952094444072E-272, 1539634748 -> 2.429738132317924E110, 1201648438 -> 1.2318638393727931E41, 803892976 -> -2.1408844799849434E42, 132589461 -> -6.724616862088172E237, -426808141 -> -1.7818692446798573E-199, 2070302569 -> -1.9363934480648376E152, -142612373 -> -5.289781829272728E-173, -743939942 -> 4.5267978059340745E-158, 284912769 -> -1.3334534846758688E-109, 721636973 -> 6.284482522880498E-7, 44527593 -> -7.149354645962933E255, -143598766 -> 8.298912027782988E111, 2147483647 -> 79.72079182014002, -1 -> 3.731777436319693E-18, 1764631933 -> -1.3174668462363633E106, 1763610516 -> 3.381906954428537E-63, -1324222957 -> -9.59155501289049E-30, -1413936665 -> 5.083708421676393E273),71.81075800741212),
[info]       arg2 = DecayedVector(Map(-1086016398 -> 1.0231140798022226E-136, 0 -> 3.01344056575156E-219, -2147483648 -> 1.867681696329119E300, 1659086411 -> 6.737487374650964E-225, 564546781 -> -3.0766401510152493E22, 1927521424 -> 1.0814416316955774E-185, 566194627 -> 5.292057531131149E-53, 1 -> -3.065831822976799E228, 524856393 -> -2.8200395537352725E299, 61220771 -> -2.761004362337464E171, 873269565 -> -4.347660905341913E-96, -461054465 -> -1.1321040259930026E35, -1661146768 -> -2.8889830019201256E28, 1994521382 -> -4.768238397220232E-53, 1341602364 -> 3.734494775277793E96, -1174356579 -> 1.2689849505531487E8, 2147483647 -> 1.6877385989217227E124, -1 -> 1.4503547040657538E117, 349094581 -> 4.239784438300051E231, 1417192610 -> 1.0801072326468062E-274, 386338808 -> -9.199277461295296E212, -1609871611 -> 1.068174325090749E-250, 1732598637 -> -4.140681213039893E101),186.93533307523302)
[info]     )
[info]     Label of failing property:
[info]       isAssociativeEq

Should I just file an issue for that one? Or is this some sort of known numerical-precision kind of thing?

@johnynek
Copy link
Collaborator

@jeff-stripe sorry about the flake. I think it is numerical precision, but it does come up periodically.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, agree with your suggestion to move the order in joinWithInput and suggested using Aggregator.append

@jeff-stripe
Copy link
Contributor Author

@johnynek Pushed your suggested changes + new tests (I made them fail first) for replaceState and Scan.const. The only thing left untested is the Applicative (my local codecov says that it's covered, but I don't actually believe that).

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one last minor request, and I'll merge

@jeff-stripe
Copy link
Contributor Author

Done. Thanks for the helpful feedback and thorough review! The result is much better than where it started.

@johnynek johnynek merged commit f756b46 into twitter:develop Nov 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants