Skip to content

Commit

Permalink
Revert "Return futures on Stream.cancel when possible."
Browse files Browse the repository at this point in the history
This reverts commit 1905dda.

Review URL: https://codereview.chromium.org/2223133002 .
  • Loading branch information
floitschG committed Aug 8, 2016
1 parent 53d968b commit 46a8579
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 553 deletions.
6 changes: 0 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
* Report a better error when a bind fails because of a bad source address.
* Handle HTTP header `charset` parameter with empty value.

* `dart:async`
* More aggressively returns a Future on Stream.cancel operations.
Discourages to return `null` from `cancel`.
* Fixes a few bugs where the cancel future wasn't passed through
transformations.

### Strong Mode

* New feature - an option to disable implicit casts
Expand Down
12 changes: 4 additions & 8 deletions sdk/lib/async/stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ abstract class Stream<T> {
onCancel: () {
if (timer != null) timer.cancel();
timer = null;
return Future._nullFuture;
});
return controller.stream;
}
Expand Down Expand Up @@ -442,7 +441,7 @@ abstract class Stream<T> {
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () => subscription.cancel(),
onCancel: () { subscription.cancel(); },
sync: true
);
}
Expand Down Expand Up @@ -500,7 +499,7 @@ abstract class Stream<T> {
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () => subscription.cancel(),
onCancel: () { subscription.cancel(); },
sync: true
);
}
Expand Down Expand Up @@ -1408,10 +1407,7 @@ abstract class StreamSubscription<T> {
* the subscription is canceled.
*
* Returns a future that is completed once the stream has finished
* its cleanup.
*
* For historical reasons, may also return `null` if no cleanup was necessary.
* Returning `null` is deprecated and should be avoided.
* its cleanup. May also return `null` if no cleanup was necessary.
*
* Typically, futures are returned when the stream needs to release resources.
* For example, a stream might need to close an open file (as an asynchronous
Expand Down Expand Up @@ -1715,7 +1711,7 @@ abstract class StreamTransformer<S, T> {
* },
* onPause: () { subscription.pause(); },
* onResume: () { subscription.resume(); },
* onCancel: () => subscription.cancel(),
* onCancel: () { subscription.cancel(); },
* sync: true);
* return controller.stream.listen(null);
* });
Expand Down
29 changes: 10 additions & 19 deletions sdk/lib/async/stream_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// error or done event pending (waiting for the cancel to be done) discard
// that event.
_state &= ~_STATE_WAIT_FOR_CANCEL;
if (!_isCanceled) {
_cancel();
}
return _cancelFuture ?? Future._nullFuture;
if (_isCanceled) return _cancelFuture;
_cancel();
return _cancelFuture;
}

Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
Expand All @@ -200,14 +199,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// Overwrite the onDone and onError handlers.
_onDone = () { result._complete(futureValue); };
_onError = (error, stackTrace) {
Future cancelFuture = cancel();
if (!identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() {
result._completeError(error, stackTrace);
});
} else {
result._completeError(error, stackTrace);
}
cancel();
result._completeError(error, stackTrace);
};

return result;
Expand Down Expand Up @@ -368,8 +361,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
if (_cancelOnError) {
_state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
if (_cancelFuture is Future &&
!identical(_cancelFuture, Future._nullFuture)) {
if (_cancelFuture is Future) {
_cancelFuture.whenComplete(sendError);
} else {
sendError();
Expand Down Expand Up @@ -397,8 +389,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,

_cancel();
_state |= _STATE_WAIT_FOR_CANCEL;
if (_cancelFuture is Future &&
!identical(_cancelFuture, Future._nullFuture)) {
if (_cancelFuture is Future) {
_cancelFuture.whenComplete(sendDone);
} else {
sendDone();
Expand Down Expand Up @@ -787,7 +778,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
}
}

Future cancel() => Future._nullFuture;
Future cancel() => null;

Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
_Future/*<E>*/ result = new _Future/*<E>*/();
Expand Down Expand Up @@ -925,7 +916,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {

Future cancel() {
_stream._cancelSubscription();
return Future._nullFuture;
return null;
}

bool get isPaused {
Expand Down Expand Up @@ -1041,7 +1032,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {

Future cancel() {
StreamSubscription subscription = _subscription;
if (subscription == null) return Future._nullFuture;
if (subscription == null) return null;
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_clear();
Expand Down
2 changes: 1 addition & 1 deletion sdk/lib/async/stream_transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class _SinkTransformerStreamSubscription<S, T>
if (_isSubscribed) {
StreamSubscription subscription = _subscription;
_subscription = null;
return subscription.cancel();
subscription.cancel();
}
return null;
}
Expand Down
Loading

0 comments on commit 46a8579

Please sign in to comment.