diff --git a/lib/src/media/stream/client.dart b/lib/src/media/stream/client.dart index 4760855e266..7112f4502e2 100644 --- a/lib/src/media/stream/client.dart +++ b/lib/src/media/stream/client.dart @@ -75,7 +75,7 @@ class StreamManager { if (_clients.containsKey(sanitizedName)) { return _clients[sanitizedName]!; } - final client = StreamClient(name, _remove); + final client = StreamClient(name, _add, _removeStream); _clients[sanitizedName] = client; if (_streams.containsKey(sanitizedName)) { @@ -111,10 +111,20 @@ class StreamManager { void _removeClient(String name) { final sanitizedName = _getValidSDPTrackName(name); if (_clients.containsKey(sanitizedName)) { - _clients.remove(sanitizedName)!; + final client = _clients.remove(sanitizedName)!; + client._shutdown(); _logger.d('Removed StreamClient named $name'); } } + + Future closeAll() async { + final futures = []; + final keys = _streams.keys.toSet()..addAll(_clients.keys); + for (final name in keys) { + futures.add(_remove(name)); + } + await Future.wait(futures); + } } /// A client to manage a camera's WebRTC stream. @@ -122,36 +132,51 @@ class StreamManager { /// Use the [getStream] method to obtain a stream of [MediaStream] that can be used to display WebRTC video. class StreamClient { final String name; + final Future Function(String name) _open; final Future Function(String name) _close; MediaStream? _stream; + late StreamSubscription _internalListener; // ignore: close_sinks final StreamController _internalStreamController = StreamController.broadcast(); // ignore: close_sinks - final StreamController _streamController = StreamController.broadcast(); + final StreamController _streamController = StreamController.broadcast(); - StreamClient(this.name, this._close) { - _internalStreamController.stream.listen((event) { + StreamClient(this.name, this._open, this._close) { + _internalListener = _internalStreamController.stream.listen((event) { _stream = event; _streamController.add(event); }); } /// Return a stream of [MediaStream], which can be used to display WebRTC video. - Stream getStream() { - if (_stream != null) { - Future.delayed(const Duration(milliseconds: 100), () { - _streamController.add(_stream!); - }); + Stream getStream() { + if (_internalListener.isPaused) { + _internalListener.resume(); } + Future.microtask(() { + if (_stream != null) { + _streamController.add(_stream!); + } else { + _open(name); + } + }); return _streamController.stream; } - /// Close the stream connection and release resources. - Future closeStream() async { + Future _shutdown() async { await _streamController.close(); await _internalStreamController.close(); - await _close(name); + } + + /// Close the stream connection and release resources. + Future closeStream() async { + if (!_streamController.hasListener) { + _internalListener.pause(); + await _close(name); + _stream = null; + _streamController.add(null); + } } } diff --git a/lib/src/robot/client.dart b/lib/src/robot/client.dart index a529aaedbec..a3f28dd4bd6 100644 --- a/lib/src/robot/client.dart +++ b/lib/src/robot/client.dart @@ -205,6 +205,7 @@ class RobotClient { try { _checkConnectionTask?.cancel(); _shouldAttemptReconnection = false; + await _streamManager.closeAll(); _sessionsClient.stop(); await _channel.shutdown(); } catch (e) { diff --git a/lib/widgets/camera_stream.dart b/lib/widgets/camera_stream.dart index 945e7db7a26..d61d04d78be 100644 --- a/lib/widgets/camera_stream.dart +++ b/lib/widgets/camera_stream.dart @@ -21,23 +21,23 @@ class ViamCameraStreamView extends StatefulWidget { class _ViamCameraStreamViewState extends State { late RTCVideoRenderer _renderer; - late StreamSubscription _streamSub; + late StreamSubscription _streamSub; Exception? _error; int _width = 160; int _height = 90; @override void initState() { - _startStream(); super.initState(); + _startStream(); } @override - void deactivate() { - super.deactivate(); + void dispose() { _renderer.dispose(); - widget.streamClient.closeStream(); _streamSub.cancel(); + widget.streamClient.closeStream(); + super.dispose(); } Future _startStream() async {