From 5ca9217ea247689427c8abd73768f53dda9f682f Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 7 Jun 2022 07:41:23 -0600 Subject: [PATCH] chore(casting): `@agoric/chain-streams` -> `@agoric/casting` --- .github/workflows/test-all-packages.yml | 4 +- packages/agoric-cli/package.json | 2 +- packages/agoric-cli/src/entrypoint.js | 2 +- .../agoric-cli/src/{stream.js => follow.js} | 38 +++++++++---------- packages/agoric-cli/src/main.js | 8 ++-- packages/agoric-cli/src/sdk-package-names.js | 2 +- packages/{chain-streams => casting}/README.md | 38 +++++++++---------- .../{chain-streams => casting}/jsconfig.json | 0 .../node-fetch-shim.js | 0 .../{chain-streams => casting}/package.json | 4 +- .../src/casting-spec.js} | 27 ++++++------- .../src/change-follower.js} | 16 ++++---- .../src/defaults.js | 0 .../src/follower-cosmjs.js} | 34 ++++++++--------- .../src/iterable.js | 6 +-- .../src/leader-netconfig.js | 8 ++-- .../{chain-streams => casting}/src/leader.js | 8 ++-- .../{chain-streams => casting}/src/main.js | 4 +- .../{chain-streams => casting}/src/shuffle.js | 0 .../{chain-streams => casting}/src/types.js | 20 +++++----- .../test/fake-rpc-server.js | 2 +- .../test/lockdown.js | 0 .../test/prepare-test-env-ava.js | 0 .../test/test-mvp.js | 18 ++++----- 24 files changed, 121 insertions(+), 120 deletions(-) rename packages/agoric-cli/src/{stream.js => follow.js} (72%) rename packages/{chain-streams => casting}/README.md (74%) rename packages/{chain-streams => casting}/jsconfig.json (100%) rename packages/{chain-streams => casting}/node-fetch-shim.js (100%) rename packages/{chain-streams => casting}/package.json (94%) rename packages/{chain-streams/src/store-key.js => casting/src/casting-spec.js} (56%) rename packages/{chain-streams/src/watcher.js => casting/src/change-follower.js} (68%) rename packages/{chain-streams => casting}/src/defaults.js (100%) rename packages/{chain-streams/src/stream-cosmjs.js => casting/src/follower-cosmjs.js} (88%) rename packages/{chain-streams => casting}/src/iterable.js (86%) rename packages/{chain-streams => casting}/src/leader-netconfig.js (88%) rename packages/{chain-streams => casting}/src/leader.js (87%) rename packages/{chain-streams => casting}/src/main.js (75%) rename packages/{chain-streams => casting}/src/shuffle.js (100%) rename packages/{chain-streams => casting}/src/types.js (72%) rename packages/{chain-streams => casting}/test/fake-rpc-server.js (98%) rename packages/{chain-streams => casting}/test/lockdown.js (100%) rename packages/{chain-streams => casting}/test/prepare-test-env-ava.js (100%) rename packages/{chain-streams => casting}/test/test-mvp.js (79%) diff --git a/.github/workflows/test-all-packages.yml b/.github/workflows/test-all-packages.yml index 5f94c51416d0..c111de7c7d22 100644 --- a/.github/workflows/test-all-packages.yml +++ b/.github/workflows/test-all-packages.yml @@ -142,8 +142,8 @@ jobs: # END-TEST-BOILERPLATE - name: yarn test (cosmos) run: cd golang/cosmos && yarn ${{ steps.vars.outputs.test }} - - name: yarn test (chain-streams) - run: cd packages/chain-streams && yarn ${{ steps.vars.outputs.test }} + - name: yarn test (casting) + run: cd packages/casting && yarn ${{ steps.vars.outputs.test }} - name: yarn test (run-protocol) run: cd packages/run-protocol && yarn ${{ steps.vars.outputs.test }} - name: yarn test (pegasus) diff --git a/packages/agoric-cli/package.json b/packages/agoric-cli/package.json index a27c8057263d..39d20ff9ad68 100644 --- a/packages/agoric-cli/package.json +++ b/packages/agoric-cli/package.json @@ -29,7 +29,7 @@ "dependencies": { "@agoric/access-token": "^0.4.18", "@agoric/assert": "^0.4.0", - "@agoric/chain-streams": "^0.1.0", + "@agoric/casting": "^0.1.0", "@agoric/nat": "^4.1.0", "@endo/bundle-source": "^2.2.0", "@endo/captp": "^2.0.7", diff --git a/packages/agoric-cli/src/entrypoint.js b/packages/agoric-cli/src/entrypoint.js index dd1020a7b43e..9005d38f83d4 100755 --- a/packages/agoric-cli/src/entrypoint.js +++ b/packages/agoric-cli/src/entrypoint.js @@ -4,7 +4,7 @@ import '@endo/init/pre.js'; import 'esm'; -import '@agoric/chain-streams/node-fetch-shim.js'; +import '@agoric/casting/node-fetch-shim.js'; import '@endo/init'; import path from 'path'; diff --git a/packages/agoric-cli/src/stream.js b/packages/agoric-cli/src/follow.js similarity index 72% rename from packages/agoric-cli/src/stream.js rename to packages/agoric-cli/src/follow.js index eb0cc2fab79d..00b3e7b2d68f 100644 --- a/packages/agoric-cli/src/stream.js +++ b/packages/agoric-cli/src/follow.js @@ -6,14 +6,14 @@ import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js'; import { delay, iterateLatest, - makeChainStream, + makeFollower, makeLeader, - makeStoreKey, -} from '@agoric/chain-streams'; + makeCastingSpec, +} from '@agoric/casting'; -export default async function streamMain(progname, rawArgs, powers, opts) { +export default async function followerMain(progname, rawArgs, powers, opts) { const { anylogger } = powers; - const console = anylogger('agoric:stream'); + const console = anylogger('agoric:follower'); const { integrity, @@ -23,8 +23,8 @@ export default async function streamMain(progname, rawArgs, powers, opts) { sleep, } = opts; - /** @type {import('@agoric/chain-streams').ChainStreamOptions} */ - const streamOptions = { + /** @type {import('@agoric/casting').FollowerOptions} */ + const followerOptions = { integrity, }; @@ -33,7 +33,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) { switch (output) { case 'justinlines': case 'justin': { - streamOptions.unserializer = null; + followerOptions.unserializer = null; const pretty = !output.endsWith('lines'); formatOutput = ({ body }) => { const encoded = JSON.parse(body); @@ -55,15 +55,15 @@ export default async function streamMain(progname, rawArgs, powers, opts) { } case 'hex': { // Dump as hex strings. - streamOptions.decode = buf => buf; - streamOptions.unserializer = null; + followerOptions.decode = buf => buf; + followerOptions.unserializer = null; formatOutput = buf => buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), ''); break; } case 'text': { - streamOptions.decode = buf => new TextDecoder().decode(buf); - streamOptions.unserializer = null; + followerOptions.decode = buf => new TextDecoder().decode(buf); + followerOptions.unserializer = null; formatOutput = buf => buf; break; } @@ -74,7 +74,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) { } if (integrity !== 'none') { - streamOptions.crasher = Far('stream crasher', { + followerOptions.crasher = Far('follower crasher', { crash: (...args) => { console.error(...args); console.warn(`You are running with '--integrity=${integrity}'`); @@ -87,7 +87,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) { } // TODO: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ - /** @type {import('@agoric/chain-streams').ChainLeaderOptions} */ + /** @type {import('@agoric/casting').LeaderOptions} */ const leaderOptions = { retryCallback: (e, _attempt) => { verbose && console.warn('Retrying due to:', e); @@ -105,14 +105,14 @@ export default async function streamMain(progname, rawArgs, powers, opts) { const [_cmd, ...specs] = rawArgs; - verbose && console.warn('Streaming from leader at', bootstrap); + verbose && console.warn('Creating leader for', bootstrap); const leader = makeLeader(bootstrap, leaderOptions); await Promise.all( specs.map(async spec => { - verbose && console.warn('Consuming', spec); - const storeKey = makeStoreKey(spec); - const stream = makeChainStream(leader, storeKey, streamOptions); - for await (const { value } of iterateLatest(stream)) { + verbose && console.warn('Following', spec); + const castingSpec = makeCastingSpec(spec); + const follower = makeFollower(leader, castingSpec, followerOptions); + for await (const { value } of iterateLatest(follower)) { process.stdout.write(`${formatOutput(value)}\n`); } }), diff --git a/packages/agoric-cli/src/main.js b/packages/agoric-cli/src/main.js index 9c88f72c2c42..6d32576e8eea 100644 --- a/packages/agoric-cli/src/main.js +++ b/packages/agoric-cli/src/main.js @@ -8,7 +8,7 @@ import initMain from './init.js'; import installMain from './install.js'; import setDefaultsMain from './set-defaults.js'; import startMain from './start.js'; -import streamMain from './stream.js'; +import followMain from './follow.js'; import walletMain from './open.js'; const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet'; @@ -162,8 +162,8 @@ const main = async (progname, rawArgs, powers) => { }); program - .command('stream ') - .description('read an Agoric Chain Stream') + .command('follow ') + .description('follow an Agoric Casting leader') .option( '--integrity ', 'set integrity mode', @@ -210,7 +210,7 @@ const main = async (progname, rawArgs, powers) => { .option('-B, --bootstrap ', 'network bootstrap configuration') .action(async (pathSpecs, cmd) => { const opts = { ...program.opts(), ...cmd.opts() }; - return subMain(streamMain, ['stream', ...pathSpecs], opts); + return subMain(followMain, ['follow', ...pathSpecs], opts); }); const addRunOptions = cmd => diff --git a/packages/agoric-cli/src/sdk-package-names.js b/packages/agoric-cli/src/sdk-package-names.js index 5438155c310f..6048ab9efe16 100644 --- a/packages/agoric-cli/src/sdk-package-names.js +++ b/packages/agoric-cli/src/sdk-package-names.js @@ -4,7 +4,7 @@ export default [ "@agoric/access-token", "@agoric/assert", - "@agoric/chain-streams", + "@agoric/casting", "@agoric/cosmic-swingset", "@agoric/cosmos", "@agoric/deploy-script-support", diff --git a/packages/chain-streams/README.md b/packages/casting/README.md similarity index 74% rename from packages/chain-streams/README.md rename to packages/casting/README.md index e953d092a1ea..a26a3cb3eeb1 100644 --- a/packages/chain-streams/README.md +++ b/packages/casting/README.md @@ -1,11 +1,11 @@ -# Chain Streams +# Agoric Casting -This [Agoric](https://agoric.com) Chain Streams package consumes data server -publication leaders in a flexible, future-proof way. +This [Agoric](https://agoric.com) Casting package follows ocap broadcasts in a +flexible, future-proof way. -TL;DR: You can run `yarn demo`, or to consume a mailbox stream do: +TL;DR: You can run `yarn demo`, or to follow a mailbox castingSpec do: ```sh -npx agoric stream -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext +npx agoric follow -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext ``` An example of following an on-chain mailbox in code (using this package) is: @@ -13,31 +13,31 @@ An example of following an on-chain mailbox in code (using this package) is: ```js // First, obtain a Hardened JS environment via Endo. import '@endo/init/pre-remoting.js'; // needed only for the next line -import '@agoric/chain-streams/node-fetch-shim.js'; // needed for Node.js +import '@agoric/castingSpec/node-fetch-shim.js'; // needed for Node.js import '@endo/init'; import { iterateLatest, - makeChainStream, + makeFollower, makeLeader, - makeStoreKey, -} from '@agoric/chain-streams'; + makeCastingSpec, +} from '@agoric/casting'; -// Iterate over a mailbox stream on the devnet. +// Iterate over a mailbox follower on the devnet. const leader = makeLeader('https://devnet.agoric.net/network-config'); -const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz'); -const stream = makeChainStream(leader, storeKey); -for await (const { value } of iterateLatest(stream)) { +const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); +const follower = makeFollower(leader, castingSpec); +for await (const { value } of iterateLatest(follower)) { console.log(`here's a mailbox value`, value); } ``` -## Stream options +## Follower options -The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides an optional bag of options: +The `followerOpts` argument in `makeFollower(leader, key, followerOpts)` provides an optional bag of options: - the `integrity` option, which has three possibilities: - `'strict'` - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block), - - `'optimistic'` (default) - release data immediately, but may crash the stream in the future if an already-released value could not be proven, + - `'optimistic'` (default) - release data immediately, but may crash the follower in the future if an already-released value could not be proven, - `'none'` - release data immediately without validation - the `decode` option is a function to translate `buf: Uint8Array` into `data: string` - (default) - interpret buf as a utf-8 string, then `JSON.parse` it @@ -46,7 +46,7 @@ The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides - `null` - don't additionally unserialize data before releasing it - any unserializer object supporting `E(unserializer).unserialize(data)` - the `crasher` option can be - - `null` (default) stream failures only propagate an exception/rejection + - `null` (default) follower failures only propagate an exception/rejection - any crasher object supporting `E(crasher).crash(reason)` ## Behind the scenes @@ -54,11 +54,11 @@ The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides - the network config contains enough information to obtain Tendermint RPC nodes for a given Agoric network. You can use `makeLeaderFromRpcAddresses` directly if you want to avoid fetching a network-config. -- each stream uses periodic CosmJS state polling (every X milliseconds) which +- each follower uses periodic CosmJS state polling (every X milliseconds) which can be refreshed more expediently via a Tendermint subscription to the corresponding `state_change` event - published (string) values are automatically unmarshalled, but without object references. a custom `marshaller` for your application. -- the `iterateRecent` adapter transforms a stream into a local async iterator +- the `iterateRecent` adapter transforms a follower into a local async iterator that produces only the last queried value (with no history reconstruction) ## Status diff --git a/packages/chain-streams/jsconfig.json b/packages/casting/jsconfig.json similarity index 100% rename from packages/chain-streams/jsconfig.json rename to packages/casting/jsconfig.json diff --git a/packages/chain-streams/node-fetch-shim.js b/packages/casting/node-fetch-shim.js similarity index 100% rename from packages/chain-streams/node-fetch-shim.js rename to packages/casting/node-fetch-shim.js diff --git a/packages/chain-streams/package.json b/packages/casting/package.json similarity index 94% rename from packages/chain-streams/package.json rename to packages/casting/package.json index da7fc65ec6ef..fe5a8ba96e22 100644 --- a/packages/chain-streams/package.json +++ b/packages/casting/package.json @@ -1,7 +1,7 @@ { - "name": "@agoric/chain-streams", + "name": "@agoric/casting", "version": "0.1.0", - "description": "Agoric's Chain Streams", + "description": "Agoric's OCap broadcastingSpec system", "type": "module", "main": "src/main.js", "repository": "https://github.com/Agoric/agoric-sdk", diff --git a/packages/chain-streams/src/store-key.js b/packages/casting/src/casting-spec.js similarity index 56% rename from packages/chain-streams/src/store-key.js rename to packages/casting/src/casting-spec.js index fd46991da831..aa5bffa81a86 100644 --- a/packages/chain-streams/src/store-key.js +++ b/packages/casting/src/casting-spec.js @@ -3,9 +3,9 @@ import { toAscii } from '@cosmjs/encoding'; /** * @param {string} storagePath - * @returns {import('./types').ChainStoreKey} + * @returns {import('./types').CastingSpec} */ -const swingsetPathToStoreKey = storagePath => +const swingsetPathToCastingSpec = storagePath => harden({ storeName: 'swingset', storeSubkey: toAscii(`swingset/data:${storagePath}`), @@ -17,9 +17,9 @@ const DATA_PREFIX_BYTES = new Uint8Array([0]); /** * @param {string} storagePath * @param {string} [storeName] - * @returns {import('./types').ChainStoreKey} + * @returns {import('./types').CastingSpec} */ -const vstoragePathToStoreKey = (storagePath, storeName = 'vstorage') => { +const vstoragePathToCastingSpec = (storagePath, storeName = 'vstorage') => { const elems = storagePath ? storagePath.split('.') : []; const buf = toAscii(`${elems.length}.${storagePath}`); return harden({ @@ -29,26 +29,27 @@ const vstoragePathToStoreKey = (storagePath, storeName = 'vstorage') => { }); }; -export const DEFAULT_PATH_CONVERTER = vstoragePathToStoreKey; +export const DEFAULT_PATH_CONVERTER = vstoragePathToCastingSpec; /** - * @type {Record import('./types').ChainStoreKey>} + * @type {Record import('./types').CastingSpec>} */ export const pathPrefixToConverters = harden({ - 'swingset:': swingsetPathToStoreKey, - 'vstore:': vstoragePathToStoreKey, + 'swingset:': swingsetPathToCastingSpec, + 'vstore:': vstoragePathToCastingSpec, ':': DEFAULT_PATH_CONVERTER, }); /** - * @param {string} pathSpec - * @returns {import('./types').ChainStoreKey} + * @param {string} specString + * @returns {import('./types').CastingSpec} */ -export const makeStoreKey = pathSpec => { - const match = pathSpec.match(/^([^:.]*:)(.*)/); +export const makeCastingSpec = specString => { + assert.typeof(specString, 'string'); + const match = specString.match(/^([^:.]*:)(.*)/); assert( match, - `path spec ${pathSpec} does not match 'PREFIX:PATH' or ':PATH'`, + `spec string ${specString} does not match 'PREFIX:PATH' or ':PATH'`, ); const kind = match[1]; const storePath = match[2]; diff --git a/packages/chain-streams/src/watcher.js b/packages/casting/src/change-follower.js similarity index 68% rename from packages/chain-streams/src/watcher.js rename to packages/casting/src/change-follower.js index b25505398add..93e738456c5a 100644 --- a/packages/chain-streams/src/watcher.js +++ b/packages/casting/src/change-follower.js @@ -5,19 +5,19 @@ import { DEFAULT_KEEP_POLLING } from './defaults.js'; /** * Just return an unspecified allegedValue every poll period. * - * @param {import('./types').ChainLeader} leader - * @param {import('./types.js').ChainStoreKey} storeKey - * @returns {Promise>} + * @param {import('./types').Leader} leader + * @param {import('./types.js').CastingSpec} castingSpec + * @returns {Promise>} */ -export const makePollingWatcher = async (leader, storeKey) => { +export const makePollingChangeFollower = async (leader, castingSpec) => { const { keepPolling = DEFAULT_KEEP_POLLING } = await E(leader).getOptions(); - return Far('key watcher stream', { + return Far('polling change follower', { getLatestIterable: () => - Far('key watcher iterable', { + Far('polling change follower iterable', { [Symbol.asyncIterator]: () => { /** @type {Promise | undefined} */ let nextPollPromise; - return Far('key watcher iterator', { + return Far('polling change follower iterator', { next: async () => { if (!nextPollPromise) { nextPollPromise = keepPolling(); @@ -25,7 +25,7 @@ export const makePollingWatcher = async (leader, storeKey) => { const keepGoing = await nextPollPromise; nextPollPromise = undefined; const change = harden({ - storeKey, + castingSpec, // Make no warrant as to the values. values: [], }); diff --git a/packages/chain-streams/src/defaults.js b/packages/casting/src/defaults.js similarity index 100% rename from packages/chain-streams/src/defaults.js rename to packages/casting/src/defaults.js diff --git a/packages/chain-streams/src/stream-cosmjs.js b/packages/casting/src/follower-cosmjs.js similarity index 88% rename from packages/chain-streams/src/stream-cosmjs.js rename to packages/casting/src/follower-cosmjs.js index b4e2c63e6f48..44bc2e8a90da 100644 --- a/packages/chain-streams/src/stream-cosmjs.js +++ b/packages/casting/src/follower-cosmjs.js @@ -9,8 +9,8 @@ import { DEFAULT_DECODER, DEFAULT_UNSERIALIZER } from './defaults.js'; const { details: X } = assert; -/** @template T @typedef {import('./types.js').ChainStreamElement} ChainStreamElement */ -/** @template T @typedef {import('./types.js').ChainStream} ChainStream */ +/** @template T @typedef {import('./types.js').FollowerElement} FollowerElement */ +/** @template T @typedef {import('./types.js').Follower} Follower */ /** * @template T @@ -42,7 +42,7 @@ const collectSingle = values => { */ /** - * @type {Record['integrity'], QueryVerifier>} + * @type {Record['integrity'], QueryVerifier>} */ export const integrityToQueryVerifier = harden({ strict: async (getProvenValue, crash, _getAllegedValue) => { @@ -77,12 +77,12 @@ export const integrityToQueryVerifier = harden({ /** * @template T - * @param {ERef} leader - * @param {import('./types').ChainStoreKey} storeKey - * @param {import('./types').ChainStreamOptions} options - * @returns {ChainStream>} + * @param {ERef} leader + * @param {import('./types').CastingSpec} castingSpec + * @param {import('./types').FollowerOptions} options + * @returns {Follower>} */ -export const makeChainStream = (leader, storeKey, options = {}) => { +export const makeFollower = (leader, castingSpec, options = {}) => { const { decode = DEFAULT_DECODER, unserializer = DEFAULT_UNSERIALIZER, @@ -93,11 +93,11 @@ export const makeChainStream = (leader, storeKey, options = {}) => { storeName, storeSubkey, dataPrefixBytes = new Uint8Array(), - } = storeKey; + } = castingSpec; /** @type {QueryVerifier} */ const queryVerifier = integrityToQueryVerifier[integrity]; - assert(queryVerifier, X`unrecognized stream integrity mode ${integrity}`); + assert(queryVerifier, X`unrecognized follower integrity mode ${integrity}`); /** @type {Map} */ const endpointToQueryClient = new Map(); @@ -173,10 +173,10 @@ export const makeChainStream = (leader, storeKey, options = {}) => { ); // Enable the periodic fetch. - /** @type {ChainStream>} */ - return Far('chain stream', { + /** @type {Follower>} */ + return Far('chain follower', { getLatestIterable: () => { - /** @type {NotifierRecord>} */ + /** @type {NotifierRecord>} */ const { updater, notifier } = makeNotifierKit(); let finished = false; @@ -246,7 +246,7 @@ export const makeChainStream = (leader, storeKey, options = {}) => { let lastBuf; /** - * @param {import('./types').ChainStoreChange} allegedChange + * @param {import('./types').CastingChange} allegedChange */ const queryAndUpdateOnce = async allegedChange => { const committer = prepareUpdateInOrder(); @@ -288,9 +288,9 @@ export const makeChainStream = (leader, storeKey, options = {}) => { committer.commit({ value }); }; - const changeStream = E(leader).watchStoreKey(storeKey); + const changeFollower = E(leader).watchCasting(castingSpec); const queryWhenKeyChanges = async () => { - for await (const allegedChange of iterateLatest(changeStream)) { + for await (const allegedChange of iterateLatest(changeFollower)) { if (finished) { return; } @@ -299,7 +299,7 @@ export const makeChainStream = (leader, storeKey, options = {}) => { } }; - queryAndUpdateOnce({ values: [], storeKey }).catch(retryOrFail); + queryAndUpdateOnce({ values: [], castingSpec }).catch(retryOrFail); queryWhenKeyChanges().catch(fail); return makeAsyncIterableFromNotifier(notifier); diff --git a/packages/chain-streams/src/iterable.js b/packages/casting/src/iterable.js similarity index 86% rename from packages/chain-streams/src/iterable.js rename to packages/casting/src/iterable.js index 9c0896aab9df..2275959c3984 100644 --- a/packages/chain-streams/src/iterable.js +++ b/packages/casting/src/iterable.js @@ -29,14 +29,14 @@ export const makeAsyncIterableFromNotifier = notifier => /** * @template T - * @param {ERef>} stream + * @param {ERef>} follower */ -export const iterateLatest = stream => { +export const iterateLatest = follower => { // For now, just pass through the iterable. return harden({ /** @returns {AsyncIterator} */ [Symbol.asyncIterator]: () => { - const latestIterable = E(stream).getLatestIterable(); + const latestIterable = E(follower).getLatestIterable(); const iterator = E(latestIterable)[Symbol.asyncIterator](); return harden({ next: () => E(iterator).next(), diff --git a/packages/chain-streams/src/leader-netconfig.js b/packages/casting/src/leader-netconfig.js similarity index 88% rename from packages/chain-streams/src/leader-netconfig.js rename to packages/casting/src/leader-netconfig.js index 2ce11355c407..6861e2c86a17 100644 --- a/packages/chain-streams/src/leader-netconfig.js +++ b/packages/casting/src/leader-netconfig.js @@ -7,7 +7,7 @@ const { details: X } = assert; /** * @param {string[]} rpcAddrs - * @param {import('./types.js').ChainLeaderOptions} [leaderOptions] + * @param {import('./types.js').LeaderOptions} [leaderOptions] */ export const makeLeaderFromRpcAddresses = (rpcAddrs, leaderOptions) => { assert(Array.isArray(rpcAddrs), X`rpcAddrs ${rpcAddrs} must be an array`); @@ -26,11 +26,11 @@ export const makeLeaderFromRpcAddresses = (rpcAddrs, leaderOptions) => { /** * @param {string} netconfigURL - * @param {import('./types.js').ChainLeaderOptions} [options] + * @param {import('./types.js').LeaderOptions} [options] */ export const makeLeaderFromNetworkConfig = (netconfigURL, options = {}) => { const { retryCallback = DEFAULT_RETRY_CALLBACK } = options; - /** @type {import('./types.js').ChainLeaderOptions['retryCallback']} */ + /** @type {import('./types.js').LeaderOptions['retryCallback']} */ const retry = async (err, attempt) => { if (retryCallback) { return retryCallback(err, attempt); @@ -60,7 +60,7 @@ export const makeLeaderFromNetworkConfig = (netconfigURL, options = {}) => { /** * @param {string} bootstrap - * @param {import('./types.js').ChainLeaderOptions} options + * @param {import('./types.js').LeaderOptions} options */ export const makeLeader = (bootstrap, options) => { if (bootstrap.includes('network-config')) { diff --git a/packages/chain-streams/src/leader.js b/packages/casting/src/leader.js similarity index 87% rename from packages/chain-streams/src/leader.js rename to packages/casting/src/leader.js index 0c66579edef0..1f89946ab70f 100644 --- a/packages/chain-streams/src/leader.js +++ b/packages/casting/src/leader.js @@ -2,13 +2,13 @@ import { E, Far } from '@endo/far'; import { DEFAULT_RETRY_CALLBACK } from './defaults.js'; import { shuffle } from './shuffle.js'; -import { makePollingWatcher } from './watcher.js'; +import { makePollingChangeFollower } from './change-follower.js'; /** * Create a chain leader that rotates through a list of endpoints. * * @param {string[]} endpoints - * @param {import('./types.js').ChainLeaderOptions} leaderOptions + * @param {import('./types.js').LeaderOptions} leaderOptions */ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { const { retryCallback = DEFAULT_RETRY_CALLBACK } = leaderOptions; @@ -20,7 +20,7 @@ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { let lastRespondingEndpointIndex = 0; let thisAttempt = 0; - /** @type {import('./types.js').ChainLeader} */ + /** @type {import('./types.js').Leader} */ const leader = Far('round robin leader', { getOptions: () => leaderOptions, retry: async (err, attempt) => { @@ -29,7 +29,7 @@ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { } throw err; }, - watchStoreKey: storeKey => makePollingWatcher(leader, storeKey), + watchCasting: castingSpec => makePollingChangeFollower(leader, castingSpec), /** * @template T * @param {(endpoint: string) => Promise} callback diff --git a/packages/chain-streams/src/main.js b/packages/casting/src/main.js similarity index 75% rename from packages/chain-streams/src/main.js rename to packages/casting/src/main.js index 3e0de76bcafd..4a597fa07851 100644 --- a/packages/chain-streams/src/main.js +++ b/packages/casting/src/main.js @@ -2,7 +2,7 @@ export * from './types.js'; // no named exports export * from './defaults.js'; export * from './leader-netconfig.js'; -export * from './stream-cosmjs.js'; -export * from './store-key.js'; +export * from './follower-cosmjs.js'; +export * from './casting-spec.js'; export * from './leader.js'; export * from './iterable.js'; diff --git a/packages/chain-streams/src/shuffle.js b/packages/casting/src/shuffle.js similarity index 100% rename from packages/chain-streams/src/shuffle.js rename to packages/casting/src/shuffle.js diff --git a/packages/chain-streams/src/types.js b/packages/casting/src/types.js similarity index 72% rename from packages/chain-streams/src/types.js rename to packages/casting/src/types.js index 162f6672bc6f..a0e73e87eeed 100644 --- a/packages/chain-streams/src/types.js +++ b/packages/casting/src/types.js @@ -4,35 +4,35 @@ export {}; /** - * @typedef {object} ChainLeaderOptions + * @typedef {object} LeaderOptions * @property {null | ((err: any, attempt?: number) => Promise)} [retryCallback] * @property {() => Promise} [keepPolling] */ /** - * @typedef {object} ChainStoreChange - * @property {ChainStoreKey} storeKey + * @typedef {object} CastingChange + * @property {CastingSpec} castingSpec * @property {number} [blockHeight] * @property {Uint8Array[]} values */ /** - * @typedef {object} ChainLeader + * @typedef {object} Leader * @property {(error: any, attempt?: number) => Promise} retry - * @property {() => ChainLeaderOptions} getOptions + * @property {() => LeaderOptions} getOptions * @property {(callback: (endpoint: string) => Promise) => Promise} mapEndpoints - * @property {(key: ChainStoreKey) => Promise>} watchStoreKey + * @property {(key: CastingSpec) => Promise>} watchCasting */ /** * @template T - * @typedef {object} ChainStream + * @typedef {object} Follower * @property {() => AsyncIterable} [getLatestIterable] */ /** * @template T - * @typedef {object} ChainStreamElement + * @typedef {object} FollowerElement * @property {T} value */ @@ -47,7 +47,7 @@ export {}; */ /** - * @typedef {object} ChainStreamOptions + * @typedef {object} FollowerOptions * @property {null | import('@endo/far').FarRef} [unserializer] * @property {(buf: Uint8Array) => any} [decode] * @property {'strict'|'optimistic'|'none'} [integrity] @@ -55,7 +55,7 @@ export {}; */ /** - * @typedef {object} ChainStoreKey + * @typedef {object} CastingSpec * @property {string} storeName * @property {Uint8Array} storeSubkey * @property {Uint8Array} [dataPrefixBytes] diff --git a/packages/chain-streams/test/fake-rpc-server.js b/packages/casting/test/fake-rpc-server.js similarity index 98% rename from packages/chain-streams/test/fake-rpc-server.js rename to packages/casting/test/fake-rpc-server.js index 56fe48199fed..dc95bd892e40 100644 --- a/packages/chain-streams/test/fake-rpc-server.js +++ b/packages/casting/test/fake-rpc-server.js @@ -230,7 +230,7 @@ export const develop = async () => { const PORT = await startFakeServer(mockT, [...fakeValues]); console.log( `Try this in another terminal: - agoric stream :fake.path --bootstrap=http://localhost:${PORT}/network-config --sleep=0.5 --integrity=none`, + agoric follower :fake.path --bootstrap=http://localhost:${PORT}/network-config --sleep=0.5 --integrity=none`, ); console.warn(`Control-C to interrupt...`); // Wait forever. diff --git a/packages/chain-streams/test/lockdown.js b/packages/casting/test/lockdown.js similarity index 100% rename from packages/chain-streams/test/lockdown.js rename to packages/casting/test/lockdown.js diff --git a/packages/chain-streams/test/prepare-test-env-ava.js b/packages/casting/test/prepare-test-env-ava.js similarity index 100% rename from packages/chain-streams/test/prepare-test-env-ava.js rename to packages/casting/test/prepare-test-env-ava.js diff --git a/packages/chain-streams/test/test-mvp.js b/packages/casting/test/test-mvp.js similarity index 79% rename from packages/chain-streams/test/test-mvp.js rename to packages/casting/test/test-mvp.js index 0cc1dc066ebc..6423b8e6ef4f 100644 --- a/packages/chain-streams/test/test-mvp.js +++ b/packages/casting/test/test-mvp.js @@ -4,9 +4,9 @@ import { test } from './prepare-test-env-ava.js'; import { iterateLatest, - makeChainStream, + makeFollower, makeLeader, - makeStoreKey, + makeCastingSpec, } from '../src/main.js'; import { delay } from '../src/defaults.js'; @@ -16,12 +16,12 @@ test('happy path', async t => { const expected = ['latest', 'later', 'done']; t.plan(expected.length); const PORT = await t.context.startServer(t, [...expected]); - /** @type {import('../src/types.js').ChainLeaderOptions} */ + /** @type {import('../src/types.js').LeaderOptions} */ const lo = { retryCallback: null, // fail fast, no retries keepPolling: () => delay(200).then(() => true), // poll really quickly }; - /** @type {import('../src/types.js').ChainStreamOptions} */ + /** @type {import('../src/types.js').FollowerOptions} */ const so = { integrity: 'none', }; @@ -29,9 +29,9 @@ test('happy path', async t => { // The rest of this test is taken almost verbatim from the README.md, with // some minor modifications (testLeaderOptions and deepEqual). const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo); - const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz'); - const stream = await makeChainStream(leader, storeKey, so); - for await (const { value } of iterateLatest(stream)) { + const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); + const follower = await makeFollower(leader, castingSpec, so); + for await (const { value } of iterateLatest(follower)) { t.log(`here's a mailbox value`, value); // The rest here is to drive the test. @@ -69,8 +69,8 @@ test('missing rpc server', async t => { }); test('unrecognized integrity', async t => { - await t.throws(() => makeChainStream({}, {}, { integrity: 'bother' }), { - message: /unrecognized stream integrity mode.*/, + await t.throws(() => makeFollower({}, {}, { integrity: 'bother' }), { + message: /unrecognized follower integrity mode.*/, }); });