Skip to content

Commit

Permalink
fix(notifier): Make makeAsyncIterableFromNotifier lossy
Browse files Browse the repository at this point in the history
Cherry-picked from gh-5413-lossy-makeNotifierFromAsyncIterable.
See #5695 .
  • Loading branch information
gibson042 committed Jul 8, 2022
1 parent b2fff2d commit 820e340
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 80 deletions.
51 changes: 18 additions & 33 deletions packages/notifier/src/asyncIterableAdaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,49 +38,34 @@ import './types.js';
export const makeAsyncIterableFromNotifier = notifierP => {
return Far('asyncIterableFromNotifier', {
[Symbol.asyncIterator]: () => {
/** @type {boolean} */
let done = false;
/** @type {UpdateCount} */
let localUpdateCount;
/** @type {Promise<{value: T, done: boolean}> | undefined} */
let myIterationResultP;
/** @type {{value: T, done: boolean}} */
let myIterationResult;
return Far('asyncIteratorFromNotifier', {
next: () => {
if (!myIterationResultP) {
// In this adaptor, once `next()` is called and returns an
// unresolved promise, `myIterationResultP`, and until
// `myIterationResultP` is fulfilled with an
// iteration result, further `next()` calls will return the same
// `myIterationResultP` promise again without asking the notifier
// for more updates. If there's already an unanswered ask in the
// air, all further asks should just reuse the result of that one.
//
// This reuse behavior is only needed for code that uses the async
// iterator protocol explicitly. When this async iterator is
// consumed by a for/await/of loop, `next()` will only be called
// after the promise for the previous iteration result has
// fulfilled. If it fulfills with `done: true`, the for/await/of
// loop will never call `next()` again.
//
// See
next: async () => {
// Request another result only if the notifier has not terminated
// (in which case the final result is used for all subsequent
// `next()` calls).
if (!done) {
// This adaptor waits for each result promise to settle
// before returning its own.
// The eager consumption inherent to a notifier is not
// compatible with direct use of the async iterator protocol
// to collect multiple pending results, even though such use
// cases do exist in general as documented at
// https://2ality.com/2016/10/asynchronous-iteration.html#queuing-next()-invocations
// for an explicit use that sends `next()` without waiting.
myIterationResultP = E(notifierP)
myIterationResult = await E(notifierP)
.getUpdateSince(localUpdateCount)
.then(({ value, updateCount }) => {
localUpdateCount = updateCount;
const done = localUpdateCount === undefined;
if (!done) {
// Once the outstanding question has been answered, stop
// using that answer, so any further `next()` questions
// cause a new `getUpdateSince` request.
//
// But only if more answers are expected. Once the notifier
// is `done`, that was the last answer so reuse it forever.
myIterationResultP = undefined;
}
done = localUpdateCount === undefined;
return harden({ value, done });
});
}
return myIterationResultP;
return myIterationResult;
},
});
},
Expand Down
170 changes: 125 additions & 45 deletions packages/notifier/src/notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,63 +161,143 @@ export const makeNotifierKit = (...initialStateArr) => {
export const makeNotifierFromAsyncIterable = asyncIterableP => {
const iteratorP = E(asyncIterableP)[Symbol.asyncIterator]();

/** @type {Promise<UpdateRecord<T>>|undefined} */
let optNextPromise;
/** @type {WeakMap<Promise<{value: any, done: boolean}>, Promise<{value, updateCount}>>} */
const resultPromiseMap = new WeakMap();
/** @type {Promise<{value: any, done: boolean}>} */
let latestResultInP;
/** @type {undefined | Promise<{value, updateCount}>} */
let latestResultOutP;
/** @type {undefined | Promise<{value, updateCount}>} */
let nextResultOutP;
/** @type {undefined | ((resolution?: any) => void)} */
let nextResultInR;
/** @type {UpdateCount & bigint} */
let currentUpdateCount = 0n;
/** @type {ERef<UpdateRecord<T>>|undefined} */
let currentResponse;
let final = false;
let latestUpdateCount = 0n;
let finished = false;
let finalResultOut;

// Consume results as soon as their predecessors settle.
(async function consumeEagerly() {
try {
let done = false;
while (!done) {
// TODO: Fix this typing friction.
// Possibly related: https://github.com/microsoft/TypeScript/issues/38479
// @ts-expect-error Tolerate done: undefined.
latestResultInP = E(iteratorP).next();
if (nextResultInR) {
nextResultInR(latestResultInP);
nextResultInR = undefined;
}
// eslint-disable-next-line no-await-in-loop
const latestResultIn = await latestResultInP;
({ done } = latestResultIn);
}
} catch (err) {} // eslint-disable-line no-empty
if (nextResultInR) {
// @ts-expect-error It really is fine to use latestResultInP here.
nextResultInR(latestResultInP);
nextResultInR = undefined;
}
})();

// Create outbound results on-demand, but at most once.
/**
* @param {Promise<{value: any, done: boolean}>} resultInP
* @returns {Promise<{value, updateCount}>}
*/
function translateInboundResult(resultInP) {
return resultInP.then(
({ value, done }) => {
// If this is resolving a post-finish request, preserve the final result.
if (finished) {
return finalResultOut;
}

if (done) {
finished = true;

// If there is a pending next-value promise, resolve it.
if (nextResultInR) {
nextResultInR(/* irrelevant becaused finished is true */);
nextResultInR = undefined;
}

// Final results have undefined updateCount.
finalResultOut = harden({ value, updateCount: undefined });
return finalResultOut;
}

// Discard any pending promise.
// eslint-disable-next-line no-multi-assign
latestResultOutP = nextResultOutP = nextResultInR = undefined;

latestUpdateCount += 1n;
return harden({ value, updateCount: latestUpdateCount });
},
rejection => {
if (!finished) {
finished = true;

// If there is a pending next-value promise, resolve it.
if (nextResultInR) {
nextResultInR(resultInP);
nextResultInR = undefined;
}
}
throw rejection;
},
);
}
function getLatestResultOutP() {
if (!latestResultOutP) {
assert(latestResultInP !== undefined);
latestResultOutP = resultPromiseMap.get(latestResultInP);
if (!latestResultOutP) {
latestResultOutP = translateInboundResult(latestResultInP);
resultPromiseMap.set(latestResultInP, latestResultOutP);
}
}
return latestResultOutP;
}

/**
* @template T
* @type {BaseNotifier<T>}
*/
const baseNotifier = Far('baseNotifier', {
getUpdateSince(updateCount = -1n) {
if (updateCount < currentUpdateCount) {
if (currentResponse) {
return Promise.resolve(currentResponse);
}
} else if (updateCount !== currentUpdateCount) {
throw new Error(
'getUpdateSince argument must be a previously-issued updateCount.',
);
assert(
updateCount <= latestUpdateCount,
'argument must be a previously-issued updateCount.',
);

// If we don't yet have an inbound result or a promise for an outbound result,
// create the latter.
if (!latestResultInP && !latestResultOutP) {
const { promise, resolve } = makePromiseKit();
nextResultInR = resolve;
nextResultOutP = translateInboundResult(promise);
latestResultOutP = nextResultOutP;
}

// Return a final response if we have one, otherwise a promise for the next state.
if (final) {
assert(currentResponse !== undefined);
return Promise.resolve(currentResponse);
if (updateCount < latestUpdateCount) {
// Each returned promise is unique.
return getLatestResultOutP().then();
}
if (!optNextPromise) {
const nextIterResultP = E(iteratorP).next();
optNextPromise = E.when(
nextIterResultP,
({ done, value }) => {
assert(!final);
if (done) {
final = true;
}
currentUpdateCount += 1n;
currentResponse = harden({
value,
updateCount: done ? undefined : currentUpdateCount,
});
optNextPromise = undefined;
return currentResponse;
},
_reason => {
final = true;
currentResponse =
/** @type {Promise<UpdateRecord<T>>} */
(nextIterResultP);
optNextPromise = undefined;
return currentResponse;
},
);

if (!nextResultOutP) {
if (finished) {
nextResultOutP = getLatestResultOutP();
} else {
const { promise, resolve } = makePromiseKit();
nextResultInR = resolve;
nextResultOutP = translateInboundResult(promise);
}
}
return optNextPromise;

// Each returned promise is unique.
return nextResultOutP.then();
},
});

Expand Down
4 changes: 2 additions & 2 deletions packages/notifier/src/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const makeSubscriptionIterator = tailP => {
return Far('SubscriptionIterator', {
subscribe: () => makeSubscription(tailP),
[Symbol.asyncIterator]: () => makeSubscriptionIterator(tailP),
next: () => {
const resultP = E.get(tailP).head;
next: async () => {
const resultP = await E.get(tailP).head;
tailP = E.get(tailP).tail;
Promise.resolve(tailP).catch(() => {}); // suppress unhandled rejection error
return resultP;
Expand Down

0 comments on commit 820e340

Please sign in to comment.