Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify error handling #29

Merged
merged 9 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,76 @@ exception being thrown there.
Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the
exception)?

No: the exception should cause the containing scope to finish (or a supervised fork to fail), cancelling any forks
that are operating in the background. Any unused channels can then be garbage-collected.
It depends on two factors:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would mention beforehand that there are two possible designs, of which we picked one, as otherwise this might make the impression that error handling in Ox is dependent on the factors below (which it isn't)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact ... such a discussion would be better placed in and ADR. Maybe this should be moved there? ADRs can be as simple as a text file, e.g. here: https://github.com/softwaremill/tapir/tree/master/doc/adr

The docs can then simply state the current state of affairs, keeping the design discussions separate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, moved the discussion to an ADR.

- whether there are any forks running in parallel to the failed one,
- whether you only signal the exception downstream, or also choose to re-throw it.

If there's only a single fork running at a time, it would terminate processing anyway, so it's enough to signal the exception to the downstream.

If there are multiple forks running in parallel, there are two possible scenarios:
1. If you choose to re-throw the exception, it should cause the containing scope to finish (or a supervised fork to fail),
cancelling any forks that are operating in the background. Any unused channels can then be garbage-collected.
2. If you choose not to re-throw, the forks running in parallel would be allowed to complete normally (unless the containing scope is finished for another reason).

Internally, for the built-in `Source` operators, we took the latter approach, i.e. we chose not to re-throw and let the parrallel forks complete normally.
However, we keep in mind that they might not be able to send to downstream channel anymore - since the downstream might already be closed by the failing fork.

### Example

Let's have a look at the error handling in `Source.mapParUnordered` to demonstrate our approach. This operator applies a mapping function to a given number of elements in parallel, and is implemented as follows:

```scala
def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
val s = new Semaphore(parallelism)
forkDaemon {
supervised { // (1)
repeatWhile {
s.acquire()
receive() match
case ChannelClosed.Done => false
case e @ ChannelClosed.Error(r) => // (2)
c.error(r)
false
case t: T @unchecked =>
fork { // (3)
try
c.send(f(t)) // (4)
s.release()
catch case t: Throwable => c.error(t) // (5)
}
true
}
}
c.done()
}
c
```

It first creates a `supervised` scope (1), i.e. one that only completes (on the happy path) when all
non-daemon supervised forks complete. The mapping function `f` is then run in parallel using non-daemon `fork`s (3).

Let's assume an input `Source` with 4 elements, and `parallelism` set to 2:

```scala
val input: Source[Int] = Source.fromValues(1, 2, 3, 4)
def f(i: Int): Int = if ()

val result: Source[Int] = input.mapParUnordered(2)(f)
```

Let's also assume that the mapping function `f` is an identity with a fixed delay, but it's going to fail
immediately (by throwing an exception) when it processes the third element.

In this scenario, the first 2-element batch would successfully process elements `1` and `2`, and emit them
downstream (i.e. to the `result` source). Then the forks processing of `3` and `4` would start in parallel.
While `4` would still be processed (due to the delay in `f`), the fork processing `3` would immediately
throw an exception, which would be caught (5). Consequently, the downstream channel `c` would be closed
with an error, but the fork processing `4` would remain running. Whenever the fork processing `4` is done
executing `f`, its attempt to `c.send` (4) will fail silently - due to `c` being already closed.
Eventually, no results from the second batch would be send downstream.

The role of the exception handler is then to re-create the entire processing pipeline, or escalate the error further.
The sequence of events would be similar if it was the upstream (rather than `f`) that failed, i.e. when `receive()` resulted in an error (2).

## Backpressure

Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ trait SourceOps[+T] { this: Source[T] =>

private def mapParScope[U](parallelism: Int, c2: Channel[U], f: T => U): Unit =
val s = new Semaphore(parallelism)
val inProgress = Channel[Fork[U]](parallelism)
val inProgress = Channel[Fork[Option[U]]](parallelism)
val closeScope = new CountDownLatch(1)
scoped {
// enqueueing fork
Expand All @@ -142,12 +142,12 @@ trait SourceOps[+T] { this: Source[T] =>
try
val u = f(t)
s.release() // not in finally, as in case of an exception, no point in starting subsequent forks
u
Some(u)
catch
case t: Throwable =>
c2.error(t)
closeScope.countDown()
throw t
None
})
true
}
Expand All @@ -157,8 +157,9 @@ trait SourceOps[+T] { this: Source[T] =>
fork {
repeatWhile {
inProgress.receive() match
case f: Fork[U] @unchecked =>
c2.send(f.join()).isValue
case f: Fork[Option[U]] @unchecked =>
f.join().foreach(c2.send)
rucek marked this conversation as resolved.
Show resolved Hide resolved
true
case ChannelClosed.Done =>
closeScope.countDown()
c2.done()
Expand Down Expand Up @@ -395,7 +396,7 @@ trait SourceOps[+T] { this: Source[T] =>
repeatWhile {
receive() match
case ChannelClosed.Done => sink.done(); false
case e: ChannelClosed.Error => sink.error(e.reason); throw e.toThrowable
case e: ChannelClosed.Error => sink.error(e.reason); false
case t: T @unchecked => sink.send(t).orThrow; true
}

Expand Down