Skip to content

Commit

Permalink
filtering improved withing the transformer
Browse files Browse the repository at this point in the history
1. stream.takeWhile will cancel on first failure
2. rename `event` and `events` to `name` and `names` to match IDL
  • Loading branch information
tiholic committed Jul 24, 2020
1 parent 08b53d7 commit a6de32a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
7 changes: 5 additions & 2 deletions example/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ class _MyAppState extends State<MyApp> {
);

Widget createChannelSubscribeButton() => FlatButton(
onPressed: (_realtimeChannelState==ably.ChannelState.attached)?() {
onPressed: (_realtimeChannelState==ably.ChannelState.attached && channelMessageSubscription==null)?() {
ably.RealtimeChannel channel = _realtime.channels.get("test-channel");
Stream<ably.Message> messageStream = channel.subscribe();
Stream<ably.Message> messageStream = channel.subscribe(name: 'message-data');
channelMessageSubscription = messageStream.listen((ably.Message message){
print("Channel message recieved: $message\n"
"\tisNull: ${message.data == null}\n"
Expand All @@ -352,6 +352,9 @@ class _MyAppState extends State<MyApp> {
onPressed: (channelMessageSubscription!=null)?() async {
await channelMessageSubscription.cancel();
print("Channel messages ubsubscribed");
setState((){
channelMessageSubscription = null;
});
}:null,
child: Text('Unsubscribe'),
);
Expand Down
21 changes: 13 additions & 8 deletions lib/src/impl/realtime/channels.dart
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,27 @@ class RealtimePlatformChannel extends PlatformObject implements spec.RealtimeCha

@override
Stream<spec.Message> subscribe({
String event,
List<String> events
String name,
List<String> names
}) {
Stream<spec.Message> stream = listen(PlatformMethod.onRealtimeChannelMessage, _payload).transform<spec.Message>(
StreamTransformer.fromHandlers(
handleData: (dynamic value, EventSink<spec.Message> sink){
spec.Message message = value as spec.Message;
sink.add(message);
if (names!=null){
if(names.contains(message.name)){
sink.add(message);
}
} else if (name!=null) {
if(message.name==name){
sink.add(message);
}
} else {
sink.add(message);
}
}
)
);
if (events!=null && events.isNotEmpty){
return stream.takeWhile((spec.Message _message) => events.contains(_message.name));
} else if (event!=null) {
return stream.takeWhile((spec.Message _message) => _message.name==event);
}
return stream;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/src/spec/realtime/channels.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ abstract class RealtimeChannel extends EventEmitter<ChannelEvent, ChannelStateCh
dynamic data
});
Stream<Message> subscribe({
String event,
List<String> events,
String name,
List<String> names,
});
void setOptions(ChannelOptions options);
}
Expand Down

0 comments on commit a6de32a

Please sign in to comment.