From 400c00fd4fb16b737d3b0d50420c761b6e026d94 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Fri, 27 Oct 2023 11:49:46 +0200 Subject: [PATCH 1/9] Don't rethrow exceptions in mapParScope --- core/src/main/scala/ox/channels/SourceOps.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 47d7be8f..e588d889 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -121,7 +121,7 @@ trait SourceOps[+T] { this: Source[T] => private def mapParScope[U](parallelism: Int, c2: Channel[U], f: T => U): Unit = val s = new Semaphore(parallelism) - val inProgress = Channel[Fork[U]](parallelism) + val inProgress = Channel[Fork[Option[U]]](parallelism) val closeScope = new CountDownLatch(1) scoped { // enqueueing fork @@ -142,12 +142,12 @@ trait SourceOps[+T] { this: Source[T] => try val u = f(t) s.release() // not in finally, as in case of an exception, no point in starting subsequent forks - u + Some(u) catch case t: Throwable => c2.error(t) closeScope.countDown() - throw t + None }) true } @@ -157,8 +157,9 @@ trait SourceOps[+T] { this: Source[T] => fork { repeatWhile { inProgress.receive() match - case f: Fork[U] @unchecked => - c2.send(f.join()).isValue + case f: Fork[Option[U]] @unchecked => + f.join().foreach(c2.send) + true case ChannelClosed.Done => closeScope.countDown() c2.done() From fb8bad7505b2f6c4564d497a770875106dae4377 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Fri, 27 Oct 2023 15:25:17 +0200 Subject: [PATCH 2/9] Update docs on error propagation in channels --- README.md | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 28476c3c..619f5df7 100644 --- a/README.md +++ b/README.md @@ -628,10 +628,76 @@ exception being thrown there. Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the exception)? -No: the exception should cause the containing scope to finish (or a supervised fork to fail), cancelling any forks -that are operating in the background. Any unused channels can then be garbage-collected. +It depends on two factors: +- whether there are any forks running in parallel to the failed one, +- whether you only signal the exception downstream, or also choose to re-throw it. + +If there's only a single fork running at a time, it would terminate processing anyway, so it's enough to signal the exception to the downstream. + +If there are multiple forks running in parallel, there are two possible scenarios: +1. If you choose to re-throw the exception, it should cause the containing scope to finish (or a supervised fork to fail), +cancelling any forks that are operating in the background. Any unused channels can then be garbage-collected. +2. If you choose not to re-throw, the forks running in parallel would be allowed to complete normally (unless the containing scope is finished for another reason). + +Internally, for the built-in `Source` operators, we took the latter approach, i.e. we chose not to re-throw and let the parrallel forks complete normally. +However, we keep in mind that they might not be able to send to downstream channel anymore - since the downstream might already be closed by the failing fork. + +### Example + +Let's have a look at the error handling in `Source.mapParUnordered` to demonstrate our approach. This operator applies a mapping function to a given number of elements in parallel, and is implemented as follows: + +```scala +def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): Source[U] = + val c = StageCapacity.newChannel[U] + val s = new Semaphore(parallelism) + forkDaemon { + supervised { // (1) + repeatWhile { + s.acquire() + receive() match + case ChannelClosed.Done => false + case e @ ChannelClosed.Error(r) => // (2) + c.error(r) + false + case t: T @unchecked => + fork { // (3) + try + c.send(f(t)) // (4) + s.release() + catch case t: Throwable => c.error(t) // (5) + } + true + } + } + c.done() + } + c +``` + +It first creates a `supervised` scope (1), i.e. one that only completes (on the happy path) when all +non-daemon supervised forks complete. The mapping function `f` is then run in parallel using non-daemon `fork`s (3). + +Let's assume an input `Source` with 4 elements, and `parallelism` set to 2: + +```scala +val input: Source[Int] = Source.fromValues(1, 2, 3, 4) +def f(i: Int): Int = if () + +val result: Source[Int] = input.mapParUnordered(2)(f) +``` + +Let's also assume that the mapping function `f` is an identity with a fixed delay, but it's going to fail +immediately (by throwing an exception) when it processes the third element. + +In this scenario, the first 2-element batch would successfully process elements `1` and `2`, and emit them +downstream (i.e. to the `result` source). Then the forks processing of `3` and `4` would start in parallel. +While `4` would still be processed (due to the delay in `f`), the fork processing `3` would immediately +throw an exception, which would be caught (5). Consequently, the downstream channel `c` would be closed +with an error, but the fork processing `4` would remain running. Whenever the fork processing `4` is done +executing `f`, its attempt to `c.send` (4) will fail silently - due to `c` being already closed. +Eventually, no results from the second batch would be send downstream. -The role of the exception handler is then to re-create the entire processing pipeline, or escalate the error further. +The sequence of events would be similar if it was the upstream (rather than `f`) that failed, i.e. when `receive()` resulted in an error (2). ## Backpressure From d17a43ed9f5923f6024bfd933a06e1dea82f15ac Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Mon, 30 Oct 2023 09:49:21 +0100 Subject: [PATCH 3/9] Don't re-throw in pipeTo when receive from upstream fails --- core/src/main/scala/ox/channels/SourceOps.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index e588d889..183f664e 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -396,7 +396,7 @@ trait SourceOps[+T] { this: Source[T] => repeatWhile { receive() match case ChannelClosed.Done => sink.done(); false - case e: ChannelClosed.Error => sink.error(e.reason); throw e.toThrowable + case e: ChannelClosed.Error => sink.error(e.reason); false case t: T @unchecked => sink.send(t).orThrow; true } From f79c6bd63fcec959b984e2fddd92dcbd556824b9 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Mon, 30 Oct 2023 12:45:34 +0100 Subject: [PATCH 4/9] Move discussion on propagating errors to ADR --- README.md | 76 +--------------- doc/adr/0001-error-propagation-in-channels.md | 87 +++++++++++++++++++ 2 files changed, 90 insertions(+), 73 deletions(-) create mode 100644 doc/adr/0001-error-propagation-in-channels.md diff --git a/README.md b/README.md index 619f5df7..6a3935ea 100644 --- a/README.md +++ b/README.md @@ -625,79 +625,9 @@ There can be at most one default clause in a `select` invocation. Errors are only propagated downstream, ultimately reaching the point where the source is discharged, leading to an exception being thrown there. -Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the -exception)? - -It depends on two factors: -- whether there are any forks running in parallel to the failed one, -- whether you only signal the exception downstream, or also choose to re-throw it. - -If there's only a single fork running at a time, it would terminate processing anyway, so it's enough to signal the exception to the downstream. - -If there are multiple forks running in parallel, there are two possible scenarios: -1. If you choose to re-throw the exception, it should cause the containing scope to finish (or a supervised fork to fail), -cancelling any forks that are operating in the background. Any unused channels can then be garbage-collected. -2. If you choose not to re-throw, the forks running in parallel would be allowed to complete normally (unless the containing scope is finished for another reason). - -Internally, for the built-in `Source` operators, we took the latter approach, i.e. we chose not to re-throw and let the parrallel forks complete normally. -However, we keep in mind that they might not be able to send to downstream channel anymore - since the downstream might already be closed by the failing fork. - -### Example - -Let's have a look at the error handling in `Source.mapParUnordered` to demonstrate our approach. This operator applies a mapping function to a given number of elements in parallel, and is implemented as follows: - -```scala -def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): Source[U] = - val c = StageCapacity.newChannel[U] - val s = new Semaphore(parallelism) - forkDaemon { - supervised { // (1) - repeatWhile { - s.acquire() - receive() match - case ChannelClosed.Done => false - case e @ ChannelClosed.Error(r) => // (2) - c.error(r) - false - case t: T @unchecked => - fork { // (3) - try - c.send(f(t)) // (4) - s.release() - catch case t: Throwable => c.error(t) // (5) - } - true - } - } - c.done() - } - c -``` - -It first creates a `supervised` scope (1), i.e. one that only completes (on the happy path) when all -non-daemon supervised forks complete. The mapping function `f` is then run in parallel using non-daemon `fork`s (3). - -Let's assume an input `Source` with 4 elements, and `parallelism` set to 2: - -```scala -val input: Source[Int] = Source.fromValues(1, 2, 3, 4) -def f(i: Int): Int = if () - -val result: Source[Int] = input.mapParUnordered(2)(f) -``` - -Let's also assume that the mapping function `f` is an identity with a fixed delay, but it's going to fail -immediately (by throwing an exception) when it processes the third element. - -In this scenario, the first 2-element batch would successfully process elements `1` and `2`, and emit them -downstream (i.e. to the `result` source). Then the forks processing of `3` and `4` would start in parallel. -While `4` would still be processed (due to the delay in `f`), the fork processing `3` would immediately -throw an exception, which would be caught (5). Consequently, the downstream channel `c` would be closed -with an error, but the fork processing `4` would remain running. Whenever the fork processing `4` is done -executing `f`, its attempt to `c.send` (4) will fail silently - due to `c` being already closed. -Eventually, no results from the second batch would be send downstream. - -The sequence of events would be similar if it was the upstream (rather than `f`) that failed, i.e. when `receive()` resulted in an error (2). +The approach we decided to take (only propagating errors dowstream) is one of the two possible designs - +with the other being re-throwing an exception when it's encountered. +Please see [the respective ADR](doc/adr/0001-error-propagation-in-channels.md) for a discussion. ## Backpressure diff --git a/doc/adr/0001-error-propagation-in-channels.md b/doc/adr/0001-error-propagation-in-channels.md new file mode 100644 index 00000000..0fa6b7c3 --- /dev/null +++ b/doc/adr/0001-error-propagation-in-channels.md @@ -0,0 +1,87 @@ +# 1. Error propagation in channels + +Date: 2023-10-30 + +## Context + +What should happen when an error is encountered when processing channel elements? Should it be propagated downstream or re-thrown? + +## Decision + +We chose to only propagate the errors downstream, so that they are eventually thrown when the source is discharged. + +Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the +exception)? + +It might if there are mutliple forks running in parallel, of which one end with an error. This + +It depends on two factors: +- whether there are any forks running in parallel to the failed one, +- whether you only signal the exception downstream, or also choose to re-throw it. + +If there's only a single fork running at a time, it would terminate processing anyway, so it's enough to signal the exception to the downstream. + +If there are multiple forks running in parallel, there are two possible scenarios: +1. If you choose to re-throw the exception, it should cause the containing scope to finish (or a supervised fork to fail), +cancelling any forks that are operating in the background. Any unused channels can then be garbage-collected. +2. If you choose not to re-throw, the forks running in parallel would be allowed to complete normally (unless the containing scope is finished for another reason). + +Internally, for the built-in `Source` operators, we took the latter approach, i.e. we chose not to re-throw and let the parrallel forks complete normally. +However, we keep in mind that they might not be able to send to downstream channel anymore - since the downstream might already be closed by the failing fork. + +### Example + +Let's have a look at the error handling in `Source.mapParUnordered` to demonstrate our approach. This operator applies a mapping function to a given number of elements in parallel, and is implemented as follows: + +```scala +def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): Source[U] = + val c = StageCapacity.newChannel[U] + val s = new Semaphore(parallelism) + forkDaemon { + supervised { // (1) + repeatWhile { + s.acquire() + receive() match + case ChannelClosed.Done => false + case e @ ChannelClosed.Error(r) => // (2) + c.error(r) + false + case t: T @unchecked => + fork { // (3) + try + c.send(f(t)) // (4) + s.release() + catch case t: Throwable => c.error(t) // (5) + } + true + } + } + c.done() + } + c +``` + +It first creates a `supervised` scope (1), i.e. one that only completes (on the happy path) when all +non-daemon supervised forks complete. The mapping function `f` is then run in parallel using non-daemon `fork`s (3). + +Let's assume an input `Source` with 4 elements, and `parallelism` set to 2: + +```scala +val input: Source[Int] = Source.fromValues(1, 2, 3, 4) +def f(i: Int): Int = if () + +val result: Source[Int] = input.mapParUnordered(2)(f) +``` + +Let's also assume that the mapping function `f` is an identity with a fixed delay, but it's going to fail +immediately (by throwing an exception) when it processes the third element. + +In this scenario, the first 2-element batch would successfully process elements `1` and `2`, and emit them +downstream (i.e. to the `result` source). Then the forks processing of `3` and `4` would start in parallel. +While `4` would still be processed (due to the delay in `f`), the fork processing `3` would immediately +throw an exception, which would be caught (5). Consequently, the downstream channel `c` would be closed +with an error, but the fork processing `4` would remain running. Whenever the fork processing `4` is done +executing `f`, its attempt to `c.send` (4) will fail silently - due to `c` being already closed. +Eventually, no results from the second batch would be send downstream. + +The sequence of events would be similar if it was the upstream (rather than `f`) that failed, i.e. when `receive()` resulted in an error (2). \ No newline at end of file From 3386dc6e99aa5a7e0d67785efb7a953ec145aee9 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Mon, 30 Oct 2023 12:46:56 +0100 Subject: [PATCH 5/9] Fix typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6a3935ea..ae9b9762 100644 --- a/README.md +++ b/README.md @@ -625,7 +625,7 @@ There can be at most one default clause in a `select` invocation. Errors are only propagated downstream, ultimately reaching the point where the source is discharged, leading to an exception being thrown there. -The approach we decided to take (only propagating errors dowstream) is one of the two possible designs - +The approach we decided to take (only propagating errors downstream) is one of the two possible designs - with the other being re-throwing an exception when it's encountered. Please see [the respective ADR](doc/adr/0001-error-propagation-in-channels.md) for a discussion. From 55319a18e1cab7bcaddb42e28c03a1b211fb5036 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Mon, 30 Oct 2023 14:03:08 +0100 Subject: [PATCH 6/9] Remove unnecessary paragraph --- doc/adr/0001-error-propagation-in-channels.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/doc/adr/0001-error-propagation-in-channels.md b/doc/adr/0001-error-propagation-in-channels.md index 0fa6b7c3..264e648b 100644 --- a/doc/adr/0001-error-propagation-in-channels.md +++ b/doc/adr/0001-error-propagation-in-channels.md @@ -11,9 +11,7 @@ What should happen when an error is encountered when processing channel elements We chose to only propagate the errors downstream, so that they are eventually thrown when the source is discharged. Won't this design cause upstream channels / sources to operate despite the consumer being gone (because of the -exception)? - -It might if there are mutliple forks running in parallel, of which one end with an error. This +exception)? It depends on two factors: - whether there are any forks running in parallel to the failed one, From f3214bc85dde0792c36c7acde162252932019cc0 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Mon, 30 Oct 2023 14:04:47 +0100 Subject: [PATCH 7/9] Remove unused named pattern in mapParUnordered --- core/src/main/scala/ox/channels/SourceOps.scala | 2 +- doc/adr/0001-error-propagation-in-channels.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 183f664e..befda1bd 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -181,7 +181,7 @@ trait SourceOps[+T] { this: Source[T] => s.acquire() receive() match case ChannelClosed.Done => false - case e @ ChannelClosed.Error(r) => + case ChannelClosed.Error(r) => c.error(r) false case t: T @unchecked => diff --git a/doc/adr/0001-error-propagation-in-channels.md b/doc/adr/0001-error-propagation-in-channels.md index 264e648b..274d78d8 100644 --- a/doc/adr/0001-error-propagation-in-channels.md +++ b/doc/adr/0001-error-propagation-in-channels.md @@ -41,7 +41,7 @@ def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): So s.acquire() receive() match case ChannelClosed.Done => false - case e @ ChannelClosed.Error(r) => // (2) + case ChannelClosed.Error(r) => // (2) c.error(r) false case t: T @unchecked => From 8f08365d245a763cebac98092ed70a86449c5155 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Mon, 30 Oct 2023 14:17:04 +0100 Subject: [PATCH 8/9] Terminate the sending fork in mapParScope after an error --- core/src/main/scala/ox/channels/SourceOps.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index befda1bd..ba0bfdca 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -158,8 +158,7 @@ trait SourceOps[+T] { this: Source[T] => repeatWhile { inProgress.receive() match case f: Fork[Option[U]] @unchecked => - f.join().foreach(c2.send) - true + f.join().map(c2.send).isDefined case ChannelClosed.Done => closeScope.countDown() c2.done() From 8108358d97308af2883fb196c276da36bdc4c098 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Tue, 31 Oct 2023 08:50:31 +0100 Subject: [PATCH 9/9] Fix formatting in ADR --- doc/adr/0001-error-propagation-in-channels.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/adr/0001-error-propagation-in-channels.md b/doc/adr/0001-error-propagation-in-channels.md index 274d78d8..9ddc6e18 100644 --- a/doc/adr/0001-error-propagation-in-channels.md +++ b/doc/adr/0001-error-propagation-in-channels.md @@ -41,7 +41,7 @@ def mapParUnordered[U](parallelism: Int)(f: T => U)(using Ox, StageCapacity): So s.acquire() receive() match case ChannelClosed.Done => false - case ChannelClosed.Error(r) => // (2) + case ChannelClosed.Error(r) => // (2) c.error(r) false case t: T @unchecked =>