diff --git a/lib/widgets/mjpeg.dart b/lib/widgets/mjpeg.dart index 17b45a76..d2d9d84d 100644 --- a/lib/widgets/mjpeg.dart +++ b/lib/widgets/mjpeg.dart @@ -38,46 +38,25 @@ class MjpegPreprocessor { /// An Mjpeg. class Mjpeg extends HookWidget { - final String stream; + final streamKey = UniqueKey(); + final MjpegStreamState mjpegStream; final BoxFit? fit; final double? width; final double? height; - final bool isLive; - final Duration timeout; final WidgetBuilder? loading; - final Client? httpClient; final Widget Function(BuildContext contet, dynamic error, dynamic stack)? error; - final Map headers; - final MjpegPreprocessor? preprocessor; - - MemoryImage? previousImage; - - late _StreamManager _manager; Mjpeg({ - this.httpClient, - this.isLive = false, + required this.mjpegStream, this.width, - this.timeout = const Duration(seconds: 5), this.height, this.fit, - required this.stream, this.error, this.loading, - this.headers = const {}, - this.preprocessor, super.key, }); - Future cancelSubscription() async { - await _manager.cancelSubscription(); - } - - Future dispose() async { - await _manager.dispose(); - } - @override Widget build(BuildContext context) { final image = useState(null); @@ -87,31 +66,27 @@ class Mjpeg extends HookWidget { isMounted() => context.mounted; final manager = useMemoized( - () => _manager = _StreamManager( - stream, - isLive && visible.visible, - headers, - timeout, - httpClient ?? Client(), - preprocessor ?? MjpegPreprocessor(), - isMounted, - () => visible.visible, + () => _StreamManager( + mjpegStream: mjpegStream, + mounted: isMounted, + visible: () => visible.visible, ), [ - stream, - isLive, visible.visible, - timeout, - httpClient, - preprocessor, - isMounted() + isMounted(), ]); + final key = useMemoized(() => UniqueKey(), [manager]); useEffect(() { errorState.value = null; - manager.updateStream(image, errorState); - return manager.dispose; + manager.updateStream(streamKey, image, errorState); + return () { + if (visible.visible && isMounted()) { + return; + } + mjpegStream.cancelSubscription(streamKey); + }; }, [manager]); if (errorState.value != null && kDebugMode) { @@ -133,7 +108,7 @@ class Mjpeg extends HookWidget { ); } - if ((image.value == null && previousImage == null) || + if ((image.value == null && mjpegStream.previousImage == null) || errorState.value != null) { return SizedBox( width: width, @@ -143,15 +118,10 @@ class Mjpeg extends HookWidget { : loading!(context)); } - if (image.value != null) { - previousImage?.evict(); - previousImage = image.value!; - } - return VisibilityDetector( key: key, child: Image( - image: image.value ?? previousImage!, + image: image.value ?? mjpegStream.previousImage!, width: width, height: height, gaplessPlayback: true, @@ -166,136 +136,226 @@ class Mjpeg extends HookWidget { } } -class _StreamManager { +class MjpegStreamState { static const _trigger = 0xFF; static const _soi = 0xD8; static const _eoi = 0xD9; final String stream; final bool isLive; - final Duration _timeout; + final Duration timeout; final Map headers; - final Client _httpClient; - final MjpegPreprocessor _preprocessor; - final bool Function() _mounted; - final bool Function() _visible; - StreamSubscription? _subscription; - - _StreamManager(this.stream, this.isLive, this.headers, this._timeout, - this._httpClient, this._preprocessor, this._mounted, this._visible); - - Future cancelSubscription() async { - if (_subscription != null) { - await _subscription!.cancel(); - _subscription = null; + Client httpClient = Client(); + Stream>? byteStream; + + final MjpegPreprocessor? preprocessor; + + MemoryImage? previousImage; + + final Map _subscriptions = {}; + + StreamSubscription? _bitSubscription; + int bitCount = 0; + double bandwidth = 0.0; + + late final Timer bandwidthTimer; + + MjpegStreamState({ + required this.stream, + this.isLive = true, + this.timeout = const Duration(seconds: 5), + this.headers = const {}, + this.preprocessor, + }) { + bandwidthTimer = Timer.periodic(const Duration(seconds: 1), (timer) { + bandwidth = bitCount / 1e6; + + bitCount = 0; + }); + } + + void dispose() { + for (StreamSubscription subscription in _subscriptions.values) { + subscription.cancel(); } + _subscriptions.clear(); + _bitSubscription?.cancel(); + _bitSubscription = null; + byteStream = null; + httpClient.close(); + httpClient = Client(); + bitCount = 0; } - Future dispose() async { - try { - _httpClient.close(); - } finally { - await _subscription?.cancel(); - _subscription = null; + void cancelSubscription(Key key) { + if (_subscriptions.containsKey(key)) { + _subscriptions.remove(key)!.cancel(); + + if (_subscriptions.isEmpty) { + dispose(); + } } } - void _sendImage(ValueNotifier image, - ValueNotifier errorState, List chunks) async { + void sendImage( + ValueNotifier image, + ValueNotifier errorState, + List chunks, { + required bool Function() mounted, + }) async { // pass image through preprocessor sending to [Image] for rendering - final List? imageData = _preprocessor.process(chunks); + final List? imageData; + + if (preprocessor != null) { + imageData = preprocessor?.process(chunks); + } else { + imageData = chunks; + } + if (imageData == null) return; final imageMemory = MemoryImage(Uint8List.fromList(imageData)); - if (_mounted()) { + previousImage?.evict(); + previousImage = imageMemory; + if (mounted()) { errorState.value = null; image.value = imageMemory; } } - void updateStream(ValueNotifier image, - ValueNotifier?> errorState) async { - if (!_visible() || !_mounted()) { - await dispose(); - return; + void _onDataReceived({ + required List carry, + required List chunk, + required ValueNotifier image, + required ValueNotifier?> errorState, + required bool Function() mounted, + }) async { + if (carry.isNotEmpty && carry.last == _trigger) { + if (chunk.first == _eoi) { + carry.add(chunk.first); + sendImage(image, errorState, carry, mounted: mounted); + carry = []; + if (!isLive) { + dispose(); + } + } } - try { - final request = Request('GET', Uri.parse(stream)); - request.headers.addAll(headers); - final response = await _httpClient.send(request).timeout( - _timeout); //timeout is to prevent process to hang forever in some case - - if (response.statusCode >= 200 && response.statusCode < 300) { - var carry = []; - _subscription = response.stream.listen((chunk) async { - if (!_visible() || !_mounted()) { - carry = []; - return; - } - if (carry.isNotEmpty && carry.last == _trigger) { - if (chunk.first == _eoi) { - carry.add(chunk.first); - _sendImage(image, errorState, carry); - carry = []; - if (!isLive) { - await dispose(); - } - } - } - for (var i = 0; i < chunk.length - 1; i++) { - final d = chunk[i]; - final d1 = chunk[i + 1]; - - if (d == _trigger && d1 == _soi) { - carry = []; - carry.add(d); - } else if (d == _trigger && d1 == _eoi && carry.isNotEmpty) { - carry.add(d); - carry.add(d1); - - _sendImage(image, errorState, carry); - carry = []; - if (!isLive) { - await dispose(); - } - } else if (carry.isNotEmpty) { - carry.add(d); - if (i == chunk.length - 2) { - carry.add(d1); - } - } - } - }, onError: (error, stack) { - try { - if (_mounted()) { - errorState.value = [error, stack]; - image.value = null; - } - } finally { - dispose(); - } - }, cancelOnError: true); - } else { - if (_mounted()) { - errorState.value = [ - HttpException('Stream returned ${response.statusCode} status'), - StackTrace.current - ]; - image.value = null; + for (var i = 0; i < chunk.length - 1; i++) { + final d = chunk[i]; + final d1 = chunk[i + 1]; + + if (d == _trigger && d1 == _soi) { + carry = []; + carry.add(d); + } else if (d == _trigger && d1 == _eoi && carry.isNotEmpty) { + carry.add(d); + carry.add(d1); + + sendImage(image, errorState, carry, mounted: mounted); + carry = []; + if (!isLive) { + dispose(); + } + } else if (carry.isNotEmpty) { + carry.add(d); + if (i == chunk.length - 2) { + carry.add(d1); } - dispose(); } - } catch (error, stack) { - // we ignore those errors in case play/pause is triggers - if (!error - .toString() - .contains('Connection closed before full header was received')) { - if (_mounted()) { - errorState.value = [error, stack]; - image.value = null; + } + } + + void updateStream( + Key key, + ValueNotifier image, + ValueNotifier?> errorState, { + required bool Function() visible, + required bool Function() mounted, + }) async { + if (byteStream == null && visible() && mounted()) { + try { + final request = Request('GET', Uri.parse(stream)); + request.headers.addAll(headers); + final response = await httpClient.send(request).timeout( + timeout); //timeout is to prevent process to hang forever in some case + + if (response.statusCode >= 200 && response.statusCode < 300) { + byteStream = response.stream.asBroadcastStream(); + + _bitSubscription = byteStream!.listen((data) { + bitCount += data.length * Uint8List.bytesPerElement * 8; + }); + } else { + if (mounted()) { + errorState.value = [ + HttpException('Stream returned ${response.statusCode} status'), + StackTrace.current + ]; + image.value = null; + } + dispose(); + } + } catch (error, stack) { + // we ignore those errors in case play/pause is triggers + if (!error + .toString() + .contains('Connection closed before full header was received')) { + if (mounted()) { + errorState.value = [error, stack]; + image.value = null; + } } } } + + if (byteStream == null) { + return; + } + + var carry = []; + _subscriptions.putIfAbsent( + key, + () => byteStream!.listen((chunk) { + if (!visible() || !mounted()) { + carry.clear(); + return; + } + _onDataReceived( + carry: carry, + chunk: chunk, + image: image, + errorState: errorState, + mounted: mounted, + ); + }, onError: (error, stack) { + try { + if (mounted()) { + errorState.value = [error, stack]; + image.value = null; + } + } finally { + dispose(); + } + }, cancelOnError: true)); + } +} + +class _StreamManager { + final MjpegStreamState mjpegStream; + + final bool Function() mounted; + final bool Function() visible; + + _StreamManager({ + required this.mjpegStream, + required this.mounted, + required this.visible, + }); + + void updateStream(Key key, ValueNotifier image, + ValueNotifier?> errorState) async { + mjpegStream.updateStream(key, image, errorState, + visible: visible, mounted: mounted); } } diff --git a/lib/widgets/nt_widgets/multi-topic/camera_stream.dart b/lib/widgets/nt_widgets/multi-topic/camera_stream.dart index 19ca55b0..5776f17c 100644 --- a/lib/widgets/nt_widgets/multi-topic/camera_stream.dart +++ b/lib/widgets/nt_widgets/multi-topic/camera_stream.dart @@ -14,35 +14,19 @@ class CameraStreamModel extends NTWidgetModel { String get streamsTopic => '$topic/streams'; - Mjpeg? _streamWidget; MemoryImage? _lastDisplayedImage; - get streamWidget => _streamWidget; - - set streamWidget(value) => _streamWidget = value; + MjpegStreamState? mjpegStream; get lastDisplayedImage => _lastDisplayedImage; set lastDisplayedImage(value) => _lastDisplayedImage = value; - bool _clientOpen = false; - - get clientOpen => _clientOpen; - - set clientOpen(value) => _clientOpen = value; - CameraStreamModel({required super.topic, super.dataType, super.period}) : super(); CameraStreamModel.fromJson({required super.jsonData}) : super.fromJson(); - @override - void init() { - super.init(); - - _clientOpen = true; - } - @override void resetSubscription() { closeClient(); @@ -52,27 +36,20 @@ class CameraStreamModel extends NTWidgetModel { @override void disposeWidget({bool deleting = false}) { - Future(() async { - if (deleting) { - await _streamWidget?.dispose(); - } - _clientOpen = false; - - if (deleting) { - _lastDisplayedImage?.evict(); - _streamWidget?.previousImage?.evict(); - } - }); + if (deleting) { + _lastDisplayedImage?.evict(); + mjpegStream?.previousImage?.evict(); + mjpegStream?.dispose(); + } super.disposeWidget(deleting: deleting); } void closeClient() { _lastDisplayedImage?.evict(); - _lastDisplayedImage = _streamWidget?.previousImage; - _streamWidget?.dispose(); - _streamWidget = null; - _clientOpen = false; + _lastDisplayedImage = mjpegStream?.previousImage; + mjpegStream?.dispose(); + mjpegStream = null; } @override @@ -83,7 +60,6 @@ class CameraStreamModel extends NTWidgetModel { return [ ...streams, - _clientOpen, ntConnection.isNT4Connected, ]; } @@ -101,13 +77,6 @@ class CameraStreamWidget extends NTWidget { return StreamBuilder( stream: model.multiTopicPeriodicStream, builder: (context, snapshot) { - if (!ntConnection.isNT4Connected && model._clientOpen) { - model.closeClient(); - } - - bool createNewWidget = model._streamWidget == null || - (!model.clientOpen && ntConnection.isNT4Connected); - List rawStreams = tryCast(ntConnection.getLastAnnouncedValue(model.streamsTopic)) ?? []; @@ -127,11 +96,12 @@ class CameraStreamWidget extends NTWidget { return Stack( fit: StackFit.expand, children: [ - if (model.lastDisplayedImage != null) + if (model.mjpegStream != null || model.lastDisplayedImage != null) Opacity( opacity: 0.35, child: Image( - image: model.lastDisplayedImage!, + image: model.mjpegStream?.previousImage ?? + model.lastDisplayedImage!, fit: BoxFit.contain, ), ), @@ -153,23 +123,26 @@ class CameraStreamWidget extends NTWidget { ); } + bool createNewWidget = model.mjpegStream == null; + + createNewWidget = + createNewWidget || (model.mjpegStream?.stream != streams.last); + if (createNewWidget) { - model.clientOpen = true; model.lastDisplayedImage?.evict(); + model.mjpegStream?.dispose(); String stream = streams.last; - - model.streamWidget = Mjpeg( - fit: BoxFit.contain, - isLive: true, - stream: stream, - ); + model.mjpegStream = MjpegStreamState(stream: stream); } return Stack( fit: StackFit.expand, children: [ - model.streamWidget!, + Mjpeg( + mjpegStream: model.mjpegStream!, + fit: BoxFit.contain, + ), ], ); },