diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index ccfa88a8..b9650ada 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -2,6 +2,7 @@ package ox.channels import ox.* +import java.util import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.{IterableOnce, mutable} import scala.concurrent.duration.FiniteDuration @@ -731,6 +732,51 @@ trait SourceOps[+T] { this: Source[T] => */ def reduce[U >: T](f: (U, U) => U): U = fold(headOption().getOrElse(throw new NoSuchElementException("cannot reduce an empty source")))(f) + + /** Returns the list of up to `n` last elements from this source. Less than `n` elements is returned when this source contains less + * elements than requested. The [[List.empty]] is returned when `takeLast` is called on an empty source. + * + * @param n + * Number of elements to be taken from the end of this source. It is expected that `n >= 0`. + * @return + * A list of up to `n` last elements from this source. + * @throws ChannelClosedException.Error + * When receiving an element from this source fails. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * supervised { + * Source.empty[Int].takeLast(5) // List.empty + * Source.fromValues(1).takeLast(0) // List.empty + * Source.fromValues(1).takeLast(2) // List(1) + * val s = Source.fromValues(1, 2, 3, 4) + * s.takeLast(2) // List(4, 5) + * s.receive() // ChannelClosed.Done + * } + * }}} + */ + def takeLast(n: Int): List[T] = + require(n >= 0, "n must be >= 0") + if (n == 0) + drain() + List.empty + else if (n == 1) lastOption().map(List(_)).getOrElse(List.empty) + else + supervised { + val buffer: mutable.ListBuffer[T] = mutable.ListBuffer() + buffer.sizeHint(n) + repeatWhile { + receive() match + case ChannelClosed.Done => false + case e: ChannelClosed.Error => throw e.toThrowable + case t: T @unchecked => + if (buffer.size == n) buffer.dropInPlace(1) + buffer.append(t); true + } + buffer.result() + } } trait SourceCompanionOps: diff --git a/core/src/test/scala/ox/channels/SourceOpsTakeLastTest.scala b/core/src/test/scala/ox/channels/SourceOpsTakeLastTest.scala new file mode 100644 index 00000000..9c7ba7be --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsTakeLastTest.scala @@ -0,0 +1,53 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsTakeLastTest extends AnyFlatSpec with Matchers { + behavior of "SourceOps.takeLast" + + it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source + .failed[Int](new RuntimeException("source is broken")) + .takeLast(1) + } should have message "java.lang.RuntimeException: source is broken" + } + + it should "throw ChannelClosedException.Error for source failed without exception" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source + .failedWithoutReason[Int]() + .takeLast(1) + } + } + + it should "fail to takeLast when n < 0" in supervised { + the[IllegalArgumentException] thrownBy { + Source.empty[Int].takeLast(-1) + } should have message "requirement failed: n must be >= 0" + } + + it should "return empty list for the empty source" in supervised { + Source.empty[Int].takeLast(1) shouldBe List.empty + } + + it should "return empty list when n == 0 and list is not empty" in supervised { + Source.fromValues(1).takeLast(0) shouldBe List.empty + } + + it should "return list with all elements if the source is smaller than requested number" in supervised { + Source.fromValues(1, 2).takeLast(3) shouldBe List(1, 2) + } + + it should "return the last n elements from the source" in supervised { + Source.fromValues(1, 2, 3, 4, 5).takeLast(2) shouldBe List(4, 5) + } + + it should "drain the source" in supervised { + val s = Source.fromValues(1) + s.takeLast(1) shouldBe List(1) + s.receive() shouldBe ChannelClosed.Done + } +}