Skip to content

Commit

Permalink
feat: introduce takeLast(n) operator
Browse files Browse the repository at this point in the history
Returns the list of up to `n` last elements from this source. Less than
`n` elements is returned when this source contains les elements than requested.
The [[List.empty]] is returned when `takeLast` is called on an empty source.

Example:

  Source.empty[Int].takeLast(5) // List.empty
  Source.fromValues(1).takeLast(0) // List.empty
  Source.fromValues(1).takeLast(2) // List(1)
  val s = Source.fromValues(1, 2, 3, 4)
  s.takeLast(2) // List(4, 5)
  s.receive() // ChannelClosed.Done
  • Loading branch information
geminicaprograms committed Oct 31, 2023
1 parent c7710e2 commit fc367fe
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
46 changes: 46 additions & 0 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ox.channels

import ox.*

import java.util
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -731,6 +732,51 @@ trait SourceOps[+T] { this: Source[T] =>
*/
def reduce[U >: T](f: (U, U) => U): U =
fold(headOption().getOrElse(throw new NoSuchElementException("cannot reduce an empty source")))(f)

/** Returns the list of up to `n` last elements from this source. Less than `n` elements is returned when this source contains less
* elements than requested. The [[List.empty]] is returned when `takeLast` is called on an empty source.
*
* @param n
* Number of elements to be taken from the end of this source. It is expected that `n >= 0`.
* @return
* A list of up to `n` last elements from this source.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* supervised {
* Source.empty[Int].takeLast(5) // List.empty
* Source.fromValues(1).takeLast(0) // List.empty
* Source.fromValues(1).takeLast(2) // List(1)
* val s = Source.fromValues(1, 2, 3, 4)
* s.takeLast(2) // List(4, 5)
* s.receive() // ChannelClosed.Done
* }
* }}}
*/
def takeLast(n: Int): List[T] =
require(n >= 0, "n must be >= 0")
if (n == 0)
drain()
List.empty
else if (n == 1) lastOption().map(List(_)).getOrElse(List.empty)
else
supervised {
val buffer: mutable.ListBuffer[T] = mutable.ListBuffer()
buffer.sizeHint(n)
repeatWhile {
receive() match
case ChannelClosed.Done => false
case e: ChannelClosed.Error => throw e.toThrowable
case t: T @unchecked =>
if (buffer.size == n) buffer.dropInPlace(1)
buffer.append(t); true
}
buffer.result()
}
}

trait SourceCompanionOps:
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTakeLastTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsTakeLastTest extends AnyFlatSpec with Matchers {
behavior of "SourceOps.takeLast"

it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed[Int](new RuntimeException("source is broken"))
.takeLast(1)
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failedWithoutReason[Int]()
.takeLast(1)
}
}

it should "fail to takeLast when n < 0" in supervised {
the[IllegalArgumentException] thrownBy {
Source.empty[Int].takeLast(-1)
} should have message "requirement failed: n must be >= 0"
}

it should "return empty list for the empty source" in supervised {
Source.empty[Int].takeLast(1) shouldBe List.empty
}

it should "return empty list when n == 0 and list is not empty" in supervised {
Source.fromValues(1).takeLast(0) shouldBe List.empty
}

it should "return list with all elements if the source is smaller than requested number" in supervised {
Source.fromValues(1, 2).takeLast(3) shouldBe List(1, 2)
}

it should "return the last n elements from the source" in supervised {
Source.fromValues(1, 2, 3, 4, 5).takeLast(2) shouldBe List(4, 5)
}

it should "drain the source" in supervised {
val s = Source.fromValues(1)
s.takeLast(1) shouldBe List(1)
s.receive() shouldBe ChannelClosed.Done
}
}

0 comments on commit fc367fe

Please sign in to comment.