Skip to content

Commit

Permalink
feat(notifier): Add makeNotifierFromSubscriber (#5737)
Browse files Browse the repository at this point in the history
* fix(notifier): Make makeAsyncIterableFromNotifier lossy

Cherry-picked from gh-5413-lossy-makeNotifierFromAsyncIterable.
See #5695 .

* fix(notifier): Revert "Make makeAsyncIterableFromNotifier lossy"

Eager consumption led to infinite loops; see
#5695 for context.

* feat(notifier): Add makeNotifierFromSubscriber

Fixes #5413

* test(notifier): Update per code review

* chore(notifier): Resolve lint warnings

* fix(notifier): Align makeNotifierFromSubscriber with makeNotifierKit

getUpdateSince() always consults the source subscribeAfter() rather than
using a possibly-stale local cache.

* fix master merge

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Turadg Aleahmad <turadg@agoric.com>
  • Loading branch information
3 people committed Jul 14, 2022
1 parent dc4798e commit 077718a
Show file tree
Hide file tree
Showing 4 changed files with 463 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/notifier/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export {
makeNotifier,
makeNotifierKit,
makeNotifierFromAsyncIterable,
makeNotifierFromSubscriber,
} from './notifier.js';
export { makeSubscription, makeSubscriptionKit } from './subscriber.js';
export {
Expand Down
94 changes: 94 additions & 0 deletions packages/notifier/src/notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,103 @@ export const makeNotifierKit = (...initialStateArr) => {
return harden({ notifier, updater });
};

/**
* @template T
* @param {ERef<Subscriber<T>>} subscriberP
* @returns {Notifier<T>}
*/
export const makeNotifierFromSubscriber = subscriberP => {
/** @type {bigint} */
let latestInboundCount;
/** @type {UpdateCount & bigint} */
let latestUpdateCount = 0n;
/** @type {WeakMap<PublicationRecord<T>, Promise<UpdateRecord<T>>>} */
const outboundResults = new WeakMap();

/**
* @param {PublicationRecord<T>} record
* @returns {Promise<UpdateRecord<T>>}
*/
const translateInboundPublicationRecord = record => {
// Leverage identity preservation of `record`.
const existingOutboundResult = outboundResults.get(record);
if (existingOutboundResult) {
return existingOutboundResult;
}

latestInboundCount = record.publishCount;
latestUpdateCount += 1n;
const resultP = E.when(record.head, ({ value, done }) => {
if (done) {
// Final results have undefined updateCount.
return harden({ value, updateCount: undefined });
}
return harden({ value, updateCount: latestUpdateCount });
});
outboundResults.set(record, resultP);
return resultP;
};

/**
* @template T
* @type {BaseNotifier<T>}
*/
const baseNotifier = Far('baseNotifier', {
getUpdateSince(updateCount = -1n) {
assert(
updateCount <= latestUpdateCount,
'argument must be a previously-issued updateCount.',
);

if (updateCount < latestUpdateCount) {
// Return the most recent result possible without imposing an unnecessary delay.
return E(subscriberP)
.subscribeAfter()
.then(translateInboundPublicationRecord);
}

// Return a result that follows the last-returned result,
// skipping over intermediate results if latestInboundCount
// no longer corresponds with the latest result.
// Note that unlike notifiers and subscribers respectively returned by
// makeNotifierKit and makePublishKit, this result is not guaranteed
// to follow the result returned when a non-latest updateCount is provided
// (e.g., it is possible for `notifierFromSubscriber.getUpdateSince()` and
// `notifierFromSubscriber.getUpdateSince(latestUpdateCount)` to both
// settle to the same object `newLatest` where `newLatest.updateCount`
// is one greater than `latestUpdateCount`).
return E(subscriberP)
.subscribeAfter(latestInboundCount)
.then(translateInboundPublicationRecord);
},
});

/** @type {Notifier<T>} */
const notifier = Far('notifier', {
...makeAsyncIterableFromNotifier(baseNotifier),
...baseNotifier,

/**
* Use this to distribute a Notifier efficiently over the network,
* by obtaining this from the Notifier to be replicated, and applying
* `makeNotifier` to it at the new site to get an equivalent local
* Notifier at that site.
*/
getSharableNotifierInternals: () => baseNotifier,
getStoreKey: () => harden({ notifier }),
});
return notifier;
};
harden(makeNotifierFromSubscriber);

/**
* Adaptor from async iterable to notifier.
*
* @deprecated The resulting notifier is lossless, which is not desirable.
* Prefer makeNotifierFromSubscriber, and refer to
* https://github.com/Agoric/agoric-sdk/issues/5413 and
* https://github.com/Agoric/agoric-sdk/pull/5695 for context.
*
* @template T
* @param {ERef<AsyncIterable<T>>} asyncIterableP
* @returns {Notifier<T>}
Expand Down
10 changes: 10 additions & 0 deletions packages/notifier/test/iterable-testing-tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ export const invertPromiseSettlement = promise =>
rejection => rejection,
);

// Return a promise that will resolve in the specified number of turns,
// supporting asynchronous sleep.
export const delayByTurns = async turnCount => {
while (turnCount) {
turnCount -= 1;
// eslint-disable-next-line no-await-in-loop
await undefined;
}
};

/** @typedef {import('@endo/marshal').Passable} Passable */

/** @typedef {import('ava').Assertions} Assertions */
Expand Down
Loading

0 comments on commit 077718a

Please sign in to comment.