Skip to content

Commit

Permalink
Return futures on Stream.cancel when possible.
Browse files Browse the repository at this point in the history
Deprecate returning `null`.

Also, fixes cases where transformations on a stream didn't forward the cancel future

Fixes #26777.

BUG= http://dartbug.com/26777.
R=lrn@google.com

Review URL: https://codereview.chromium.org/2202533003 .

Committed: 395e7aa
Reverted: 99e5328
Committed: 1905dda
Reverted: 46a8579
  • Loading branch information
floitschG committed Sep 5, 2016
1 parent e8a46ac commit 6255638
Show file tree
Hide file tree
Showing 10 changed files with 553 additions and 259 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
* `dart:async`
* `Future.wait` now catches synchronous errors and returns them in the
returned Future.
* More aggressively returns a Future on Stream.cancel operations.
Discourages to return `null` from `cancel`.
* Fixes a few bugs where the cancel future wasn't passed through
transformations.
* `dart:io`
* Added `WebSocket.addUtf8Text` to allow sending a pre-encoded text message
without a round-trip UTF-8 conversion.
Expand Down
12 changes: 8 additions & 4 deletions sdk/lib/async/stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ abstract class Stream<T> {
onCancel: () {
if (timer != null) timer.cancel();
timer = null;
return Future._nullFuture;
});
return controller.stream;
}
Expand Down Expand Up @@ -441,7 +442,7 @@ abstract class Stream<T> {
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
onCancel: () => subscription.cancel(),
sync: true
);
}
Expand Down Expand Up @@ -499,7 +500,7 @@ abstract class Stream<T> {
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
onCancel: () => subscription.cancel(),
sync: true
);
}
Expand Down Expand Up @@ -1407,7 +1408,10 @@ abstract class StreamSubscription<T> {
* the subscription is canceled.
*
* Returns a future that is completed once the stream has finished
* its cleanup. May also return `null` if no cleanup was necessary.
* its cleanup.
*
* For historical reasons, may also return `null` if no cleanup was necessary.
* Returning `null` is deprecated and should be avoided.
*
* Typically, futures are returned when the stream needs to release resources.
* For example, a stream might need to close an open file (as an asynchronous
Expand Down Expand Up @@ -1711,7 +1715,7 @@ abstract class StreamTransformer<S, T> {
* },
* onPause: () { subscription.pause(); },
* onResume: () { subscription.resume(); },
* onCancel: () { subscription.cancel(); },
* onCancel: () => subscription.cancel(),
* sync: true);
* return controller.stream.listen(null);
* });
Expand Down
29 changes: 19 additions & 10 deletions sdk/lib/async/stream_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// error or done event pending (waiting for the cancel to be done) discard
// that event.
_state &= ~_STATE_WAIT_FOR_CANCEL;
if (_isCanceled) return _cancelFuture;
_cancel();
return _cancelFuture;
if (!_isCanceled) {
_cancel();
}
return _cancelFuture ?? Future._nullFuture;
}

Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
Expand All @@ -199,8 +200,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// Overwrite the onDone and onError handlers.
_onDone = () { result._complete(futureValue); };
_onError = (error, stackTrace) {
cancel();
result._completeError(error, stackTrace);
Future cancelFuture = cancel();
if (!identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() {
result._completeError(error, stackTrace);
});
} else {
result._completeError(error, stackTrace);
}
};

return result;
Expand Down Expand Up @@ -361,7 +368,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
if (_cancelOnError) {
_state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
if (_cancelFuture is Future) {
if (_cancelFuture is Future &&
!identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendError);
} else {
sendError();
Expand Down Expand Up @@ -389,7 +397,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,

_cancel();
_state |= _STATE_WAIT_FOR_CANCEL;
if (_cancelFuture is Future) {
if (_cancelFuture is Future &&
!identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendDone);
} else {
sendDone();
Expand Down Expand Up @@ -778,7 +787,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
}
}

Future cancel() => null;
Future cancel() => Future._nullFuture;

Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
_Future/*<E>*/ result = new _Future/*<E>*/();
Expand Down Expand Up @@ -916,7 +925,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {

Future cancel() {
_stream._cancelSubscription();
return null;
return Future._nullFuture;
}

bool get isPaused {
Expand Down Expand Up @@ -1032,7 +1041,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {

Future cancel() {
StreamSubscription subscription = _subscription;
if (subscription == null) return null;
if (subscription == null) return Future._nullFuture;
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_clear();
Expand Down
4 changes: 2 additions & 2 deletions sdk/lib/async/stream_pipe.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void _cancelAndError(StreamSubscription subscription,
error,
StackTrace stackTrace) {
var cancelFuture = subscription.cancel();
if (cancelFuture is Future) {
if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
} else {
future._completeError(error, stackTrace);
Expand Down Expand Up @@ -61,7 +61,7 @@ _ErrorCallback _cancelAndErrorClosure(
before completing with a value. */
void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
var cancelFuture = subscription.cancel();
if (cancelFuture is Future) {
if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(value));
} else {
future._complete(value);
Expand Down
2 changes: 1 addition & 1 deletion sdk/lib/async/stream_transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class _SinkTransformerStreamSubscription<S, T>
if (_isSubscribed) {
StreamSubscription subscription = _subscription;
_subscription = null;
subscription.cancel();
return subscription.cancel();
}
return null;
}
Expand Down
Loading

0 comments on commit 6255638

Please sign in to comment.