Skip to content

Commit

Permalink
feat: implement orElse operator
Browse files Browse the repository at this point in the history
If a source has no elements then elements from an `alternative` source
are emitted to the returned channel. If this source is failed then failure
is passed to the returned channel.

Examples:

  Source.fromValues(1).orElse(Source.fromValues(2, 3)).toList // List(1)
  Source.empty.orElse(Source.fromValues(2, 3)).toList         // List(2, 3)

Closes #37.
  • Loading branch information
geminicaprograms committed Nov 2, 2023
1 parent efd3b9a commit cbfbfd0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
28 changes: 28 additions & 0 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,34 @@ trait SourceOps[+T] { this: Source[T] =>
}
buffer.result()
}

/** If this source has no elements then elements from an `alternative` source are emitted to the returned channel. If this source is
* failed then failure is passed to the returned channel.
*
* @param alternative
* An alternative source of elements used when this source is empty.
* @return
* A source that emits either elements from this source or from `alternative` (when this source is empty).
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* supervised {
* Source.fromValues(1).orElse(Source.fromValues(2, 3)).toList // List(1)
* Source.empty.orElse(Source.fromValues(2, 3)).toList // List(2, 3)
* }
* }}}
*/
def orElse[U >: T](alternative: Source[U])(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
forkDaemon {
receive() match
case ChannelClosed.Done => alternative.pipeTo(c)
case ChannelClosed.Error(r) => c.error(r)
case t: T @unchecked => c.send(t); pipeTo(c)
}
c
}

trait SourceCompanionOps:
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsOrElseTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsOrElseTest extends AnyFlatSpec with Matchers {
behavior of "SourceOps.orElse"

it should "emit elements only from the original source when it is not empty" in supervised {
Source.fromValues(1).orElse(Source.fromValues(2, 3)).toList shouldBe List(1)
}

it should "emit elements only from the alternative source when the original source is empty" in supervised {
Source.empty.orElse(Source.fromValues(2, 3)).toList shouldBe List(2, 3)
}

it should "return failed source when the original source is failed" in supervised {
val failure = new RuntimeException()
Source.failed(failure).orElse(Source.fromValues(2, 3)).receive() shouldBe ChannelClosed.Error(Some(failure))
}
}

0 comments on commit cbfbfd0

Please sign in to comment.