Skip to content

Commit

Permalink
optimize filtering on streams
Browse files Browse the repository at this point in the history
  • Loading branch information
tiholic committed Aug 4, 2020
1 parent eb93cb5 commit 5359c91
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 50 deletions.
43 changes: 9 additions & 34 deletions lib/src/impl/realtime/channels.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,47 +113,22 @@ class RealtimePlatformChannel extends PlatformObject implements spec.RealtimeCha

@override
Stream<ChannelStateChange> on([ChannelEvent channelEvent]) {
Stream<ChannelStateChange> stream = listen(PlatformMethod.onRealtimeChannelStateChanged, _payload).transform<ChannelStateChange>(
StreamTransformer.fromHandlers(
handleData: (dynamic value, EventSink<ChannelStateChange> sink){
ChannelStateChange stateChange = value as ChannelStateChange;
if (channelEvent!=null) {
if (stateChange.event==channelEvent) {
sink.add(stateChange);
}
} else {
sink.add(stateChange);
}
}
)
);
return stream;
return listen(PlatformMethod.onRealtimeChannelStateChanged, _payload)
.map((stateChange) => stateChange as ChannelStateChange)
.where((stateChange) => channelEvent==null || stateChange.event==channelEvent);
}

@override
Stream<spec.Message> subscribe({
String name,
List<String> names
}) {
Stream<spec.Message> stream = listen(PlatformMethod.onRealtimeChannelMessage, _payload).transform<spec.Message>(
StreamTransformer.fromHandlers(
handleData: (dynamic value, EventSink<spec.Message> sink){
spec.Message message = value as spec.Message;
if (names!=null){
if(names.contains(message.name)){
sink.add(message);
}
} else if (name!=null) {
if(message.name==name){
sink.add(message);
}
} else {
sink.add(message);
}
}
)
);
return stream;
final subscribedNames = {name, ...?names}.where((n) => n != null).toList();
return listen(PlatformMethod.onRealtimeChannelMessage, _payload)
.map((message) => message as spec.Message)
.where((message) =>
subscribedNames.isEmpty ||
subscribedNames.any((n) => n == message.name));
}

}
Expand Down
20 changes: 4 additions & 16 deletions lib/src/impl/realtime/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,10 @@ class ConnectionPlatformObject extends PlatformObject implements Connection {

@override
Stream<ConnectionStateChange> on([ConnectionEvent connectionEvent]) {
Stream<ConnectionStateChange> stream = listen(PlatformMethod.onRealtimeConnectionStateChanged).transform<ConnectionStateChange>(
StreamTransformer.fromHandlers(
handleData: (dynamic value, EventSink<ConnectionStateChange> sink){
ConnectionStateChange stateChange = value as ConnectionStateChange;
if (connectionEvent!=null) {
if (stateChange.event==connectionEvent) {
sink.add(stateChange);
}
} else {
sink.add(stateChange);
}

}
)
);
return stream;
return listen(PlatformMethod.onRealtimeConnectionStateChanged)
.map((connectionEvent) => connectionEvent as ConnectionStateChange)
.where((connectionStateChange) =>
connectionEvent==null || connectionStateChange.event==connectionEvent);
}

@override
Expand Down

0 comments on commit 5359c91

Please sign in to comment.