diff --git a/lib/src/impl/realtime/channels.dart b/lib/src/impl/realtime/channels.dart index 835ebc954..22d9e420d 100644 --- a/lib/src/impl/realtime/channels.dart +++ b/lib/src/impl/realtime/channels.dart @@ -113,21 +113,9 @@ class RealtimePlatformChannel extends PlatformObject implements spec.RealtimeCha @override Stream on([ChannelEvent channelEvent]) { - Stream stream = listen(PlatformMethod.onRealtimeChannelStateChanged, _payload).transform( - StreamTransformer.fromHandlers( - handleData: (dynamic value, EventSink sink){ - ChannelStateChange stateChange = value as ChannelStateChange; - if (channelEvent!=null) { - if (stateChange.event==channelEvent) { - sink.add(stateChange); - } - } else { - sink.add(stateChange); - } - } - ) - ); - return stream; + return listen(PlatformMethod.onRealtimeChannelStateChanged, _payload) + .map((stateChange) => stateChange as ChannelStateChange) + .where((stateChange) => channelEvent==null || stateChange.event==channelEvent); } @override @@ -135,25 +123,12 @@ class RealtimePlatformChannel extends PlatformObject implements spec.RealtimeCha String name, List names }) { - Stream stream = listen(PlatformMethod.onRealtimeChannelMessage, _payload).transform( - StreamTransformer.fromHandlers( - handleData: (dynamic value, EventSink sink){ - spec.Message message = value as spec.Message; - if (names!=null){ - if(names.contains(message.name)){ - sink.add(message); - } - } else if (name!=null) { - if(message.name==name){ - sink.add(message); - } - } else { - sink.add(message); - } - } - ) - ); - return stream; + final subscribedNames = {name, ...?names}.where((n) => n != null).toList(); + return listen(PlatformMethod.onRealtimeChannelMessage, _payload) + .map((message) => message as spec.Message) + .where((message) => + subscribedNames.isEmpty || + subscribedNames.any((n) => n == message.name)); } } diff --git a/lib/src/impl/realtime/connection.dart b/lib/src/impl/realtime/connection.dart index c82d4bab2..14ef5ed59 100644 --- a/lib/src/impl/realtime/connection.dart +++ b/lib/src/impl/realtime/connection.dart @@ -42,22 +42,10 @@ class ConnectionPlatformObject extends PlatformObject implements Connection { @override Stream on([ConnectionEvent connectionEvent]) { - Stream stream = listen(PlatformMethod.onRealtimeConnectionStateChanged).transform( - StreamTransformer.fromHandlers( - handleData: (dynamic value, EventSink sink){ - ConnectionStateChange stateChange = value as ConnectionStateChange; - if (connectionEvent!=null) { - if (stateChange.event==connectionEvent) { - sink.add(stateChange); - } - } else { - sink.add(stateChange); - } - - } - ) - ); - return stream; + return listen(PlatformMethod.onRealtimeConnectionStateChanged) + .map((connectionEvent) => connectionEvent as ConnectionStateChange) + .where((connectionStateChange) => + connectionEvent==null || connectionStateChange.event==connectionEvent); } @override