-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathAsyncTaskExtensions.cs
153 lines (136 loc) · 6.93 KB
/
AsyncTaskExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// https://github.com/xamarin/xamarin-android/blob/83854738b8e01747f9536f426fe17ad784cc2081/src/Xamarin.Android.Build.Tasks/Utilities/AsyncTaskExtensions.cs
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xamarin.Build;
namespace Microsoft.Android.Build.Tasks
{
public static class AsyncTaskExtensions
{
/// <summary>
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// </summary>
public static Task WhenAll<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body) =>
asyncTask.WhenAll (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);
/// <summary>
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// </summary>
public static Task WhenAll<TSource>(this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning)
{
var scheduler = GetTaskScheduler (maxConcurrencyLevel);
var tasks = new List<Task> ();
foreach (var s in source) {
tasks.Add (Task.Factory.StartNew (() => {
try {
body (s);
} catch (Exception exc) {
LogErrorAndCancel (asyncTask, exc);
}
}, asyncTask.CancellationToken, creationOptions, scheduler));
}
return Task.WhenAll (tasks);
}
/// <summary>
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static Task WhenAllWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body) =>
asyncTask.WhenAllWithLock (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);
/// <summary>
/// Creates a collection of Task with proper CancellationToken and error handling and waits via Task.WhenAll
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static Task WhenAllWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning)
{
var scheduler = GetTaskScheduler (maxConcurrencyLevel);
var lockObject = new object ();
var tasks = new List<Task> ();
foreach (var s in source) {
tasks.Add (Task.Factory.StartNew (() => {
try {
body (s, lockObject);
} catch (Exception exc) {
LogErrorAndCancel (asyncTask, exc);
}
}, asyncTask.CancellationToken, creationOptions, scheduler));
}
return Task.WhenAll (tasks);
}
/// <summary>
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// </summary>
public static ParallelLoopResult ParallelForEach<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body) =>
asyncTask.ParallelForEach (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);
/// <summary>
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// </summary>
public static ParallelLoopResult ParallelForEach<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource> body, int maxConcurrencyLevel)
{
var options = ParallelOptions (asyncTask, maxConcurrencyLevel);
return Parallel.ForEach (source, options, s => {
try {
body (s);
} catch (Exception exc) {
LogErrorAndCancel (asyncTask, exc);
}
});
}
/// <summary>
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static ParallelLoopResult ParallelForEachWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body) =>
asyncTask.ParallelForEachWithLock (source, body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);
/// <summary>
/// Calls Parallel.ForEach() with appropriate ParallelOptions and exception handling.
/// Passes an object the inner method can use for locking. The callback is of the form: (T item, object lockObject)
/// </summary>
public static ParallelLoopResult ParallelForEachWithLock<TSource> (this AsyncTask asyncTask, IEnumerable<TSource> source, Action<TSource, object> body, int maxConcurrencyLevel)
{
var options = ParallelOptions (asyncTask, maxConcurrencyLevel);
var lockObject = new object ();
return Parallel.ForEach (source, options, s => {
try {
body (s, lockObject);
} catch (Exception exc) {
LogErrorAndCancel (asyncTask, exc);
}
});
}
static ParallelOptions ParallelOptions (AsyncTask asyncTask, int maxConcurrencyLevel) => new ParallelOptions {
CancellationToken = asyncTask.CancellationToken,
TaskScheduler = GetTaskScheduler (maxConcurrencyLevel),
};
static TaskScheduler GetTaskScheduler (int maxConcurrencyLevel)
{
var pair = new ConcurrentExclusiveSchedulerPair (TaskScheduler.Default, maxConcurrencyLevel);
return pair.ConcurrentScheduler;
}
static int DefaultMaxConcurrencyLevel => Math.Max (1, Environment.ProcessorCount - 1);
static void LogErrorAndCancel (AsyncTask asyncTask, Exception exc)
{
asyncTask.LogCodedError ("XA0000", Properties.Resources.XA0000_Exception, exc);
asyncTask.Cancel ();
}
/// <summary>
/// Calls Task.Factory.StartNew() with a proper CancellationToken, TaskScheduler, and TaskCreationOptions.LongRunning.
/// </summary>
public static Task RunTask (this AsyncTask asyncTask, Action body) =>
asyncTask.RunTask (body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);
/// <summary>
/// Calls Task.Factory.StartNew() with a proper CancellationToken
/// </summary>
public static Task RunTask (this AsyncTask asyncTask, Action body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning) =>
Task.Factory.StartNew (body, asyncTask.CancellationToken, creationOptions, GetTaskScheduler (maxConcurrencyLevel));
/// <summary>
/// Calls Task.Factory.StartNew<T>() with a proper CancellationToken, TaskScheduler, and TaskCreationOptions.LongRunning.
/// </summary>
public static Task<TSource> RunTask<TSource> (this AsyncTask asyncTask, Func<TSource> body) =>
asyncTask.RunTask (body, maxConcurrencyLevel: DefaultMaxConcurrencyLevel);
/// <summary>
/// Calls Task.Factory.StartNew<T>() with a proper CancellationToken.
/// </summary>
public static Task<TSource> RunTask<TSource> (this AsyncTask asyncTask, Func<TSource> body, int maxConcurrencyLevel, TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning) =>
Task.Factory.StartNew (body, asyncTask.CancellationToken, creationOptions, GetTaskScheduler (maxConcurrencyLevel));
}
}