Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prelude] make Stream lazier #851

Merged
merged 5 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions kyo-data/shared/src/main/scala/kyo/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,11 @@ object Chunk:
* @return
* a new Chunk.Indexed containing the elements from the Array
*/
def from[A <: AnyRef](values: Array[A]): Chunk.Indexed[A] =
Compact(Arrays.copyOf(values, values.length))
def from[A](values: Array[A]): Chunk.Indexed[A] =
if values.isEmpty then cachedEmpty.asInstanceOf[Chunk.Indexed[A]]
else
Compact(Array.copyAs(values, values.length)(using ClassTag.AnyRef).asInstanceOf[Array[A]])
end from

/** Creates a Chunk from a Seq of elements.
*
Expand Down
107 changes: 87 additions & 20 deletions kyo-prelude/shared/src/main/scala/kyo/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kyo.Emit.Ack
import kyo.Emit.Ack.*
import kyo.Tag
import kyo.kernel.ArrowEffect
import scala.annotation.nowarn
import scala.annotation.targetName

/** Represents a stream of values of type `V` with effects of type `S`.
Expand All @@ -19,13 +20,13 @@ import scala.annotation.targetName
* @param v
* The effect that produces acknowledgments and emits chunks of values
*/
final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
sealed abstract class Stream[V, -S]:

/** Returns the effect that produces acknowledgments and emits chunks of values. */
def emit: Ack < (Emit[Chunk[V]] & S) = v
def emit: Ack < (Emit[Chunk[V]] & S)

private def continue[S2](f: Int => Ack < (Emit[Chunk[V]] & S & S2))(using Frame): Stream[V, S & S2] =
Stream(v.map {
Stream(emit.map {
case Stop => Stop
case Continue(n) => f(n)
})
Expand Down Expand Up @@ -63,7 +64,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
tagV2: Tag[Emit[Chunk[V2]]],
frame: Frame
): Stream[V2, S & S2] =
Stream[V2, S & S2](ArrowEffect.handleState(tagV, (), v)(
Stream[V2, S & S2](ArrowEffect.handleState(tagV, (), emit)(
[C] =>
(input, _, cont) =>
if input.isEmpty then
Expand All @@ -85,7 +86,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
tagV2: Tag[Emit[Chunk[V2]]],
frame: Frame
): Stream[V2, S & S2 & S3] =
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), v)(
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), emit)(
[C] =>
(input, _, cont) =>
Kyo.foldLeft(input)(Continue(): Ack) { (ack, v) =>
Expand All @@ -108,7 +109,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
tagV2: Tag[Emit[Chunk[V2]]],
frame: Frame
): Stream[V2, S & S2 & S3] =
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), v)(
Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), emit)(
[C] =>
(input, _, cont) =>
if input.isEmpty then
Expand All @@ -118,7 +119,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
))

private def discard(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
Stream(ArrowEffect.handle(tag, v)(
Stream(ArrowEffect.handle(tag, emit)(
[C] => (input, cont) => cont(Stop)
))

Expand All @@ -132,7 +133,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
def take(n: Int)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
if n <= 0 then discard
else
Stream[V, S](ArrowEffect.handleState(tag, n, v)(
Stream[V, S](ArrowEffect.handleState(tag, n, emit)(
[C] =>
(input, state, cont) =>
if state == 0 then
Expand All @@ -153,7 +154,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
def drop(n: Int)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
if n <= 0 then this
else
Stream[V, S](ArrowEffect.handleState(tag, n, v)(
Stream[V, S](ArrowEffect.handleState(tag, n, emit)(
[C] =>
(input, state, cont) =>
if state == 0 then
Expand All @@ -172,7 +173,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A new stream containing elements that satisfy the predicate
*/
def takeWhile[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] =
Stream[V, S & S2](ArrowEffect.handleState(tag, true, v)(
Stream[V, S & S2](ArrowEffect.handleState(tag, true, emit)(
[C] =>
(input, state, cont) =>
if !state then (false, cont(Stop))
Expand All @@ -191,7 +192,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A new stream with initial elements that satisfy the predicate removed
*/
def dropWhile[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] =
Stream[V, S & S2](ArrowEffect.handleState(tag, true, v)(
Stream[V, S & S2](ArrowEffect.handleState(tag, true, emit)(
[C] =>
(input, state, cont) =>
if state then
Expand All @@ -211,7 +212,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A new stream containing only elements that satisfy the predicate
*/
def filter[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] =
Stream[V, S & S2](ArrowEffect.handleState(tag, (), v)(
Stream[V, S & S2](ArrowEffect.handleState(tag, (), emit)(
[C] =>
(input, _, cont) =>
Kyo.filter(input)(f).map { c =>
Expand Down Expand Up @@ -247,7 +248,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
*/
@targetName("changesMaybe")
def changes(first: Maybe[V])(using tag: Tag[Emit[Chunk[V]]], frame: Frame, ce: CanEqual[V, V]): Stream[V, S] =
Stream[V, S](ArrowEffect.handleState(tag, first, v)(
Stream[V, S](ArrowEffect.handleState(tag, first, emit)(
[C] =>
(input, state, cont) =>
val c = input.changes(state)
Expand All @@ -264,7 +265,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A unit effect that runs the stream without collecting results
*/
def runDiscard(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Unit < S =
ArrowEffect.handle(tag, v.unit)(
ArrowEffect.handle(tag, emit.unit)(
[C] => (input, cont) => cont(Continue())
)

Expand All @@ -286,7 +287,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A unit effect that runs the stream and applies f to each chunk
*/
def runForeachChunk[S2](f: Chunk[V] => Unit < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Unit < (S & S2) =
ArrowEffect.handle(tag, v.unit)(
ArrowEffect.handle(tag, emit.unit)(
[C] =>
(input, cont) =>
if !input.isEmpty then
Expand All @@ -305,7 +306,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* The final accumulated value
*/
def runFold[A, S2](acc: A)(f: (A, V) => A < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): A < (S & S2) =
ArrowEffect.handleState(tag, acc, v)(
ArrowEffect.handleState(tag, acc, emit)(
handle = [C] =>
(input, state, cont) =>
Kyo.foldLeft(input)(state)(f).map((_, cont(Continue()))),
Expand All @@ -318,7 +319,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
* A chunk containing all values emitted by the stream
*/
def run(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Chunk[V] < S =
ArrowEffect.handleState(tag, Chunk.empty[Chunk[V]], v)(
ArrowEffect.handleState(tag, Chunk.empty[Chunk[V]], emit)(
handle = [C] =>
(input, state, cont) =>
(state.append(input), cont(Continue())),
Expand All @@ -328,25 +329,91 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)):
end Stream

object Stream:
@nowarn("msg=anonymous")
inline def apply[V, S](inline v: => Ack < (Emit[Chunk[V]] & S)): Stream[V, S] =
new Stream[V, S]:
def emit: Ack < (Emit[Chunk[V]] & S) = v

private val _empty = Stream(Stop)
def empty[V]: Stream[V, Any] = _empty.asInstanceOf[Stream[V, Any]]

def init[V, S](seq: Seq[V] < S)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
/** The default chunk size for streams. */
inline def DefaultChunkSize: Int = 4096

/** Creates a stream from a sequence of values.
*
* @param v
* The effect returning a sequence of values
* @param chunkSize
* The size of chunks to emit (default: 4096). Supplying a negative value will result in a chunk size of 1.
* @return
* A stream of values from the sequence
*/
def init[V, S](v: => Seq[V] < S, chunkSize: Int = DefaultChunkSize)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
Stream[V, S](
seq.map { seq =>
v.map { seq =>
val chunk: Chunk[V] = Chunk.from(seq)
val _chunkSize = chunkSize max 1
Emit.andMap(Chunk.empty[V]) { ack =>
Loop(chunk, ack) { (c, ack) =>
ack match
case Stop =>
Loop.done(Stop)
case Continue(n) =>
if c.isEmpty then Loop.done(Ack.Continue())
else Emit.andMap(c.take(n))(ack => Loop.continue(c.dropLeft(n), ack))
else
val i = n min _chunkSize
Emit.andMap(c.take(i))(ack => Loop.continue(c.dropLeft(i), ack))
}
}
}
)

/** Creates a stream of integers from start (inclusive) to end (exclusive).
*
* @param start
* The starting value (inclusive)
* @param end
* The ending value (exclusive)
* @param step
* The step size (default: 1)
* @param chunkSize
* The size of chunks to emit (default: 4096)
* @return
* A stream of integers within the specified range
*/
def range[S](start: Int, end: Int, step: Int = 1, chunkSize: Int = DefaultChunkSize)(using
tag: Tag[Emit[Chunk[Int]]],
frame: Frame
): Stream[Int, S] =
if step == 0 || (start < end && step < 0) || (start > end && step > 0) then empty
else
Stream[Int, S] {
val _chunkSize = chunkSize max 1

Emit.andMap(Chunk.empty[Int]) { ack =>
Loop(start, ack) { (current, ack) =>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of this loop can probably be simplified. I can take a look later.

ack match
case Stop =>
Loop.done(Stop)
case Continue(n) =>
val continue =
if step > 0 then current < end
else current > end

if !continue then Loop.done(Stop)
else
val remaining =
if step > 0 then
((end - current - 1) / step).abs + 1
else
((current - end - 1) / step.abs).abs + 1
val size = (n min _chunkSize) min remaining
val chunk = Chunk.from(Range(current, current + size * step, step))
Emit.andMap(chunk)(ack => Loop.continue(current + step * size, ack))
end if
}
}
}

end Stream
70 changes: 70 additions & 0 deletions kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ class StreamTest extends Test:
Stream.init(Seq(1, 2, 3)).run.eval == Seq(1, 2, 3)
)
}

"lazy" in {
var i = 0
val _ = Stream.init {
i += 1
Seq.empty[Int]
}

assert(i == 0)
}

"chunk size" in {
def size(n: Int, c: Int): Chunk[Int] =
Var.runTuple(Chunk.empty[Int])(
Stream
.init(Seq.fill(n)(""), chunkSize = c)
.mapChunk(chunk => Var.update[Chunk[Int]](_.append(chunk.size)))
.runDiscard
).eval._1

assert(size(10240, 4096) == Chunk(4096, 4096, 2048))
assert(size(301, 100) == Chunk(100, 100, 100, 1))
assert(size(5, 0) == Chunk(1, 1, 1, 1, 1))
}
}

"initChunk" - {
Expand All @@ -42,6 +66,52 @@ class StreamTest extends Test:
}
}

"range" - {
"empty" in {
assert(Stream.range(0, 0).run.eval == Seq.empty)
}

"negative" in {
assert(Stream.range(0, -10).run.eval == (0 until -10))
assert(Stream.range(0, -10, step = -1).run.eval == (0 until -10 by -1))
}

"positive" in {
assert(Stream.range(0, 1024).run.eval == (0 until 1024))
}

"step" - {
"zero" in {
assert(Stream.range(0, 1024, 0).run.eval == Seq.empty)
}

"positive" in {
assert(Stream.range(0, 1024, 2).run.eval == (0 until 1024 by 2))
}

"negative" in {
assert(Stream.range(0, -10, -2).run.eval == (0 until -10 by -2))
}
}

"chunk size" in {
def size(n: Int, c: Int): Chunk[Int] =
Var.runTuple(Chunk.empty[Int])(
Stream
.range(0, n, chunkSize = c)
.mapChunk(chunk => Var.update[Chunk[Int]](_.append(chunk.size)))
.runDiscard
).eval._1

assert(size(10240, 4096) == Chunk(4096, 4096, 2048))
assert(size(301, 100) == Chunk(100, 100, 100, 1))
}

"stack safety" in {
assert(Stream.range(0, 1000000).take(5).run.eval == (0 until 5))
}
}

"take" - {
"zero" in {
assert(
Expand Down
Loading