Skip to content

Commit

Permalink
Move discussion on propagating errors to ADR
Browse files Browse the repository at this point in the history
  • Loading branch information
rucek committed Oct 30, 2023
1 parent d17a43e commit f79c6bd
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 73 deletions.
76 changes: 3 additions & 73 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -625,79 +625,9 @@ There can be at most one default clause in a `select` invocation.
Errors are only propagated downstream, ultimately reaching the point where the source is discharged, leading to an
exception being thrown there.

Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the
exception)?

It depends on two factors:
- whether there are any forks running in parallel to the failed one,
- whether you only signal the exception downstream, or also choose to re-throw it.

If there's only a single fork running at a time, it would terminate processing anyway, so it's enough to signal the exception to the downstream.

If there are multiple forks running in parallel, there are two possible scenarios:
1. If you choose to re-throw the exception, it should cause the containing scope to finish (or a supervised fork to fail),
cancelling any forks that are operating in the background. Any unused channels can then be garbage-collected.
2. If you choose not to re-throw, the forks running in parallel would be allowed to complete normally (unless the containing scope is finished for another reason).

Internally, for the built-in `Source` operators, we took the latter approach, i.e. we chose not to re-throw and let the parrallel forks complete normally.
However, we keep in mind that they might not be able to send to downstream channel anymore - since the downstream might already be closed by the failing fork.

### Example

Let's have a look at the error handling in `Source.mapParUnordered` to demonstrate our approach. This operator applies a mapping function to a given number of elements in parallel, and is implemented as follows:

```scala
def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
val s = new Semaphore(parallelism)
forkDaemon {
supervised { // (1)
repeatWhile {
s.acquire()
receive() match
case ChannelClosed.Done => false
case e @ ChannelClosed.Error(r) => // (2)
c.error(r)
false
case t: T @unchecked =>
fork { // (3)
try
c.send(f(t)) // (4)
s.release()
catch case t: Throwable => c.error(t) // (5)
}
true
}
}
c.done()
}
c
```

It first creates a `supervised` scope (1), i.e. one that only completes (on the happy path) when all
non-daemon supervised forks complete. The mapping function `f` is then run in parallel using non-daemon `fork`s (3).

Let's assume an input `Source` with 4 elements, and `parallelism` set to 2:

```scala
val input: Source[Int] = Source.fromValues(1, 2, 3, 4)
def f(i: Int): Int = if ()

val result: Source[Int] = input.mapParUnordered(2)(f)
```

Let's also assume that the mapping function `f` is an identity with a fixed delay, but it's going to fail
immediately (by throwing an exception) when it processes the third element.

In this scenario, the first 2-element batch would successfully process elements `1` and `2`, and emit them
downstream (i.e. to the `result` source). Then the forks processing of `3` and `4` would start in parallel.
While `4` would still be processed (due to the delay in `f`), the fork processing `3` would immediately
throw an exception, which would be caught (5). Consequently, the downstream channel `c` would be closed
with an error, but the fork processing `4` would remain running. Whenever the fork processing `4` is done
executing `f`, its attempt to `c.send` (4) will fail silently - due to `c` being already closed.
Eventually, no results from the second batch would be send downstream.

The sequence of events would be similar if it was the upstream (rather than `f`) that failed, i.e. when `receive()` resulted in an error (2).
The approach we decided to take (only propagating errors dowstream) is one of the two possible designs -
with the other being re-throwing an exception when it's encountered.
Please see [the respective ADR](doc/adr/0001-error-propagation-in-channels.md) for a discussion.

## Backpressure

Expand Down
87 changes: 87 additions & 0 deletions doc/adr/0001-error-propagation-in-channels.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# 1. Error propagation in channels

Date: 2023-10-30

## Context

What should happen when an error is encountered when processing channel elements? Should it be propagated downstream or re-thrown?

## Decision

We chose to only propagate the errors downstream, so that they are eventually thrown when the source is discharged.

Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the
exception)?

It might if there are mutliple forks running in parallel, of which one end with an error. This

It depends on two factors:
- whether there are any forks running in parallel to the failed one,
- whether you only signal the exception downstream, or also choose to re-throw it.

If there's only a single fork running at a time, it would terminate processing anyway, so it's enough to signal the exception to the downstream.

If there are multiple forks running in parallel, there are two possible scenarios:
1. If you choose to re-throw the exception, it should cause the containing scope to finish (or a supervised fork to fail),
cancelling any forks that are operating in the background. Any unused channels can then be garbage-collected.
2. If you choose not to re-throw, the forks running in parallel would be allowed to complete normally (unless the containing scope is finished for another reason).

Internally, for the built-in `Source` operators, we took the latter approach, i.e. we chose not to re-throw and let the parrallel forks complete normally.
However, we keep in mind that they might not be able to send to downstream channel anymore - since the downstream might already be closed by the failing fork.

### Example

Let's have a look at the error handling in `Source.mapParUnordered` to demonstrate our approach. This operator applies a mapping function to a given number of elements in parallel, and is implemented as follows:

```scala
def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
val s = new Semaphore(parallelism)
forkDaemon {
supervised { // (1)
repeatWhile {
s.acquire()
receive() match
case ChannelClosed.Done => false
case e @ ChannelClosed.Error(r) => // (2)
c.error(r)
false
case t: T @unchecked =>
fork { // (3)
try
c.send(f(t)) // (4)
s.release()
catch case t: Throwable => c.error(t) // (5)
}
true
}
}
c.done()
}
c
```

It first creates a `supervised` scope (1), i.e. one that only completes (on the happy path) when all
non-daemon supervised forks complete. The mapping function `f` is then run in parallel using non-daemon `fork`s (3).

Let's assume an input `Source` with 4 elements, and `parallelism` set to 2:

```scala
val input: Source[Int] = Source.fromValues(1, 2, 3, 4)
def f(i: Int): Int = if ()

val result: Source[Int] = input.mapParUnordered(2)(f)
```

Let's also assume that the mapping function `f` is an identity with a fixed delay, but it's going to fail
immediately (by throwing an exception) when it processes the third element.

In this scenario, the first 2-element batch would successfully process elements `1` and `2`, and emit them
downstream (i.e. to the `result` source). Then the forks processing of `3` and `4` would start in parallel.
While `4` would still be processed (due to the delay in `f`), the fork processing `3` would immediately
throw an exception, which would be caught (5). Consequently, the downstream channel `c` would be closed
with an error, but the fork processing `4` would remain running. Whenever the fork processing `4` is done
executing `f`, its attempt to `c.send` (4) will fail silently - due to `c` being already closed.
Eventually, no results from the second batch would be send downstream.

The sequence of events would be similar if it was the upstream (rather than `f`) that failed, i.e. when `receive()` resulted in an error (2).

0 comments on commit f79c6bd

Please sign in to comment.