diff --git a/kyo-data/shared/src/main/scala/kyo/Chunk.scala b/kyo-data/shared/src/main/scala/kyo/Chunk.scala index b57f99514..3afc48b4b 100644 --- a/kyo-data/shared/src/main/scala/kyo/Chunk.scala +++ b/kyo-data/shared/src/main/scala/kyo/Chunk.scala @@ -451,8 +451,11 @@ object Chunk: * @return * a new Chunk.Indexed containing the elements from the Array */ - def from[A <: AnyRef](values: Array[A]): Chunk.Indexed[A] = - Compact(Arrays.copyOf(values, values.length)) + def from[A](values: Array[A]): Chunk.Indexed[A] = + if values.isEmpty then cachedEmpty.asInstanceOf[Chunk.Indexed[A]] + else + Compact(Array.copyAs(values, values.length)(using ClassTag.AnyRef).asInstanceOf[Array[A]]) + end from /** Creates a Chunk from a Seq of elements. * diff --git a/kyo-prelude/shared/src/main/scala/kyo/Stream.scala b/kyo-prelude/shared/src/main/scala/kyo/Stream.scala index 22cce5503..65dfad807 100644 --- a/kyo-prelude/shared/src/main/scala/kyo/Stream.scala +++ b/kyo-prelude/shared/src/main/scala/kyo/Stream.scala @@ -4,6 +4,7 @@ import kyo.Emit.Ack import kyo.Emit.Ack.* import kyo.Tag import kyo.kernel.ArrowEffect +import scala.annotation.nowarn import scala.annotation.targetName /** Represents a stream of values of type `V` with effects of type `S`. @@ -19,13 +20,13 @@ import scala.annotation.targetName * @param v * The effect that produces acknowledgments and emits chunks of values */ -final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): +sealed abstract class Stream[V, -S]: /** Returns the effect that produces acknowledgments and emits chunks of values. */ - def emit: Ack < (Emit[Chunk[V]] & S) = v + def emit: Ack < (Emit[Chunk[V]] & S) private def continue[S2](f: Int => Ack < (Emit[Chunk[V]] & S & S2))(using Frame): Stream[V, S & S2] = - Stream(v.map { + Stream(emit.map { case Stop => Stop case Continue(n) => f(n) }) @@ -63,7 +64,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): tagV2: Tag[Emit[Chunk[V2]]], frame: Frame ): Stream[V2, S & S2] = - Stream[V2, S & S2](ArrowEffect.handleState(tagV, (), v)( + Stream[V2, S & S2](ArrowEffect.handleState(tagV, (), emit)( [C] => (input, _, cont) => if input.isEmpty then @@ -85,7 +86,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): tagV2: Tag[Emit[Chunk[V2]]], frame: Frame ): Stream[V2, S & S2 & S3] = - Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), v)( + Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), emit)( [C] => (input, _, cont) => Kyo.foldLeft(input)(Continue(): Ack) { (ack, v) => @@ -108,7 +109,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): tagV2: Tag[Emit[Chunk[V2]]], frame: Frame ): Stream[V2, S & S2 & S3] = - Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), v)( + Stream[V2, S & S2 & S3](ArrowEffect.handleState(tagV, (), emit)( [C] => (input, _, cont) => if input.isEmpty then @@ -118,7 +119,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): )) private def discard(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] = - Stream(ArrowEffect.handle(tag, v)( + Stream(ArrowEffect.handle(tag, emit)( [C] => (input, cont) => cont(Stop) )) @@ -132,7 +133,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): def take(n: Int)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] = if n <= 0 then discard else - Stream[V, S](ArrowEffect.handleState(tag, n, v)( + Stream[V, S](ArrowEffect.handleState(tag, n, emit)( [C] => (input, state, cont) => if state == 0 then @@ -153,7 +154,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): def drop(n: Int)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] = if n <= 0 then this else - Stream[V, S](ArrowEffect.handleState(tag, n, v)( + Stream[V, S](ArrowEffect.handleState(tag, n, emit)( [C] => (input, state, cont) => if state == 0 then @@ -172,7 +173,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): * A new stream containing elements that satisfy the predicate */ def takeWhile[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] = - Stream[V, S & S2](ArrowEffect.handleState(tag, true, v)( + Stream[V, S & S2](ArrowEffect.handleState(tag, true, emit)( [C] => (input, state, cont) => if !state then (false, cont(Stop)) @@ -191,7 +192,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): * A new stream with initial elements that satisfy the predicate removed */ def dropWhile[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] = - Stream[V, S & S2](ArrowEffect.handleState(tag, true, v)( + Stream[V, S & S2](ArrowEffect.handleState(tag, true, emit)( [C] => (input, state, cont) => if state then @@ -211,7 +212,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): * A new stream containing only elements that satisfy the predicate */ def filter[S2](f: V => Boolean < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S & S2] = - Stream[V, S & S2](ArrowEffect.handleState(tag, (), v)( + Stream[V, S & S2](ArrowEffect.handleState(tag, (), emit)( [C] => (input, _, cont) => Kyo.filter(input)(f).map { c => @@ -247,7 +248,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): */ @targetName("changesMaybe") def changes(first: Maybe[V])(using tag: Tag[Emit[Chunk[V]]], frame: Frame, ce: CanEqual[V, V]): Stream[V, S] = - Stream[V, S](ArrowEffect.handleState(tag, first, v)( + Stream[V, S](ArrowEffect.handleState(tag, first, emit)( [C] => (input, state, cont) => val c = input.changes(state) @@ -264,7 +265,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): * A unit effect that runs the stream without collecting results */ def runDiscard(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Unit < S = - ArrowEffect.handle(tag, v.unit)( + ArrowEffect.handle(tag, emit.unit)( [C] => (input, cont) => cont(Continue()) ) @@ -286,7 +287,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): * A unit effect that runs the stream and applies f to each chunk */ def runForeachChunk[S2](f: Chunk[V] => Unit < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Unit < (S & S2) = - ArrowEffect.handle(tag, v.unit)( + ArrowEffect.handle(tag, emit.unit)( [C] => (input, cont) => if !input.isEmpty then @@ -305,7 +306,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): * The final accumulated value */ def runFold[A, S2](acc: A)(f: (A, V) => A < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): A < (S & S2) = - ArrowEffect.handleState(tag, acc, v)( + ArrowEffect.handleState(tag, acc, emit)( handle = [C] => (input, state, cont) => Kyo.foldLeft(input)(state)(f).map((_, cont(Continue()))), @@ -318,7 +319,7 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): * A chunk containing all values emitted by the stream */ def run(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Chunk[V] < S = - ArrowEffect.handleState(tag, Chunk.empty[Chunk[V]], v)( + ArrowEffect.handleState(tag, Chunk.empty[Chunk[V]], emit)( handle = [C] => (input, state, cont) => (state.append(input), cont(Continue())), @@ -328,14 +329,31 @@ final case class Stream[V, -S](v: Ack < (Emit[Chunk[V]] & S)): end Stream object Stream: + @nowarn("msg=anonymous") + inline def apply[V, S](inline v: => Ack < (Emit[Chunk[V]] & S)): Stream[V, S] = + new Stream[V, S]: + def emit: Ack < (Emit[Chunk[V]] & S) = v private val _empty = Stream(Stop) def empty[V]: Stream[V, Any] = _empty.asInstanceOf[Stream[V, Any]] - def init[V, S](seq: Seq[V] < S)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] = + /** The default chunk size for streams. */ + inline def DefaultChunkSize: Int = 4096 + + /** Creates a stream from a sequence of values. + * + * @param v + * The effect returning a sequence of values + * @param chunkSize + * The size of chunks to emit (default: 4096). Supplying a negative value will result in a chunk size of 1. + * @return + * A stream of values from the sequence + */ + def init[V, S](v: => Seq[V] < S, chunkSize: Int = DefaultChunkSize)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] = Stream[V, S]( - seq.map { seq => + v.map { seq => val chunk: Chunk[V] = Chunk.from(seq) + val _chunkSize = chunkSize max 1 Emit.andMap(Chunk.empty[V]) { ack => Loop(chunk, ack) { (c, ack) => ack match @@ -343,10 +361,59 @@ object Stream: Loop.done(Stop) case Continue(n) => if c.isEmpty then Loop.done(Ack.Continue()) - else Emit.andMap(c.take(n))(ack => Loop.continue(c.dropLeft(n), ack)) + else + val i = n min _chunkSize + Emit.andMap(c.take(i))(ack => Loop.continue(c.dropLeft(i), ack)) } } } ) + /** Creates a stream of integers from start (inclusive) to end (exclusive). + * + * @param start + * The starting value (inclusive) + * @param end + * The ending value (exclusive) + * @param step + * The step size (default: 1) + * @param chunkSize + * The size of chunks to emit (default: 4096) + * @return + * A stream of integers within the specified range + */ + def range[S](start: Int, end: Int, step: Int = 1, chunkSize: Int = DefaultChunkSize)(using + tag: Tag[Emit[Chunk[Int]]], + frame: Frame + ): Stream[Int, S] = + if step == 0 || (start < end && step < 0) || (start > end && step > 0) then empty + else + Stream[Int, S] { + val _chunkSize = chunkSize max 1 + + Emit.andMap(Chunk.empty[Int]) { ack => + Loop(start, ack) { (current, ack) => + ack match + case Stop => + Loop.done(Stop) + case Continue(n) => + val continue = + if step > 0 then current < end + else current > end + + if !continue then Loop.done(Stop) + else + val remaining = + if step > 0 then + ((end - current - 1) / step).abs + 1 + else + ((current - end - 1) / step.abs).abs + 1 + val size = (n min _chunkSize) min remaining + val chunk = Chunk.from(Range(current, current + size * step, step)) + Emit.andMap(chunk)(ack => Loop.continue(current + step * size, ack)) + end if + } + } + } + end Stream diff --git a/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala b/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala index ba045b23c..ff3df0fd1 100644 --- a/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala +++ b/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala @@ -26,6 +26,30 @@ class StreamTest extends Test: Stream.init(Seq(1, 2, 3)).run.eval == Seq(1, 2, 3) ) } + + "lazy" in { + var i = 0 + val _ = Stream.init { + i += 1 + Seq.empty[Int] + } + + assert(i == 0) + } + + "chunk size" in { + def size(n: Int, c: Int): Chunk[Int] = + Var.runTuple(Chunk.empty[Int])( + Stream + .init(Seq.fill(n)(""), chunkSize = c) + .mapChunk(chunk => Var.update[Chunk[Int]](_.append(chunk.size))) + .runDiscard + ).eval._1 + + assert(size(10240, 4096) == Chunk(4096, 4096, 2048)) + assert(size(301, 100) == Chunk(100, 100, 100, 1)) + assert(size(5, 0) == Chunk(1, 1, 1, 1, 1)) + } } "initChunk" - { @@ -42,6 +66,52 @@ class StreamTest extends Test: } } + "range" - { + "empty" in { + assert(Stream.range(0, 0).run.eval == Seq.empty) + } + + "negative" in { + assert(Stream.range(0, -10).run.eval == (0 until -10)) + assert(Stream.range(0, -10, step = -1).run.eval == (0 until -10 by -1)) + } + + "positive" in { + assert(Stream.range(0, 1024).run.eval == (0 until 1024)) + } + + "step" - { + "zero" in { + assert(Stream.range(0, 1024, 0).run.eval == Seq.empty) + } + + "positive" in { + assert(Stream.range(0, 1024, 2).run.eval == (0 until 1024 by 2)) + } + + "negative" in { + assert(Stream.range(0, -10, -2).run.eval == (0 until -10 by -2)) + } + } + + "chunk size" in { + def size(n: Int, c: Int): Chunk[Int] = + Var.runTuple(Chunk.empty[Int])( + Stream + .range(0, n, chunkSize = c) + .mapChunk(chunk => Var.update[Chunk[Int]](_.append(chunk.size))) + .runDiscard + ).eval._1 + + assert(size(10240, 4096) == Chunk(4096, 4096, 2048)) + assert(size(301, 100) == Chunk(100, 100, 100, 1)) + } + + "stack safety" in { + assert(Stream.range(0, 1000000).take(5).run.eval == (0 until 5)) + } + } + "take" - { "zero" in { assert(