diff --git a/src/Extensions/ParallelForEachExtensions.cs b/src/Extensions/ParallelForEachExtensions.cs index acb032f..1be2193 100644 --- a/src/Extensions/ParallelForEachExtensions.cs +++ b/src/Extensions/ParallelForEachExtensions.cs @@ -12,29 +12,29 @@ public static class ParallelForEachExtensions { private class ParallelForEachContext { - private SemaphoreSlim _semaphore; - private TaskCompletionSource _completionTcs; + private readonly SemaphoreSlim _semaphore; + private readonly TaskCompletionSource _completionTcs; private List _exceptionList; private SpinLock _exceptionListLock; - private readonly int _maxDegreeOfParalellism; + private readonly int _maxDegreeOfParallelism; private readonly bool _breakLoopOnException; private readonly bool _gracefulBreak; private CancellationToken _cancellationToken; private CancellationTokenRegistration _cancellationTokenRegistration; - public ParallelForEachContext(int maxDegreeOfParalellism, bool breakLoopOnException, bool gracefulBreak, CancellationToken cancellationToken) + public ParallelForEachContext(int maxDegreeOfParallelism, bool breakLoopOnException, bool gracefulBreak, CancellationToken cancellationToken) { - if (maxDegreeOfParalellism < 0) - throw new ArgumentException($"The maximum degree of parallelism must be a non-negative number, but got {maxDegreeOfParalellism}", nameof(maxDegreeOfParalellism)); - if (maxDegreeOfParalellism == 0) - maxDegreeOfParalellism = Environment.ProcessorCount - 1; - if (maxDegreeOfParalellism <= 0) - maxDegreeOfParalellism = 1; - - _semaphore = new SemaphoreSlim(initialCount: maxDegreeOfParalellism, maxCount: maxDegreeOfParalellism + 1); + if (maxDegreeOfParallelism < 0) + throw new ArgumentException($"The maximum degree of parallelism must be a non-negative number, but got {maxDegreeOfParallelism}", nameof(maxDegreeOfParallelism)); + if (maxDegreeOfParallelism == 0) + maxDegreeOfParallelism = Environment.ProcessorCount - 1; + if (maxDegreeOfParallelism <= 0) + maxDegreeOfParallelism = 1; + + _semaphore = new SemaphoreSlim(initialCount: maxDegreeOfParallelism, maxCount: maxDegreeOfParallelism + 1); _completionTcs = new TaskCompletionSource(); _exceptionListLock = new SpinLock(enableThreadOwnerTracking: false); - _maxDegreeOfParalellism = maxDegreeOfParalellism; + _maxDegreeOfParallelism = maxDegreeOfParallelism; _breakLoopOnException = breakLoopOnException; _gracefulBreak = gracefulBreak; @@ -109,7 +109,7 @@ public void OnOperationComplete(Exception exceptionIfFailed = null) return; } - if ((_semaphore.CurrentCount == _maxDegreeOfParalellism + 1) || (IsLoopBreakRequested && !_gracefulBreak)) + if ((_semaphore.CurrentCount == _maxDegreeOfParallelism + 1) || (IsLoopBreakRequested && !_gracefulBreak)) CompleteLoopNow(); } @@ -119,11 +119,11 @@ public void CompleteLoopNow() try { - if (_semaphore != null) - _semaphore.Dispose(); + _semaphore?.Dispose(); } catch { + // ignored } var exceptions = ReadExceptions(); @@ -162,18 +162,20 @@ private void OnCancelRequested() /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// If True (the default behavior), waits on completion for all started tasks when the loop breaks due to cancellation or an exception + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, bool gracefulBreak, + IProgress progress = null, CancellationToken cancellationToken = default) { if (collection == null) @@ -181,7 +183,7 @@ public static Task ParallelForEachAsync( if (asyncItemAction == null) throw new ArgumentNullException(nameof(asyncItemAction)); - var context = new ParallelForEachContext(maxDegreeOfParalellism, breakLoopOnException, gracefulBreak, cancellationToken); + var context = new ParallelForEachContext(maxDegreeOfParallelism, breakLoopOnException, gracefulBreak, cancellationToken); Task.Run( async () => @@ -209,6 +211,7 @@ public static Task ParallelForEachAsync( try { itemActionTask = asyncItemAction(enumerator.Current, itemIndex); + progress?.Report(enumerator.Current); } // there is no guarantee that task is executed asynchronously, so it can throw right away catch (Exception ex) @@ -263,18 +266,20 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// If True (the default behavior), waits on completion for all started tasks when the loop breaks due to cancellation or an exception + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, bool gracefulBreak, + IProgress progress = null, CancellationToken cancellationToken = default) { if (enumerator == null) @@ -282,7 +287,7 @@ public static Task ParallelForEachAsync( if (asyncItemAction == null) throw new ArgumentNullException(nameof(asyncItemAction)); - var context = new ParallelForEachContext(maxDegreeOfParalellism, breakLoopOnException, gracefulBreak, cancellationToken); + var context = new ParallelForEachContext(maxDegreeOfParallelism, breakLoopOnException, gracefulBreak, cancellationToken); Task.Run( async () => @@ -308,6 +313,7 @@ public static Task ParallelForEachAsync( try { itemActionTask = asyncItemAction(enumerator.Current, itemIndex); + progress?.Report(enumerator.Current); } // there is no guarantee that task is executed asynchronously, so it can throw right away catch (Exception ex) @@ -362,22 +368,77 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, + IProgress progress = null, CancellationToken cancellationToken = default) => collection.ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + CancellationToken cancellationToken = default) + => collection.ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + IProgress progress = null, + CancellationToken cancellationToken = default) + => enumerator.ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -386,7 +447,7 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// Cancellation token /// Wraps any exception(s) that occurred inside @@ -394,14 +455,40 @@ public static Task ParallelForEachAsync( public static Task ParallelForEachAsync( this IAsyncEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, CancellationToken cancellationToken = default) => enumerator.ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + IProgress progress = null, + CancellationToken cancellationToken = default) + => collection.ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -410,20 +497,21 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, CancellationToken cancellationToken = default) => collection.ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -432,20 +520,46 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, + IProgress progress = null, CancellationToken cancellationToken = default) => enumerator.ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + CancellationToken cancellationToken = default) + => enumerator.ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -454,18 +568,43 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, + IProgress progress = null, CancellationToken cancellationToken = default) => collection.ParallelForEachAsync( asyncItemAction, - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerable collection, + Func asyncItemAction, + + CancellationToken cancellationToken = default) + => collection.ParallelForEachAsync( + asyncItemAction, + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -474,18 +613,69 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerator enumerator, Func asyncItemAction, + IProgress progress = null, CancellationToken cancellationToken = default) => enumerator.ParallelForEachAsync( asyncItemAction, - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerator enumerator, + Func asyncItemAction, + CancellationToken cancellationToken = default) + => enumerator.ParallelForEachAsync( + asyncItemAction, + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + IProgress progress = null, + CancellationToken cancellationToken = default) + => collection.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -494,7 +684,7 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// Cancellation token /// Wraps any exception(s) that occurred inside @@ -502,14 +692,42 @@ public static Task ParallelForEachAsync( public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, CancellationToken cancellationToken = default) => collection.ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + IProgress progress = null, + CancellationToken cancellationToken = default) + => enumerator.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -518,7 +736,7 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// Cancellation token /// Wraps any exception(s) that occurred inside @@ -526,14 +744,44 @@ public static Task ParallelForEachAsync( public static Task ParallelForEachAsync( this IAsyncEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, CancellationToken cancellationToken = default) => enumerator.ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// If True (the default behavior), waits on completion for all started tasks when the loop breaks due to cancellation or an exception + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + bool gracefulBreak, + IProgress progress = null, + CancellationToken cancellationToken = default) + => collection.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + breakLoopOnException, + gracefulBreak, + progress, cancellationToken); /// @@ -542,7 +790,7 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// If True (the default behavior), waits on completion for all started tasks when the loop breaks due to cancellation or an exception /// Cancellation token @@ -551,15 +799,16 @@ public static Task ParallelForEachAsync( public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, bool gracefulBreak, CancellationToken cancellationToken = default) => collection.ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, gracefulBreak, + /*progress*/null, cancellationToken); /// @@ -568,24 +817,54 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// If True (the default behavior), waits on completion for all started tasks when the loop breaks due to cancellation or an exception + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, bool gracefulBreak, + IProgress progress = null, CancellationToken cancellationToken = default) => enumerator.ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, gracefulBreak, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// If True (the default behavior), waits on completion for all started tasks when the loop breaks due to cancellation or an exception + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + bool gracefulBreak, + CancellationToken cancellationToken = default) + => enumerator.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + breakLoopOnException, + gracefulBreak, + /*progress*/null, cancellationToken); /// @@ -594,20 +873,46 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, + IProgress progress = null, CancellationToken cancellationToken = default) => collection.ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + CancellationToken cancellationToken = default) + => collection.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -616,20 +921,46 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, + IProgress progress = null, CancellationToken cancellationToken = default) => enumerator.ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + CancellationToken cancellationToken = default) + => enumerator.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -638,18 +969,65 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IAsyncEnumerable collection, Func asyncItemAction, + IProgress progress = null, CancellationToken cancellationToken = default) => collection.ParallelForEachAsync( (item, index) => asyncItemAction(item), - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerable collection, + Func asyncItemAction, + CancellationToken cancellationToken = default) + => collection.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IAsyncEnumerator enumerator, + Func asyncItemAction, + IProgress progress = null, + CancellationToken cancellationToken = default) + => enumerator.ParallelForEachAsync( + (item, index) => asyncItemAction(item), + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -667,9 +1045,37 @@ public static Task ParallelForEachAsync( CancellationToken cancellationToken = default) => enumerator.ParallelForEachAsync( (item, index) => asyncItemAction(item), - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + IProgress progress = null, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -678,7 +1084,7 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. /// Cancellation token /// Wraps any exception(s) that occurred inside @@ -686,14 +1092,15 @@ public static Task ParallelForEachAsync( public static Task ParallelForEachAsync( this IEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, CancellationToken cancellationToken = default) - => collection.ToAsyncEnumerable(runSynchronously: true).ParallelForEachAsync( + => collection.ToAsyncEnumerable().ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -702,22 +1109,75 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, + IProgress progress = null, CancellationToken cancellationToken = default) - => enumerator.ToAsyncEnumerator(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + CancellationToken cancellationToken = default) + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + IProgress progress = null, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -726,20 +1186,21 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, CancellationToken cancellationToken = default) - => collection.ToAsyncEnumerable(runSynchronously: true).ParallelForEachAsync( + => collection.ToAsyncEnumerable().ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -748,20 +1209,46 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, + IProgress progress = null, CancellationToken cancellationToken = default) - => enumerator.ToAsyncEnumerator(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( asyncItemAction, - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + CancellationToken cancellationToken = default) + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( + asyncItemAction, + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -770,18 +1257,42 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerable collection, Func asyncItemAction, + IProgress progress = null, CancellationToken cancellationToken = default) - => collection.ToAsyncEnumerable(runSynchronously: true).ParallelForEachAsync( + => collection.ToAsyncEnumerable().ParallelForEachAsync( asyncItemAction, - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerable collection, + Func asyncItemAction, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + asyncItemAction, + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -790,18 +1301,42 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerator enumerator, Func asyncItemAction, + IProgress progress = null, CancellationToken cancellationToken = default) - => enumerator.ToAsyncEnumerator(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( asyncItemAction, - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item, where first argument is the item and second argument is item's index in the collection + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerator enumerator, + Func asyncItemAction, + CancellationToken cancellationToken = default) + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( + asyncItemAction, + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -810,22 +1345,50 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, + IProgress progress = null, CancellationToken cancellationToken = default) - => collection.ToAsyncEnumerable(runSynchronously: true).ParallelForEachAsync( + => collection.ToAsyncEnumerable().ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -834,22 +1397,75 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, bool breakLoopOnException, + IProgress progress = null, CancellationToken cancellationToken = default) - => enumerator.ToAsyncEnumerator(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, breakLoopOnException, /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Set to True to stop processing items when first exception occurs. The result might contain several exceptions though when faulty tasks finish at the same time. + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + bool breakLoopOnException, + CancellationToken cancellationToken = default) + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + breakLoopOnException, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerable collection, + Func asyncItemAction, + int maxDegreeOfParallelism, + IProgress progress = null, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -858,20 +1474,46 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerable collection, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + (item, index) => asyncItemAction(item), + maxDegreeOfParallelism, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerator enumerator, + Func asyncItemAction, + int maxDegreeOfParallelism, + IProgress progress = null, CancellationToken cancellationToken = default) - => collection.ToAsyncEnumerable(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -880,20 +1522,21 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item - /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. + /// Maximum items to schedule processing in parallel. The actual concurrency level depends on TPL settings. Set to 0 to choose a default value based on processor count. /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerator enumerator, Func asyncItemAction, - int maxDegreeOfParalellism, + int maxDegreeOfParallelism, CancellationToken cancellationToken = default) - => enumerator.ToAsyncEnumerator(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( (item, index) => asyncItemAction(item), - maxDegreeOfParalellism, + maxDegreeOfParallelism, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); /// @@ -902,18 +1545,65 @@ public static Task ParallelForEachAsync( /// The type of an item /// The collection of items to perform actions on /// An asynchronous action to perform on the item + /// Use for progress updates /// Cancellation token /// Wraps any exception(s) that occurred inside /// Thrown when the loop is canceled with public static Task ParallelForEachAsync( this IEnumerable collection, Func asyncItemAction, + IProgress progress = null, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + (item, index) => asyncItemAction(item), + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + progress, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerable collection, + Func asyncItemAction, + CancellationToken cancellationToken = default) + => collection.ToAsyncEnumerable().ParallelForEachAsync( + (item, index) => asyncItemAction(item), + /*maxDegreeOfParallelism:*/0, + /*breakLoopOnException:*/false, + /*gracefulBreak:*/true, + /*progress*/null, + cancellationToken); + + /// + /// Invokes an asynchronous action on each item in the collection in parallel + /// + /// The type of an item + /// The collection of items to perform actions on + /// An asynchronous action to perform on the item + /// Use for progress updates + /// Cancellation token + /// Wraps any exception(s) that occurred inside + /// Thrown when the loop is canceled with + public static Task ParallelForEachAsync( + this IEnumerator enumerator, + Func asyncItemAction, + IProgress progress = null, CancellationToken cancellationToken = default) - => collection.ToAsyncEnumerable(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( (item, index) => asyncItemAction(item), - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + progress, cancellationToken); /// @@ -929,11 +1619,12 @@ public static Task ParallelForEachAsync( this IEnumerator enumerator, Func asyncItemAction, CancellationToken cancellationToken = default) - => enumerator.ToAsyncEnumerator(runSynchronously: true).ParallelForEachAsync( + => enumerator.ToAsyncEnumerator().ParallelForEachAsync( (item, index) => asyncItemAction(item), - /*maxDegreeOfParalellism:*/0, + /*maxDegreeOfParallelism:*/0, /*breakLoopOnException:*/false, /*gracefulBreak:*/true, + /*progress*/null, cancellationToken); } -} +} \ No newline at end of file