diff --git a/packages/notifier/src/asyncIterableAdaptor.js b/packages/notifier/src/asyncIterableAdaptor.js index c003e2efc867..a70a43fbd9b6 100644 --- a/packages/notifier/src/asyncIterableAdaptor.js +++ b/packages/notifier/src/asyncIterableAdaptor.js @@ -38,49 +38,34 @@ import './types.js'; export const makeAsyncIterableFromNotifier = notifierP => { return Far('asyncIterableFromNotifier', { [Symbol.asyncIterator]: () => { + /** @type {boolean} */ + let done = false; /** @type {UpdateCount} */ let localUpdateCount; - /** @type {Promise<{value: T, done: boolean}> | undefined} */ - let myIterationResultP; + /** @type {{value: T, done: boolean}} */ + let myIterationResult; return Far('asyncIteratorFromNotifier', { - next: () => { - if (!myIterationResultP) { - // In this adaptor, once `next()` is called and returns an - // unresolved promise, `myIterationResultP`, and until - // `myIterationResultP` is fulfilled with an - // iteration result, further `next()` calls will return the same - // `myIterationResultP` promise again without asking the notifier - // for more updates. If there's already an unanswered ask in the - // air, all further asks should just reuse the result of that one. - // - // This reuse behavior is only needed for code that uses the async - // iterator protocol explicitly. When this async iterator is - // consumed by a for/await/of loop, `next()` will only be called - // after the promise for the previous iteration result has - // fulfilled. If it fulfills with `done: true`, the for/await/of - // loop will never call `next()` again. - // - // See + next: async () => { + // Request another result only if the notifier has not terminated + // (in which case the final result is used for all subsequent + // `next()` calls). + if (!done) { + // This adaptor waits for each result promise to settle + // before returning its own. + // The eager consumption inherent to a notifier is not + // compatible with direct use of the async iterator protocol + // to collect multiple pending results, even though such use + // cases do exist in general as documented at // https://2ality.com/2016/10/asynchronous-iteration.html#queuing-next()-invocations - // for an explicit use that sends `next()` without waiting. - myIterationResultP = E(notifierP) + myIterationResult = await E(notifierP) .getUpdateSince(localUpdateCount) .then(({ value, updateCount }) => { localUpdateCount = updateCount; - const done = localUpdateCount === undefined; - if (!done) { - // Once the outstanding question has been answered, stop - // using that answer, so any further `next()` questions - // cause a new `getUpdateSince` request. - // - // But only if more answers are expected. Once the notifier - // is `done`, that was the last answer so reuse it forever. - myIterationResultP = undefined; - } + done = localUpdateCount === undefined; return harden({ value, done }); }); } - return myIterationResultP; + return myIterationResult; }, }); }, diff --git a/packages/notifier/src/notifier.js b/packages/notifier/src/notifier.js index 3689f020989f..3b8acdcd93b7 100644 --- a/packages/notifier/src/notifier.js +++ b/packages/notifier/src/notifier.js @@ -161,13 +161,105 @@ export const makeNotifierKit = (...initialStateArr) => { export const makeNotifierFromAsyncIterable = asyncIterableP => { const iteratorP = E(asyncIterableP)[Symbol.asyncIterator](); - /** @type {Promise>|undefined} */ - let optNextPromise; + /** @type {WeakMap, Promise<{value, updateCount}>>} */ + const resultPromiseMap = new WeakMap(); + /** @type {Promise<{value: any, done: boolean}>} */ + let latestResultInP; + /** @type {undefined | Promise<{value, updateCount}>} */ + let latestResultOutP; + /** @type {undefined | Promise<{value, updateCount}>} */ + let nextResultOutP; + /** @type {undefined | ((resolution?: any) => void)} */ + let nextResultInR; /** @type {UpdateCount & bigint} */ - let currentUpdateCount = 0n; - /** @type {ERef>|undefined} */ - let currentResponse; - let final = false; + let latestUpdateCount = 0n; + let finished = false; + let finalResultOut; + + // Consume results as soon as their predecessors settle. + (async function consumeEagerly() { + try { + let done = false; + while (!done) { + // TODO: Fix this typing friction. + // Possibly related: https://github.com/microsoft/TypeScript/issues/38479 + // @ts-expect-error Tolerate done: undefined. + latestResultInP = E(iteratorP).next(); + if (nextResultInR) { + nextResultInR(latestResultInP); + nextResultInR = undefined; + } + // eslint-disable-next-line no-await-in-loop + const latestResultIn = await latestResultInP; + ({ done } = latestResultIn); + } + } catch (err) {} // eslint-disable-line no-empty + if (nextResultInR) { + // @ts-expect-error It really is fine to use latestResultInP here. + nextResultInR(latestResultInP); + nextResultInR = undefined; + } + })(); + + // Create outbound results on-demand, but at most once. + /** + * @param {Promise<{value: any, done: boolean}>} resultInP + * @returns {Promise<{value, updateCount}>} + */ + function translateInboundResult(resultInP) { + return resultInP.then( + ({ value, done }) => { + // If this is resolving a post-finish request, preserve the final result. + if (finished) { + return finalResultOut; + } + + if (done) { + finished = true; + + // If there is a pending next-value promise, resolve it. + if (nextResultInR) { + nextResultInR(/* irrelevant becaused finished is true */); + nextResultInR = undefined; + } + + // Final results have undefined updateCount. + finalResultOut = harden({ value, updateCount: undefined }); + return finalResultOut; + } + + // Discard any pending promise. + // eslint-disable-next-line no-multi-assign + latestResultOutP = nextResultOutP = nextResultInR = undefined; + + latestUpdateCount += 1n; + return harden({ value, updateCount: latestUpdateCount }); + }, + rejection => { + if (!finished) { + finished = true; + + // If there is a pending next-value promise, resolve it. + if (nextResultInR) { + nextResultInR(resultInP); + nextResultInR = undefined; + } + } + throw rejection; + }, + ); + } + function getLatestResultOutP() { + if (!latestResultOutP) { + assert(latestResultInP !== undefined); + latestResultOutP = resultPromiseMap.get(latestResultInP); + if (!latestResultOutP) { + latestResultOutP = translateInboundResult(latestResultInP); + resultPromiseMap.set(latestResultInP, latestResultOutP); + } + } + return latestResultOutP; + } /** * @template T @@ -175,49 +267,37 @@ export const makeNotifierFromAsyncIterable = asyncIterableP => { */ const baseNotifier = Far('baseNotifier', { getUpdateSince(updateCount = -1n) { - if (updateCount < currentUpdateCount) { - if (currentResponse) { - return Promise.resolve(currentResponse); - } - } else if (updateCount !== currentUpdateCount) { - throw new Error( - 'getUpdateSince argument must be a previously-issued updateCount.', - ); + assert( + updateCount <= latestUpdateCount, + 'argument must be a previously-issued updateCount.', + ); + + // If we don't yet have an inbound result or a promise for an outbound result, + // create the latter. + if (!latestResultInP && !latestResultOutP) { + const { promise, resolve } = makePromiseKit(); + nextResultInR = resolve; + nextResultOutP = translateInboundResult(promise); + latestResultOutP = nextResultOutP; } - // Return a final response if we have one, otherwise a promise for the next state. - if (final) { - assert(currentResponse !== undefined); - return Promise.resolve(currentResponse); + if (updateCount < latestUpdateCount) { + // Each returned promise is unique. + return getLatestResultOutP().then(); } - if (!optNextPromise) { - const nextIterResultP = E(iteratorP).next(); - optNextPromise = E.when( - nextIterResultP, - ({ done, value }) => { - assert(!final); - if (done) { - final = true; - } - currentUpdateCount += 1n; - currentResponse = harden({ - value, - updateCount: done ? undefined : currentUpdateCount, - }); - optNextPromise = undefined; - return currentResponse; - }, - _reason => { - final = true; - currentResponse = - /** @type {Promise>} */ - (nextIterResultP); - optNextPromise = undefined; - return currentResponse; - }, - ); + + if (!nextResultOutP) { + if (finished) { + nextResultOutP = getLatestResultOutP(); + } else { + const { promise, resolve } = makePromiseKit(); + nextResultInR = resolve; + nextResultOutP = translateInboundResult(promise); + } } - return optNextPromise; + + // Each returned promise is unique. + return nextResultOutP.then(); }, }); diff --git a/packages/notifier/src/subscriber.js b/packages/notifier/src/subscriber.js index 3cc01d4a34fc..ae5edcf37760 100644 --- a/packages/notifier/src/subscriber.js +++ b/packages/notifier/src/subscriber.js @@ -46,8 +46,8 @@ const makeSubscriptionIterator = tailP => { return Far('SubscriptionIterator', { subscribe: () => makeSubscription(tailP), [Symbol.asyncIterator]: () => makeSubscriptionIterator(tailP), - next: () => { - const resultP = E.get(tailP).head; + next: async () => { + const resultP = await E.get(tailP).head; tailP = E.get(tailP).tail; Promise.resolve(tailP).catch(() => {}); // suppress unhandled rejection error return resultP;