From cbfbfd030f1812d1d18575aee9071d17be6fb534 Mon Sep 17 00:00:00 2001 From: Jacek Centkowski Date: Thu, 2 Nov 2023 19:17:15 +0100 Subject: [PATCH] feat: implement `orElse` operator If a source has no elements then elements from an `alternative` source are emitted to the returned channel. If this source is failed then failure is passed to the returned channel. Examples: Source.fromValues(1).orElse(Source.fromValues(2, 3)).toList // List(1) Source.empty.orElse(Source.fromValues(2, 3)).toList // List(2, 3) Closes #37. --- .../main/scala/ox/channels/SourceOps.scala | 28 +++++++++++++++++++ .../ox/channels/SourceOpsOrElseTest.scala | 22 +++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 core/src/test/scala/ox/channels/SourceOpsOrElseTest.scala diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index e4c42369..620a1c96 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -779,6 +779,34 @@ trait SourceOps[+T] { this: Source[T] => } buffer.result() } + + /** If this source has no elements then elements from an `alternative` source are emitted to the returned channel. If this source is + * failed then failure is passed to the returned channel. + * + * @param alternative + * An alternative source of elements used when this source is empty. + * @return + * A source that emits either elements from this source or from `alternative` (when this source is empty). + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * supervised { + * Source.fromValues(1).orElse(Source.fromValues(2, 3)).toList // List(1) + * Source.empty.orElse(Source.fromValues(2, 3)).toList // List(2, 3) + * } + * }}} + */ + def orElse[U >: T](alternative: Source[U])(using Ox, StageCapacity): Source[U] = + val c = StageCapacity.newChannel[U] + forkDaemon { + receive() match + case ChannelClosed.Done => alternative.pipeTo(c) + case ChannelClosed.Error(r) => c.error(r) + case t: T @unchecked => c.send(t); pipeTo(c) + } + c } trait SourceCompanionOps: diff --git a/core/src/test/scala/ox/channels/SourceOpsOrElseTest.scala b/core/src/test/scala/ox/channels/SourceOpsOrElseTest.scala new file mode 100644 index 00000000..e4195152 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsOrElseTest.scala @@ -0,0 +1,22 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsOrElseTest extends AnyFlatSpec with Matchers { + behavior of "SourceOps.orElse" + + it should "emit elements only from the original source when it is not empty" in supervised { + Source.fromValues(1).orElse(Source.fromValues(2, 3)).toList shouldBe List(1) + } + + it should "emit elements only from the alternative source when the original source is empty" in supervised { + Source.empty.orElse(Source.fromValues(2, 3)).toList shouldBe List(2, 3) + } + + it should "return failed source when the original source is failed" in supervised { + val failure = new RuntimeException() + Source.failed(failure).orElse(Source.fromValues(2, 3)).receive() shouldBe ChannelClosed.Error(Some(failure)) + } +}