-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Import scodec-stream and scodec-protocols in to fs2 #2588
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loving the TimeSeries stuff in its current form, sooo much easier to understand, even though it's just naming changes
final class Scan[S, -I, +O]( | ||
val initial: S, | ||
private val transform_ : AndThen[(S, I), (S, Chunk[O])], | ||
private val onComplete_ : AndThen[S, Chunk[O]] | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be more flexible to replace this class definition with a Scan
trait, that declared the few main methods initial
, transform
and onComplete
, and included any definitions that can be derived from those?
trait Scan[S, -I, +O](
def initial: S
def transform(s: S, i: I): (S, Chunk[O])
def onComplete(s: S): Chunk[O]
/** Chunk form of [[transform]]. */
def transformAccumulate(s: S, c: Chunk[I]): (S, Chunk[O]) =
c.foldLeft(s -> Chunk.empty[O]) { case ((s, acc), i) =>
val (s2, os) = transform(s, i)
(s2, acc ++ os)
}
/** Converts this scan to a pipe. */
def toPipe[F[_]]: Stream[F, I] => Stream[F, O] =
_.pull
.scanChunks(initial)(transformAccumulate)
.flatMap(state => Pull.output(onComplete(state)))
.stream
}
Even some of the combinators, such as map
and contramap
, could be defined on this trait:
/** Returns a new scan which transforms output values using the supplied function. */
def map[O2](f: O => O2): Scan[S, I, O2] = new Scan[S, I, O2] {
def initial = self.initial
def transform(s: S, i: I): (S, Chunk[O2]) = {
val (s1, os) = self.transform(s, i)
s1 -> os.map(f)
}
def onComplete(s: S): Chunk[O] =
self.onComplete(s).map(f)
}
This design would be similar to that of other libraries, such as the entity encoders and decoders of Http4S.
The form that passes the transform_
and onComplete_
would then just be a specific implementation. It may be, of course, that other basic primitives for the Scan can be found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That’s basically what we used to have: https://github.com/scodec/scodec-protocols/blob/main/src/main/scala/scodec/protocols/Transform.scala#L38
I switched to the new version as part of adding stack safety for various operations.
This PR imports the entirety of scodec-stream and scodec-protocols in to FS2. The motivation for this PR are (1) reducing maintenance burden and (2) improving discoverability of various generic combinators.
More specifically, this PR:
fs2.Scan
type, which provides a new primitive for building pure, stateful transformations of stream elements. AScan
can be converted to a pipe and applied to a stream. However, it supports various forms of composition that streams/pipe cannot express, like step-wise input/output transformations (i.e., feeding a single input to a scan and then transforming the values received in response).fs2.timeseries
package tofs2-core
module. This provides theTimeStamped
andTimeSeries
objects.fs2-scodec
module containingfs2.interop.scodec
package with typesStreamDecoder
andStreamEncoder
. This is a port of scodec-stream.fs2-protocols
module, primarily as an example of how to usefs2-scodec
.