diff --git a/packages/notifier/src/index.js b/packages/notifier/src/index.js index 6314b1b1a41..8a984b0ff9c 100644 --- a/packages/notifier/src/index.js +++ b/packages/notifier/src/index.js @@ -9,6 +9,7 @@ export { makeNotifier, makeNotifierKit, makeNotifierFromAsyncIterable, + makeNotifierFromSubscriber, } from './notifier.js'; export { makeSubscription, makeSubscriptionKit } from './subscriber.js'; export { diff --git a/packages/notifier/src/notifier.js b/packages/notifier/src/notifier.js index 3689f020989..31cbed2bc98 100644 --- a/packages/notifier/src/notifier.js +++ b/packages/notifier/src/notifier.js @@ -151,9 +151,103 @@ export const makeNotifierKit = (...initialStateArr) => { return harden({ notifier, updater }); }; +/** + * @template T + * @param {ERef>} subscriberP + * @returns {Notifier} + */ +export const makeNotifierFromSubscriber = subscriberP => { + /** @type {bigint} */ + let latestInboundCount; + /** @type {UpdateCount & bigint} */ + let latestUpdateCount = 0n; + /** @type {WeakMap, Promise>>} */ + const outboundResults = new WeakMap(); + + /** + * @param {PublicationRecord} record + * @returns {Promise>} + */ + const translateInboundPublicationRecord = record => { + // Leverage identity preservation of `record`. + const existingOutboundResult = outboundResults.get(record); + if (existingOutboundResult) { + return existingOutboundResult; + } + + latestInboundCount = record.publishCount; + latestUpdateCount += 1n; + const resultP = E.when(record.head, ({ value, done }) => { + if (done) { + // Final results have undefined updateCount. + return harden({ value, updateCount: undefined }); + } + return harden({ value, updateCount: latestUpdateCount }); + }); + outboundResults.set(record, resultP); + return resultP; + }; + + /** + * @template T + * @type {BaseNotifier} + */ + const baseNotifier = Far('baseNotifier', { + getUpdateSince(updateCount = -1n) { + assert( + updateCount <= latestUpdateCount, + 'argument must be a previously-issued updateCount.', + ); + + if (updateCount < latestUpdateCount) { + // Return the most recent result possible without imposing an unnecessary delay. + return E(subscriberP) + .subscribeAfter() + .then(translateInboundPublicationRecord); + } + + // Return a result that follows the last-returned result, + // skipping over intermediate results if latestInboundCount + // no longer corresponds with the latest result. + // Note that unlike notifiers and subscribers respectively returned by + // makeNotifierKit and makePublishKit, this result is not guaranteed + // to follow the result returned when a non-latest updateCount is provided + // (e.g., it is possible for `notifierFromSubscriber.getUpdateSince()` and + // `notifierFromSubscriber.getUpdateSince(latestUpdateCount)` to both + // settle to the same object `newLatest` where `newLatest.updateCount` + // is one greater than `latestUpdateCount`). + return E(subscriberP) + .subscribeAfter(latestInboundCount) + .then(translateInboundPublicationRecord); + }, + }); + + /** @type {Notifier} */ + const notifier = Far('notifier', { + ...makeAsyncIterableFromNotifier(baseNotifier), + ...baseNotifier, + + /** + * Use this to distribute a Notifier efficiently over the network, + * by obtaining this from the Notifier to be replicated, and applying + * `makeNotifier` to it at the new site to get an equivalent local + * Notifier at that site. + */ + getSharableNotifierInternals: () => baseNotifier, + getStoreKey: () => harden({ notifier }), + }); + return notifier; +}; +harden(makeNotifierFromSubscriber); + /** * Adaptor from async iterable to notifier. * + * @deprecated The resulting notifier is lossless, which is not desirable. + * Prefer makeNotifierFromSubscriber, and refer to + * https://github.com/Agoric/agoric-sdk/issues/5413 and + * https://github.com/Agoric/agoric-sdk/pull/5695 for context. + * * @template T * @param {ERef>} asyncIterableP * @returns {Notifier} diff --git a/packages/notifier/test/iterable-testing-tools.js b/packages/notifier/test/iterable-testing-tools.js index f408b1ead12..cc8ba5b7efa 100644 --- a/packages/notifier/test/iterable-testing-tools.js +++ b/packages/notifier/test/iterable-testing-tools.js @@ -13,6 +13,16 @@ export const invertPromiseSettlement = promise => rejection => rejection, ); +// Return a promise that will resolve in the specified number of turns, +// supporting asynchronous sleep. +export const delayByTurns = async turnCount => { + while (turnCount) { + turnCount -= 1; + // eslint-disable-next-line no-await-in-loop + await undefined; + } +}; + /** @typedef {import('@endo/marshal').Passable} Passable */ /** @typedef {import('ava').Assertions} Assertions */ diff --git a/packages/notifier/test/test-makeNotifierFromSubscriber.js b/packages/notifier/test/test-makeNotifierFromSubscriber.js new file mode 100644 index 00000000000..e2cf37d7e38 --- /dev/null +++ b/packages/notifier/test/test-makeNotifierFromSubscriber.js @@ -0,0 +1,358 @@ +// @ts-check + +import { test } from './prepare-test-env-ava.js'; + +import { makePublishKit, makeNotifierFromSubscriber } from '../src/index.js'; +import { + delayByTurns, + invertPromiseSettlement, +} from './iterable-testing-tools.js'; + +/** @param {{conclusionMethod: 'finish' | 'fail', conclusionValue: any}} config */ +const makeBatchPublishKit = ({ conclusionMethod, conclusionValue }) => { + const { publisher, subscriber } = makePublishKit(); + + // Publish in power-of-two batches: [1], [2], [3, 4], [5, 6, 7, 8], ... + let nextValue = 1; + let nextBatchSize = 1; + let done = false; + const initialize = () => { + publisher.publish(nextValue); + nextValue += 1; + }; + const publishNextBatch = () => { + if (done) { + return; + } + for (let i = 0; i < nextBatchSize; i += 1) { + publisher.publish(nextValue); + nextValue += 1; + } + nextBatchSize *= 2; + if (nextValue > 64) { + done = true; + if (conclusionMethod === 'fail') { + publisher.fail(conclusionValue); + } else { + publisher.finish(conclusionValue); + } + } + }; + + return { initialize, publishNextBatch, subscriber }; +}; + +test('makeNotifierFromSubscriber(finishes) - for-await-of iteration', async t => { + const { initialize, publishNextBatch, subscriber } = makeBatchPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + initialize(); + + const results = []; + for await (const result of notifier) { + results.push(result); + publishNextBatch(); + } + + t.deepEqual( + results, + [1, 2, 4, 8, 16, 32], + 'only the last of values published between iteration steps should be observed', + ); +}); + +test('makeNotifierFromSubscriber(finishes) - getUpdateSince', async t => { + const { initialize, publishNextBatch, subscriber } = makeBatchPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + initialize(); + + const results = []; + let updateCount; + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + const result = await notifier.getUpdateSince(updateCount); + ({ updateCount } = result); + results.push(result.value); + // eslint-disable-next-line no-await-in-loop + t.deepEqual(await notifier.getUpdateSince(), result); + if (updateCount === undefined) { + break; + } + // eslint-disable-next-line no-await-in-loop + await publishNextBatch(); + } + + t.deepEqual( + results, + [1, 2, 4, 8, 16, 32, 'done'], + 'only the last of values published between iteration steps should be observed', + ); + t.deepEqual(await notifier.getUpdateSince(), { + value: 'done', + updateCount: undefined, + }); +}); + +test('makeNotifierFromSubscriber(fails) - for-await-of iteration', async t => { + const failure = new Error('failure'); + const { initialize, publishNextBatch, subscriber } = makeBatchPublishKit({ + conclusionMethod: 'fail', + conclusionValue: failure, + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + initialize(); + + const results = []; + try { + for await (const result of notifier) { + results.push(result); + publishNextBatch(); + } + throw new Error('for-await-of completed successfully'); + } catch (err) { + t.is(err, failure, 'for-await-of should throw the failure value'); + } + + t.deepEqual( + results, + [1, 2, 4, 8, 16, 32], + 'only the last of values published between iteration steps should be observed', + ); +}); + +test('makeNotifierFromSubscriber(fails) - getUpdateSince', async t => { + const failure = new Error('failure'); + const { initialize, publishNextBatch, subscriber } = makeBatchPublishKit({ + conclusionMethod: 'fail', + conclusionValue: failure, + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + initialize(); + + const results = []; + let updateCount; + try { + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + const result = await notifier.getUpdateSince(updateCount); + ({ updateCount } = result); + results.push(result.value); + // eslint-disable-next-line no-await-in-loop + t.deepEqual(await notifier.getUpdateSince(), result); + if (updateCount === undefined) { + break; + } + // eslint-disable-next-line no-await-in-loop + await publishNextBatch(); + } + throw new Error('for-await-of completed successfully'); + } catch (err) { + t.is(err, failure, 'await should throw the failure value'); + } + + t.deepEqual( + results, + [1, 2, 4, 8, 16, 32], + 'only the last of values published between iteration steps should be observed', + ); + await t.throwsAsync(() => notifier.getUpdateSince(), { is: failure }); +}); + +test('makeNotifierFromSubscriber - getUpdateSince timing', async t => { + const { initialize, publishNextBatch, subscriber } = makeBatchPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + + const sequence = []; + const firstP = notifier.getUpdateSince(); + void firstP.then(_ => sequence.push('resolve firstP')); + const firstP2 = notifier.getUpdateSince(); + void firstP2.then(_ => sequence.push('resolve firstP2')); + + await delayByTurns(2); + t.deepEqual( + sequence, + [], + 'getUpdateSince() should not settle before a value is published', + ); + + initialize(); + await delayByTurns(4); + t.deepEqual( + sequence, + ['resolve firstP', 'resolve firstP2'], + 'getUpdateSince() should settle after a value is published', + ); + + publishNextBatch(); + t.like( + await Promise.all([firstP, firstP2]), + { ...[{ value: 1 }, { value: 1 }], length: 2 }, + 'early getUpdateSince() should settle to the first result', + ); + t.like( + await notifier.getUpdateSince(), + { value: 2 }, + 'getUpdateSince() should settle to the latest result', + ); + + publishNextBatch(); + const lateResult = await notifier.getUpdateSince(); + t.like( + lateResult, + { value: 4 }, + 'getUpdateSince() should settle to a just-published result', + ); + + const pendingResultP = notifier.getUpdateSince(lateResult.updateCount); + publishNextBatch(); + t.like( + await Promise.all([ + pendingResultP, + notifier.getUpdateSince(), + notifier.getUpdateSince(), + ]), + { ...[{ value: 8 }, { value: 8 }, { value: 8 }], length: 3 }, + 'getUpdateSince(latestUpdateCount) should settle to the next result', + ); + + publishNextBatch(); + publishNextBatch(); + t.like( + await notifier.getUpdateSince(lateResult.updateCount), + { value: 32 }, + 'getUpdateSince(oldUpdateCount) should settle to the latest result', + ); +}); + +test('makeNotifierFromSubscriber - updateCount validation', async t => { + const { subscriber } = makeBatchPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + t.throws(() => notifier.getUpdateSince(1n)); +}); + +test('makeNotifierFromSubscriber - getUpdateSince() result identity', async t => { + const { initialize, publishNextBatch, subscriber } = makeBatchPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + const firstP = notifier.getUpdateSince(); + const firstP2 = notifier.getUpdateSince(); + t.not(firstP, firstP2, 'early getUpdateSince() promises should be distinct'); + + initialize(); + const [firstResult, firstResult2] = await Promise.all([firstP, firstP2]); + let { updateCount } = firstResult; + t.deepEqual( + new Set([ + firstResult, + firstResult2, + ...(await Promise.all([ + notifier.getUpdateSince(), + notifier.getUpdateSince(), + ])), + await notifier.getUpdateSince(), + ]), + new Set([firstResult]), + 'first results should be identical', + ); + + const secondP = notifier.getUpdateSince(updateCount); + const secondP2 = notifier.getUpdateSince(updateCount); + t.not( + secondP, + secondP2, + 'getUpdateSince(updateCount) promises should be distinct', + ); + + publishNextBatch(); + const [secondResult, secondResult2] = await Promise.all([secondP, secondP2]); + t.deepEqual( + new Set([ + secondResult, + secondResult2, + ...(await Promise.all([ + notifier.getUpdateSince(), + notifier.getUpdateSince(updateCount), + ])), + await notifier.getUpdateSince(), + ]), + new Set([secondResult]), + 'late results should be identical', + ); + + t.not( + notifier.getUpdateSince(), + notifier.getUpdateSince(), + 'late getUpdateSince() promises should be distinct', + ); + t.not( + notifier.getUpdateSince(), + notifier.getUpdateSince(updateCount), + 'getUpdateSince() promises should be distinct from getUpdateSince(updateCount) promises', + ); + + let previousResult = secondResult; + let finalResultP; + let finalResult; + while (updateCount) { + finalResultP = notifier.getUpdateSince(updateCount); + publishNextBatch(); + // eslint-disable-next-line no-await-in-loop + finalResult = await finalResultP; + ({ updateCount } = finalResult); + if (updateCount) { + previousResult = finalResult; + } + } + t.deepEqual( + new Set([ + finalResult, + ...(await Promise.all([ + notifier.getUpdateSince(), + notifier.getUpdateSince(updateCount), + ])), + await notifier.getUpdateSince(previousResult.updateCount), + await notifier.getUpdateSince(), + ]), + new Set([finalResult]), + 'final results should be identical', + ); + + const { publisher, subscriber: failureSubscriber } = makePublishKit(); + const failureNotifier = await makeNotifierFromSubscriber(failureSubscriber); + publisher.publish('first value'); + ({ updateCount } = await failureNotifier.getUpdateSince()); + const failureP = failureNotifier.getUpdateSince(); + const failureP2 = failureNotifier.getUpdateSince(); + const failure = new Error('failure'); + publisher.fail(failure); + t.deepEqual( + new Set( + await Promise.all( + [ + failureP, + failureP2, + failureNotifier.getUpdateSince(), + failureNotifier.getUpdateSince(updateCount), + failureNotifier.getUpdateSince(), + ].map(invertPromiseSettlement), + ), + ), + new Set([failure]), + 'failure results should be identical', + ); +});