From 0b879c424291f70b8212a888ef3f712209521c04 Mon Sep 17 00:00:00 2001 From: Gary Roumanis Date: Tue, 22 Nov 2022 11:59:08 -0800 Subject: [PATCH] Report and shutdown after file watch errors (#3411) * Report and shutdown after file watch errors * change error codes --- build_daemon/CHANGELOG.md | 5 + build_daemon/lib/constants.dart | 6 + build_daemon/lib/daemon.dart | 15 +-- .../lib/src/fakes/fake_change_provider.dart | 5 +- build_daemon/lib/src/server.dart | 32 +++--- build_daemon/pubspec.yaml | 2 +- build_daemon/test/daemon_test.dart | 107 +++++++++++------- 7 files changed, 102 insertions(+), 70 deletions(-) diff --git a/build_daemon/CHANGELOG.md b/build_daemon/CHANGELOG.md index 3c00f12d4..7b71ca9c6 100644 --- a/build_daemon/CHANGELOG.md +++ b/build_daemon/CHANGELOG.md @@ -1,3 +1,8 @@ +## 3.1.1-dev + +- Report file watching errors and stop the daemon. + + ## 3.1.0 - Add `BuildResults.changedAssets` containing asset URIs changed during a diff --git a/build_daemon/lib/constants.dart b/build_daemon/lib/constants.dart index 94990782a..0e8e7e9fd 100644 --- a/build_daemon/lib/constants.dart +++ b/build_daemon/lib/constants.dart @@ -12,6 +12,12 @@ const optionsSkew = 'DIFFERENT OPTIONS'; const buildModeFlag = 'build-mode'; +/// Daemon shuts down after this timeout if there is no active connection. +const defaultIdleTimeout = Duration(seconds: 30); + +const fileChangeEventErrorCode = 101; +const fileChangeStreamClosedErrorCode = 102; + enum BuildMode { // ignore: constant_identifier_names Manual, diff --git a/build_daemon/lib/daemon.dart b/build_daemon/lib/daemon.dart index a04ee3d29..b188d0e09 100644 --- a/build_daemon/lib/daemon.dart +++ b/build_daemon/lib/daemon.dart @@ -25,7 +25,7 @@ import 'src/server.dart'; class Daemon { final String _workingDirectory; final RandomAccessFile? _lock; - final _doneCompleter = Completer(); + final _doneCompleter = Completer(); Server? _server; StreamSubscription? _sub; @@ -34,7 +34,8 @@ class Daemon { : _workingDirectory = workingDirectory, _lock = _tryGetLock(workingDirectory); - Future get onDone => _doneCompleter.future; + /// Returns exit code. + Future get onDone => _doneCompleter.future; Future stop({String message = '', int failureType = 0}) => _server!.stop(message: message, failureType: failureType); @@ -65,7 +66,7 @@ class Daemon { ChangeProvider changeProvider, { Serializers? serializersOverride, bool Function(BuildTarget, Iterable)? shouldBuild, - Duration timeout = const Duration(seconds: 30), + Duration timeout = defaultIdleTimeout, }) async { if (_server != null || _lock == null) return; _handleGracefulExit(); @@ -83,12 +84,12 @@ class Daemon { var port = await server.listen(); _createPortFile(port); - unawaited(server.onDone.then((_) async { - await _cleanUp(); + unawaited(server.onDone.then((exitCode) async { + await _cleanUp(exitCode); })); } - Future _cleanUp() async { + Future _cleanUp(int exitCode) async { await _server?.stop(); await _sub?.cancel(); // We need to close the lock prior to deleting the file. @@ -97,7 +98,7 @@ class Daemon { if (workspace.existsSync()) { workspace.deleteSync(recursive: true); } - if (!_doneCompleter.isCompleted) _doneCompleter.complete(); + if (!_doneCompleter.isCompleted) _doneCompleter.complete(exitCode); } void _createPortFile(int port) => diff --git a/build_daemon/lib/src/fakes/fake_change_provider.dart b/build_daemon/lib/src/fakes/fake_change_provider.dart index a9b0059fa..d95f29491 100644 --- a/build_daemon/lib/src/fakes/fake_change_provider.dart +++ b/build_daemon/lib/src/fakes/fake_change_provider.dart @@ -1,16 +1,15 @@ // Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. - import 'dart:async'; import 'package:build_daemon/change_provider.dart'; import 'package:watcher/watcher.dart' show WatchEvent; class FakeChangeProvider implements ChangeProvider { + final changeStreamController = StreamController>(); @override - Stream> get changes => Stream.empty(); - + Stream> get changes => changeStreamController.stream; @override Future> collectChanges() async => []; } diff --git a/build_daemon/lib/src/server.dart b/build_daemon/lib/src/server.dart index 12c27d6ec..cc01b1ae6 100644 --- a/build_daemon/lib/src/server.dart +++ b/build_daemon/lib/src/server.dart @@ -1,7 +1,6 @@ // Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. - import 'dart:async'; import 'dart:convert'; import 'dart:io'; @@ -17,6 +16,7 @@ import 'package:watcher/watcher.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import '../change_provider.dart'; +import '../constants.dart'; import '../daemon_builder.dart'; import '../data/build_request.dart'; import '../data/build_target.dart'; @@ -31,24 +31,19 @@ import 'managers/build_target_manager.dart'; /// Note the server will only notify clients of pertinent events. class Server { static final loggerName = 'BuildDaemonServer'; - - final _isDoneCompleter = Completer(); + final _isDoneCompleter = Completer(); final BuildTargetManager _buildTargetManager; final _pool = Pool(1); final Serializers _serializers; final ChangeProvider _changeProvider; late final Timer _timeout; - HttpServer? _server; final DaemonBuilder _builder; // Channels that are interested in the current build. var _interestedChannels = {}; - final _subs = []; - final _outputStreamController = StreamController(); late final Stream _logs; - Server( this._builder, Duration timeout, @@ -61,9 +56,7 @@ class Server { BuildTargetManager(shouldBuildOverride: shouldBuild) { _logs = _outputStreamController.stream; _forwardData(); - _handleChanges(changeProvider.changes); - // Stop the server if nobody connects. _timeout = Timer(timeout, () async { if (_buildTargetManager.isEmpty) { @@ -72,7 +65,8 @@ class Server { }); } - Future get onDone => _isDoneCompleter.future; + /// Returns exit code. + Future get onDone => _isDoneCompleter.future; /// Starts listening for build daemon clients. Future listen() async { @@ -98,7 +92,6 @@ class Server { _removeChannel(channel); }); }); - var server = _server = await HttpMultiServer.loopback(0); // Serve requests in an error zone to prevent failures // when running from another error zone. @@ -124,7 +117,7 @@ class Server { await sub.cancel(); } await _outputStreamController.close(); - if (!_isDoneCompleter.isCompleted) _isDoneCompleter.complete(); + if (!_isDoneCompleter.isCompleted) _isDoneCompleter.complete(failureType); } Future _build( @@ -134,7 +127,6 @@ class Server { buildTargets.expand(_buildTargetManager.channels).toSet(); return _builder.build(buildTargets, changes); }); - void _forwardData() { _subs ..add(_builder.logs.listen((log) { @@ -147,14 +139,11 @@ class Server { // Don't serialize or send changed assets if the client isn't interested // in them. String? message, messageWithoutChangedAssets; - for (var channel in _interestedChannels) { var targets = _buildTargetManager.targetsFor(channel); var wantsChangedAssets = targets .any((e) => e is DefaultBuildTarget && e.reportChangedAssets); - String messageForChannel; - if (wantsChangedAssets) { messageForChannel = message ??= jsonEncode(_serializers.serialize(status)); @@ -163,7 +152,6 @@ class Server { _serializers .serialize(status.rebuild((b) => b.changedAssets = null))); } - channel.sink.add(messageForChannel); } })) @@ -183,7 +171,15 @@ class Server { var buildTargets = _buildTargetManager.targetsForChanges(changes); if (buildTargets.isEmpty) return; await _build(buildTargets, changes); - }).listen((_) {})); + }).listen((_) {}, onError: (e) { + stop( + message: 'Error in file change event: $e', + failureType: fileChangeEventErrorCode); + }, onDone: () { + stop( + message: 'File change stream closed', + failureType: fileChangeStreamClosedErrorCode); + })); } void _removeChannel(WebSocketChannel channel) async { diff --git a/build_daemon/pubspec.yaml b/build_daemon/pubspec.yaml index 4b4c2164d..52e4d69a5 100644 --- a/build_daemon/pubspec.yaml +++ b/build_daemon/pubspec.yaml @@ -1,5 +1,5 @@ name: build_daemon -version: 3.1.0 +version: 3.1.1-dev description: A daemon for running Dart builds. repository: https://github.com/dart-lang/build/tree/master/build_daemon diff --git a/build_daemon/test/daemon_test.dart b/build_daemon/test/daemon_test.dart index 866e3e3f2..e42214290 100644 --- a/build_daemon/test/daemon_test.dart +++ b/build_daemon/test/daemon_test.dart @@ -1,10 +1,10 @@ // Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. - @OnPlatform({ 'windows': Skip('Directories cant be deleted while processes are still open') }) +import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:isolate'; @@ -17,17 +17,16 @@ import 'package:test/test.dart'; import 'package:test_descriptor/test_descriptor.dart' as d; import 'package:uuid/uuid.dart'; +final defaultIdleTimeoutSec = defaultIdleTimeout.inSeconds; void main() { var testDaemons = []; var testWorkspaces = []; var uuid = Uuid(); - group('Daemon', () { setUp(() { testDaemons.clear(); testWorkspaces.clear(); }); - tearDown(() async { for (var testDaemon in testDaemons) { testDaemon.kill(ProcessSignal.sigkill); @@ -39,7 +38,6 @@ void main() { } } }); - test('can be stopped', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); @@ -52,32 +50,29 @@ void main() { expect(daemon.onDone, completes); await daemon.stop(); }); - test('can run if no other daemon is running', () async { var workspace = uuid.v1(); var daemon = await _runDaemon(workspace); testDaemons.add(daemon); expect(await _statusOf(daemon), 'RUNNING'); }); - test('shuts down if no client connects', () async { var workspace = uuid.v1(); var daemon = await _runDaemon(workspace, timeout: 1); testDaemons.add(daemon); expect(await daemon.exitCode, isNotNull); }); - test('can not run if another daemon is running in the same workspace', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); - var daemonOne = await _runDaemon(workspace); - expect(await _statusOf(daemonOne), 'RUNNING'); + var daemonOne = + await _runDaemon(workspace, timeout: defaultIdleTimeoutSec * 2); + expect(await _statusOf(daemonOne, logPrefix: 'one'), 'RUNNING'); var daemonTwo = await _runDaemon(workspace); testDaemons.addAll([daemonOne, daemonTwo]); - expect(await _statusOf(daemonTwo), 'ALREADY RUNNING'); - }); - + expect(await _statusOf(daemonTwo, logPrefix: 'two'), 'ALREADY RUNNING'); + }, timeout: Timeout.factor(2)); test('can run if another daemon is running in a different workspace', () async { var workspace1 = uuid.v1(); @@ -89,17 +84,15 @@ void main() { testDaemons.addAll([daemonOne, daemonTwo]); expect(await _statusOf(daemonTwo), 'RUNNING'); }, timeout: Timeout.factor(2)); - test('can start two daemons at the same time', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); var daemonOne = await _runDaemon(workspace); + expect(await _statusOf(daemonOne), 'RUNNING'); var daemonTwo = await _runDaemon(workspace); - expect([await _statusOf(daemonOne), await _statusOf(daemonTwo)], - containsAll(['RUNNING', 'ALREADY RUNNING'])); + expect(await _statusOf(daemonTwo), 'ALREADY RUNNING'); testDaemons.addAll([daemonOne, daemonTwo]); }, timeout: Timeout.factor(2)); - test('logs the version when running', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); @@ -108,13 +101,11 @@ void main() { expect(await _statusOf(daemon), 'RUNNING'); expect(await Daemon(workspace).runningVersion(), currentVersion); }); - test('does not set the current version if not running', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); expect(await Daemon(workspace).runningVersion(), null); }); - test('logs the options when running', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); @@ -124,13 +115,11 @@ void main() { expect( (await Daemon(workspace).currentOptions()).contains('foo'), isTrue); }); - test('does not log the options if not running', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); expect((await Daemon(workspace).currentOptions()).isEmpty, isTrue); }); - test('cleans up after itself', () async { var workspace = uuid.v1(); testWorkspaces.add(workspace); @@ -143,50 +132,88 @@ void main() { await daemon.exitCode; expect(Directory(daemonWorkspace(workspace)).existsSync(), isFalse); }); + test('daemon stops after file changes stream has error', () async { + var workspace = uuid.v1(); + testWorkspaces.add(workspace); + var daemon = + await _runDaemon(workspace, errorChangeProviderAfterNSeconds: 1); + expect(await _statusOf(daemon), 'RUNNING'); + await daemon.exitCode; + expect(Directory(daemonWorkspace(workspace)).existsSync(), isFalse); + }); + test('daemon stops after file changes stream is closed', () async { + var workspace = uuid.v1(); + testWorkspaces.add(workspace); + var daemon = + await _runDaemon(workspace, closeChangeProviderAfterNSeconds: 1); + expect(await _statusOf(daemon), 'RUNNING'); + await daemon.exitCode; + expect(Directory(daemonWorkspace(workspace)).existsSync(), isFalse); + }); }); } /// Returns the daemon status. -/// -/// If the status is null, the stderr ir returned. -Future _statusOf(Process daemon) async { - String? status; - try { - status = await daemon.stdout - .transform(utf8.decoder) - .transform(const LineSplitter()) - .firstWhere((line) => line == 'RUNNING' || line == 'ALREADY RUNNING'); - } catch (_) {} - status ??= (await daemon.stderr - .transform(utf8.decoder) - .transform(const LineSplitter()) - .toList()) - .join('\n'); - return status; +Future _statusOf(Process daemon, {String logPrefix = ''}) async { + final statusCompleter = Completer(); + daemon.stdout + .transform(utf8.decoder) + .transform(const LineSplitter()) + .listen((line) { + print('$logPrefix [STDOUT] $line'); + if (!statusCompleter.isCompleted && + (line == 'RUNNING' || line == 'ALREADY RUNNING')) { + statusCompleter.complete(line); + } + }); + daemon.stderr + .transform(utf8.decoder) + .transform(const LineSplitter()) + .listen((line) { + print('$logPrefix [STDERR] $line'); + }); + return statusCompleter.future; } -Future _runDaemon(var workspace, {int timeout = 30}) async { +Future _runDaemon(String workspace, + {int timeout = -1, + int errorChangeProviderAfterNSeconds = -1, + int closeChangeProviderAfterNSeconds = -1}) async { + timeout = timeout > 0 ? timeout : defaultIdleTimeoutSec; await d.file('test.dart', ''' + // @dart=2.9 import 'package:build_daemon/daemon.dart'; import 'package:build_daemon/daemon_builder.dart'; import 'package:build_daemon/client.dart'; import 'package:build_daemon/src/fakes/fake_builder.dart'; import 'package:build_daemon/src/fakes/fake_change_provider.dart'; - main() async { var options = ['foo'].toSet(); var timeout = Duration(seconds: $timeout); var daemon = Daemon('$workspace'); + var changeProvider = FakeChangeProvider(); if(daemon.hasLock) { await daemon.start( options, FakeDaemonBuilder(), - FakeChangeProvider(), + changeProvider, timeout: timeout); // Real implementations of the daemon usually have // non-trivial set up time. await Future.delayed(Duration(seconds: 1)); print('RUNNING'); + if ($errorChangeProviderAfterNSeconds > 0) { + Future.delayed(Duration(seconds: $errorChangeProviderAfterNSeconds), + () { + changeProvider.changeStreamController.addError('ERROR'); + }); + } + if ($closeChangeProviderAfterNSeconds > 0) { + Future.delayed(Duration(seconds: $closeChangeProviderAfterNSeconds), + () { + changeProvider.changeStreamController.close(); + }); + } }else{ // Mimic the behavior of actual daemon implementations. var version = await daemon.runningVersion(); @@ -195,7 +222,6 @@ Future _runDaemon(var workspace, {int timeout = 30}) async { } } ''').create(); - var args = [ ...Platform.executableArguments, if (!Platform.executableArguments @@ -205,6 +231,5 @@ Future _runDaemon(var workspace, {int timeout = 30}) async { ]; var process = await Process.start(Platform.resolvedExecutable, args, workingDirectory: d.sandbox); - return process; }