Skip to content

Commit

Permalink
Bug 1881135 [wpt PR 44682] - DOM: Reorder Observable completion event…
Browse files Browse the repository at this point in the history
…s, a=testonly

Automatic update from web-platform-tests
DOM: Reorder Observable completion events

This CL "fixes" Observable unsubscription/teardown timing. As a matter
of accidental historical precedent, Observables in JavaScript (but not
in other languages) had implemented the "rule" that upon
Subscriber#error() or Subscriber#complete(), the subscriber would:
 1. First, invoke the appropriate Observer callback, if provided (i.e.,
    complete() or error() callback).
 2. Signal abort Subscriber#signal, which invokes any teardowns and also
    fires the `abort` event at the signal.

However, after dom@chromium.org discussed this more with
ben@benlesh.com, we came to the conclusion that the principle of "as
soon as you know you will teardown, you MUST close the subscription and
any upstream subscriptions" should be adhered. This means the above
steps must be inverted. This is a small-in-size but medium-in-impact
design change for the Observable concept, and led to a blog post [1] and
an announcement [2] that the RxJS library intends to change its
historical ordering of these events.

This CL:
 1. Inverts the order of the aforementioned steps in the Blink
    implementation.
 2. Improves some tests that assert this new ordering.
 3. Simplifies the takeUntil() operator in general.

The Observable spec will be updated alongside this commit:
WICG/observable#120.

[1]: https://benlesh.com/posts/observables-are-broken-and-so-is-javascript/
[2]: ReactiveX/rxjs#7443

R=masonf@chromium.org

Bug: 1485981
Change-Id: I376e66eef490808d264dc999862a801d591aa278
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5311097
Commit-Queue: Dominic Farolino <dom@chromium.org>
Reviewed-by: Mason Freed <masonf@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1263562}

--

wpt-commits: a1a5ea2b3dc460ce75cbfcf237538d959469b1ed
wpt-pr: 44682
  • Loading branch information
domfarolino authored and moz-wptsync-bot committed Feb 27, 2024
1 parent f257cac commit 0e86a2e
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,18 @@ test(t => {

source.subscribe({
complete: () => {
activeDuringComplete = innerSubscriber.active
abortedDuringComplete = innerSubscriber.active
activeDuringComplete = innerSubscriber.active;
abortedDuringComplete = innerSubscriber.signal.aborted;
}
});
assert_true(activeBeforeComplete, "Subscription is active before complete");
assert_false(abortedBeforeComplete, "Subscription is not aborted before complete");
assert_false(activeDuringComplete, "Subscription is not active during complete");
assert_false(abortedDuringComplete, "Subscription is not aborted during complete");
assert_false(activeDuringComplete,
"Subscription becomes inactive during Subscriber#complete(), just " +
"before Observer#complete() callback is invoked");
assert_true(abortedDuringComplete,
"Subscription's signal is aborted during Subscriber#complete(), just " +
"before Observer#complete() callback is invoked");
assert_false(activeAfterComplete, "Subscription is not active after complete");
assert_true(abortedAfterComplete, "Subscription is aborted after complete");
}, "Subscription is inactive after complete()");
Expand All @@ -269,13 +273,18 @@ test(t => {

source.subscribe({
error: () => {
activeDuringError = innerSubscriber.active
activeDuringError = innerSubscriber.active;
abortedDuringError = innerSubscriber.signal.aborted;
}
});
assert_true(activeBeforeError, "Subscription is active before error");
assert_false(abortedBeforeError, "Subscription is not aborted before error");
assert_false(activeDuringError, "Subscription is not active during error");
assert_false(abortedDuringError, "Subscription is not aborted during error");
assert_false(activeDuringError,
"Subscription becomes inactive during Subscriber#error(), just " +
"before Observer#error() callback is invoked");
assert_true(abortedDuringError,
"Subscription's signal is aborted during Subscriber#error(), just " +
"before Observer#error() callback is invoked");
assert_false(activeAfterError, "Subscription is not active after error");
assert_true(abortedAfterError, "Subscription is not aborted after error");
}, "Subscription is inactive after error()");
Expand Down Expand Up @@ -690,6 +699,18 @@ test(() => {
assert_true(abortedDuringTeardown2, 'should be aborted during teardown callback 2');
}, "Unsubscription lifecycle");

test(t => {
let innerSubscriber = null;
const source = new Observable(subscriber => {
innerSubscriber = subscriber;
subscriber.error('calling error()');
});

source.subscribe();
assert_equals(innerSubscriber.signal.reason, "calling error()",
"Reason is set correctly");
}, "Subscriber#error() value is stored as Subscriber's AbortSignal's reason");

test(t => {
const source = new Observable((subscriber) => {
let n = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,74 +32,102 @@ promise_test(async () => {
// `takeUntil()` operator, the spec responds to `notifier`'s `next()` by
// unsubscribing from `notifier`, which is what this test asserts.
promise_test(async () => {
const source = new Observable(subscriber => {});
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});

let notifierSubscriberActiveBeforeNext;
let notifierSubscriberActiveAfterNext;
let teardownCalledAfterNext;
let notifierSignalAbortedAfterNext;
const notifier = new Observable(subscriber => {
let teardownCalled;
subscriber.addTeardown(() => teardownCalled = true);
subscriber.addTeardown(() => results.push('notifier teardown'));

results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
notifierSubscriberActiveBeforeNext = subscriber.active;
results.push(`notifer active before next(): ${subscriber.active}`);
subscriber.next('value');
notifierSubscriberActiveAfterNext = subscriber.active;
teardownCalledAfterNext = (teardownCalled === true);
notifierSignalAbortedAfterNext = subscriber.signal.aborted;
results.push(`notifer active after next(): ${subscriber.active}`);
});

let nextOrErrorCalled = false;
let completeCalled = false;
source.takeUntil(notifier).subscribe({
next: () => nextOrErrorCalled = true,
error: () => nextOrErrorCalled = true,
complete: () => completeCalled = true,
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});
assert_true(notifierSubscriberActiveBeforeNext);
assert_false(notifierSubscriberActiveAfterNext);
assert_true(teardownCalledAfterNext);
assert_true(notifierSignalAbortedAfterNext);
assert_false(nextOrErrorCalled);
assert_true(completeCalled);
}, "takeUntil: notifier next() unsubscribes to notifier");

assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before next(): true',
'notifier teardown',
'takeUntil() complete callback',
'notifer active after next(): false',
]);
}, "takeUntil: notifier next() unsubscribes from notifier");
// This test is identical to the one above, with the exception being that the
// `notifier` calls `subscriber.error()` instead `subscriber.next()`.
promise_test(async () => {
const source = new Observable(subscriber => {});
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});

let notifierSubscriberActiveBeforeNext;
let notifierSubscriberActiveAfterNext;
let teardownCalledAfterNext;
let notifierSignalAbortedAfterNext;
const notifier = new Observable(subscriber => {
let teardownCalled;
subscriber.addTeardown(() => teardownCalled = true);
subscriber.addTeardown(() => results.push('notifier teardown'));

results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
notifierSubscriberActiveBeforeNext = subscriber.active;
results.push(`notifer active before error(): ${subscriber.active}`);
subscriber.error('error');
notifierSubscriberActiveAfterNext = subscriber.active;
teardownCalledAfterNext = (teardownCalled === true);
notifierSignalAbortedAfterNext = subscriber.signal.aborted;
results.push(`notifer active after error(): ${subscriber.active}`);
});

let nextOrErrorCalled = false;
let completeCalled = false;
source.takeUntil(notifier).subscribe({
next: () => nextOrErrorCalled = true,
error: () => nextOrErrorCalled = true,
complete: () => completeCalled = true,
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});
assert_true(notifierSubscriberActiveBeforeNext);
assert_false(notifierSubscriberActiveAfterNext);
assert_true(teardownCalledAfterNext);
assert_true(notifierSignalAbortedAfterNext);
assert_false(nextOrErrorCalled);
assert_true(completeCalled);
}, "takeUntil: notifier error() unsubscribes to notifier");

assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before error(): true',
'notifier teardown',
'takeUntil() complete callback',
'notifer active after error(): false',
]);
}, "takeUntil: notifier error() unsubscribes from notifier");
// This test is identical to the above except it `throw`s instead of calling
// `Subscriber#error()`.
promise_test(async () => {
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});

const notifier = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('notifier teardown'));

results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
results.push(`notifer active before throw: ${subscriber.active}`);
throw new Error('custom error');
// Won't run:
results.push(`notifer active after throw: ${subscriber.active}`);
});

source.takeUntil(notifier).subscribe({
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});

assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before throw: true',
'notifier teardown',
'takeUntil() complete callback',
]);
}, "takeUntil: notifier throw Error unsubscribes from notifier");

// Test that `notifier` unsubscribes from source Observable.
promise_test(async t => {
Expand Down Expand Up @@ -130,9 +158,10 @@ promise_test(async t => {
let notifierTeardownCalledBeforeCompleteCallback;
await new Promise(resolve => {
source.takeUntil(notifier).subscribe({
next: () => nextOrErrorCalled = true,
error: () => nextOrErrorCalled = true,
next: () => {nextOrErrorCalled = true; results.push('next callback');},
error: () => {nextOrErrorCalled = true; results.push('error callback');},
complete: () => {
results.push('complete callback');
notifierTeardownCalledBeforeCompleteCallback = notifierTeardownCalled;
resolve();
},
Expand All @@ -145,15 +174,16 @@ promise_test(async t => {
// The notifier/source teardowns are not called by the time the outer
// `Observer#complete()` callback is invoked, but they are all run *after*
// (i.e., before `notifier`'s `subscriber.next()` returns internally).
assert_false(notifierTeardownCalledBeforeCompleteCallback);
assert_true(notifierTeardownCalledBeforeCompleteCallback);
assert_true(notifierTeardownCalled);
assert_array_equals(results, [
"notifier subscribed",
"source subscribed",
"notifier teardown",
"notifier signal abort",
"source teardown",
"source signal abort"
"source signal abort",
"complete callback",
]);
}, "takeUntil: notifier next() unsubscribes from notifier & source observable");

Expand Down

0 comments on commit 0e86a2e

Please sign in to comment.