From 1561f386a72f248c492b1bb50141f7367270a5b3 Mon Sep 17 00:00:00 2001 From: Jacek Centkowski Date: Fri, 27 Oct 2023 17:42:28 +0200 Subject: [PATCH] feat: implement `reduce` operator Uses the first and the following (if available) elements from this source and applies function `f` on them. The returned value is used as the next current value and `f` is applied again with the value received from a source. The operation is repeated until the source is drained. This is similar operation to `fold` but it uses the first source element as `zero` e.g.: Source.empty[Int].reduce(_ + _) // throws NoSuchElementException("cannot reduce an empty source") Source.fromValues(1).reduce(_ + _) // 1 val s = Source.fromValues(1, 2) s.reduce(_ + _) // 3 s.receive() // ChannelClosed.Done Note that in case when function `f` thrown an exception the it is propagated up to the caller. --- .../main/scala/ox/channels/SourceOps.scala | 32 +++++++++++ .../ox/channels/SourceOpsReduceTest.scala | 53 +++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 core/src/test/scala/ox/channels/SourceOpsReduceTest.scala diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index da15fff0..390c10b8 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -699,6 +699,38 @@ trait SourceOps[+T] { this: Source[T] => case t: T @unchecked => current = f(current, t); true } current + + /** Uses the first and the following (if available) elements from this source and applies function `f` on them. The returned value is used + * as the next current value and `f` is applied again with the value received from this source. The operation is repeated until this + * source is drained. This is similar operation to [[fold]] but it uses the first source element as `zero`. + * + * @param f + * A binary function (a function that takes two arguments) that is applied to the current and next values received from this source. + * @return + * Combined value retrieved from running function `f` on all source elements in a cumulative manner where result of the previous call + * is used as an input value to the next. + * @throws NoSuchElementException + * When this source is empty. + * @throws ChannelClosedException.Error + * When receiving an element from this source fails. + * @throws exception + * When function `f` throws an `exception` then it is propagated up to the caller. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * supervised { + * Source.empty[Int].reduce(_ + _) // throws NoSuchElementException("cannot reduce an empty source") + * Source.fromValues(1).reduce(_ + _) // 1 + * val s = Source.fromValues(1, 2) + * s.reduce(_ + _) // 3 + * s.receive() // ChannelClosed.Done + * } + * }}} + */ + def reduce[U >: T](f: (U, U) => U): U = + fold(headOption().getOrElse(throw new NoSuchElementException("cannot reduce an empty source")))(f) } trait SourceCompanionOps: diff --git a/core/src/test/scala/ox/channels/SourceOpsReduceTest.scala b/core/src/test/scala/ox/channels/SourceOpsReduceTest.scala new file mode 100644 index 00000000..f31876c5 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsReduceTest.scala @@ -0,0 +1,53 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsReduceTest extends AnyFlatSpec with Matchers { + behavior of "SourceOps.reduce" + + it should "throw NoSuchElementException for reduce over the empty source" in supervised { + the[NoSuchElementException] thrownBy { + Source.empty[Int].reduce(_ + _) + } should have message "cannot reduce an empty source" + } + + it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source + .failed[Int](new RuntimeException("source is broken")) + .reduce(_ + _) + } should have message "java.lang.RuntimeException: source is broken" + } + + it should "throw ChannelClosedException.Error for source failed without exception" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source + .failedWithoutReason[Int]() + .reduce(_ + _) + } + } + + it should "throw exception thrown in `f` when `f` throws" in supervised { + the[RuntimeException] thrownBy { + Source + .fromValues(1, 2) + .reduce((_, _) => throw new RuntimeException("Function `f` is broken")) + } should have message "Function `f` is broken" + } + + it should "return first element from reduce over the single element source" in supervised { + Source.fromValues(1).reduce(_ + _) shouldBe 1 + } + + it should "run reduce over on non-empty source" in supervised { + Source.fromValues(1, 2).reduce(_ + _) shouldBe 3 + } + + it should "drain the source" in supervised { + val s = Source.fromValues(1) + s.reduce(_ + _) shouldBe 1 + s.receive() shouldBe ChannelClosed.Done + } +}