Skip to content

Commit

Permalink
fix: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
erights committed May 23, 2022
1 parent cab0a23 commit 79a6311
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 25 deletions.
2 changes: 1 addition & 1 deletion packages/notifier/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ local or remote promise for an AsyncIterable. `observeIteration` only sends it e
using `E` (equivalent to the tildot syntax `~.`), and so doesn't care about these differences.

While correct, Bob’s code is sub-optimal. Its distributed systems properties are not terrible, but
Bob does better using `getUpdatesSince()` (provided by
Bob does better using `getIterationSince()` (provided by
SubscriptionKit). This lets Bob make a local AsyncIterable that coordinates better with producer
Paula's IterationObserver.

Expand Down
35 changes: 18 additions & 17 deletions packages/notifier/src/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import './types.js';

/**
* @template T
* @param {SubscriptionUpdates<T>} tailP
* @param {Iteration<T>} tailP
* @returns {AsyncIterator<T>}
*/
const makeSubscriptionIterator = tailP => {
Expand All @@ -28,14 +28,14 @@ const makeSubscriptionIterator = tailP => {
* Makes a behavioral presence of a possibly far subscription.
*
* @template T
* @param {GetUpdatesSince<T>} getUpdatesSince
* @param {GetIterationSince<T>} getIterationSince
* @returns {Subscription<T>}
*/
const makeSubscription = getUpdatesSince => {
const makeSubscription = getIterationSince => {
return Far('Subscription', {
[Symbol.asyncIterator]: () => makeSubscriptionIterator(getUpdatesSince()),
[Symbol.asyncIterator]: () => makeSubscriptionIterator(getIterationSince()),

getUpdatesSince,
getIterationSince,
});
};

Expand All @@ -47,10 +47,10 @@ const makeSubscription = getUpdatesSince => {
* @returns {Subscription<T>}
*/
export const shadowSubscription = subscriptionP => {
const getUpdatesSince = (updateCount = NaN) =>
E(subscriptionP).getUpdatesSince(updateCount);
const getIterationSince = (updateCount = NaN) =>
E(subscriptionP).getIterationSince(updateCount);

return makeSubscription(getUpdatesSince);
return makeSubscription(getIterationSince);
};
harden(shadowSubscription);

Expand All @@ -62,29 +62,30 @@ harden(shadowSubscription);
* @returns {SubscriptionRecord<T>}
*/
export const makeSubscriptionKit = () => {
/** @type {((internals: SubscriptionUpdates<T>) => void) | undefined} */
/** @type {((internals: Iteration<T>) => void) | undefined} */
let tailR;
let tailP = new HandledPromise(r => (tailR = r));

let currentUpdateCount = 1;
/** @type {SubscriptionUpdates<T>} */
/** @type {Iteration<T>} */
let currentP = tailP;
const advanceCurrent = () => {
// If tailP has not advanced past currentP, do nothing.
if (currentP !== tailP) {
currentUpdateCount += 1;
currentP = tailP;
if (currentP === tailP) {
// If tailP has not advanced past currentP, do nothing.
return;
}
currentUpdateCount += 1;
currentP = tailP;
};

const getUpdatesSince = (updateCount = NaN) => {
const getIterationSince = (updateCount = NaN) => {
if (currentUpdateCount === updateCount) {
return tailP;
} else {
return currentP;
}
};
const subscription = makeSubscription(getUpdatesSince);
const subscription = makeSubscription(getIterationSince);

/** @type {IterationObserver<T>} */
const publication = Far('publication', {
Expand Down Expand Up @@ -129,7 +130,7 @@ export const makeSubscriptionKit = () => {
}

advanceCurrent();
/** @type {SubscriptionUpdates<T>} */
/** @type {Iteration<T>} */
const rejection = HandledPromise.reject(reason);
tailR(rejection);
tailR = undefined;
Expand Down
14 changes: 7 additions & 7 deletions packages/notifier/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,24 @@

/**
* @template T
* @typedef {object} SubscriptionUpdatesRecord
* @typedef {object} IterationRecord
* @property {ERef<IteratorResult<T>>} head internal only
* @property {UpdateCount} updateCount is a value that identifies the update
* @property {SubscriptionUpdates<T>} tail internal onli
* @property {Iteration<T>} tail internal only
*/

/**
* @template T
* @typedef {ERef<SubscriptionUpdatesRecord<T>>} SubscriptionUpdates
* @typedef {ERef<IterationRecord<T>>} Iteration
* Will be shared between machines, so it must be safe to expose. But other
* software should avoid depending on its internal structure.
* software should consider it opaque, not depending on its internal structure.
*/

/**
* @template T
* @callback GetUpdatesSince<T>
* @callback GetIterationSince<T>
* @param {UpdateCount} [updateCount]
* @returns {SubscriptionUpdates<T>}
* @returns {Iteration<T>}
*/

/**
Expand All @@ -136,7 +136,7 @@
/**
* @template T
* @typedef {object} SharableSubscription
* @property {GetUpdatesSince<T>} getUpdatesSince
* @property {GetIterationSince<T>} getIterationSince
* Internally used to get the "current" SharableSubscriptionInternals
* in order to make a subscription iterator that starts there.
* The answer is "current" in that it was accurate at some moment between
Expand Down

0 comments on commit 79a6311

Please sign in to comment.