diff --git a/example/lib/main.dart b/example/lib/main.dart index dd5a1cf0a..5e3dd485c 100644 --- a/example/lib/main.dart +++ b/example/lib/main.dart @@ -329,9 +329,9 @@ class _MyAppState extends State { ); Widget createChannelSubscribeButton() => FlatButton( - onPressed: (_realtimeChannelState==ably.ChannelState.attached)?() { + onPressed: (_realtimeChannelState==ably.ChannelState.attached && channelMessageSubscription==null)?() { ably.RealtimeChannel channel = _realtime.channels.get("test-channel"); - Stream messageStream = channel.subscribe(); + Stream messageStream = channel.subscribe(name: 'message-data'); channelMessageSubscription = messageStream.listen((ably.Message message){ print("Channel message recieved: $message\n" "\tisNull: ${message.data == null}\n" @@ -352,6 +352,9 @@ class _MyAppState extends State { onPressed: (channelMessageSubscription!=null)?() async { await channelMessageSubscription.cancel(); print("Channel messages ubsubscribed"); + setState((){ + channelMessageSubscription = null; + }); }:null, child: Text('Unsubscribe'), ); diff --git a/lib/src/impl/realtime/channels.dart b/lib/src/impl/realtime/channels.dart index b2d82cdfd..3efcec3e5 100644 --- a/lib/src/impl/realtime/channels.dart +++ b/lib/src/impl/realtime/channels.dart @@ -134,22 +134,27 @@ class RealtimePlatformChannel extends PlatformObject implements spec.RealtimeCha @override Stream subscribe({ - String event, - List events + String name, + List names }) { Stream stream = listen(PlatformMethod.onRealtimeChannelMessage, _payload).transform( StreamTransformer.fromHandlers( handleData: (dynamic value, EventSink sink){ spec.Message message = value as spec.Message; - sink.add(message); + if (names!=null){ + if(names.contains(message.name)){ + sink.add(message); + } + } else if (name!=null) { + if(message.name==name){ + sink.add(message); + } + } else { + sink.add(message); + } } ) ); - if (events!=null && events.isNotEmpty){ - return stream.takeWhile((spec.Message _message) => events.contains(_message.name)); - } else if (event!=null) { - return stream.takeWhile((spec.Message _message) => _message.name==event); - } return stream; } diff --git a/lib/src/spec/realtime/channels.dart b/lib/src/spec/realtime/channels.dart index ff00a12ad..2bb930992 100644 --- a/lib/src/spec/realtime/channels.dart +++ b/lib/src/spec/realtime/channels.dart @@ -35,8 +35,8 @@ abstract class RealtimeChannel extends EventEmitter subscribe({ - String event, - List events, + String name, + List names, }); void setOptions(ChannelOptions options); }