From 68ff92257800022749494e169d62cffeaf1b53a7 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Thu, 2 Jun 2022 20:07:43 -0600 Subject: [PATCH 01/10] fix(deployment): drive-by upgrade to Node.js 16 --- packages/deployment/ansible/prepare-machine.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/deployment/ansible/prepare-machine.yml b/packages/deployment/ansible/prepare-machine.yml index 8a0190ec850..e6c1065dfa7 100644 --- a/packages/deployment/ansible/prepare-machine.yml +++ b/packages/deployment/ansible/prepare-machine.yml @@ -5,6 +5,6 @@ gather_facts: yes strategy: free vars: - - NODEJS_VERSION: 14 + - NODEJS_VERSION: 16 roles: - prereq From 203b10d09acace40939843e798d8ea4e1c5e94cd Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Mon, 30 May 2022 19:21:47 -0600 Subject: [PATCH 02/10] feat(cosmos): report `state_change` events --- golang/cosmos/x/swingset/keeper/keeper.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/golang/cosmos/x/swingset/keeper/keeper.go b/golang/cosmos/x/swingset/keeper/keeper.go index b8cff0bae20..4cd00c2a935 100644 --- a/golang/cosmos/x/swingset/keeper/keeper.go +++ b/golang/cosmos/x/swingset/keeper/keeper.go @@ -259,3 +259,11 @@ func (k Keeper) SetMailbox(ctx sdk.Context, peer string, mailbox string) { path := StoragePathMailbox + "." + peer k.vstorageKeeper.LegacySetStorageAndNotify(ctx, path, mailbox) } + +func (k Keeper) PathToEncodedKey(path string) []byte { + return k.vstorageKeeper.PathToEncodedKey(path) +} + +func (k Keeper) GetStoreName() string { + return k.vstorageKeeper.GetStoreName() +} From 11bd633067cae1f8ada6e79cd9c05a6563e49afd Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 31 May 2022 20:54:13 -0600 Subject: [PATCH 03/10] chore(chain-streams): package infrastructure --- packages/chain-streams/README.md | 54 +++++++++++++++++++++++++ packages/chain-streams/jsconfig.json | 19 +++++++++ packages/chain-streams/package.json | 59 ++++++++++++++++++++++++++++ yarn.lock | 6 +-- 4 files changed, 135 insertions(+), 3 deletions(-) create mode 100644 packages/chain-streams/README.md create mode 100644 packages/chain-streams/jsconfig.json create mode 100644 packages/chain-streams/package.json diff --git a/packages/chain-streams/README.md b/packages/chain-streams/README.md new file mode 100644 index 00000000000..8d5f6487669 --- /dev/null +++ b/packages/chain-streams/README.md @@ -0,0 +1,54 @@ +# Chain Streams + +This [Agoric](https://agoric.com) Chain Streams package consumes data server +publication sources in a flexible, future-proof way. + +An example of following an on-chain mailbox using this package is: + +```js +import { makeSourceFromNetconfig, iterateLatest } from '@agoric/chain-streams'; + +const src = makeSourceFromNetconfig('https://devnet.agoric.net/network-config'); +const stream = E(src).makeStreamFromStoragePath('mailbox.agoric1...', { integrity: 'unsafe' }); +for await (const mailbox of iterateLatest(stream)) { + console.log(`here's a mailbox object`, mailbox); +} +``` + +## Stream options + +The `E(src).makeStream...` call allows specifying a second argument of stream options +- the `integrity` option, which has three possibilities: + - `safe` (default) - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block), + - `optimistic` - release data immediately, but may crash the stream in the future if a released value could not be proven, + - `unsafe` - release data immediately without validation +- the `unserializer` option can be + - (default) - release unserialized objects using `@agoric/marshal`'s `makeMarshal()` + - `null` - don't translate data before releasing it + - any unserializer object supporting `E(unserializer).unserialize(data)` + +## Behind the scenes + +- the network config contains enough information to obtain Tendermint RPC nodes + for a given Agoric network. You can use `makeSourceFromTendermintRPCNodes` + directly if you want to avoid using a network-config. +- each stream uses periodic CosmJS state polling (every X milliseconds) which + can be refreshed more expediently via a Tendermint subscription to the + corresponding `state_change` event +- published (string) values are automatically unmarshalled, but without object references. a custom `marshaller` for your application. +- the `iterateRecent` adapter transforms a stream into a local async iterator + that produces only the last queried value (with no history reconstruction) + +## Status + +This package currently depends on: +- Hardened Javascript +- `@agoric/notify` async iterable adapters to implement `iterateLatest` +- `@endo/marshal` for default object unserialization +- [CosmJS](https://github.com/cosmos/cosmjs) for proof verification, although [it does not yet support light client tracking of the validator set](https://github.com/cosmos/cosmjs/issues/492) +- a bespoke follower of [WebSocket Tendermint events](https://docs.tendermint.com/master/tendermint-core/subscription.html#legacy-streaming-api) + +Short-term goals: +- integrate the new [SharedSubscription API](https://github.com/Agoric/agoric-sdk/pull/5418#discussion_r886253328) from the [Agoric/agoric-sdk#5418 `makePublisherKit` PR](https://github.com/Agoric/agoric-sdk/pull/5418) +- support `iterateEach` with the [lossless forward iteration algorithm](https://github.com/Agoric/agoric-sdk/blob/mfig-vstream/golang/cosmos/x/vstream/spec/01_concepts.md#forward-iteration-lossless-history) via the [Agoric/agoric-sdk#5466 `x/vstream` PR](https://github.com/Agoric/agoric-sdk/pull/5466) +- upgrade to the [Tendermint event log API](https://docs.tendermint.com/master/tendermint-core/subscription.html#event-log-api) when Tendermint v0.36 is supported by the Agoric chain diff --git a/packages/chain-streams/jsconfig.json b/packages/chain-streams/jsconfig.json new file mode 100644 index 00000000000..619986a30fb --- /dev/null +++ b/packages/chain-streams/jsconfig.json @@ -0,0 +1,19 @@ +// This file can contain .js-specific Typescript compiler config. +{ + "compilerOptions": { + "target": "esnext", + "module": "esnext", + + "noEmit": true, +/* + // The following flags are for creating .d.ts files: + "noEmit": false, + "declaration": true, + "emitDeclarationOnly": true, +*/ + "downlevelIteration": true, + "strictNullChecks": true, + "moduleResolution": "node", + }, + "include": ["src/**/*.js", "exported.js"], +} diff --git a/packages/chain-streams/package.json b/packages/chain-streams/package.json new file mode 100644 index 00000000000..a7b56357a96 --- /dev/null +++ b/packages/chain-streams/package.json @@ -0,0 +1,59 @@ +{ + "name": "@agoric/streams", + "version": "0.1.0", + "description": "Agoric's Streams follower library", + "type": "module", + "main": "src/main.js", + "repository": "https://github.com/Agoric/agoric-sdk", + "scripts": { + "build": "exit 0", + "test": "ava", + "test:c8": "c8 $C8_OPTIONS ava --config=ava-nesm.config.js", + "test:xs": "exit 0", + "lint-fix": "yarn lint:eslint --fix", + "lint": "run-s --continue-on-error lint:*", + "lint:types": "tsc --maxNodeModuleJsDepth 4 -p jsconfig.json", + "lint:eslint": "eslint --ext .js,.ts ." + }, + "keywords": [], + "author": "Agoric", + "license": "Apache-2.0", + "dependencies": { + "@agoric/notifier": "^0.4.0", + "@agoric/spawner": "^0.5.1", + "@cosmjs/encoding": "^0.28.4", + "@cosmjs/proto-signing": "^0.28.4", + "@cosmjs/stargate": "^0.28.4", + "@cosmjs/tendermint-rpc": "^0.28.4", + "@endo/far": "^0.2.3", + "@endo/init": "^0.5.41", + "@endo/lockdown": "^0.1.13", + "@endo/marshal": "^0.6.7", + "@endo/promise-kit": "^0.2.41", + "node-fetch": "^2.6.0" + }, + "devDependencies": { + "ava": "^3.12.1", + "c8": "^7.7.2", + "express": "^4.17.1", + "ws": "^7.2.0", + "@endo/ses-ava": "^0.2.25" + }, + "eslintConfig": { + "extends": [ + "@agoric" + ] + }, + "publishConfig": { + "access": "public" + }, + "engines": { + "node": ">=14.15.0" + }, + "ava": { + "files": [ + "test/**/test-*.js" + ], + "timeout": "20m" + } +} diff --git a/yarn.lock b/yarn.lock index 1c0b2db9763..31d1abff2ff 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1137,7 +1137,7 @@ elliptic "^6.5.3" libsodium-wrappers "^0.7.6" -"@cosmjs/encoding@0.28.4": +"@cosmjs/encoding@0.28.4", "@cosmjs/encoding@^0.28.4": version "0.28.4" resolved "https://registry.yarnpkg.com/@cosmjs/encoding/-/encoding-0.28.4.tgz#ea39eb4c27ebf7b35e62e9898adae189b86d0da7" integrity sha512-N6Qnjs4dd8KwjW5m9t3L+rWYYGW2wyS+iLtJJ9DD8DiTTxpW9h7/AmUVO/dsRe5H2tV8/DzH/B9pFfpsgro22A== @@ -1161,7 +1161,7 @@ dependencies: bn.js "^5.2.0" -"@cosmjs/proto-signing@0.28.4": +"@cosmjs/proto-signing@0.28.4", "@cosmjs/proto-signing@^0.28.4": version "0.28.4" resolved "https://registry.yarnpkg.com/@cosmjs/proto-signing/-/proto-signing-0.28.4.tgz#7007651042bd05b3eee7e1c8562417bbed630198" integrity sha512-4vgCLK9gOsdWzD78V5XbAsupSSyntPEzokWYhgRQNwgVTcKX1kg0eKZqUvF5ua5iL9x6MevfH/sgwPyiYleMBw== @@ -1210,7 +1210,7 @@ dependencies: xstream "^11.14.0" -"@cosmjs/tendermint-rpc@0.28.4": +"@cosmjs/tendermint-rpc@0.28.4", "@cosmjs/tendermint-rpc@^0.28.4": version "0.28.4" resolved "https://registry.yarnpkg.com/@cosmjs/tendermint-rpc/-/tendermint-rpc-0.28.4.tgz#78835fdc8126baa3122c8b2b396c1d7d290c7167" integrity sha512-iz6p4UW2QUZNh55WeJy9wHbMdqM8COo0AJdrGU4Ikb/xU0/H6b0dFPoEK+i6ngR0cSizh+hpTMzh3AA7ySUKlA== From ffc90556bf57e1b370ae12cf3d24782fb4abf6d0 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Wed, 1 Jun 2022 22:04:50 -0600 Subject: [PATCH 04/10] feat(chain-streams): initial MVP implementation --- .github/workflows/test-all-packages.yml | 2 + packages/chain-streams/README.md | 50 ++- packages/chain-streams/jsconfig.json | 2 +- packages/chain-streams/node-fetch-shim.js | 4 + packages/chain-streams/package.json | 10 +- packages/chain-streams/src/defaults.js | 54 ++++ packages/chain-streams/src/iterable.js | 46 +++ .../chain-streams/src/leader-netconfig.js | 65 ++++ packages/chain-streams/src/leader.js | 65 ++++ packages/chain-streams/src/main.js | 8 + packages/chain-streams/src/shuffle.js | 16 + packages/chain-streams/src/store-key.js | 39 +++ packages/chain-streams/src/stream-cosmjs.js | 286 ++++++++++++++++++ packages/chain-streams/src/types.js | 61 ++++ packages/chain-streams/src/watcher.js | 41 +++ .../chain-streams/test/fake-rpc-server.js | 230 ++++++++++++++ packages/chain-streams/test/lockdown.js | 3 + .../test/prepare-test-env-ava.js | 6 + packages/chain-streams/test/test-mvp.js | 84 +++++ .../src/vpool-xyk-amm/multipoolMarketMaker.js | 3 +- packages/run-protocol/test/metrics.js | 2 + 21 files changed, 1053 insertions(+), 24 deletions(-) create mode 100644 packages/chain-streams/node-fetch-shim.js create mode 100644 packages/chain-streams/src/defaults.js create mode 100644 packages/chain-streams/src/iterable.js create mode 100644 packages/chain-streams/src/leader-netconfig.js create mode 100644 packages/chain-streams/src/leader.js create mode 100644 packages/chain-streams/src/main.js create mode 100644 packages/chain-streams/src/shuffle.js create mode 100644 packages/chain-streams/src/store-key.js create mode 100644 packages/chain-streams/src/stream-cosmjs.js create mode 100644 packages/chain-streams/src/types.js create mode 100644 packages/chain-streams/src/watcher.js create mode 100644 packages/chain-streams/test/fake-rpc-server.js create mode 100644 packages/chain-streams/test/lockdown.js create mode 100644 packages/chain-streams/test/prepare-test-env-ava.js create mode 100644 packages/chain-streams/test/test-mvp.js diff --git a/.github/workflows/test-all-packages.yml b/.github/workflows/test-all-packages.yml index a18a2989a90..5f94c51416d 100644 --- a/.github/workflows/test-all-packages.yml +++ b/.github/workflows/test-all-packages.yml @@ -142,6 +142,8 @@ jobs: # END-TEST-BOILERPLATE - name: yarn test (cosmos) run: cd golang/cosmos && yarn ${{ steps.vars.outputs.test }} + - name: yarn test (chain-streams) + run: cd packages/chain-streams && yarn ${{ steps.vars.outputs.test }} - name: yarn test (run-protocol) run: cd packages/run-protocol && yarn ${{ steps.vars.outputs.test }} - name: yarn test (pegasus) diff --git a/packages/chain-streams/README.md b/packages/chain-streams/README.md index 8d5f6487669..2de5ce9046d 100644 --- a/packages/chain-streams/README.md +++ b/packages/chain-streams/README.md @@ -1,37 +1,59 @@ # Chain Streams This [Agoric](https://agoric.com) Chain Streams package consumes data server -publication sources in a flexible, future-proof way. +publication leaders in a flexible, future-proof way. -An example of following an on-chain mailbox using this package is: +TL;DR: You can run `yarn demo`, or to consume a mailbox stream do: +```sh +npx agoric stream -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext +``` + +An example of following an on-chain mailbox in codd (using this package) is: ```js -import { makeSourceFromNetconfig, iterateLatest } from '@agoric/chain-streams'; +// First, obtain a Hardened JS environment via Endo. +import '@endo/init/pre-remoting.js'; // needed only for the next line +import '@agoric/chain-streams/node-fetch-shim.js'; // needed for Node.js +import '@endo/init'; + +import { + iterateLatest, + makeChainStream, + makeLeader, + makeStoreKey, +} from '@agoric/chain-streams'; -const src = makeSourceFromNetconfig('https://devnet.agoric.net/network-config'); -const stream = E(src).makeStreamFromStoragePath('mailbox.agoric1...', { integrity: 'unsafe' }); -for await (const mailbox of iterateLatest(stream)) { - console.log(`here's a mailbox object`, mailbox); +// Iterate over a mailbox stream on the devnet. +const leader = makeLeader('https://devnet.agoric.net/network-config'); +const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz'); +const stream = makeChainStream(leader, storeKey); +for await (const { value } of iterateLatest(stream)) { + console.log(`here's a mailbox value`, value); } ``` ## Stream options -The `E(src).makeStream...` call allows specifying a second argument of stream options +The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides an optional bag of options: - the `integrity` option, which has three possibilities: - - `safe` (default) - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block), - - `optimistic` - release data immediately, but may crash the stream in the future if a released value could not be proven, - - `unsafe` - release data immediately without validation + - `'strict'` - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block), + - `'optimistic'` (default) - release data immediately, but may crash the stream in the future if an already-released value could not be proven, + - `'none'` - release data immediately without validation +- the `decode` option is a function to translate `buf: Uint8Array` into `data: string` + - (default) - interpret buf as a utf-8 string, then `JSON.parse` it - the `unserializer` option can be - (default) - release unserialized objects using `@agoric/marshal`'s `makeMarshal()` - - `null` - don't translate data before releasing it + - `null` - don't additionally unserialize data before releasing it - any unserializer object supporting `E(unserializer).unserialize(data)` +- the `crasher` option can be + - `null` (default) stream failures only propagate an exception/rejection + - any crasher object supporting `E(crasher).crash(reason)` ## Behind the scenes - the network config contains enough information to obtain Tendermint RPC nodes - for a given Agoric network. You can use `makeSourceFromTendermintRPCNodes` - directly if you want to avoid using a network-config. + for a given Agoric network. You can use `makeLeaderFromRpcAddresses` directly + if you want to avoid fetching a network-config. - each stream uses periodic CosmJS state polling (every X milliseconds) which can be refreshed more expediently via a Tendermint subscription to the corresponding `state_change` event diff --git a/packages/chain-streams/jsconfig.json b/packages/chain-streams/jsconfig.json index 619986a30fb..e6811ce15af 100644 --- a/packages/chain-streams/jsconfig.json +++ b/packages/chain-streams/jsconfig.json @@ -15,5 +15,5 @@ "strictNullChecks": true, "moduleResolution": "node", }, - "include": ["src/**/*.js", "exported.js"], + "include": ["src/**/*.js", "exported.js", "lockdown.js"], } diff --git a/packages/chain-streams/node-fetch-shim.js b/packages/chain-streams/node-fetch-shim.js new file mode 100644 index 00000000000..c444f9ab9d7 --- /dev/null +++ b/packages/chain-streams/node-fetch-shim.js @@ -0,0 +1,4 @@ +/* global globalThis */ +import fetch from 'node-fetch'; + +globalThis.fetch = fetch; diff --git a/packages/chain-streams/package.json b/packages/chain-streams/package.json index a7b56357a96..da7fc65ec6e 100644 --- a/packages/chain-streams/package.json +++ b/packages/chain-streams/package.json @@ -1,12 +1,13 @@ { - "name": "@agoric/streams", + "name": "@agoric/chain-streams", "version": "0.1.0", - "description": "Agoric's Streams follower library", + "description": "Agoric's Chain Streams", "type": "module", "main": "src/main.js", "repository": "https://github.com/Agoric/agoric-sdk", "scripts": { "build": "exit 0", + "demo": "node -e 'import(\"./test/fake-rpc-server.js\").then(ns => ns.develop())'", "test": "ava", "test:c8": "c8 $C8_OPTIONS ava --config=ava-nesm.config.js", "test:xs": "exit 0", @@ -39,11 +40,6 @@ "ws": "^7.2.0", "@endo/ses-ava": "^0.2.25" }, - "eslintConfig": { - "extends": [ - "@agoric" - ] - }, "publishConfig": { "access": "public" }, diff --git a/packages/chain-streams/src/defaults.js b/packages/chain-streams/src/defaults.js new file mode 100644 index 00000000000..f577ee86309 --- /dev/null +++ b/packages/chain-streams/src/defaults.js @@ -0,0 +1,54 @@ +// @ts-check +/* global setTimeout */ +import { Far } from '@endo/far'; +import { makeMarshal } from '@endo/marshal'; + +/** + * Resolve a Promise after a given number of milliseconds. + * + * @param {number} ms + * @returns {Promise} + */ +export const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); + +/** + * Report an error, then retry the leader operation after a second or two. + * + * @param {any} err + * @returns {Promise} + */ +export const DEFAULT_RETRY_CALLBACK = err => { + console.warn('retrying after error', err); + return delay(1000 + Math.random() * 1000); +}; + +/** + * Return true after we want to be sure we received latest state something. + * + * @returns {Promise} + */ +export const DEFAULT_KEEP_POLLING = () => + // TOOD: Remove this when the event-driven stuff is in place. + delay(5000 + Math.random() * 1000).then(() => true); +// ... and uses this instead. +// delay(10 * 60 * 1000 + Math.random() * 60_000).then(() => true); + +/** + * Decode utf-8 bytes, then parse the resulting JSON. + * + * @param {Uint8Array} buf + */ +export const DEFAULT_DECODER = harden(buf => { + const td = new TextDecoder(); + const str = td.decode(buf); + return harden(JSON.parse(str)); +}); + +/** + * Unserialize the JSONable data. + * + * @type {import('./types').Unserializer} + */ +export const DEFAULT_UNSERIALIZER = Far('marshal unserializer', { + unserialize: makeMarshal().unserialize, +}); diff --git a/packages/chain-streams/src/iterable.js b/packages/chain-streams/src/iterable.js new file mode 100644 index 00000000000..9c0896aab9d --- /dev/null +++ b/packages/chain-streams/src/iterable.js @@ -0,0 +1,46 @@ +// @ts-check +import { E } from '@endo/far'; + +/** + * Consume a notifier, only returning next when there is a new publication. + * + * @template T + * @param {Notifier} notifier + */ +export const makeAsyncIterableFromNotifier = notifier => + harden({ + [Symbol.asyncIterator]: () => { + /** @type {UpdateCount} */ + let lastUpdateCount = 0; + return harden({ + next: async () => { + const { value, updateCount } = await notifier.getUpdateSince( + lastUpdateCount, + ); + lastUpdateCount = updateCount; + return { + value, + done: lastUpdateCount === undefined, + }; + }, + }); + }, + }); + +/** + * @template T + * @param {ERef>} stream + */ +export const iterateLatest = stream => { + // For now, just pass through the iterable. + return harden({ + /** @returns {AsyncIterator} */ + [Symbol.asyncIterator]: () => { + const latestIterable = E(stream).getLatestIterable(); + const iterator = E(latestIterable)[Symbol.asyncIterator](); + return harden({ + next: () => E(iterator).next(), + }); + }, + }); +}; diff --git a/packages/chain-streams/src/leader-netconfig.js b/packages/chain-streams/src/leader-netconfig.js new file mode 100644 index 00000000000..9aaaff740f7 --- /dev/null +++ b/packages/chain-streams/src/leader-netconfig.js @@ -0,0 +1,65 @@ +// @ts-check +/* global fetch */ +import { makeRoundRobinLeader } from './leader.js'; +import { DEFAULT_RETRY_CALLBACK } from './defaults.js'; + +const { details: X } = assert; + +/** + * @param {string[]} rpcAddrs + * @param {import('./types.js').ChainLeaderOptions} [leaderOptions] + */ +export const makeLeaderFromRpcAddresses = (rpcAddrs, leaderOptions) => { + assert(Array.isArray(rpcAddrs), X`rpcAddrs ${rpcAddrs} must be an array`); + + const rpcHrefs = rpcAddrs.map(rpcAddr => { + assert.typeof(rpcAddr, 'string', X`rpcAddr ${rpcAddr} must be a string`); + // Don't remove explicit port numbers from the URL, because the Cosmos + // `--node=xxx` flag requires them (it doesn't just assume that + // `--node=https://testnet.rpc.agoric.net` is the same as + // `--node=https://testnet.rpc.agoric.net:443`) + return rpcAddr.includes('://') ? rpcAddr : `http://${rpcAddr}`; + }); + + return makeRoundRobinLeader(rpcHrefs, leaderOptions); +}; + +/** + * @param {string} netconfigURL + * @param {import('./types.js').ChainLeaderOptions} [options] + */ +export const makeLeaderFromNetworkConfig = (netconfigURL, options = {}) => { + const { retryCallback = DEFAULT_RETRY_CALLBACK } = options; + const retry = async err => { + if (retryCallback) { + return retryCallback(err); + } + throw err; + }; + return new Promise((resolve, reject) => { + const makeLeader = async () => { + const response = await fetch(netconfigURL, { + headers: { accept: 'application/json' }, + }); + const { rpcAddrs } = await response.json(); + return makeLeaderFromRpcAddresses(rpcAddrs, options); + }; + const retryLeader = async err => { + retry(err) + .then(() => makeLeader().then(resolve, retryLeader)) + .catch(reject); + }; + makeLeader().then(resolve, retryLeader); + }); +}; + +/** + * @param {string} bootstrap + * @param {import('./types.js').ChainLeaderOptions} options + */ +export const makeLeader = (bootstrap, options) => { + if (bootstrap.includes('network-config')) { + return makeLeaderFromNetworkConfig(bootstrap, options); + } + return makeLeaderFromRpcAddresses([bootstrap], options); +}; diff --git a/packages/chain-streams/src/leader.js b/packages/chain-streams/src/leader.js new file mode 100644 index 00000000000..a077484ca9c --- /dev/null +++ b/packages/chain-streams/src/leader.js @@ -0,0 +1,65 @@ +// @ts-check +import { E, Far } from '@endo/far'; +import { DEFAULT_RETRY_CALLBACK } from './defaults.js'; +import { shuffle } from './shuffle.js'; +import { makePollingWatcher } from './watcher.js'; + +/** + * Create a chain leader that rotates through a list of endpoints. + * + * @param {string[]} endpoints + * @param {import('./types.js').ChainLeaderOptions} leaderOptions + */ +export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { + const { retryCallback = DEFAULT_RETRY_CALLBACK } = leaderOptions; + + // Shuffle the RPC addresses, so that we don't always hit the same one as all + // our peers. + shuffle(endpoints); + + let lastRespondingEndpointIndex = 0; + + /** @type {import('./types.js').ChainLeader} */ + const leader = Far('round robin leader', { + getOptions: () => leaderOptions, + retry: async err => { + if (retryCallback) { + return retryCallback(err); + } + throw err; + }, + watchStoreKey: storeKey => makePollingWatcher(leader, storeKey), + /** + * @template T + * @param {(endpoint: string) => Promise} callback + */ + mapEndpoints: async callback => { + /** @type {Promise} */ + const p = new Promise((resolve, reject) => { + let endpointIndex = lastRespondingEndpointIndex; + + const retry = async err => { + endpointIndex = (endpointIndex + 1) % endpoints.length; + + // eslint-disable-next-line no-use-before-define + E(leader).retry(err).then(applyOne, reject); + }; + + const applyOne = () => { + Promise.resolve() + .then(() => callback(endpoints[endpointIndex])) + .then(res => { + resolve(harden([res])); + lastRespondingEndpointIndex = endpointIndex; + }, retry); + + // Don't return to prevent a promise chain. + }; + + applyOne(); + }); + return p; + }, + }); + return leader; +}; diff --git a/packages/chain-streams/src/main.js b/packages/chain-streams/src/main.js new file mode 100644 index 00000000000..3e0de76bcaf --- /dev/null +++ b/packages/chain-streams/src/main.js @@ -0,0 +1,8 @@ +// eslint-disable-next-line import/export +export * from './types.js'; // no named exports +export * from './defaults.js'; +export * from './leader-netconfig.js'; +export * from './stream-cosmjs.js'; +export * from './store-key.js'; +export * from './leader.js'; +export * from './iterable.js'; diff --git a/packages/chain-streams/src/shuffle.js b/packages/chain-streams/src/shuffle.js new file mode 100644 index 00000000000..9300ab8ad97 --- /dev/null +++ b/packages/chain-streams/src/shuffle.js @@ -0,0 +1,16 @@ +// @ts-check + +/** + * Modern version of Fisher-Yates shuffle algorithm (in-place). + * + * @template T + * @param {Array} a + */ +export const shuffle = a => { + for (let i = a.length - 1; i > 0; i -= 1) { + const j = Math.floor(Math.random() * (i + 1)); + const x = a[i]; + a[i] = a[j]; + a[j] = x; + } +}; diff --git a/packages/chain-streams/src/store-key.js b/packages/chain-streams/src/store-key.js new file mode 100644 index 00000000000..22e2fc52d6a --- /dev/null +++ b/packages/chain-streams/src/store-key.js @@ -0,0 +1,39 @@ +// @ts-check +import { toAscii } from '@cosmjs/encoding'; + +/** + * @param {string} storagePath + * @returns {import('./types').ChainStoreKey} + */ +const swingsetPathToStoreKey = storagePath => + harden({ + storeName: 'swingset', + storeSubkey: toAscii(`swingset/data:${storagePath}`), + }); + +export const DEFAULT_PATH_CONVERTER = swingsetPathToStoreKey; + +/** + * @type {Record import('./types').ChainStoreKey>} + */ +export const pathPrefixToConverters = harden({ + 'swingset:': swingsetPathToStoreKey, + ':': DEFAULT_PATH_CONVERTER, +}); + +/** + * @param {string} pathSpec + * @returns {import('./types').ChainStoreKey} + */ +export const makeStoreKey = pathSpec => { + const match = pathSpec.match(/^([^:.]*:)(.*)/); + assert( + match, + `path spec ${pathSpec} does not match 'PREFIX:PATH' or ':PATH'`, + ); + const kind = match[1]; + const storePath = match[2]; + const converter = pathPrefixToConverters[kind]; + assert(converter, `Unknown pathKind ${kind}`); + return converter(storePath); +}; diff --git a/packages/chain-streams/src/stream-cosmjs.js b/packages/chain-streams/src/stream-cosmjs.js new file mode 100644 index 00000000000..da84475c185 --- /dev/null +++ b/packages/chain-streams/src/stream-cosmjs.js @@ -0,0 +1,286 @@ +// @ts-check +import { E, Far } from '@endo/far'; +import { makeNotifierKit } from '@agoric/notifier'; +import { Tendermint34Client } from '@cosmjs/tendermint-rpc'; +import { QueryClient } from '@cosmjs/stargate'; + +import { makeAsyncIterableFromNotifier, iterateLatest } from './iterable.js'; +import { DEFAULT_DECODER, DEFAULT_UNSERIALIZER } from './defaults.js'; + +const { details: X } = assert; + +/** @template T @typedef {import('./types.js').ChainStreamElement} ChainStreamElement */ +/** @template T @typedef {import('./types.js').ChainStream} ChainStream */ + +/** + * @template T + * @param {Iterable} values + * @returns {T} + */ +const collectSingle = values => { + /** @type {T[]} */ + const head = []; + let count = 0; + for (const value of values) { + count += 1; + if (count === 1) { + head.push(value); + } else { + assert.fail(`expected single value, got at least ${count}`); + } + } + + assert.equal(head.length, 1, 'expected single value'); + return head[0]; +}; + +/** + * @callback QueryVerifier + * @param {() => Promise} getProvenValue + * @param {(reason?: unknown) => void} crash + * @param {() => Promise} getAllegedValue + */ + +/** + * @type {Record['integrity'], QueryVerifier>} + */ +export const integrityToQueryVerifier = harden({ + strict: async (getProvenValue, crash, _getAllegedValue) => { + // Just ignore the alleged value. + // Crash hard if we can't prove. + return getProvenValue().catch(crash); + }, + none: async (_getProvenValue, _crash, getAllegedValue) => { + // Fast and loose. + return getAllegedValue(); + }, + optimistic: async (getProvenValue, crash, getAllegedValue) => { + const allegedValue = await getAllegedValue(); + // Prove later, since it may take time we say we can't afford. + getProvenValue().then(provenValue => { + if (provenValue.length === allegedValue.length) { + if (provenValue.every((proven, i) => proven === allegedValue[i])) { + return; + } + } + crash( + assert.error( + X`Alleged value ${allegedValue} did not match proof ${provenValue}`, + ), + ); + }, crash); + + // Speculate that we got the right value. + return allegedValue; + }, +}); + +/** + * @template T + * @param {ERef} leader + * @param {import('./types').ChainStoreKey} storeKey + * @param {import('./types').ChainStreamOptions} options + * @returns {ChainStream>} + */ +export const makeChainStream = (leader, storeKey, options = {}) => { + const { + decode = DEFAULT_DECODER, + unserializer = DEFAULT_UNSERIALIZER, + integrity = 'optimistic', + crasher = null, + } = options; + const { storeName, storeSubkey } = storeKey; + + /** @type {QueryVerifier} */ + const queryVerifier = integrityToQueryVerifier[integrity]; + assert(queryVerifier, X`unrecognized stream integrity mode ${integrity}`); + + /** @type {Map} */ + const endpointToQueryClient = new Map(); + + /** + * @param {string} endpoint + */ + const getOrCreateQueryClient = async endpoint => { + if (endpointToQueryClient.has(endpoint)) { + // Cache hit. + const queryClient = endpointToQueryClient.get(endpoint); + assert(queryClient); + return queryClient; + } + // Create a new client. They retry automatically. + const rpcClient = await Tendermint34Client.connect(endpoint); + const queryClient = QueryClient.withExtensions(rpcClient); + endpointToQueryClient.set(endpoint, queryClient); + return queryClient; + }; + + /** + * @param {'queryVerified' | 'queryUnverified'} method + * @param {string} queryPath + */ + const makeQuerier = + (method, queryPath) => + /** + * @param {number} [height] + * @returns {Promise} + */ + async height => { + const values = await E(leader).mapEndpoints(async endpoint => { + const queryClient = await getOrCreateQueryClient(endpoint); + return E(queryClient) + [method](queryPath, storeSubkey, height) + .then( + result => { + return { result, error: null }; + }, + error => { + return { result: null, error }; + }, + ); + }); + const { result, error } = collectSingle(values); + if (error !== null) { + throw error; + } + assert(result); + return result; + }; + + const getProvenValueAtHeight = makeQuerier('queryVerified', storeName); + const getUnprovenValueAtHeight = makeQuerier( + 'queryUnverified', + `store/${storeName}/key`, + ); + + // Enable the periodic fetch. + /** @type {ChainStream>} */ + return Far('chain stream', { + getLatestIterable: () => { + /** @type {NotifierRecord>} */ + const { updater, notifier } = makeNotifierKit(); + let finished = false; + + const fail = err => { + finished = true; + updater.fail(err); + return false; + }; + + const crash = err => { + fail(err); + if (crasher) { + E(crasher).crash( + `PROOF VERIFICATION FAILURE; crashing follower`, + err, + ); + } else { + console.error(`PROOF VERIFICATION FAILURE; crashing follower`, err); + } + }; + + const retryOrFail = err => { + return E(leader) + .retry(err) + .catch(e => { + fail(e); + throw e; + }); + }; + + /** + * These semantics are to ensure that later queries are not committed + * ahead of earlier ones. + * + * @template T + * @param {(...args: T[]) => void} commitAction + */ + const makePrepareInOrder = commitAction => { + let lastPrepareTicket = 0n; + let lastCommitTicket = 0n; + + const prepareInOrder = () => { + lastPrepareTicket += 1n; + const ticket = lastPrepareTicket; + assert(ticket > lastCommitTicket); + const committer = Far('committer', { + isValid: () => ticket > lastCommitTicket, + /** + * @type {(...args: T[]) => void} + */ + commit: (...args) => { + assert(committer.isValid()); + lastCommitTicket = ticket; + commitAction(...args); + }, + }); + return committer; + }; + return prepareInOrder; + }; + + const prepareUpdateInOrder = makePrepareInOrder(updater.updateState); + + /** @type {Uint8Array} */ + let lastBuf; + + /** + * @param {import('./types').ChainStoreChange} allegedChange + */ + const queryAndUpdateOnce = async allegedChange => { + const committer = prepareUpdateInOrder(); + + // Make an unproven query if we have no alleged value. + const { values: allegedValues, blockHeight: allegedBlockHeight } = + allegedChange; + const getAllegedValue = + allegedValues.length > 0 + ? () => Promise.resolve(allegedValues[allegedValues.length - 1]) + : () => getUnprovenValueAtHeight(allegedBlockHeight); + const getProvenValue = () => getProvenValueAtHeight(allegedBlockHeight); + + const buf = await queryVerifier(getProvenValue, crash, getAllegedValue); + if (!committer.isValid()) { + return; + } + if (lastBuf) { + if (buf.length === lastBuf.length) { + if (buf.every((v, i) => v === lastBuf[i])) { + // Duplicate! + return; + } + } + } + lastBuf = buf; + const data = decode(buf); + if (!unserializer) { + /** @type {T} */ + const value = data; + committer.commit({ value }); + return; + } + const value = await E(unserializer).unserialize(data); + if (!committer.isValid()) { + return; + } + committer.commit({ value }); + }; + + const changeStream = E(leader).watchStoreKey(storeKey); + const queryWhenKeyChanges = async () => { + for await (const allegedChange of iterateLatest(changeStream)) { + if (finished) { + return; + } + harden(allegedChange); + await queryAndUpdateOnce(allegedChange).catch(retryOrFail); + } + }; + + queryAndUpdateOnce({ values: [], storeKey }).catch(retryOrFail); + queryWhenKeyChanges().catch(fail); + + return makeAsyncIterableFromNotifier(notifier); + }, + }); +}; diff --git a/packages/chain-streams/src/types.js b/packages/chain-streams/src/types.js new file mode 100644 index 00000000000..bdc07d1f026 --- /dev/null +++ b/packages/chain-streams/src/types.js @@ -0,0 +1,61 @@ +// @ts-check + +// Make this a module. +export {}; + +/** + * @typedef {object} ChainLeaderOptions + * @property {null | ((err: any) => Promise)} [retryCallback] + * @property {() => Promise} [keepPolling] + */ + +/** + * @typedef {object} ChainStoreChange + * @property {ChainStoreKey} storeKey + * @property {number} [blockHeight] + * @property {Uint8Array[]} values + */ + +/** + * @typedef {object} ChainLeader + * @property {(error: any) => Promise} retry + * @property {() => ChainLeaderOptions} getOptions + * @property {(callback: (endpoint: string) => Promise) => Promise} mapEndpoints + * @property {(key: ChainStoreKey) => Promise>} watchStoreKey + */ + +/** + * @template T + * @typedef {object} ChainStream + * @property {() => AsyncIterable} [getLatestIterable] + */ + +/** + * @template T + * @typedef {object} ChainStreamElement + * @property {T} value + */ + +/** + * @typedef {object} Unserializer + * @property {(data: import('@endo/marshal').CapData) => any} unserialize + */ + +/** + * @typedef {object} Crasher + * @property {(...args: unknown[]) => void} crash + */ + +/** + * @typedef {object} ChainStreamOptions + * @property {null | import('@endo/far').FarRef} [unserializer] + * @property {(buf: Uint8Array) => any} [decode] + * @property {'strict'|'optimistic'|'none'} [integrity] + * @property {import('@endo/far').FarRef} [crasher] + */ + +/** + * @typedef {object} ChainStoreKey + * @property {string} storeName + * @property {Uint8Array} storeSubkey + */ diff --git a/packages/chain-streams/src/watcher.js b/packages/chain-streams/src/watcher.js new file mode 100644 index 00000000000..b25505398ad --- /dev/null +++ b/packages/chain-streams/src/watcher.js @@ -0,0 +1,41 @@ +// @ts-check +import { E, Far } from '@endo/far'; +import { DEFAULT_KEEP_POLLING } from './defaults.js'; + +/** + * Just return an unspecified allegedValue every poll period. + * + * @param {import('./types').ChainLeader} leader + * @param {import('./types.js').ChainStoreKey} storeKey + * @returns {Promise>} + */ +export const makePollingWatcher = async (leader, storeKey) => { + const { keepPolling = DEFAULT_KEEP_POLLING } = await E(leader).getOptions(); + return Far('key watcher stream', { + getLatestIterable: () => + Far('key watcher iterable', { + [Symbol.asyncIterator]: () => { + /** @type {Promise | undefined} */ + let nextPollPromise; + return Far('key watcher iterator', { + next: async () => { + if (!nextPollPromise) { + nextPollPromise = keepPolling(); + } + const keepGoing = await nextPollPromise; + nextPollPromise = undefined; + const change = harden({ + storeKey, + // Make no warrant as to the values. + values: [], + }); + return harden({ + value: change, + done: !keepGoing, + }); + }, + }); + }, + }), + }); +}; diff --git a/packages/chain-streams/test/fake-rpc-server.js b/packages/chain-streams/test/fake-rpc-server.js new file mode 100644 index 00000000000..15473a689c3 --- /dev/null +++ b/packages/chain-streams/test/fake-rpc-server.js @@ -0,0 +1,230 @@ +// @ts-check +import './lockdown.js'; + +import { makeMarshal } from '@endo/marshal'; + +import express from 'express'; +// import morgan from 'morgan'; + +import { toAscii, toBase64 } from '@cosmjs/encoding'; + +let lastPort = 8989; + +const fakeStatusResult = { + node_info: { + protocol_version: { + p2p: '7', + block: '10', + app: '0', + }, + id: '5576458aef205977e18fd50b274e9b5d9014525a', + listen_addr: 'tcp://0.0.0.0:26656', + network: 'cosmoshub-2', + version: '0.32.1', + channels: '4020212223303800', + moniker: 'moniker-node', + other: { + tx_index: 'on', + rpc_address: 'tcp://0.0.0.0:26657', + }, + }, + sync_info: { + latest_block_hash: + '790BA84C3545FCCC49A5C629CEE6EA58A6E875C3862175BDC11EE7AF54703501', + latest_app_hash: + 'C9AEBB441B787D9F1D846DE51F3826F4FD386108B59B08239653ABF59455C3F8', + latest_block_height: '1262196', + latest_block_time: '2019-08-01T11:52:22.818762194Z', + earliest_block_hash: + '790BA84C3545FCCC49A5C629CEE6EA58A6E875C3862175BDC11EE7AF54703501', + earliest_app_hash: + 'C9AEBB441B787D9F1D846DE51F3826F4FD386108B59B08239653ABF59455C3F8', + earliest_block_height: '1262196', + earliest_block_time: '2019-08-01T11:52:22.818762194Z', + max_leader_block_height: '1262196', + catching_up: false, + total_synced_time: '1000000000', + remaining_time: '0', + total_snapshots: '10', + chunk_process_avg_time: '1000000000', + snapshot_height: '1262196', + snapshot_chunks_count: '10', + snapshot_chunks_total: '100', + backfilled_blocks: '10', + backfill_blocks_total: '100', + }, + validator_info: { + address: '5D6A51A8E9899C44079C6AF90618BA0369070E6E', + pub_key: { + type: 'tendermint/PubKeyEd25519', + value: 'A6DoBUypNtUAyEHWtQ9bFjfNg8Bo9CrnkUGl6k6OHN4=', + }, + voting_power: '0', + }, +}; + +export const startFakeServer = (t, fakeValues) => { + const { log = console.log } = t; + const { serialize } = makeMarshal(); + lastPort += 1; + const PORT = lastPort; + return new Promise(resolve => { + log('starting http server on port', PORT); + const app = express(); + // app.use(morgan()); + app.use((req, _res, next) => { + log('request', req.method, req.url); + next(); + }); + app.use(express.json()); + app.get('/bad-network-config', (req, res) => { + res.json({ + rpcAddrs: 'not an array', + }); + }); + app.get('/network-config', (req, res) => { + res.json({ + rpcAddrs: [`http://localhost:${PORT}/tendermint-rpc`], + }); + }); + app.post('/tendermint-rpc', (req, res) => { + log('received', req.path, req.body, req.params); + const reply = result => { + log('response', result); + res.json({ + jsonrpc: '2.0', + id: req.body.id, + result, + }); + }; + switch (req.body.method) { + case 'status': { + reply(fakeStatusResult); + break; + } + case 'abci_query': { + const value = + fakeValues.length === 0 + ? null + : toBase64( + toAscii(JSON.stringify(serialize(fakeValues.shift()))), + ); + const result = { + response: { + code: 0, + log: '', + info: '', + index: '0', + key: 'c3dpbmdzZXQvZGF0YTptYWlsYm94LmFnb3JpYzFmb29iYXJiYXo=', + value, + proofOps: null, + height: '74863', + codespace: '', + }, + }; + reply(result); + break; + } + default: { + res.sendStatus(400); + } + } + }); + const listener = app.listen(PORT, () => { + log('started http server on', PORT); + const cleanup = () => { + log('shutting down http server on', PORT); + listener.close(); + }; + t.context.cleanups.push(cleanup); + resolve(PORT); + }); + }); +}; + +export const jsonPairs = harden([ + // Justin is the same as the JSON encoding but without unnecessary quoting + ['[1,2]', '[1,2]'], + ['{"foo":1}', '{foo:1}'], + ['{"a":1,"b":2}', '{a:1,b:2}'], + ['{"a":1,"b":{"c":3}}', '{a:1,b:{c:3}}'], + ['true', 'true'], + ['1', '1'], + ['"abc"', '"abc"'], + ['null', 'null'], + + // Primitives not representable in JSON + ['{"@qclass":"undefined"}', 'undefined'], + ['{"@qclass":"NaN"}', 'NaN'], + ['{"@qclass":"Infinity"}', 'Infinity'], + ['{"@qclass":"-Infinity"}', '-Infinity'], + ['{"@qclass":"bigint","digits":"4"}', '4n'], + ['{"@qclass":"bigint","digits":"9007199254740993"}', '9007199254740993n'], + ['{"@qclass":"symbol","name":"@@asyncIterator"}', 'Symbol.asyncIterator'], + ['{"@qclass":"symbol","name":"@@match"}', 'Symbol.match'], + ['{"@qclass":"symbol","name":"foo"}', 'Symbol.for("foo")'], + ['{"@qclass":"symbol","name":"@@@@foo"}', 'Symbol.for("@@foo")'], + + // Arrays and objects + ['[{"@qclass":"undefined"}]', '[undefined]'], + ['{"foo":{"@qclass":"undefined"}}', '{foo:undefined}'], + ['{"@qclass":"error","message":"","name":"Error"}', 'Error("")'], + [ + '{"@qclass":"error","message":"msg","name":"ReferenceError"}', + 'ReferenceError("msg")', + ], + + // The one case where JSON is not a semantic subset of JS + ['{"__proto__":8}', '{["__proto__"]:8}'], + + // The Hilbert Hotel is always tricky + ['{"@qclass":"hilbert","original":8}', '{"@qclass":8}'], + ['{"@qclass":"hilbert","original":"@qclass"}', '{"@qclass":"@qclass"}'], + [ + '{"@qclass":"hilbert","original":{"@qclass":"hilbert","original":8}}', + '{"@qclass":{"@qclass":8}}', + ], + [ + '{"@qclass":"hilbert","original":{"@qclass":"hilbert","original":8,"rest":{"foo":"foo1"}},"rest":{"bar":{"@qclass":"hilbert","original":{"@qclass":"undefined"}}}}', + '{"@qclass":{"@qclass":8,foo:"foo1"},bar:{"@qclass":undefined}}', + ], + + // tagged + ['{"@qclass":"tagged","tag":"x","payload":8}', 'makeTagged("x",8)'], + [ + '{"@qclass":"tagged","tag":"x","payload":{"@qclass":"undefined"}}', + 'makeTagged("x",undefined)', + ], + + // Slots + [ + '[{"@qclass":"slot","iface":"Alleged: for testing Justin","index":0}]', + '[slot(0,"Alleged: for testing Justin")]', + ], + // Tests https://github.com/endojs/endo/issues/1185 fix + [ + '[{"@qclass":"slot","iface":"Alleged: for testing Justin","index":0},{"@qclass":"slot","index":0}]', + '[slot(0,"Alleged: for testing Justin"),slot(0)]', + ], +]); + +export const develop = async () => { + const { unserialize } = makeMarshal(); + const fakeValues = harden( + jsonPairs.map(([jsonMarshalled]) => + unserialize({ body: jsonMarshalled, slots: [] }), + ), + ); + const mockT = { + log: console.log, + context: { cleanups: [] }, + }; + const PORT = await startFakeServer(mockT, [...fakeValues]); + console.log( + `Try this in another terminal: + agoric stream :fake.path --bootstrap=http://localhost:${PORT}/network-config --sleep=0.5 --integrity=none`, + ); + console.warn(`Control-C to interrupt...`); + // Wait forever. + await new Promise(() => {}); +}; diff --git a/packages/chain-streams/test/lockdown.js b/packages/chain-streams/test/lockdown.js new file mode 100644 index 00000000000..f597abccf3d --- /dev/null +++ b/packages/chain-streams/test/lockdown.js @@ -0,0 +1,3 @@ +import '@endo/init/pre-remoting.js'; +import '../node-fetch-shim.js'; +import '@endo/init'; diff --git a/packages/chain-streams/test/prepare-test-env-ava.js b/packages/chain-streams/test/prepare-test-env-ava.js new file mode 100644 index 00000000000..164f82d07a2 --- /dev/null +++ b/packages/chain-streams/test/prepare-test-env-ava.js @@ -0,0 +1,6 @@ +import './lockdown.js'; +import { wrapTest } from '@endo/ses-ava'; +import rawTest from 'ava'; + +/** @type {typeof rawTest} */ +export const test = wrapTest(rawTest); diff --git a/packages/chain-streams/test/test-mvp.js b/packages/chain-streams/test/test-mvp.js new file mode 100644 index 00000000000..978ad909963 --- /dev/null +++ b/packages/chain-streams/test/test-mvp.js @@ -0,0 +1,84 @@ +// @ts-nocheck +// eslint-disable-next-line import/order +import { test } from './prepare-test-env-ava.js'; + +import { + iterateLatest, + makeChainStream, + makeLeader, + makeStoreKey, +} from '../src/main.js'; + +import { delay } from '../src/defaults.js'; +import { startFakeServer } from './fake-rpc-server.js'; + +test('happy path', async t => { + const expected = ['latest', 'later', 'done']; + t.plan(expected.length); + const PORT = await t.context.startServer(t, [...expected]); + /** @type {import('../src/types.js').ChainLeaderOptions} */ + const lo = { + retryCallback: null, // fail fast, no retries + keepPolling: () => delay(200).then(() => true), // poll really quickly + }; + /** @type {import('../src/types.js').ChainStreamOptions} */ + const so = { + integrity: 'none', + }; + + // The rest of this test is taken almost verbatim from the README.md, with + // some minor modifications (testLeaderOptions and deepEqual). + const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo); + const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz'); + const stream = await makeChainStream(leader, storeKey, so); + for await (const { value } of iterateLatest(stream)) { + t.log(`here's a mailbox value`, value); + + // The rest here is to drive the test. + t.deepEqual(value, expected.shift()); + if (expected.length === 0) { + break; + } + } +}); + +test('bad network config', async t => { + const PORT = await t.context.startServer(t, []); + await t.throwsAsync( + () => + makeLeader(`http://localhost:${PORT}/bad-network-config`, { + retryCallback: null, + }), + { + message: /rpcAddrs .* must be an array/, + }, + ); +}); + +test('missing rpc server', async t => { + const PORT = await t.context.startServer(t, []); + await t.throwsAsync( + () => + makeLeader(`http://localhost:${PORT}/missing-network-config`, { + retryCallback: null, + }), + { + message: /^invalid json response body/, + }, + ); +}); + +test('unrecognized integrity', async t => { + await t.throwsAsync(() => makeChainStream({}, {}, { integrity: 'bother' }), { + message: /unrecognized stream integrity mode.*/, + }); +}); + +test.before(t => { + t.context.cleanups = []; + t.context.startServer = startFakeServer; +}); + +test.after(t => { + t.context.cleanups.map(cleanup => cleanup()); +}); diff --git a/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js b/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js index 33a2427c5a9..7961eab315b 100644 --- a/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js +++ b/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js @@ -168,7 +168,7 @@ const start = async (zcf, privateArgs) => { const { publication: metricsPublication, subscription: rawMetricsSubscription, - } = makeSubscriptionKit(); + } = makeSubscriptionKit(harden({ XYK: [] })); const { storageNode, marshaller } = privateArgs; const metricsStorageNode = storageNode && E(storageNode).getChildNode('metrics'); // TODO: magic string @@ -182,7 +182,6 @@ const start = async (zcf, privateArgs) => { harden({ XYK: Array.from(secondaryBrandToPool.keys()) }), ); }; - updateMetrics(); // For now, this seat collects protocol fees. It needs to be connected to // something that will extract the fees. diff --git a/packages/run-protocol/test/metrics.js b/packages/run-protocol/test/metrics.js index bc0dbf4fd36..76f2585af71 100644 --- a/packages/run-protocol/test/metrics.js +++ b/packages/run-protocol/test/metrics.js @@ -14,12 +14,14 @@ export const subscriptionTracker = async (t, subscription) => { const assertInitial = async expectedValue => { notif = await metrics.getUpdateSince(); + t.log('assertInitial notif', notif); t.deepEqual(notif.value, expectedValue); }; /** @param {Record} expectedDelta */ const assertChange = async expectedDelta => { const prevNotif = notif; notif = await metrics.getUpdateSince(notif.updateCount); + t.log('assertChange notif', notif); const actualDelta = diff(prevNotif.value, notif.value); t.deepEqual(actualDelta, expectedDelta, 'Unexpected delta'); }; From a89d537b0ac3a7e3f2cbc60a6bb075031261d826 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Thu, 2 Jun 2022 20:09:06 -0600 Subject: [PATCH 05/10] feat(agoric): new `stream` command --- packages/agoric-cli/package.json | 2 + packages/agoric-cli/src/entrypoint.js | 1 + packages/agoric-cli/src/main.js | 53 ++++++++ packages/agoric-cli/src/sdk-package-names.js | 1 + packages/agoric-cli/src/stream.js | 120 +++++++++++++++++++ 5 files changed, 177 insertions(+) create mode 100644 packages/agoric-cli/src/stream.js diff --git a/packages/agoric-cli/package.json b/packages/agoric-cli/package.json index bd0ef73b078..a27c8057263 100644 --- a/packages/agoric-cli/package.json +++ b/packages/agoric-cli/package.json @@ -29,11 +29,13 @@ "dependencies": { "@agoric/access-token": "^0.4.18", "@agoric/assert": "^0.4.0", + "@agoric/chain-streams": "^0.1.0", "@agoric/nat": "^4.1.0", "@endo/bundle-source": "^2.2.0", "@endo/captp": "^2.0.7", "@endo/compartment-mapper": "^0.7.5", "@endo/init": "^0.5.41", + "@endo/marshal": "^0.6.7", "@endo/promise-kit": "^0.2.41", "@iarna/toml": "^2.2.3", "anylogger": "^0.21.0", diff --git a/packages/agoric-cli/src/entrypoint.js b/packages/agoric-cli/src/entrypoint.js index d9d40c99f6b..dd1020a7b43 100755 --- a/packages/agoric-cli/src/entrypoint.js +++ b/packages/agoric-cli/src/entrypoint.js @@ -4,6 +4,7 @@ import '@endo/init/pre.js'; import 'esm'; +import '@agoric/chain-streams/node-fetch-shim.js'; import '@endo/init'; import path from 'path'; diff --git a/packages/agoric-cli/src/main.js b/packages/agoric-cli/src/main.js index 4e4a15fae32..9c88f72c2c4 100644 --- a/packages/agoric-cli/src/main.js +++ b/packages/agoric-cli/src/main.js @@ -8,6 +8,7 @@ import initMain from './init.js'; import installMain from './install.js'; import setDefaultsMain from './set-defaults.js'; import startMain from './start.js'; +import streamMain from './stream.js'; import walletMain from './open.js'; const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet'; @@ -160,6 +161,58 @@ const main = async (progname, rawArgs, powers) => { return subMain(installMain, ['install', forceSdkVersion], opts); }); + program + .command('stream ') + .description('read an Agoric Chain Stream') + .option( + '--integrity ', + 'set integrity mode', + value => { + assert( + ['strict', 'optimistic', 'none'].includes(value), + X`--integrity must be one of 'strict', 'optimistic', or 'none'`, + TypeError, + ); + return value; + }, + 'optimistic', + ) + .option( + '--sleep ', + 'sleep between polling (may be fractional)', + value => { + const num = Number(value); + assert.equal(`${num}`, value, X`--sleep must be a number`, TypeError); + return num; + }, + 0, + ) + .option( + '-o, --output ', + 'value output format', + value => { + assert( + [ + 'hex', + 'justin', + 'justinlines', + 'json', + 'jsonlines', + 'text', + ].includes(value), + X`--output must be one of 'hex', 'justin', 'justinlines', 'json', 'jsonlines', or 'text'`, + TypeError, + ); + return value; + }, + 'justin', + ) + .option('-B, --bootstrap ', 'network bootstrap configuration') + .action(async (pathSpecs, cmd) => { + const opts = { ...program.opts(), ...cmd.opts() }; + return subMain(streamMain, ['stream', ...pathSpecs], opts); + }); + const addRunOptions = cmd => cmd .option( diff --git a/packages/agoric-cli/src/sdk-package-names.js b/packages/agoric-cli/src/sdk-package-names.js index d3822478673..5438155c310 100644 --- a/packages/agoric-cli/src/sdk-package-names.js +++ b/packages/agoric-cli/src/sdk-package-names.js @@ -4,6 +4,7 @@ export default [ "@agoric/access-token", "@agoric/assert", + "@agoric/chain-streams", "@agoric/cosmic-swingset", "@agoric/cosmos", "@agoric/deploy-script-support", diff --git a/packages/agoric-cli/src/stream.js b/packages/agoric-cli/src/stream.js new file mode 100644 index 00000000000..b5b4942d402 --- /dev/null +++ b/packages/agoric-cli/src/stream.js @@ -0,0 +1,120 @@ +// @ts-check +import process from 'process'; +import { Far } from '@endo/marshal'; +import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js'; + +import { + delay, + iterateLatest, + makeChainStream, + makeLeader, + makeStoreKey, +} from '@agoric/chain-streams'; + +export default async function streamMain(progname, rawArgs, powers, opts) { + const { anylogger } = powers; + const console = anylogger('agoric:stream'); + + const { + integrity, + output, + bootstrap = 'http://localhost:26657', + verbose, + sleep, + } = opts; + + /** @type {import('@agoric/chain-streams').ChainStreamOptions} */ + const streamOptions = { + integrity, + }; + + /** @type {(buf: any) => any} */ + let formatOutput; + switch (output) { + case 'justinlines': + case 'justin': { + streamOptions.unserializer = null; + const pretty = !output.endsWith('lines'); + formatOutput = ({ body }) => { + const encoded = JSON.parse(body); + return decodeToJustin(encoded, pretty); + }; + break; + } + case 'jsonlines': + case 'json': { + const spaces = output.endsWith('lines') ? undefined : 2; + const bigintToStringReplacer = (_, arg) => { + if (typeof arg === 'bigint') { + return `${arg}`; + } + return arg; + }; + formatOutput = obj => JSON.stringify(obj, bigintToStringReplacer, spaces); + break; + } + case 'hex': { + // Dump as hex strings. + streamOptions.decode = buf => buf; + streamOptions.unserializer = null; + formatOutput = buf => + buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), ''); + break; + } + case 'text': { + streamOptions.decode = buf => new TextDecoder().decode(buf); + streamOptions.unserializer = null; + formatOutput = buf => buf; + break; + } + default: { + console.error(`Unknown output format: ${output}`); + return 1; + } + } + + if (integrity !== 'none') { + streamOptions.crasher = Far('stream crasher', { + crash: (...args) => { + console.error(...args); + console.warn(`You are running with '--integrity=${integrity}'`); + console.warn( + `If you trust your RPC nodes, you can turn off proofs with '--integrity=none'`, + ); + process.exit(1); + }, + }); + } + + /** @type {import('@agoric/chain-streams').ChainLeaderOptions} */ + const leaderOptions = { + retryCallback: e => { + verbose && console.warn('Retrying due to:', e); + return delay(1000 + Math.random() * 1000); + }, + keepPolling: async () => { + let toSleep = sleep * 1000; + if (toSleep <= 0) { + toSleep = (5 + Math.random()) * 1000; + } + await delay(toSleep); + return true; + }, + }; + + const [_cmd, ...specs] = rawArgs; + + verbose && console.warn('Streaming from leader at', bootstrap); + const leader = makeLeader(bootstrap, leaderOptions); + await Promise.all( + specs.map(async spec => { + verbose && console.warn('Consuming', spec); + const storeKey = makeStoreKey(spec); + const stream = makeChainStream(leader, storeKey, streamOptions); + for await (const { value } of iterateLatest(stream)) { + process.stdout.write(`${formatOutput(value)}\n`); + } + }), + ); + return 0; +} From 44cc69a0bfccb5fc67d74ca52caea458bcc48670 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Sat, 4 Jun 2022 22:33:08 -0600 Subject: [PATCH 06/10] fix(chain-streams): port to new `x/vstorage` module --- packages/chain-streams/src/store-key.js | 21 ++++++++++++++++- packages/chain-streams/src/stream-cosmjs.js | 23 +++++++++++++++++-- packages/chain-streams/src/types.js | 1 + .../chain-streams/test/fake-rpc-server.js | 14 ++++++++--- packages/chain-streams/test/test-mvp.js | 2 +- 5 files changed, 54 insertions(+), 7 deletions(-) diff --git a/packages/chain-streams/src/store-key.js b/packages/chain-streams/src/store-key.js index 22e2fc52d6a..fd46991da83 100644 --- a/packages/chain-streams/src/store-key.js +++ b/packages/chain-streams/src/store-key.js @@ -11,13 +11,32 @@ const swingsetPathToStoreKey = storagePath => storeSubkey: toAscii(`swingset/data:${storagePath}`), }); -export const DEFAULT_PATH_CONVERTER = swingsetPathToStoreKey; +const PATH_SEPARATOR_BYTE = '.'.charCodeAt(0); +const DATA_PREFIX_BYTES = new Uint8Array([0]); + +/** + * @param {string} storagePath + * @param {string} [storeName] + * @returns {import('./types').ChainStoreKey} + */ +const vstoragePathToStoreKey = (storagePath, storeName = 'vstorage') => { + const elems = storagePath ? storagePath.split('.') : []; + const buf = toAscii(`${elems.length}.${storagePath}`); + return harden({ + storeName, + storeSubkey: buf.map(b => (b === PATH_SEPARATOR_BYTE ? 0 : b)), + dataPrefixBytes: DATA_PREFIX_BYTES, + }); +}; + +export const DEFAULT_PATH_CONVERTER = vstoragePathToStoreKey; /** * @type {Record import('./types').ChainStoreKey>} */ export const pathPrefixToConverters = harden({ 'swingset:': swingsetPathToStoreKey, + 'vstore:': vstoragePathToStoreKey, ':': DEFAULT_PATH_CONVERTER, }); diff --git a/packages/chain-streams/src/stream-cosmjs.js b/packages/chain-streams/src/stream-cosmjs.js index da84475c185..00e896e7449 100644 --- a/packages/chain-streams/src/stream-cosmjs.js +++ b/packages/chain-streams/src/stream-cosmjs.js @@ -89,7 +89,11 @@ export const makeChainStream = (leader, storeKey, options = {}) => { integrity = 'optimistic', crasher = null, } = options; - const { storeName, storeSubkey } = storeKey; + const { + storeName, + storeSubkey, + dataPrefixBytes = new Uint8Array(), + } = storeKey; /** @type {QueryVerifier} */ const queryVerifier = integrityToQueryVerifier[integrity]; @@ -144,7 +148,22 @@ export const makeChainStream = (leader, storeKey, options = {}) => { throw error; } assert(result); - return result; + + if (result.length === 0) { + // No data. + return result; + } + + // Handle the data prefix if any. + assert( + result.length >= dataPrefixBytes.length, + X`result too short for data prefix ${dataPrefixBytes}`, + ); + assert( + dataPrefixBytes.every((v, i) => v === result[i]), + X`${result} doesn't start with data prefix ${dataPrefixBytes}`, + ); + return result.slice(dataPrefixBytes.length); }; const getProvenValueAtHeight = makeQuerier('queryVerified', storeName); diff --git a/packages/chain-streams/src/types.js b/packages/chain-streams/src/types.js index bdc07d1f026..a252371ffc2 100644 --- a/packages/chain-streams/src/types.js +++ b/packages/chain-streams/src/types.js @@ -58,4 +58,5 @@ export {}; * @typedef {object} ChainStoreKey * @property {string} storeName * @property {Uint8Array} storeSubkey + * @property {Uint8Array} [dataPrefixBytes] */ diff --git a/packages/chain-streams/test/fake-rpc-server.js b/packages/chain-streams/test/fake-rpc-server.js index 15473a689c3..56fe48199fe 100644 --- a/packages/chain-streams/test/fake-rpc-server.js +++ b/packages/chain-streams/test/fake-rpc-server.js @@ -87,6 +87,16 @@ export const startFakeServer = (t, fakeValues) => { rpcAddrs: [`http://localhost:${PORT}/tendermint-rpc`], }); }); + + const dataPrefix = new Uint8Array([0]); + const encode = obj => { + const str = JSON.stringify(obj); + const ascii = toAscii(str); + const buf = new Uint8Array(dataPrefix.length + ascii.length); + buf.set(dataPrefix); + buf.set(ascii, dataPrefix.length); + return toBase64(buf); + }; app.post('/tendermint-rpc', (req, res) => { log('received', req.path, req.body, req.params); const reply = result => { @@ -106,9 +116,7 @@ export const startFakeServer = (t, fakeValues) => { const value = fakeValues.length === 0 ? null - : toBase64( - toAscii(JSON.stringify(serialize(fakeValues.shift()))), - ); + : encode(serialize(fakeValues.shift())); const result = { response: { code: 0, diff --git a/packages/chain-streams/test/test-mvp.js b/packages/chain-streams/test/test-mvp.js index 978ad909963..0cc1dc066eb 100644 --- a/packages/chain-streams/test/test-mvp.js +++ b/packages/chain-streams/test/test-mvp.js @@ -69,7 +69,7 @@ test('missing rpc server', async t => { }); test('unrecognized integrity', async t => { - await t.throwsAsync(() => makeChainStream({}, {}, { integrity: 'bother' }), { + await t.throws(() => makeChainStream({}, {}, { integrity: 'bother' }), { message: /unrecognized stream integrity mode.*/, }); }); From 094044c6e75203a47d069242cfa225b2c15d80d5 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Mon, 6 Jun 2022 10:54:36 -0600 Subject: [PATCH 07/10] fix(vats): update `chainStorage` to use new `vstorage` API --- golang/cosmos/x/vstorage/vstorage.go | 16 +++++++++ packages/vats/src/lib-chainStorage.js | 21 ++---------- packages/vats/test/test-lib-chainStorage.js | 36 ++++++++++----------- 3 files changed, 36 insertions(+), 37 deletions(-) diff --git a/golang/cosmos/x/vstorage/vstorage.go b/golang/cosmos/x/vstorage/vstorage.go index f71c367d880..70bfe45dc18 100644 --- a/golang/cosmos/x/vstorage/vstorage.go +++ b/golang/cosmos/x/vstorage/vstorage.go @@ -20,6 +20,11 @@ type vstorageMessage struct { Value string `json:"value"` } +type vstorageStoreKey struct { + StoreName string `json:"storeName"` + StoreSubkey string `json:"storeSubkey"` +} + func NewStorageHandler(keeper Keeper) vstorageHandler { return vstorageHandler{keeper: keeper} } @@ -67,6 +72,17 @@ func (sh vstorageHandler) Receive(cctx *vm.ControllerContext, str string) (ret s } return string(bz), nil + case "getStoreKey": + value := vstorageStoreKey{ + StoreName: keeper.GetStoreName(), + StoreSubkey: string(keeper.PathToEncodedKey(msg.Path)), + } + bz, err := json.Marshal(value) + if err != nil { + return "", err + } + return string(bz), nil + case "has": value := keeper.GetData(cctx.Context, msg.Path) if value == "" { diff --git a/packages/vats/src/lib-chainStorage.js b/packages/vats/src/lib-chainStorage.js index 8bb628329fb..56ee2f32979 100644 --- a/packages/vats/src/lib-chainStorage.js +++ b/packages/vats/src/lib-chainStorage.js @@ -28,9 +28,7 @@ export function makeChainStorageRoot(toStorage, storeName, rootPath) { function makeChainStorageNode(path) { const node = { getStoreKey() { - // This duplicates the Go code at - // https://github.com/Agoric/agoric-sdk/blob/cb272ae97a042ceefd3af93b1b4601ca49dfe3a7/golang/cosmos/x/swingset/keeper/keeper.go#L295 - return { storeName, storeSubkey: `swingset/data:${path}` }; + return toStorage({ key: path, method: 'getStoreKey' }); }, getChildNode(name) { assert.typeof(name, 'string'); @@ -44,22 +42,7 @@ export function makeChainStorageRoot(toStorage, storeName, rootPath) { assert.typeof(value, 'string'); toStorage({ key: path, method: 'set', value }); }, - async delete() { - assert(path !== rootPath); - // A 'set' with no value deletes a key if it has no children, but - // otherwise sets data to the empty string and leaves all nodes intact. - // We want to reject silently incomplete deletes (at least for now). - // This check is unfortunately racy (e.g., a vat could wake up - // and set data for a child before _this_ vat receives an - // already-enqueued response claiming no children), but we can tolerate - // that because transforming a deletion into a set-to-empty is - // effectively indistinguishable from a valid reordering where a fully - // successful 'delete' is followed by a child-key 'set' (for which - // absent parent keys are automatically created with empty-string data). - const childCount = await toStorage({ key: path, method: 'size' }); - if (childCount > 0) { - assert.fail(X`Refusing to delete node with children: ${path}`); - } + clearValue() { toStorage({ key: path, method: 'set' }); }, // Possible extensions: diff --git a/packages/vats/test/test-lib-chainStorage.js b/packages/vats/test/test-lib-chainStorage.js index 5d35a08bfb4..5a6eeeb1c6d 100644 --- a/packages/vats/test/test-lib-chainStorage.js +++ b/packages/vats/test/test-lib-chainStorage.js @@ -11,6 +11,12 @@ test('makeChainStorageRoot', async t => { const toStorage = message => { messages.push(message); switch (message.method) { + case 'getStoreKey': { + return { + storeName: 'swingset', + storeSubkey: `swingset/data:${message.key}`, + }; + } case 'set': if ('value' in message) { data.set(message.key, message.value); @@ -62,12 +68,7 @@ test('makeChainStorageRoot', async t => { ); } - // The root node cannot be deleted, but is otherwise normal. - await t.throwsAsync( - rootNode.delete(), - undefined, - 'root node deletion is disallowed', - ); + rootNode.clearValue(); rootNode.setValue('foo'); t.deepEqual( messages.slice(-1), @@ -117,12 +118,11 @@ test('makeChainStorageRoot', async t => { [{ key: childPath, method: 'set', value: 'foo' }], 'non-root setValue message', ); - // eslint-disable-next-line no-await-in-loop - await child.delete(); + child.clearValue(); t.deepEqual( messages.slice(-1), [{ key: childPath, method: 'set' }], - 'non-root delete message', + 'non-root clearValue message', ); } @@ -172,22 +172,22 @@ test('makeChainStorageRoot', async t => { 'level-skipping setValue message', ); - // Deletion requires absence of children. - await t.throwsAsync( - childNode.delete(), - undefined, - 'deleting a node with a child is disallowed', + childNode.clearValue(); + t.deepEqual( + messages.slice(-1), + [{ key: childPath, method: 'set' }], + 'child clearValue message', ); - await deepNode.delete(); + deepNode.clearValue(); t.deepEqual( messages.slice(-1), [{ key: deepPath, method: 'set' }], - 'granchild delete message', + 'granchild clearValue message', ); - await childNode.delete(); + childNode.clearValue(); t.deepEqual( messages.slice(-1), [{ key: childPath, method: 'set' }], - 'child delete message', + 'child clearValue message', ); }); From 345adc43a9d5d478b631c8d9715ed90f647f54dd Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 7 Jun 2022 06:59:55 -0600 Subject: [PATCH 08/10] feat(chain-streams): plumb through retry attempt counter --- packages/agoric-cli/src/stream.js | 3 ++- packages/chain-streams/README.md | 2 +- packages/chain-streams/src/defaults.js | 4 +++- packages/chain-streams/src/leader-netconfig.js | 11 ++++++++--- packages/chain-streams/src/leader.js | 9 ++++++--- packages/chain-streams/src/stream-cosmjs.js | 7 +++++-- packages/chain-streams/src/types.js | 4 ++-- 7 files changed, 27 insertions(+), 13 deletions(-) diff --git a/packages/agoric-cli/src/stream.js b/packages/agoric-cli/src/stream.js index b5b4942d402..eb0cc2fab79 100644 --- a/packages/agoric-cli/src/stream.js +++ b/packages/agoric-cli/src/stream.js @@ -86,9 +86,10 @@ export default async function streamMain(progname, rawArgs, powers, opts) { }); } + // TODO: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ /** @type {import('@agoric/chain-streams').ChainLeaderOptions} */ const leaderOptions = { - retryCallback: e => { + retryCallback: (e, _attempt) => { verbose && console.warn('Retrying due to:', e); return delay(1000 + Math.random() * 1000); }, diff --git a/packages/chain-streams/README.md b/packages/chain-streams/README.md index 2de5ce9046d..e953d092a1e 100644 --- a/packages/chain-streams/README.md +++ b/packages/chain-streams/README.md @@ -8,7 +8,7 @@ TL;DR: You can run `yarn demo`, or to consume a mailbox stream do: npx agoric stream -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext ``` -An example of following an on-chain mailbox in codd (using this package) is: +An example of following an on-chain mailbox in code (using this package) is: ```js // First, obtain a Hardened JS environment via Endo. diff --git a/packages/chain-streams/src/defaults.js b/packages/chain-streams/src/defaults.js index f577ee86309..0bc766ce799 100644 --- a/packages/chain-streams/src/defaults.js +++ b/packages/chain-streams/src/defaults.js @@ -15,10 +15,12 @@ export const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); * Report an error, then retry the leader operation after a second or two. * * @param {any} err + * @param {number} _attempt * @returns {Promise} */ -export const DEFAULT_RETRY_CALLBACK = err => { +export const DEFAULT_RETRY_CALLBACK = (err, _attempt = 0) => { console.warn('retrying after error', err); + // TODO: `delay(Math.random() * Math.min(cap, base * 2 ** attempt)) return delay(1000 + Math.random() * 1000); }; diff --git a/packages/chain-streams/src/leader-netconfig.js b/packages/chain-streams/src/leader-netconfig.js index 9aaaff740f7..2ce11355c40 100644 --- a/packages/chain-streams/src/leader-netconfig.js +++ b/packages/chain-streams/src/leader-netconfig.js @@ -30,24 +30,29 @@ export const makeLeaderFromRpcAddresses = (rpcAddrs, leaderOptions) => { */ export const makeLeaderFromNetworkConfig = (netconfigURL, options = {}) => { const { retryCallback = DEFAULT_RETRY_CALLBACK } = options; - const retry = async err => { + /** @type {import('./types.js').ChainLeaderOptions['retryCallback']} */ + const retry = async (err, attempt) => { if (retryCallback) { - return retryCallback(err); + return retryCallback(err, attempt); } throw err; }; + let attempt = 0; return new Promise((resolve, reject) => { const makeLeader = async () => { const response = await fetch(netconfigURL, { headers: { accept: 'application/json' }, }); const { rpcAddrs } = await response.json(); + // Our part succeeded, so reset the attempt counter. + attempt = 0; return makeLeaderFromRpcAddresses(rpcAddrs, options); }; const retryLeader = async err => { - retry(err) + retry(err, attempt) .then(() => makeLeader().then(resolve, retryLeader)) .catch(reject); + attempt += 1; }; makeLeader().then(resolve, retryLeader); }); diff --git a/packages/chain-streams/src/leader.js b/packages/chain-streams/src/leader.js index a077484ca9c..0c66579edef 100644 --- a/packages/chain-streams/src/leader.js +++ b/packages/chain-streams/src/leader.js @@ -18,13 +18,14 @@ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { shuffle(endpoints); let lastRespondingEndpointIndex = 0; + let thisAttempt = 0; /** @type {import('./types.js').ChainLeader} */ const leader = Far('round robin leader', { getOptions: () => leaderOptions, - retry: async err => { + retry: async (err, attempt) => { if (retryCallback) { - return retryCallback(err); + return retryCallback(err, attempt); } throw err; }, @@ -42,7 +43,8 @@ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { endpointIndex = (endpointIndex + 1) % endpoints.length; // eslint-disable-next-line no-use-before-define - E(leader).retry(err).then(applyOne, reject); + E(leader).retry(err, thisAttempt).then(applyOne, reject); + thisAttempt += 1; }; const applyOne = () => { @@ -51,6 +53,7 @@ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { .then(res => { resolve(harden([res])); lastRespondingEndpointIndex = endpointIndex; + thisAttempt = 0; }, retry); // Don't return to prevent a promise chain. diff --git a/packages/chain-streams/src/stream-cosmjs.js b/packages/chain-streams/src/stream-cosmjs.js index 00e896e7449..b4e2c63e6f4 100644 --- a/packages/chain-streams/src/stream-cosmjs.js +++ b/packages/chain-streams/src/stream-cosmjs.js @@ -198,13 +198,15 @@ export const makeChainStream = (leader, storeKey, options = {}) => { } }; + let attempt = 0; const retryOrFail = err => { - return E(leader) - .retry(err) + E(leader) + .retry(err, attempt) .catch(e => { fail(e); throw e; }); + attempt += 1; }; /** @@ -259,6 +261,7 @@ export const makeChainStream = (leader, storeKey, options = {}) => { const getProvenValue = () => getProvenValueAtHeight(allegedBlockHeight); const buf = await queryVerifier(getProvenValue, crash, getAllegedValue); + attempt = 0; if (!committer.isValid()) { return; } diff --git a/packages/chain-streams/src/types.js b/packages/chain-streams/src/types.js index a252371ffc2..162f6672bc6 100644 --- a/packages/chain-streams/src/types.js +++ b/packages/chain-streams/src/types.js @@ -5,7 +5,7 @@ export {}; /** * @typedef {object} ChainLeaderOptions - * @property {null | ((err: any) => Promise)} [retryCallback] + * @property {null | ((err: any, attempt?: number) => Promise)} [retryCallback] * @property {() => Promise} [keepPolling] */ @@ -18,7 +18,7 @@ export {}; /** * @typedef {object} ChainLeader - * @property {(error: any) => Promise} retry + * @property {(error: any, attempt?: number) => Promise} retry * @property {() => ChainLeaderOptions} getOptions * @property {(callback: (endpoint: string) => Promise) => Promise} mapEndpoints * @property {(key: ChainStoreKey) => Promise>} watchStoreKey From 32f02e7f070964bb81e9364700deb916c9a5f814 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 7 Jun 2022 07:41:23 -0600 Subject: [PATCH 09/10] chore(casting): `@agoric/chain-streams` -> `@agoric/casting` --- .github/workflows/test-all-packages.yml | 4 +- packages/agoric-cli/package.json | 2 +- packages/agoric-cli/src/entrypoint.js | 2 +- .../agoric-cli/src/{stream.js => follow.js} | 38 ++++++++--------- packages/agoric-cli/src/main.js | 8 ++-- packages/agoric-cli/src/sdk-package-names.js | 2 +- packages/{chain-streams => casting}/README.md | 38 ++++++++--------- .../{chain-streams => casting}/jsconfig.json | 7 +++- .../node-fetch-shim.js | 0 .../{chain-streams => casting}/package.json | 4 +- .../src/casting-spec.js} | 27 ++++++------ .../src/change-follower.js} | 16 ++++---- .../src/defaults.js | 0 .../src/follower-cosmjs.js} | 41 +++++++++---------- .../src/iterable.js | 6 +-- .../src/leader-netconfig.js | 8 ++-- .../{chain-streams => casting}/src/leader.js | 8 ++-- .../{chain-streams => casting}/src/main.js | 4 +- .../{chain-streams => casting}/src/shuffle.js | 0 .../{chain-streams => casting}/src/types.js | 20 ++++----- .../test/fake-rpc-server.js | 2 +- .../test/lockdown.js | 0 .../test/prepare-test-env-ava.js | 0 .../test/test-mvp.js | 18 ++++---- 24 files changed, 130 insertions(+), 125 deletions(-) rename packages/agoric-cli/src/{stream.js => follow.js} (72%) rename packages/{chain-streams => casting}/README.md (74%) rename packages/{chain-streams => casting}/jsconfig.json (81%) rename packages/{chain-streams => casting}/node-fetch-shim.js (100%) rename packages/{chain-streams => casting}/package.json (94%) rename packages/{chain-streams/src/store-key.js => casting/src/casting-spec.js} (56%) rename packages/{chain-streams/src/watcher.js => casting/src/change-follower.js} (68%) rename packages/{chain-streams => casting}/src/defaults.js (100%) rename packages/{chain-streams/src/stream-cosmjs.js => casting/src/follower-cosmjs.js} (86%) rename packages/{chain-streams => casting}/src/iterable.js (86%) rename packages/{chain-streams => casting}/src/leader-netconfig.js (88%) rename packages/{chain-streams => casting}/src/leader.js (87%) rename packages/{chain-streams => casting}/src/main.js (75%) rename packages/{chain-streams => casting}/src/shuffle.js (100%) rename packages/{chain-streams => casting}/src/types.js (72%) rename packages/{chain-streams => casting}/test/fake-rpc-server.js (98%) rename packages/{chain-streams => casting}/test/lockdown.js (100%) rename packages/{chain-streams => casting}/test/prepare-test-env-ava.js (100%) rename packages/{chain-streams => casting}/test/test-mvp.js (79%) diff --git a/.github/workflows/test-all-packages.yml b/.github/workflows/test-all-packages.yml index 5f94c51416d..c111de7c7d2 100644 --- a/.github/workflows/test-all-packages.yml +++ b/.github/workflows/test-all-packages.yml @@ -142,8 +142,8 @@ jobs: # END-TEST-BOILERPLATE - name: yarn test (cosmos) run: cd golang/cosmos && yarn ${{ steps.vars.outputs.test }} - - name: yarn test (chain-streams) - run: cd packages/chain-streams && yarn ${{ steps.vars.outputs.test }} + - name: yarn test (casting) + run: cd packages/casting && yarn ${{ steps.vars.outputs.test }} - name: yarn test (run-protocol) run: cd packages/run-protocol && yarn ${{ steps.vars.outputs.test }} - name: yarn test (pegasus) diff --git a/packages/agoric-cli/package.json b/packages/agoric-cli/package.json index a27c8057263..39d20ff9ad6 100644 --- a/packages/agoric-cli/package.json +++ b/packages/agoric-cli/package.json @@ -29,7 +29,7 @@ "dependencies": { "@agoric/access-token": "^0.4.18", "@agoric/assert": "^0.4.0", - "@agoric/chain-streams": "^0.1.0", + "@agoric/casting": "^0.1.0", "@agoric/nat": "^4.1.0", "@endo/bundle-source": "^2.2.0", "@endo/captp": "^2.0.7", diff --git a/packages/agoric-cli/src/entrypoint.js b/packages/agoric-cli/src/entrypoint.js index dd1020a7b43..9005d38f83d 100755 --- a/packages/agoric-cli/src/entrypoint.js +++ b/packages/agoric-cli/src/entrypoint.js @@ -4,7 +4,7 @@ import '@endo/init/pre.js'; import 'esm'; -import '@agoric/chain-streams/node-fetch-shim.js'; +import '@agoric/casting/node-fetch-shim.js'; import '@endo/init'; import path from 'path'; diff --git a/packages/agoric-cli/src/stream.js b/packages/agoric-cli/src/follow.js similarity index 72% rename from packages/agoric-cli/src/stream.js rename to packages/agoric-cli/src/follow.js index eb0cc2fab79..00b3e7b2d68 100644 --- a/packages/agoric-cli/src/stream.js +++ b/packages/agoric-cli/src/follow.js @@ -6,14 +6,14 @@ import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js'; import { delay, iterateLatest, - makeChainStream, + makeFollower, makeLeader, - makeStoreKey, -} from '@agoric/chain-streams'; + makeCastingSpec, +} from '@agoric/casting'; -export default async function streamMain(progname, rawArgs, powers, opts) { +export default async function followerMain(progname, rawArgs, powers, opts) { const { anylogger } = powers; - const console = anylogger('agoric:stream'); + const console = anylogger('agoric:follower'); const { integrity, @@ -23,8 +23,8 @@ export default async function streamMain(progname, rawArgs, powers, opts) { sleep, } = opts; - /** @type {import('@agoric/chain-streams').ChainStreamOptions} */ - const streamOptions = { + /** @type {import('@agoric/casting').FollowerOptions} */ + const followerOptions = { integrity, }; @@ -33,7 +33,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) { switch (output) { case 'justinlines': case 'justin': { - streamOptions.unserializer = null; + followerOptions.unserializer = null; const pretty = !output.endsWith('lines'); formatOutput = ({ body }) => { const encoded = JSON.parse(body); @@ -55,15 +55,15 @@ export default async function streamMain(progname, rawArgs, powers, opts) { } case 'hex': { // Dump as hex strings. - streamOptions.decode = buf => buf; - streamOptions.unserializer = null; + followerOptions.decode = buf => buf; + followerOptions.unserializer = null; formatOutput = buf => buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), ''); break; } case 'text': { - streamOptions.decode = buf => new TextDecoder().decode(buf); - streamOptions.unserializer = null; + followerOptions.decode = buf => new TextDecoder().decode(buf); + followerOptions.unserializer = null; formatOutput = buf => buf; break; } @@ -74,7 +74,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) { } if (integrity !== 'none') { - streamOptions.crasher = Far('stream crasher', { + followerOptions.crasher = Far('follower crasher', { crash: (...args) => { console.error(...args); console.warn(`You are running with '--integrity=${integrity}'`); @@ -87,7 +87,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) { } // TODO: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ - /** @type {import('@agoric/chain-streams').ChainLeaderOptions} */ + /** @type {import('@agoric/casting').LeaderOptions} */ const leaderOptions = { retryCallback: (e, _attempt) => { verbose && console.warn('Retrying due to:', e); @@ -105,14 +105,14 @@ export default async function streamMain(progname, rawArgs, powers, opts) { const [_cmd, ...specs] = rawArgs; - verbose && console.warn('Streaming from leader at', bootstrap); + verbose && console.warn('Creating leader for', bootstrap); const leader = makeLeader(bootstrap, leaderOptions); await Promise.all( specs.map(async spec => { - verbose && console.warn('Consuming', spec); - const storeKey = makeStoreKey(spec); - const stream = makeChainStream(leader, storeKey, streamOptions); - for await (const { value } of iterateLatest(stream)) { + verbose && console.warn('Following', spec); + const castingSpec = makeCastingSpec(spec); + const follower = makeFollower(leader, castingSpec, followerOptions); + for await (const { value } of iterateLatest(follower)) { process.stdout.write(`${formatOutput(value)}\n`); } }), diff --git a/packages/agoric-cli/src/main.js b/packages/agoric-cli/src/main.js index 9c88f72c2c4..6d32576e8ee 100644 --- a/packages/agoric-cli/src/main.js +++ b/packages/agoric-cli/src/main.js @@ -8,7 +8,7 @@ import initMain from './init.js'; import installMain from './install.js'; import setDefaultsMain from './set-defaults.js'; import startMain from './start.js'; -import streamMain from './stream.js'; +import followMain from './follow.js'; import walletMain from './open.js'; const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet'; @@ -162,8 +162,8 @@ const main = async (progname, rawArgs, powers) => { }); program - .command('stream ') - .description('read an Agoric Chain Stream') + .command('follow ') + .description('follow an Agoric Casting leader') .option( '--integrity ', 'set integrity mode', @@ -210,7 +210,7 @@ const main = async (progname, rawArgs, powers) => { .option('-B, --bootstrap ', 'network bootstrap configuration') .action(async (pathSpecs, cmd) => { const opts = { ...program.opts(), ...cmd.opts() }; - return subMain(streamMain, ['stream', ...pathSpecs], opts); + return subMain(followMain, ['follow', ...pathSpecs], opts); }); const addRunOptions = cmd => diff --git a/packages/agoric-cli/src/sdk-package-names.js b/packages/agoric-cli/src/sdk-package-names.js index 5438155c310..6048ab9efe1 100644 --- a/packages/agoric-cli/src/sdk-package-names.js +++ b/packages/agoric-cli/src/sdk-package-names.js @@ -4,7 +4,7 @@ export default [ "@agoric/access-token", "@agoric/assert", - "@agoric/chain-streams", + "@agoric/casting", "@agoric/cosmic-swingset", "@agoric/cosmos", "@agoric/deploy-script-support", diff --git a/packages/chain-streams/README.md b/packages/casting/README.md similarity index 74% rename from packages/chain-streams/README.md rename to packages/casting/README.md index e953d092a1e..a26a3cb3eeb 100644 --- a/packages/chain-streams/README.md +++ b/packages/casting/README.md @@ -1,11 +1,11 @@ -# Chain Streams +# Agoric Casting -This [Agoric](https://agoric.com) Chain Streams package consumes data server -publication leaders in a flexible, future-proof way. +This [Agoric](https://agoric.com) Casting package follows ocap broadcasts in a +flexible, future-proof way. -TL;DR: You can run `yarn demo`, or to consume a mailbox stream do: +TL;DR: You can run `yarn demo`, or to follow a mailbox castingSpec do: ```sh -npx agoric stream -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext +npx agoric follow -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext ``` An example of following an on-chain mailbox in code (using this package) is: @@ -13,31 +13,31 @@ An example of following an on-chain mailbox in code (using this package) is: ```js // First, obtain a Hardened JS environment via Endo. import '@endo/init/pre-remoting.js'; // needed only for the next line -import '@agoric/chain-streams/node-fetch-shim.js'; // needed for Node.js +import '@agoric/castingSpec/node-fetch-shim.js'; // needed for Node.js import '@endo/init'; import { iterateLatest, - makeChainStream, + makeFollower, makeLeader, - makeStoreKey, -} from '@agoric/chain-streams'; + makeCastingSpec, +} from '@agoric/casting'; -// Iterate over a mailbox stream on the devnet. +// Iterate over a mailbox follower on the devnet. const leader = makeLeader('https://devnet.agoric.net/network-config'); -const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz'); -const stream = makeChainStream(leader, storeKey); -for await (const { value } of iterateLatest(stream)) { +const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); +const follower = makeFollower(leader, castingSpec); +for await (const { value } of iterateLatest(follower)) { console.log(`here's a mailbox value`, value); } ``` -## Stream options +## Follower options -The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides an optional bag of options: +The `followerOpts` argument in `makeFollower(leader, key, followerOpts)` provides an optional bag of options: - the `integrity` option, which has three possibilities: - `'strict'` - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block), - - `'optimistic'` (default) - release data immediately, but may crash the stream in the future if an already-released value could not be proven, + - `'optimistic'` (default) - release data immediately, but may crash the follower in the future if an already-released value could not be proven, - `'none'` - release data immediately without validation - the `decode` option is a function to translate `buf: Uint8Array` into `data: string` - (default) - interpret buf as a utf-8 string, then `JSON.parse` it @@ -46,7 +46,7 @@ The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides - `null` - don't additionally unserialize data before releasing it - any unserializer object supporting `E(unserializer).unserialize(data)` - the `crasher` option can be - - `null` (default) stream failures only propagate an exception/rejection + - `null` (default) follower failures only propagate an exception/rejection - any crasher object supporting `E(crasher).crash(reason)` ## Behind the scenes @@ -54,11 +54,11 @@ The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides - the network config contains enough information to obtain Tendermint RPC nodes for a given Agoric network. You can use `makeLeaderFromRpcAddresses` directly if you want to avoid fetching a network-config. -- each stream uses periodic CosmJS state polling (every X milliseconds) which +- each follower uses periodic CosmJS state polling (every X milliseconds) which can be refreshed more expediently via a Tendermint subscription to the corresponding `state_change` event - published (string) values are automatically unmarshalled, but without object references. a custom `marshaller` for your application. -- the `iterateRecent` adapter transforms a stream into a local async iterator +- the `iterateRecent` adapter transforms a follower into a local async iterator that produces only the last queried value (with no history reconstruction) ## Status diff --git a/packages/chain-streams/jsconfig.json b/packages/casting/jsconfig.json similarity index 81% rename from packages/chain-streams/jsconfig.json rename to packages/casting/jsconfig.json index e6811ce15af..1dc50dd301c 100644 --- a/packages/chain-streams/jsconfig.json +++ b/packages/casting/jsconfig.json @@ -15,5 +15,10 @@ "strictNullChecks": true, "moduleResolution": "node", }, - "include": ["src/**/*.js", "exported.js", "lockdown.js"], + "include": [ + "*.js", + "public/**/*.js", + "src/**/*.js", + "test/**/*.js", + ], } diff --git a/packages/chain-streams/node-fetch-shim.js b/packages/casting/node-fetch-shim.js similarity index 100% rename from packages/chain-streams/node-fetch-shim.js rename to packages/casting/node-fetch-shim.js diff --git a/packages/chain-streams/package.json b/packages/casting/package.json similarity index 94% rename from packages/chain-streams/package.json rename to packages/casting/package.json index da7fc65ec6e..fe5a8ba96e2 100644 --- a/packages/chain-streams/package.json +++ b/packages/casting/package.json @@ -1,7 +1,7 @@ { - "name": "@agoric/chain-streams", + "name": "@agoric/casting", "version": "0.1.0", - "description": "Agoric's Chain Streams", + "description": "Agoric's OCap broadcastingSpec system", "type": "module", "main": "src/main.js", "repository": "https://github.com/Agoric/agoric-sdk", diff --git a/packages/chain-streams/src/store-key.js b/packages/casting/src/casting-spec.js similarity index 56% rename from packages/chain-streams/src/store-key.js rename to packages/casting/src/casting-spec.js index fd46991da83..aa5bffa81a8 100644 --- a/packages/chain-streams/src/store-key.js +++ b/packages/casting/src/casting-spec.js @@ -3,9 +3,9 @@ import { toAscii } from '@cosmjs/encoding'; /** * @param {string} storagePath - * @returns {import('./types').ChainStoreKey} + * @returns {import('./types').CastingSpec} */ -const swingsetPathToStoreKey = storagePath => +const swingsetPathToCastingSpec = storagePath => harden({ storeName: 'swingset', storeSubkey: toAscii(`swingset/data:${storagePath}`), @@ -17,9 +17,9 @@ const DATA_PREFIX_BYTES = new Uint8Array([0]); /** * @param {string} storagePath * @param {string} [storeName] - * @returns {import('./types').ChainStoreKey} + * @returns {import('./types').CastingSpec} */ -const vstoragePathToStoreKey = (storagePath, storeName = 'vstorage') => { +const vstoragePathToCastingSpec = (storagePath, storeName = 'vstorage') => { const elems = storagePath ? storagePath.split('.') : []; const buf = toAscii(`${elems.length}.${storagePath}`); return harden({ @@ -29,26 +29,27 @@ const vstoragePathToStoreKey = (storagePath, storeName = 'vstorage') => { }); }; -export const DEFAULT_PATH_CONVERTER = vstoragePathToStoreKey; +export const DEFAULT_PATH_CONVERTER = vstoragePathToCastingSpec; /** - * @type {Record import('./types').ChainStoreKey>} + * @type {Record import('./types').CastingSpec>} */ export const pathPrefixToConverters = harden({ - 'swingset:': swingsetPathToStoreKey, - 'vstore:': vstoragePathToStoreKey, + 'swingset:': swingsetPathToCastingSpec, + 'vstore:': vstoragePathToCastingSpec, ':': DEFAULT_PATH_CONVERTER, }); /** - * @param {string} pathSpec - * @returns {import('./types').ChainStoreKey} + * @param {string} specString + * @returns {import('./types').CastingSpec} */ -export const makeStoreKey = pathSpec => { - const match = pathSpec.match(/^([^:.]*:)(.*)/); +export const makeCastingSpec = specString => { + assert.typeof(specString, 'string'); + const match = specString.match(/^([^:.]*:)(.*)/); assert( match, - `path spec ${pathSpec} does not match 'PREFIX:PATH' or ':PATH'`, + `spec string ${specString} does not match 'PREFIX:PATH' or ':PATH'`, ); const kind = match[1]; const storePath = match[2]; diff --git a/packages/chain-streams/src/watcher.js b/packages/casting/src/change-follower.js similarity index 68% rename from packages/chain-streams/src/watcher.js rename to packages/casting/src/change-follower.js index b25505398ad..93e738456c5 100644 --- a/packages/chain-streams/src/watcher.js +++ b/packages/casting/src/change-follower.js @@ -5,19 +5,19 @@ import { DEFAULT_KEEP_POLLING } from './defaults.js'; /** * Just return an unspecified allegedValue every poll period. * - * @param {import('./types').ChainLeader} leader - * @param {import('./types.js').ChainStoreKey} storeKey - * @returns {Promise>} + * @param {import('./types').Leader} leader + * @param {import('./types.js').CastingSpec} castingSpec + * @returns {Promise>} */ -export const makePollingWatcher = async (leader, storeKey) => { +export const makePollingChangeFollower = async (leader, castingSpec) => { const { keepPolling = DEFAULT_KEEP_POLLING } = await E(leader).getOptions(); - return Far('key watcher stream', { + return Far('polling change follower', { getLatestIterable: () => - Far('key watcher iterable', { + Far('polling change follower iterable', { [Symbol.asyncIterator]: () => { /** @type {Promise | undefined} */ let nextPollPromise; - return Far('key watcher iterator', { + return Far('polling change follower iterator', { next: async () => { if (!nextPollPromise) { nextPollPromise = keepPolling(); @@ -25,7 +25,7 @@ export const makePollingWatcher = async (leader, storeKey) => { const keepGoing = await nextPollPromise; nextPollPromise = undefined; const change = harden({ - storeKey, + castingSpec, // Make no warrant as to the values. values: [], }); diff --git a/packages/chain-streams/src/defaults.js b/packages/casting/src/defaults.js similarity index 100% rename from packages/chain-streams/src/defaults.js rename to packages/casting/src/defaults.js diff --git a/packages/chain-streams/src/stream-cosmjs.js b/packages/casting/src/follower-cosmjs.js similarity index 86% rename from packages/chain-streams/src/stream-cosmjs.js rename to packages/casting/src/follower-cosmjs.js index b4e2c63e6f4..eebdd07f275 100644 --- a/packages/chain-streams/src/stream-cosmjs.js +++ b/packages/casting/src/follower-cosmjs.js @@ -9,8 +9,8 @@ import { DEFAULT_DECODER, DEFAULT_UNSERIALIZER } from './defaults.js'; const { details: X } = assert; -/** @template T @typedef {import('./types.js').ChainStreamElement} ChainStreamElement */ -/** @template T @typedef {import('./types.js').ChainStream} ChainStream */ +/** @template T @typedef {import('./types.js').FollowerElement} FollowerElement */ +/** @template T @typedef {import('./types.js').Follower} Follower */ /** * @template T @@ -42,7 +42,7 @@ const collectSingle = values => { */ /** - * @type {Record['integrity'], QueryVerifier>} + * @type {Record['integrity'], QueryVerifier>} */ export const integrityToQueryVerifier = harden({ strict: async (getProvenValue, crash, _getAllegedValue) => { @@ -77,12 +77,12 @@ export const integrityToQueryVerifier = harden({ /** * @template T - * @param {ERef} leader - * @param {import('./types').ChainStoreKey} storeKey - * @param {import('./types').ChainStreamOptions} options - * @returns {ChainStream>} + * @param {ERef} leader + * @param {import('./types').CastingSpec} castingSpec + * @param {import('./types').FollowerOptions} options + * @returns {Follower>} */ -export const makeChainStream = (leader, storeKey, options = {}) => { +export const makeFollower = (leader, castingSpec, options = {}) => { const { decode = DEFAULT_DECODER, unserializer = DEFAULT_UNSERIALIZER, @@ -93,11 +93,11 @@ export const makeChainStream = (leader, storeKey, options = {}) => { storeName, storeSubkey, dataPrefixBytes = new Uint8Array(), - } = storeKey; + } = castingSpec; /** @type {QueryVerifier} */ const queryVerifier = integrityToQueryVerifier[integrity]; - assert(queryVerifier, X`unrecognized stream integrity mode ${integrity}`); + assert(queryVerifier, X`unrecognized follower integrity mode ${integrity}`); /** @type {Map} */ const endpointToQueryClient = new Map(); @@ -173,10 +173,10 @@ export const makeChainStream = (leader, storeKey, options = {}) => { ); // Enable the periodic fetch. - /** @type {ChainStream>} */ - return Far('chain stream', { + /** @type {Follower>} */ + return Far('chain follower', { getLatestIterable: () => { - /** @type {NotifierRecord>} */ + /** @type {NotifierRecord>} */ const { updater, notifier } = makeNotifierKit(); let finished = false; @@ -189,10 +189,9 @@ export const makeChainStream = (leader, storeKey, options = {}) => { const crash = err => { fail(err); if (crasher) { - E(crasher).crash( - `PROOF VERIFICATION FAILURE; crashing follower`, - err, - ); + E(crasher) + .crash(`PROOF VERIFICATION FAILURE; crashing follower`, err) + .catch(e => assert(false, X`crashing follower failed: ${e}`)); } else { console.error(`PROOF VERIFICATION FAILURE; crashing follower`, err); } @@ -246,7 +245,7 @@ export const makeChainStream = (leader, storeKey, options = {}) => { let lastBuf; /** - * @param {import('./types').ChainStoreChange} allegedChange + * @param {import('./types').CastingChange} allegedChange */ const queryAndUpdateOnce = async allegedChange => { const committer = prepareUpdateInOrder(); @@ -288,9 +287,9 @@ export const makeChainStream = (leader, storeKey, options = {}) => { committer.commit({ value }); }; - const changeStream = E(leader).watchStoreKey(storeKey); + const changeFollower = E(leader).watchCasting(castingSpec); const queryWhenKeyChanges = async () => { - for await (const allegedChange of iterateLatest(changeStream)) { + for await (const allegedChange of iterateLatest(changeFollower)) { if (finished) { return; } @@ -299,7 +298,7 @@ export const makeChainStream = (leader, storeKey, options = {}) => { } }; - queryAndUpdateOnce({ values: [], storeKey }).catch(retryOrFail); + queryAndUpdateOnce({ values: [], castingSpec }).catch(retryOrFail); queryWhenKeyChanges().catch(fail); return makeAsyncIterableFromNotifier(notifier); diff --git a/packages/chain-streams/src/iterable.js b/packages/casting/src/iterable.js similarity index 86% rename from packages/chain-streams/src/iterable.js rename to packages/casting/src/iterable.js index 9c0896aab9d..2275959c398 100644 --- a/packages/chain-streams/src/iterable.js +++ b/packages/casting/src/iterable.js @@ -29,14 +29,14 @@ export const makeAsyncIterableFromNotifier = notifier => /** * @template T - * @param {ERef>} stream + * @param {ERef>} follower */ -export const iterateLatest = stream => { +export const iterateLatest = follower => { // For now, just pass through the iterable. return harden({ /** @returns {AsyncIterator} */ [Symbol.asyncIterator]: () => { - const latestIterable = E(stream).getLatestIterable(); + const latestIterable = E(follower).getLatestIterable(); const iterator = E(latestIterable)[Symbol.asyncIterator](); return harden({ next: () => E(iterator).next(), diff --git a/packages/chain-streams/src/leader-netconfig.js b/packages/casting/src/leader-netconfig.js similarity index 88% rename from packages/chain-streams/src/leader-netconfig.js rename to packages/casting/src/leader-netconfig.js index 2ce11355c40..6861e2c86a1 100644 --- a/packages/chain-streams/src/leader-netconfig.js +++ b/packages/casting/src/leader-netconfig.js @@ -7,7 +7,7 @@ const { details: X } = assert; /** * @param {string[]} rpcAddrs - * @param {import('./types.js').ChainLeaderOptions} [leaderOptions] + * @param {import('./types.js').LeaderOptions} [leaderOptions] */ export const makeLeaderFromRpcAddresses = (rpcAddrs, leaderOptions) => { assert(Array.isArray(rpcAddrs), X`rpcAddrs ${rpcAddrs} must be an array`); @@ -26,11 +26,11 @@ export const makeLeaderFromRpcAddresses = (rpcAddrs, leaderOptions) => { /** * @param {string} netconfigURL - * @param {import('./types.js').ChainLeaderOptions} [options] + * @param {import('./types.js').LeaderOptions} [options] */ export const makeLeaderFromNetworkConfig = (netconfigURL, options = {}) => { const { retryCallback = DEFAULT_RETRY_CALLBACK } = options; - /** @type {import('./types.js').ChainLeaderOptions['retryCallback']} */ + /** @type {import('./types.js').LeaderOptions['retryCallback']} */ const retry = async (err, attempt) => { if (retryCallback) { return retryCallback(err, attempt); @@ -60,7 +60,7 @@ export const makeLeaderFromNetworkConfig = (netconfigURL, options = {}) => { /** * @param {string} bootstrap - * @param {import('./types.js').ChainLeaderOptions} options + * @param {import('./types.js').LeaderOptions} options */ export const makeLeader = (bootstrap, options) => { if (bootstrap.includes('network-config')) { diff --git a/packages/chain-streams/src/leader.js b/packages/casting/src/leader.js similarity index 87% rename from packages/chain-streams/src/leader.js rename to packages/casting/src/leader.js index 0c66579edef..1f89946ab70 100644 --- a/packages/chain-streams/src/leader.js +++ b/packages/casting/src/leader.js @@ -2,13 +2,13 @@ import { E, Far } from '@endo/far'; import { DEFAULT_RETRY_CALLBACK } from './defaults.js'; import { shuffle } from './shuffle.js'; -import { makePollingWatcher } from './watcher.js'; +import { makePollingChangeFollower } from './change-follower.js'; /** * Create a chain leader that rotates through a list of endpoints. * * @param {string[]} endpoints - * @param {import('./types.js').ChainLeaderOptions} leaderOptions + * @param {import('./types.js').LeaderOptions} leaderOptions */ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { const { retryCallback = DEFAULT_RETRY_CALLBACK } = leaderOptions; @@ -20,7 +20,7 @@ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { let lastRespondingEndpointIndex = 0; let thisAttempt = 0; - /** @type {import('./types.js').ChainLeader} */ + /** @type {import('./types.js').Leader} */ const leader = Far('round robin leader', { getOptions: () => leaderOptions, retry: async (err, attempt) => { @@ -29,7 +29,7 @@ export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { } throw err; }, - watchStoreKey: storeKey => makePollingWatcher(leader, storeKey), + watchCasting: castingSpec => makePollingChangeFollower(leader, castingSpec), /** * @template T * @param {(endpoint: string) => Promise} callback diff --git a/packages/chain-streams/src/main.js b/packages/casting/src/main.js similarity index 75% rename from packages/chain-streams/src/main.js rename to packages/casting/src/main.js index 3e0de76bcaf..4a597fa0785 100644 --- a/packages/chain-streams/src/main.js +++ b/packages/casting/src/main.js @@ -2,7 +2,7 @@ export * from './types.js'; // no named exports export * from './defaults.js'; export * from './leader-netconfig.js'; -export * from './stream-cosmjs.js'; -export * from './store-key.js'; +export * from './follower-cosmjs.js'; +export * from './casting-spec.js'; export * from './leader.js'; export * from './iterable.js'; diff --git a/packages/chain-streams/src/shuffle.js b/packages/casting/src/shuffle.js similarity index 100% rename from packages/chain-streams/src/shuffle.js rename to packages/casting/src/shuffle.js diff --git a/packages/chain-streams/src/types.js b/packages/casting/src/types.js similarity index 72% rename from packages/chain-streams/src/types.js rename to packages/casting/src/types.js index 162f6672bc6..a0e73e87eee 100644 --- a/packages/chain-streams/src/types.js +++ b/packages/casting/src/types.js @@ -4,35 +4,35 @@ export {}; /** - * @typedef {object} ChainLeaderOptions + * @typedef {object} LeaderOptions * @property {null | ((err: any, attempt?: number) => Promise)} [retryCallback] * @property {() => Promise} [keepPolling] */ /** - * @typedef {object} ChainStoreChange - * @property {ChainStoreKey} storeKey + * @typedef {object} CastingChange + * @property {CastingSpec} castingSpec * @property {number} [blockHeight] * @property {Uint8Array[]} values */ /** - * @typedef {object} ChainLeader + * @typedef {object} Leader * @property {(error: any, attempt?: number) => Promise} retry - * @property {() => ChainLeaderOptions} getOptions + * @property {() => LeaderOptions} getOptions * @property {(callback: (endpoint: string) => Promise) => Promise} mapEndpoints - * @property {(key: ChainStoreKey) => Promise>} watchStoreKey + * @property {(key: CastingSpec) => Promise>} watchCasting */ /** * @template T - * @typedef {object} ChainStream + * @typedef {object} Follower * @property {() => AsyncIterable} [getLatestIterable] */ /** * @template T - * @typedef {object} ChainStreamElement + * @typedef {object} FollowerElement * @property {T} value */ @@ -47,7 +47,7 @@ export {}; */ /** - * @typedef {object} ChainStreamOptions + * @typedef {object} FollowerOptions * @property {null | import('@endo/far').FarRef} [unserializer] * @property {(buf: Uint8Array) => any} [decode] * @property {'strict'|'optimistic'|'none'} [integrity] @@ -55,7 +55,7 @@ export {}; */ /** - * @typedef {object} ChainStoreKey + * @typedef {object} CastingSpec * @property {string} storeName * @property {Uint8Array} storeSubkey * @property {Uint8Array} [dataPrefixBytes] diff --git a/packages/chain-streams/test/fake-rpc-server.js b/packages/casting/test/fake-rpc-server.js similarity index 98% rename from packages/chain-streams/test/fake-rpc-server.js rename to packages/casting/test/fake-rpc-server.js index 56fe48199fe..dc95bd892e4 100644 --- a/packages/chain-streams/test/fake-rpc-server.js +++ b/packages/casting/test/fake-rpc-server.js @@ -230,7 +230,7 @@ export const develop = async () => { const PORT = await startFakeServer(mockT, [...fakeValues]); console.log( `Try this in another terminal: - agoric stream :fake.path --bootstrap=http://localhost:${PORT}/network-config --sleep=0.5 --integrity=none`, + agoric follower :fake.path --bootstrap=http://localhost:${PORT}/network-config --sleep=0.5 --integrity=none`, ); console.warn(`Control-C to interrupt...`); // Wait forever. diff --git a/packages/chain-streams/test/lockdown.js b/packages/casting/test/lockdown.js similarity index 100% rename from packages/chain-streams/test/lockdown.js rename to packages/casting/test/lockdown.js diff --git a/packages/chain-streams/test/prepare-test-env-ava.js b/packages/casting/test/prepare-test-env-ava.js similarity index 100% rename from packages/chain-streams/test/prepare-test-env-ava.js rename to packages/casting/test/prepare-test-env-ava.js diff --git a/packages/chain-streams/test/test-mvp.js b/packages/casting/test/test-mvp.js similarity index 79% rename from packages/chain-streams/test/test-mvp.js rename to packages/casting/test/test-mvp.js index 0cc1dc066eb..6423b8e6ef4 100644 --- a/packages/chain-streams/test/test-mvp.js +++ b/packages/casting/test/test-mvp.js @@ -4,9 +4,9 @@ import { test } from './prepare-test-env-ava.js'; import { iterateLatest, - makeChainStream, + makeFollower, makeLeader, - makeStoreKey, + makeCastingSpec, } from '../src/main.js'; import { delay } from '../src/defaults.js'; @@ -16,12 +16,12 @@ test('happy path', async t => { const expected = ['latest', 'later', 'done']; t.plan(expected.length); const PORT = await t.context.startServer(t, [...expected]); - /** @type {import('../src/types.js').ChainLeaderOptions} */ + /** @type {import('../src/types.js').LeaderOptions} */ const lo = { retryCallback: null, // fail fast, no retries keepPolling: () => delay(200).then(() => true), // poll really quickly }; - /** @type {import('../src/types.js').ChainStreamOptions} */ + /** @type {import('../src/types.js').FollowerOptions} */ const so = { integrity: 'none', }; @@ -29,9 +29,9 @@ test('happy path', async t => { // The rest of this test is taken almost verbatim from the README.md, with // some minor modifications (testLeaderOptions and deepEqual). const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo); - const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz'); - const stream = await makeChainStream(leader, storeKey, so); - for await (const { value } of iterateLatest(stream)) { + const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); + const follower = await makeFollower(leader, castingSpec, so); + for await (const { value } of iterateLatest(follower)) { t.log(`here's a mailbox value`, value); // The rest here is to drive the test. @@ -69,8 +69,8 @@ test('missing rpc server', async t => { }); test('unrecognized integrity', async t => { - await t.throws(() => makeChainStream({}, {}, { integrity: 'bother' }), { - message: /unrecognized stream integrity mode.*/, + await t.throws(() => makeFollower({}, {}, { integrity: 'bother' }), { + message: /unrecognized follower integrity mode.*/, }); }); From 07c2be392843ee96f8557407c89b210183c1f517 Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 7 Jun 2022 16:14:05 -0600 Subject: [PATCH 10/10] ci(deployment-test): upgrade to Node.js v16.x --- .github/workflows/deployment-test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/deployment-test.yml b/.github/workflows/deployment-test.yml index 633edc1be66..e9ced124ba3 100644 --- a/.github/workflows/deployment-test.yml +++ b/.github/workflows/deployment-test.yml @@ -37,7 +37,7 @@ jobs: cache-key: trusty - uses: ./.github/actions/restore-node with: - node-version: 14.x + node-version: 16.x # Select a branch on loadgen to test against by adding text to the body of the # pull request. For example: #loadgen-branch: user-123-update-foo @@ -105,7 +105,7 @@ jobs: env: NETWORK_NAME: chaintest - uses: actions/upload-artifact@v2 - if: failure() + if: always() with: name: deployment-test-results-${{ env.NOW }} path: /usr/src/agoric-sdk/chaintest/results