Skip to content

Commit

Permalink
fix: yet another prefix lossy subscription sketch
Browse files Browse the repository at this point in the history
  • Loading branch information
erights committed May 23, 2022
1 parent b2f9b67 commit cab0a23
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 166 deletions.
154 changes: 76 additions & 78 deletions packages/notifier/README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/notifier/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export {
makeNotifierKit,
makeNotifierFromAsyncIterable,
} from './notifier.js';
export { makeSubscription, makeSubscriptionKit } from './subscriber.js';
export { shadowSubscription, makeSubscriptionKit } from './subscriber.js';
export {
observeNotifier,
observeIterator,
Expand Down
131 changes: 85 additions & 46 deletions packages/notifier/src/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,18 @@

import { HandledPromise, E } from '@endo/eventual-send';
import { Far } from '@endo/marshal';
import { makePromiseKit } from '@endo/promise-kit';

import './types.js';

/**
* @template T
* @param {ERef<SubscriptionInternals<T>>} sharableInternalsP
* @returns {Subscription<T>}
*/
const makeSubscription = sharableInternalsP => {
return Far('Subscription', {
// eslint-disable-next-line no-use-before-define
[Symbol.asyncIterator]: () => makeSubscriptionIterator(sharableInternalsP),

/**
* Use this to distribute a Subscription efficiently over the network,
* by obtaining this from the Subscription to be replicated, and applying
* `makeSubscription` to it at the new site to get an equivalent local
* Subscription at that site.
*
* @returns {ERef<SubscriptionInternals<T>>}
*/
getSharableSubscriptionInternals: () => sharableInternalsP,
});
};
harden(makeSubscription);
export { makeSubscription };

/**
* @template T
* @param {ERef<SubscriptionInternals<T>>} tailP
* @returns {SubscriptionIterator<T>}
* @param {SubscriptionUpdates<T>} tailP
* @returns {AsyncIterator<T>}
*/
const makeSubscriptionIterator = tailP => {
// To understand the implementation, start with
// https://web.archive.org/web/20160404122250/http://wiki.ecmascript.org/doku.php?id=strawman:concurrency#infinite_queue
return Far('SubscriptionIterator', {
subscribe: () => makeSubscription(tailP),
[Symbol.asyncIterator]: () => makeSubscriptionIterator(tailP),
next: () => {
const resultP = E.get(tailP).head;
tailP = E.get(tailP).tail;
Expand All @@ -51,51 +24,117 @@ const makeSubscriptionIterator = tailP => {
});
};

/**
* Makes a behavioral presence of a possibly far subscription.
*
* @template T
* @param {GetUpdatesSince<T>} getUpdatesSince
* @returns {Subscription<T>}
*/
const makeSubscription = getUpdatesSince => {
return Far('Subscription', {
[Symbol.asyncIterator]: () => makeSubscriptionIterator(getUpdatesSince()),

getUpdatesSince,
});
};

/**
* Makes a behavioral presence of a possibly far subscription.
*
* @template T
* @param {ERef<Subscription<T>>} subscriptionP
* @returns {Subscription<T>}
*/
export const shadowSubscription = subscriptionP => {
const getUpdatesSince = (updateCount = NaN) =>
E(subscriptionP).getUpdatesSince(updateCount);

return makeSubscription(getUpdatesSince);
};
harden(shadowSubscription);

/**
* Makes a `{ publication, subscription }` for doing lossless efficient
* distributed pub/sub.
*
* @template T
* @returns {SubscriptionRecord<T>}
*/
const makeSubscriptionKit = () => {
/** @type {((internals: ERef<SubscriptionInternals<T>>) => void) | undefined} */
let rear;
const hp = new HandledPromise(r => (rear = r));
const subscription = makeSubscription(hp);
export const makeSubscriptionKit = () => {
/** @type {((internals: SubscriptionUpdates<T>) => void) | undefined} */
let tailR;
let tailP = new HandledPromise(r => (tailR = r));

let currentUpdateCount = 1;
/** @type {SubscriptionUpdates<T>} */
let currentP = tailP;
const advanceCurrent = () => {
// If tailP has not advanced past currentP, do nothing.
if (currentP !== tailP) {
currentUpdateCount += 1;
currentP = tailP;
}
};

const getUpdatesSince = (updateCount = NaN) => {
if (currentUpdateCount === updateCount) {
return tailP;
} else {
return currentP;
}
};
const subscription = makeSubscription(getUpdatesSince);

/** @type {IterationObserver<T>} */
const publication = Far('publication', {
updateState: value => {
if (rear === undefined) {
if (tailR === undefined) {
throw new Error('Cannot update state after termination.');
}
const { promise: nextTailE, resolve: nextRear } = makePromiseKit();
rear(harden({ head: { value, done: false }, tail: nextTailE }));
rear = nextRear;

advanceCurrent();
let nextTailR;
const nextTailP = new HandledPromise(r => (nextTailR = r));
tailR(
harden({
head: { value, done: false },
updateCount: currentUpdateCount,
tail: nextTailP,
}),
);
tailP = nextTailP;
tailR = nextTailR;
},
finish: finalValue => {
if (rear === undefined) {
if (tailR === undefined) {
throw new Error('Cannot finish after termination.');
}
const readComplaint = HandledPromise.reject(
new Error('cannot read past end of iteration'),
);
readComplaint.catch(_ => {}); // suppress unhandled rejection error
rear({ head: { value: finalValue, done: true }, tail: readComplaint });
rear = undefined;

advanceCurrent();
tailR({
head: { value: finalValue, done: true },
updateCount: currentUpdateCount,
tail: readComplaint,
});
tailR = undefined;
},
fail: reason => {
if (rear === undefined) {
if (tailR === undefined) {
throw new Error('Cannot fail after termination.');
}
/** @type {Promise<SubscriptionInternals<T>>} */

advanceCurrent();
/** @type {SubscriptionUpdates<T>} */
const rejection = HandledPromise.reject(reason);
rear(rejection);
rear = undefined;
tailR(rejection);
tailR = undefined;
},
});
return harden({ publication, subscription });
};
harden(makeSubscriptionKit);
export { makeSubscriptionKit };
46 changes: 22 additions & 24 deletions packages/notifier/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,51 +106,49 @@

// /////////////////////////////////////////////////////////////////////////////

// eslint-disable-next-line jsdoc/require-property
/**
* @template T
* @typedef {{}} BaseSubscription<T>
* @typedef {object} SubscriptionUpdatesRecord
* @property {ERef<IteratorResult<T>>} head internal only
* @property {UpdateCount} updateCount is a value that identifies the update
* @property {SubscriptionUpdates<T>} tail internal onli
*/

/**
* @template T
* @typedef {object} SubscriptionInternals
* @typedef {ERef<SubscriptionUpdatesRecord<T>>} SubscriptionUpdates
* Will be shared between machines, so it must be safe to expose. But other
* software should avoid depending on its internal structure.
* @property {ERef<IteratorResult<T, T>>} head internal only
* @property {Promise<SubscriptionInternals<T>>} tail internal onli
*/

/**
* @template T
* @typedef {BaseSubscription<T> &
* ConsistentAsyncIterable<T> &
* SharableSubscription<T>} Subscription<T>
* A form of AsyncIterable supporting distributed and multicast usage.
* @callback GetUpdatesSince<T>
* @param {UpdateCount} [updateCount]
* @returns {SubscriptionUpdates<T>}
*/

/**
* @template T
* @typedef {{}} BaseSubscription<T>
*/

/**
* @template T
* @typedef {object} SharableSubscription
* @property {() => ERef<SubscriptionInternals<T>>} getSharableSubscriptionInternals
* Used to replicate the multicast values at other sites. To manually create a
* local representative of a Subscription, do
* ```js
* localIterable =
* makeSubscription(E(remoteIterable).getSharableSubscriptionInternals());
* ```
* The resulting `localIterable` also supports such remote use, and
* will return access to the same representation.
* @property {GetUpdatesSince<T>} getUpdatesSince
* Internally used to get the "current" SharableSubscriptionInternals
* in order to make a subscription iterator that starts there.
* The answer is "current" in that it was accurate at some moment between
* when you asked and when you got the answer.
*/

/**
* @template T
* @typedef {AsyncIterator<T, T> & ConsistentAsyncIterable<T>} SubscriptionIterator<T>
* an AsyncIterator supporting distributed and multicast usage.
*
* @property {() => Subscription<T>} subscribe
* Get a new subscription whose starting position is this iterator's current
* position.
* @typedef {BaseSubscription<T> &
* ConsistentAsyncIterable<T> &
* SharableSubscription<T>} Subscription<T>
* A form of AsyncIterable supporting distributed and multicast usage.
*/

/**
Expand Down
1 change: 0 additions & 1 deletion packages/notifier/test/iterable-testing-tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ export const bob = async asyncIterableP => {
* @returns {Promise<Passable[]>}
*/
export const carol = async subscriptionP => {
// @ts-expect-error
const subscriptionIteratorP = E(subscriptionP)[Symbol.asyncIterator]();
const { promise: afterA, resolve: afterAResolve } = makePromiseKit();

Expand Down
26 changes: 14 additions & 12 deletions packages/notifier/test/test-subscriber-examples.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@
import { test } from './prepare-test-env-ava.js';

// eslint-disable-next-line import/order
import { E } from '@endo/eventual-send';
import {
observeIteration,
makeSubscriptionKit,
makeSubscription,
shadowSubscription,
} from '../src/index.js';
import { paula, alice, bob, carol } from './iterable-testing-tools.js';

import '../src/types.js';

test('subscription for-await-of success example', async t => {
// TODO Update the tests marked `test.skip` to take prefix lossiness into
// account.

test.skip('subscription for-await-of success example', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const log = await alice(subscription);

t.deepEqual(log, [['non-final', 'a'], ['non-final', 'b'], ['finished']]);
});

test('subscription observeIteration success example', async t => {
test.skip('subscription observeIteration success example', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const log = await bob(subscription);
Expand All @@ -47,7 +49,7 @@ test('subscription for-await-of cannot eat promise', async t => {
t.assert(log[0][1] instanceof TypeError);
});

test('subscription observeIteration can eat promise', async t => {
test.skip('subscription observeIteration can eat promise', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const subP = Promise.resolve(subscription);
Expand All @@ -60,21 +62,21 @@ test('subscription observeIteration can eat promise', async t => {
]);
});

test('subscription for-await-of on local representative', async t => {
test.skip('subscription for-await-of on local representative', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const subP = Promise.resolve(subscription);
const localSub = makeSubscription(E(subP).getSharableSubscriptionInternals());
const localSub = shadowSubscription(subP);
const log = await alice(localSub);

t.deepEqual(log, [['non-final', 'a'], ['non-final', 'b'], ['finished']]);
});

test('subscription observeIteration on local representative', async t => {
test.skip('subscription observeIteration on local representative', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const subP = Promise.resolve(subscription);
const localSub = makeSubscription(E(subP).getSharableSubscriptionInternals());
const localSub = shadowSubscription(subP);
const log = await bob(localSub);

t.deepEqual(log, [
Expand All @@ -84,7 +86,7 @@ test('subscription observeIteration on local representative', async t => {
]);
});

test('subscription for-await-of on generic representative', async t => {
test.skip('subscription for-await-of on generic representative', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const subP = Promise.resolve(subscription);
Expand All @@ -95,7 +97,7 @@ test('subscription for-await-of on generic representative', async t => {
t.deepEqual(log, [['non-final', 'a'], ['non-final', 'b'], ['finished']]);
});

test('subscription observeIteration on generic representative', async t => {
test.skip('subscription observeIteration on generic representative', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const subP = Promise.resolve(subscription);
Expand All @@ -114,7 +116,7 @@ test('subscription observeIteration on generic representative', async t => {
// Carol is specific to subscription, so there is nothing analogous to the
// following in test-notifier-examples

test('subscribe to subscriptionIterator success example', async t => {
test.skip('subscribe to subscriptionIterator success example', async t => {
const { publication, subscription } = makeSubscriptionKit();
paula(publication);
const log = await carol(subscription);
Expand Down
6 changes: 2 additions & 4 deletions packages/pegasus/test/test-peg.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { AmountMath } from '@agoric/ertp';
import { makeZoeKit } from '@agoric/zoe';

import fakeVatAdmin from '@agoric/zoe/tools/fakeVatAdmin.js';
import { makeSubscription } from '@agoric/notifier';
import { shadowSubscription } from '@agoric/notifier';

import '@agoric/ertp/exported.js';
import { makePromiseKit } from '@endo/promise-kit';
Expand All @@ -28,9 +28,7 @@ const contractPath = `${dirname}/../src/pegasus.js`;
* @returns {AsyncIterator<T, T>}
*/
const makeAsyncIteratorFromSubscription = sub =>
makeSubscription(E(sub).getSharableSubscriptionInternals())[
Symbol.asyncIterator
]();
shadowSubscription(sub)[Symbol.asyncIterator]();

/**
* @param {import('ava').Assertions} t
Expand Down

0 comments on commit cab0a23

Please sign in to comment.