Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close camera streams on robot close and when there are no more listeners #122

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions lib/src/media/stream/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class StreamManager {
if (_clients.containsKey(sanitizedName)) {
return _clients[sanitizedName]!;
}
final client = StreamClient(name, _remove);
final client = StreamClient(name, _add, _removeStream);
_clients[sanitizedName] = client;

if (_streams.containsKey(sanitizedName)) {
Expand Down Expand Up @@ -111,47 +111,70 @@ class StreamManager {
void _removeClient(String name) {
final sanitizedName = _getValidSDPTrackName(name);
if (_clients.containsKey(sanitizedName)) {
_clients.remove(sanitizedName)!;
final client = _clients.remove(sanitizedName)!;
client._shutdown();
_logger.d('Removed StreamClient named $name');
}
}

Future<void> closeAll() async {
final futures = <Future>[];
final keys = _streams.keys.toSet()..addAll(_clients.keys);
for (var name in keys) {
clintpurser marked this conversation as resolved.
Show resolved Hide resolved
futures.add(_remove(name));
}
await Future.wait(futures);
}
}
clintpurser marked this conversation as resolved.
Show resolved Hide resolved

/// A client to manage a camera's WebRTC stream.
///
/// Use the [getStream] method to obtain a stream of [MediaStream] that can be used to display WebRTC video.
class StreamClient {
final String name;
final Future<void> Function(String name) _open;
final Future<void> Function(String name) _close;
MediaStream? _stream;
late StreamSubscription<MediaStream> _internalListener;

// ignore: close_sinks
final StreamController<MediaStream> _internalStreamController = StreamController<MediaStream>.broadcast();

// ignore: close_sinks
final StreamController<MediaStream> _streamController = StreamController<MediaStream>.broadcast();
final StreamController<MediaStream?> _streamController = StreamController<MediaStream?>.broadcast();

StreamClient(this.name, this._close) {
_internalStreamController.stream.listen((event) {
StreamClient(this.name, this._open, this._close) {
_internalListener = _internalStreamController.stream.listen((event) {
_stream = event;
_streamController.add(event);
});
}

/// Return a stream of [MediaStream], which can be used to display WebRTC video.
Stream<MediaStream> getStream() {
Stream<MediaStream?> getStream() {
_internalListener.resume();
if (_stream != null) {
Future.delayed(const Duration(milliseconds: 100), () {
clintpurser marked this conversation as resolved.
Show resolved Hide resolved
_streamController.add(_stream!);
});
} else {
_open(name);
}
return _streamController.stream;
}

/// Close the stream connection and release resources.
Future<void> closeStream() async {
Future<void> _shutdown() async {
await _streamController.close();
await _internalStreamController.close();
await _close(name);
}

/// Close the stream connection and release resources.
Future<void> closeStream() async {
if (!_streamController.hasListener) {
_internalListener.pause();
await _close(name);
_stream = null;
_streamController.add(null);
}
}
}
1 change: 1 addition & 0 deletions lib/src/robot/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class RobotClient {
try {
_checkConnectionTask?.cancel();
_shouldAttemptReconnection = false;
await _streamManager.closeAll();
_sessionsClient.stop();
await _channel.shutdown();
} catch (e) {
Expand Down
6 changes: 3 additions & 3 deletions lib/widgets/camera_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ViamCameraStreamView extends StatefulWidget {

class _ViamCameraStreamViewState extends State<ViamCameraStreamView> {
late RTCVideoRenderer _renderer;
late StreamSubscription<MediaStream> _streamSub;
late StreamSubscription<MediaStream?> _streamSub;
Exception? _error;
int _width = 160;
int _height = 90;
Expand All @@ -34,10 +34,10 @@ class _ViamCameraStreamViewState extends State<ViamCameraStreamView> {

@override
void deactivate() {
super.deactivate();
_renderer.dispose();
widget.streamClient.closeStream();
_streamSub.cancel();
widget.streamClient.closeStream();
super.deactivate();
}

Future<void> _startStream() async {
Expand Down