-
Notifications
You must be signed in to change notification settings - Fork 287
/
AdaptiveSamplingTelemetryProcessor.cs
295 lines (260 loc) · 10.2 KB
/
AdaptiveSamplingTelemetryProcessor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
namespace Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel
{
using System;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.Implementation;
/// <summary>
/// Telemetry processor for sampling telemetry at a dynamic rate before sending to Application Insights.
/// </summary>
public class AdaptiveSamplingTelemetryProcessor : ITelemetryProcessor, IDisposable
{
/// <summary>
/// Fixed-rate sampling telemetry processor.
/// </summary>
private readonly SamplingTelemetryProcessor samplingProcessor;
/// <summary>
/// Sampling percentage estimator settings.
/// </summary>
private readonly Channel.Implementation.SamplingPercentageEstimatorSettings estimatorSettings;
/// <summary>
/// Callback invoked every time sampling percentage is evaluated.
/// </summary>
private readonly Channel.Implementation.AdaptiveSamplingPercentageEvaluatedCallback evaluationCallback;
/// <summary>
/// Sampling percentage estimator telemetry processor.
/// </summary>
private SamplingPercentageEstimatorTelemetryProcessor estimatorProcessor;
/// <summary>
/// Initializes a new instance of the <see cref="AdaptiveSamplingTelemetryProcessor"/> class.
/// <param name="next">Next TelemetryProcessor in call chain.</param>
/// </summary>
public AdaptiveSamplingTelemetryProcessor(ITelemetryProcessor next)
: this(new Channel.Implementation.SamplingPercentageEstimatorSettings(), null, next)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="AdaptiveSamplingTelemetryProcessor"/> class.
/// <param name="settings">Sampling percentage estimator settings.</param>
/// <param name="callback">Callback invoked every time sampling percentage is evaluated.</param>
/// <param name="next">Next TelemetryProcessor in call chain.</param>
/// </summary>
public AdaptiveSamplingTelemetryProcessor(
Channel.Implementation.SamplingPercentageEstimatorSettings settings,
Channel.Implementation.AdaptiveSamplingPercentageEvaluatedCallback callback,
ITelemetryProcessor next)
{
this.estimatorSettings = settings;
this.evaluationCallback = callback;
// make estimator telemetry processor work after sampling was done
this.estimatorProcessor = new SamplingPercentageEstimatorTelemetryProcessor(settings, this.SamplingPercentageChanged, next);
this.samplingProcessor = new SamplingTelemetryProcessor(next, this.estimatorProcessor)
{
SamplingPercentage = this.estimatorSettings.InitialSamplingPercentage,
ProactiveSamplingPercentage = null,
};
}
/// <summary>
/// Gets or sets a semicolon separated list of telemetry types that should not be sampled.
/// Types listed are excluded even if they are set in IncludedTypes.
/// </summary>
public string ExcludedTypes
{
get { return this.samplingProcessor.ExcludedTypes; }
set { this.samplingProcessor.ExcludedTypes = value; }
}
/// <summary>
/// Gets or sets a semicolon separated list of telemetry types that should be sampled.
/// If left empty all types are included implicitly.
/// Types are not included if they are set in ExcludedTypes.
/// </summary>
public string IncludedTypes
{
get { return this.samplingProcessor.IncludedTypes; }
set { this.samplingProcessor.IncludedTypes = value; }
}
/// <summary>
/// Gets or sets initial sampling percentage applied at the start
/// of the process to dynamically vary the percentage.
/// </summary>
public double InitialSamplingPercentage
{
get
{
return this.estimatorSettings.InitialSamplingPercentage;
}
set
{
// note: 'initial' percentage will affect sampling even
// if it was running for a while
this.estimatorSettings.InitialSamplingPercentage = value;
this.estimatorProcessor.CurrentSamplingRate = this.estimatorSettings.EffectiveInitialSamplingRate;
this.samplingProcessor.SamplingPercentage = value;
}
}
/// <summary>
/// Gets or sets maximum rate of telemetry items per second
/// dynamic sampling will try to adhere to.
/// </summary>
public double MaxTelemetryItemsPerSecond
{
get
{
return this.estimatorSettings.MaxTelemetryItemsPerSecond;
}
set
{
this.estimatorSettings.MaxTelemetryItemsPerSecond = value;
}
}
/// <summary>
/// Gets or sets minimum sampling percentage that can be set
/// by the dynamic sampling percentage algorithm.
/// </summary>
public double MinSamplingPercentage
{
get
{
return this.estimatorSettings.MinSamplingPercentage;
}
set
{
this.estimatorSettings.MinSamplingPercentage = value;
}
}
/// <summary>
/// Gets or sets maximum sampling percentage that can be set
/// by the dynamic sampling percentage algorithm.
/// </summary>
public double MaxSamplingPercentage
{
get
{
return this.estimatorSettings.MaxSamplingPercentage;
}
set
{
this.estimatorSettings.MaxSamplingPercentage = value;
}
}
/// <summary>
/// Gets or sets duration of the sampling percentage evaluation interval.
/// </summary>
public TimeSpan EvaluationInterval
{
get
{
return this.estimatorSettings.EvaluationInterval;
}
set
{
this.estimatorSettings.EvaluationInterval = value;
}
}
/// <summary>
/// Gets or sets a value indicating how long to not to decrease
/// sampling percentage after last change to prevent excessive fluctuation.
/// </summary>
public TimeSpan SamplingPercentageDecreaseTimeout
{
get
{
return this.estimatorSettings.SamplingPercentageDecreaseTimeout;
}
set
{
this.estimatorSettings.SamplingPercentageDecreaseTimeout = value;
}
}
/// <summary>
/// Gets or sets a value indicating how long to not to increase
/// sampling percentage after last change to prevent excessive fluctuation.
/// </summary>
public TimeSpan SamplingPercentageIncreaseTimeout
{
get
{
return this.estimatorSettings.SamplingPercentageIncreaseTimeout;
}
set
{
this.estimatorSettings.SamplingPercentageIncreaseTimeout = value;
}
}
/// <summary>
/// Gets or sets exponential moving average ratio (factor) applied
/// during calculation of rate of telemetry items produced by the application.
/// </summary>
public double MovingAverageRatio
{
get
{
return this.estimatorSettings.MovingAverageRatio;
}
set
{
this.estimatorSettings.MovingAverageRatio = value;
}
}
/// <summary>
/// Gets sampling telemetry processor.
/// </summary>
internal SamplingTelemetryProcessor SamplingTelemetryProcessor => this.samplingProcessor;
/// <summary>
/// Gets sampling percentage estimator telemetry processor.
/// </summary>
internal SamplingPercentageEstimatorTelemetryProcessor SamplingPercentageEstimatorTelemetryProcessor => this.estimatorProcessor;
/// <summary>
/// Processes telemetry item.
/// </summary>
/// <param name="item">Telemetry item to process.</param>
public void Process(ITelemetry item)
{
this.samplingProcessor.Process(item);
}
/// <summary>
/// Disposes the object.
/// </summary>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Disposes the object.
/// </summary>
/// <param name="disposing">True if disposing.</param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
IDisposable estimatorProc = this.estimatorProcessor;
if (estimatorProc != null)
{
estimatorProc.Dispose();
this.estimatorProcessor = null;
}
}
}
private void SamplingPercentageChanged(
double afterSamplingTelemetryItemRatePerSecond,
double currentSamplingPercentage,
double newSamplingPercentage,
bool isSamplingPercentageChanged,
Channel.Implementation.SamplingPercentageEstimatorSettings settings)
{
if (isSamplingPercentageChanged)
{
this.samplingProcessor.SamplingPercentage = newSamplingPercentage;
this.samplingProcessor.ProactiveSamplingPercentage = 100 / this.estimatorProcessor.CurrentProactiveSamplingRate;
TelemetryChannelEventSource.Log.SamplingChanged(newSamplingPercentage);
}
this.evaluationCallback?.Invoke(
afterSamplingTelemetryItemRatePerSecond,
currentSamplingPercentage,
newSamplingPercentage,
isSamplingPercentageChanged,
settings);
}
}
}