-
-
Notifications
You must be signed in to change notification settings - Fork 239
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add
maxQueueSize
to limit the number of unawaited events sent to Se…
…ntry (#1868) * introduce task queue * handle trow in task * handle throwing tasks * Add documentation * add changelog entry --------- Co-authored-by: Giancarlo Buenaflor <giancarlo_buenaflor@yahoo.com>
- Loading branch information
Showing
5 changed files
with
171 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import 'dart:async'; | ||
|
||
import '../../sentry.dart'; | ||
|
||
typedef Task<T> = Future<T> Function(); | ||
|
||
class TaskQueue<T> { | ||
TaskQueue(this._maxQueueSize, this._logger); | ||
|
||
final int _maxQueueSize; | ||
final SentryLogger _logger; | ||
|
||
int _queueCount = 0; | ||
|
||
Future<T> enqueue(Task<T> task, T fallbackResult) async { | ||
if (_queueCount >= _maxQueueSize) { | ||
_logger(SentryLevel.warning, | ||
'Task dropped due to backpressure. Avoid capturing in a tight loop.'); | ||
return fallbackResult; | ||
} else { | ||
_queueCount++; | ||
try { | ||
return await task(); | ||
} finally { | ||
_queueCount--; | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import 'dart:async'; | ||
|
||
import 'package:sentry/sentry.dart'; | ||
import 'package:sentry/src/transport/task_queue.dart'; | ||
import 'package:test/test.dart'; | ||
|
||
import '../mocks.dart'; | ||
|
||
void main() { | ||
group("called sync", () { | ||
late Fixture fixture; | ||
|
||
setUp(() { | ||
fixture = Fixture(); | ||
}); | ||
|
||
test("enqueue only executed `maxQueueSize` times when not awaiting", | ||
() async { | ||
final sut = fixture.getSut(maxQueueSize: 5); | ||
|
||
var completedTasks = 0; | ||
|
||
for (int i = 0; i < 10; i++) { | ||
unawaited(sut.enqueue(() async { | ||
print('Task $i'); | ||
await Future.delayed(Duration(milliseconds: 1)); | ||
completedTasks += 1; | ||
return 1 + 1; | ||
}, -1)); | ||
} | ||
|
||
// This will always await the other futures, even if they are running longer, as it was scheduled after them. | ||
print('Started waiting for first 5 tasks'); | ||
await Future.delayed(Duration(milliseconds: 1)); | ||
print('Stopped waiting for first 5 tasks'); | ||
|
||
expect(completedTasks, 5); | ||
}); | ||
|
||
test("enqueue picks up tasks again after await in-between", () async { | ||
final sut = fixture.getSut(maxQueueSize: 5); | ||
|
||
var completedTasks = 0; | ||
|
||
for (int i = 1; i <= 10; i++) { | ||
unawaited(sut.enqueue(() async { | ||
print('Started task $i'); | ||
await Future.delayed(Duration(milliseconds: 1)); | ||
print('Completed task $i'); | ||
completedTasks += 1; | ||
return 1 + 1; | ||
}, -1)); | ||
} | ||
|
||
print('Started waiting for first 5 tasks'); | ||
await Future.delayed(Duration(milliseconds: 1)); | ||
print('Stopped waiting for first 5 tasks'); | ||
|
||
for (int i = 6; i <= 15; i++) { | ||
unawaited(sut.enqueue(() async { | ||
print('Started task $i'); | ||
await Future.delayed(Duration(milliseconds: 1)); | ||
print('Completed task $i'); | ||
completedTasks += 1; | ||
return 1 + 1; | ||
}, -1)); | ||
} | ||
|
||
print('Started waiting for second 5 tasks'); | ||
await Future.delayed(Duration(milliseconds: 5)); | ||
print('Stopped waiting for second 5 tasks'); | ||
|
||
expect(completedTasks, 10); // 10 were dropped | ||
}); | ||
|
||
test("enqueue executes all tasks when awaiting", () async { | ||
final sut = fixture.getSut(maxQueueSize: 5); | ||
|
||
var completedTasks = 0; | ||
|
||
for (int i = 0; i < 10; i++) { | ||
await sut.enqueue(() async { | ||
print('Task $i'); | ||
await Future.delayed(Duration(milliseconds: 1)); | ||
completedTasks += 1; | ||
return 1 + 1; | ||
}, -1); | ||
} | ||
expect(completedTasks, 10); | ||
}); | ||
|
||
test("throwing tasks still execute as expected", () async { | ||
final sut = fixture.getSut(maxQueueSize: 5); | ||
|
||
var completedTasks = 0; | ||
|
||
for (int i = 0; i < 10; i++) { | ||
try { | ||
await sut.enqueue(() async { | ||
completedTasks += 1; | ||
throw Error(); | ||
}, -1); | ||
} catch (_) { | ||
// Ignore | ||
} | ||
} | ||
expect(completedTasks, 10); | ||
}); | ||
}); | ||
} | ||
|
||
class Fixture { | ||
final options = SentryOptions(dsn: fakeDsn); | ||
|
||
TaskQueue<int> getSut({required int maxQueueSize}) { | ||
return TaskQueue(maxQueueSize, options.logger); | ||
} | ||
} |