diff --git a/.github/workflows/deployment-test.yml b/.github/workflows/deployment-test.yml index 633edc1be66..e9ced124ba3 100644 --- a/.github/workflows/deployment-test.yml +++ b/.github/workflows/deployment-test.yml @@ -37,7 +37,7 @@ jobs: cache-key: trusty - uses: ./.github/actions/restore-node with: - node-version: 14.x + node-version: 16.x # Select a branch on loadgen to test against by adding text to the body of the # pull request. For example: #loadgen-branch: user-123-update-foo @@ -105,7 +105,7 @@ jobs: env: NETWORK_NAME: chaintest - uses: actions/upload-artifact@v2 - if: failure() + if: always() with: name: deployment-test-results-${{ env.NOW }} path: /usr/src/agoric-sdk/chaintest/results diff --git a/.github/workflows/test-all-packages.yml b/.github/workflows/test-all-packages.yml index a18a2989a90..c111de7c7d2 100644 --- a/.github/workflows/test-all-packages.yml +++ b/.github/workflows/test-all-packages.yml @@ -142,6 +142,8 @@ jobs: # END-TEST-BOILERPLATE - name: yarn test (cosmos) run: cd golang/cosmos && yarn ${{ steps.vars.outputs.test }} + - name: yarn test (casting) + run: cd packages/casting && yarn ${{ steps.vars.outputs.test }} - name: yarn test (run-protocol) run: cd packages/run-protocol && yarn ${{ steps.vars.outputs.test }} - name: yarn test (pegasus) diff --git a/golang/cosmos/x/swingset/keeper/keeper.go b/golang/cosmos/x/swingset/keeper/keeper.go index b8cff0bae20..4cd00c2a935 100644 --- a/golang/cosmos/x/swingset/keeper/keeper.go +++ b/golang/cosmos/x/swingset/keeper/keeper.go @@ -259,3 +259,11 @@ func (k Keeper) SetMailbox(ctx sdk.Context, peer string, mailbox string) { path := StoragePathMailbox + "." + peer k.vstorageKeeper.LegacySetStorageAndNotify(ctx, path, mailbox) } + +func (k Keeper) PathToEncodedKey(path string) []byte { + return k.vstorageKeeper.PathToEncodedKey(path) +} + +func (k Keeper) GetStoreName() string { + return k.vstorageKeeper.GetStoreName() +} diff --git a/golang/cosmos/x/vstorage/vstorage.go b/golang/cosmos/x/vstorage/vstorage.go index f71c367d880..70bfe45dc18 100644 --- a/golang/cosmos/x/vstorage/vstorage.go +++ b/golang/cosmos/x/vstorage/vstorage.go @@ -20,6 +20,11 @@ type vstorageMessage struct { Value string `json:"value"` } +type vstorageStoreKey struct { + StoreName string `json:"storeName"` + StoreSubkey string `json:"storeSubkey"` +} + func NewStorageHandler(keeper Keeper) vstorageHandler { return vstorageHandler{keeper: keeper} } @@ -67,6 +72,17 @@ func (sh vstorageHandler) Receive(cctx *vm.ControllerContext, str string) (ret s } return string(bz), nil + case "getStoreKey": + value := vstorageStoreKey{ + StoreName: keeper.GetStoreName(), + StoreSubkey: string(keeper.PathToEncodedKey(msg.Path)), + } + bz, err := json.Marshal(value) + if err != nil { + return "", err + } + return string(bz), nil + case "has": value := keeper.GetData(cctx.Context, msg.Path) if value == "" { diff --git a/packages/agoric-cli/package.json b/packages/agoric-cli/package.json index bd0ef73b078..39d20ff9ad6 100644 --- a/packages/agoric-cli/package.json +++ b/packages/agoric-cli/package.json @@ -29,11 +29,13 @@ "dependencies": { "@agoric/access-token": "^0.4.18", "@agoric/assert": "^0.4.0", + "@agoric/casting": "^0.1.0", "@agoric/nat": "^4.1.0", "@endo/bundle-source": "^2.2.0", "@endo/captp": "^2.0.7", "@endo/compartment-mapper": "^0.7.5", "@endo/init": "^0.5.41", + "@endo/marshal": "^0.6.7", "@endo/promise-kit": "^0.2.41", "@iarna/toml": "^2.2.3", "anylogger": "^0.21.0", diff --git a/packages/agoric-cli/src/entrypoint.js b/packages/agoric-cli/src/entrypoint.js index d9d40c99f6b..9005d38f83d 100755 --- a/packages/agoric-cli/src/entrypoint.js +++ b/packages/agoric-cli/src/entrypoint.js @@ -4,6 +4,7 @@ import '@endo/init/pre.js'; import 'esm'; +import '@agoric/casting/node-fetch-shim.js'; import '@endo/init'; import path from 'path'; diff --git a/packages/agoric-cli/src/follow.js b/packages/agoric-cli/src/follow.js new file mode 100644 index 00000000000..00b3e7b2d68 --- /dev/null +++ b/packages/agoric-cli/src/follow.js @@ -0,0 +1,121 @@ +// @ts-check +import process from 'process'; +import { Far } from '@endo/marshal'; +import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js'; + +import { + delay, + iterateLatest, + makeFollower, + makeLeader, + makeCastingSpec, +} from '@agoric/casting'; + +export default async function followerMain(progname, rawArgs, powers, opts) { + const { anylogger } = powers; + const console = anylogger('agoric:follower'); + + const { + integrity, + output, + bootstrap = 'http://localhost:26657', + verbose, + sleep, + } = opts; + + /** @type {import('@agoric/casting').FollowerOptions} */ + const followerOptions = { + integrity, + }; + + /** @type {(buf: any) => any} */ + let formatOutput; + switch (output) { + case 'justinlines': + case 'justin': { + followerOptions.unserializer = null; + const pretty = !output.endsWith('lines'); + formatOutput = ({ body }) => { + const encoded = JSON.parse(body); + return decodeToJustin(encoded, pretty); + }; + break; + } + case 'jsonlines': + case 'json': { + const spaces = output.endsWith('lines') ? undefined : 2; + const bigintToStringReplacer = (_, arg) => { + if (typeof arg === 'bigint') { + return `${arg}`; + } + return arg; + }; + formatOutput = obj => JSON.stringify(obj, bigintToStringReplacer, spaces); + break; + } + case 'hex': { + // Dump as hex strings. + followerOptions.decode = buf => buf; + followerOptions.unserializer = null; + formatOutput = buf => + buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), ''); + break; + } + case 'text': { + followerOptions.decode = buf => new TextDecoder().decode(buf); + followerOptions.unserializer = null; + formatOutput = buf => buf; + break; + } + default: { + console.error(`Unknown output format: ${output}`); + return 1; + } + } + + if (integrity !== 'none') { + followerOptions.crasher = Far('follower crasher', { + crash: (...args) => { + console.error(...args); + console.warn(`You are running with '--integrity=${integrity}'`); + console.warn( + `If you trust your RPC nodes, you can turn off proofs with '--integrity=none'`, + ); + process.exit(1); + }, + }); + } + + // TODO: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + /** @type {import('@agoric/casting').LeaderOptions} */ + const leaderOptions = { + retryCallback: (e, _attempt) => { + verbose && console.warn('Retrying due to:', e); + return delay(1000 + Math.random() * 1000); + }, + keepPolling: async () => { + let toSleep = sleep * 1000; + if (toSleep <= 0) { + toSleep = (5 + Math.random()) * 1000; + } + await delay(toSleep); + return true; + }, + }; + + const [_cmd, ...specs] = rawArgs; + + verbose && console.warn('Creating leader for', bootstrap); + const leader = makeLeader(bootstrap, leaderOptions); + await Promise.all( + specs.map(async spec => { + verbose && console.warn('Following', spec); + const castingSpec = makeCastingSpec(spec); + const follower = makeFollower(leader, castingSpec, followerOptions); + for await (const { value } of iterateLatest(follower)) { + process.stdout.write(`${formatOutput(value)}\n`); + } + }), + ); + return 0; +} diff --git a/packages/agoric-cli/src/main.js b/packages/agoric-cli/src/main.js index 4e4a15fae32..6d32576e8ee 100644 --- a/packages/agoric-cli/src/main.js +++ b/packages/agoric-cli/src/main.js @@ -8,6 +8,7 @@ import initMain from './init.js'; import installMain from './install.js'; import setDefaultsMain from './set-defaults.js'; import startMain from './start.js'; +import followMain from './follow.js'; import walletMain from './open.js'; const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet'; @@ -160,6 +161,58 @@ const main = async (progname, rawArgs, powers) => { return subMain(installMain, ['install', forceSdkVersion], opts); }); + program + .command('follow ') + .description('follow an Agoric Casting leader') + .option( + '--integrity ', + 'set integrity mode', + value => { + assert( + ['strict', 'optimistic', 'none'].includes(value), + X`--integrity must be one of 'strict', 'optimistic', or 'none'`, + TypeError, + ); + return value; + }, + 'optimistic', + ) + .option( + '--sleep ', + 'sleep between polling (may be fractional)', + value => { + const num = Number(value); + assert.equal(`${num}`, value, X`--sleep must be a number`, TypeError); + return num; + }, + 0, + ) + .option( + '-o, --output ', + 'value output format', + value => { + assert( + [ + 'hex', + 'justin', + 'justinlines', + 'json', + 'jsonlines', + 'text', + ].includes(value), + X`--output must be one of 'hex', 'justin', 'justinlines', 'json', 'jsonlines', or 'text'`, + TypeError, + ); + return value; + }, + 'justin', + ) + .option('-B, --bootstrap ', 'network bootstrap configuration') + .action(async (pathSpecs, cmd) => { + const opts = { ...program.opts(), ...cmd.opts() }; + return subMain(followMain, ['follow', ...pathSpecs], opts); + }); + const addRunOptions = cmd => cmd .option( diff --git a/packages/agoric-cli/src/sdk-package-names.js b/packages/agoric-cli/src/sdk-package-names.js index d3822478673..6048ab9efe1 100644 --- a/packages/agoric-cli/src/sdk-package-names.js +++ b/packages/agoric-cli/src/sdk-package-names.js @@ -4,6 +4,7 @@ export default [ "@agoric/access-token", "@agoric/assert", + "@agoric/casting", "@agoric/cosmic-swingset", "@agoric/cosmos", "@agoric/deploy-script-support", diff --git a/packages/casting/README.md b/packages/casting/README.md new file mode 100644 index 00000000000..a26a3cb3eeb --- /dev/null +++ b/packages/casting/README.md @@ -0,0 +1,76 @@ +# Agoric Casting + +This [Agoric](https://agoric.com) Casting package follows ocap broadcasts in a +flexible, future-proof way. + +TL;DR: You can run `yarn demo`, or to follow a mailbox castingSpec do: +```sh +npx agoric follow -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext +``` + +An example of following an on-chain mailbox in code (using this package) is: + +```js +// First, obtain a Hardened JS environment via Endo. +import '@endo/init/pre-remoting.js'; // needed only for the next line +import '@agoric/castingSpec/node-fetch-shim.js'; // needed for Node.js +import '@endo/init'; + +import { + iterateLatest, + makeFollower, + makeLeader, + makeCastingSpec, +} from '@agoric/casting'; + +// Iterate over a mailbox follower on the devnet. +const leader = makeLeader('https://devnet.agoric.net/network-config'); +const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); +const follower = makeFollower(leader, castingSpec); +for await (const { value } of iterateLatest(follower)) { + console.log(`here's a mailbox value`, value); +} +``` + +## Follower options + +The `followerOpts` argument in `makeFollower(leader, key, followerOpts)` provides an optional bag of options: +- the `integrity` option, which has three possibilities: + - `'strict'` - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block), + - `'optimistic'` (default) - release data immediately, but may crash the follower in the future if an already-released value could not be proven, + - `'none'` - release data immediately without validation +- the `decode` option is a function to translate `buf: Uint8Array` into `data: string` + - (default) - interpret buf as a utf-8 string, then `JSON.parse` it +- the `unserializer` option can be + - (default) - release unserialized objects using `@agoric/marshal`'s `makeMarshal()` + - `null` - don't additionally unserialize data before releasing it + - any unserializer object supporting `E(unserializer).unserialize(data)` +- the `crasher` option can be + - `null` (default) follower failures only propagate an exception/rejection + - any crasher object supporting `E(crasher).crash(reason)` + +## Behind the scenes + +- the network config contains enough information to obtain Tendermint RPC nodes + for a given Agoric network. You can use `makeLeaderFromRpcAddresses` directly + if you want to avoid fetching a network-config. +- each follower uses periodic CosmJS state polling (every X milliseconds) which + can be refreshed more expediently via a Tendermint subscription to the + corresponding `state_change` event +- published (string) values are automatically unmarshalled, but without object references. a custom `marshaller` for your application. +- the `iterateRecent` adapter transforms a follower into a local async iterator + that produces only the last queried value (with no history reconstruction) + +## Status + +This package currently depends on: +- Hardened Javascript +- `@agoric/notify` async iterable adapters to implement `iterateLatest` +- `@endo/marshal` for default object unserialization +- [CosmJS](https://github.com/cosmos/cosmjs) for proof verification, although [it does not yet support light client tracking of the validator set](https://github.com/cosmos/cosmjs/issues/492) +- a bespoke follower of [WebSocket Tendermint events](https://docs.tendermint.com/master/tendermint-core/subscription.html#legacy-streaming-api) + +Short-term goals: +- integrate the new [SharedSubscription API](https://github.com/Agoric/agoric-sdk/pull/5418#discussion_r886253328) from the [Agoric/agoric-sdk#5418 `makePublisherKit` PR](https://github.com/Agoric/agoric-sdk/pull/5418) +- support `iterateEach` with the [lossless forward iteration algorithm](https://github.com/Agoric/agoric-sdk/blob/mfig-vstream/golang/cosmos/x/vstream/spec/01_concepts.md#forward-iteration-lossless-history) via the [Agoric/agoric-sdk#5466 `x/vstream` PR](https://github.com/Agoric/agoric-sdk/pull/5466) +- upgrade to the [Tendermint event log API](https://docs.tendermint.com/master/tendermint-core/subscription.html#event-log-api) when Tendermint v0.36 is supported by the Agoric chain diff --git a/packages/casting/jsconfig.json b/packages/casting/jsconfig.json new file mode 100644 index 00000000000..1dc50dd301c --- /dev/null +++ b/packages/casting/jsconfig.json @@ -0,0 +1,24 @@ +// This file can contain .js-specific Typescript compiler config. +{ + "compilerOptions": { + "target": "esnext", + "module": "esnext", + + "noEmit": true, +/* + // The following flags are for creating .d.ts files: + "noEmit": false, + "declaration": true, + "emitDeclarationOnly": true, +*/ + "downlevelIteration": true, + "strictNullChecks": true, + "moduleResolution": "node", + }, + "include": [ + "*.js", + "public/**/*.js", + "src/**/*.js", + "test/**/*.js", + ], +} diff --git a/packages/casting/node-fetch-shim.js b/packages/casting/node-fetch-shim.js new file mode 100644 index 00000000000..c444f9ab9d7 --- /dev/null +++ b/packages/casting/node-fetch-shim.js @@ -0,0 +1,4 @@ +/* global globalThis */ +import fetch from 'node-fetch'; + +globalThis.fetch = fetch; diff --git a/packages/casting/package.json b/packages/casting/package.json new file mode 100644 index 00000000000..fe5a8ba96e2 --- /dev/null +++ b/packages/casting/package.json @@ -0,0 +1,55 @@ +{ + "name": "@agoric/casting", + "version": "0.1.0", + "description": "Agoric's OCap broadcastingSpec system", + "type": "module", + "main": "src/main.js", + "repository": "https://github.com/Agoric/agoric-sdk", + "scripts": { + "build": "exit 0", + "demo": "node -e 'import(\"./test/fake-rpc-server.js\").then(ns => ns.develop())'", + "test": "ava", + "test:c8": "c8 $C8_OPTIONS ava --config=ava-nesm.config.js", + "test:xs": "exit 0", + "lint-fix": "yarn lint:eslint --fix", + "lint": "run-s --continue-on-error lint:*", + "lint:types": "tsc --maxNodeModuleJsDepth 4 -p jsconfig.json", + "lint:eslint": "eslint --ext .js,.ts ." + }, + "keywords": [], + "author": "Agoric", + "license": "Apache-2.0", + "dependencies": { + "@agoric/notifier": "^0.4.0", + "@agoric/spawner": "^0.5.1", + "@cosmjs/encoding": "^0.28.4", + "@cosmjs/proto-signing": "^0.28.4", + "@cosmjs/stargate": "^0.28.4", + "@cosmjs/tendermint-rpc": "^0.28.4", + "@endo/far": "^0.2.3", + "@endo/init": "^0.5.41", + "@endo/lockdown": "^0.1.13", + "@endo/marshal": "^0.6.7", + "@endo/promise-kit": "^0.2.41", + "node-fetch": "^2.6.0" + }, + "devDependencies": { + "ava": "^3.12.1", + "c8": "^7.7.2", + "express": "^4.17.1", + "ws": "^7.2.0", + "@endo/ses-ava": "^0.2.25" + }, + "publishConfig": { + "access": "public" + }, + "engines": { + "node": ">=14.15.0" + }, + "ava": { + "files": [ + "test/**/test-*.js" + ], + "timeout": "20m" + } +} diff --git a/packages/casting/src/casting-spec.js b/packages/casting/src/casting-spec.js new file mode 100644 index 00000000000..aa5bffa81a8 --- /dev/null +++ b/packages/casting/src/casting-spec.js @@ -0,0 +1,59 @@ +// @ts-check +import { toAscii } from '@cosmjs/encoding'; + +/** + * @param {string} storagePath + * @returns {import('./types').CastingSpec} + */ +const swingsetPathToCastingSpec = storagePath => + harden({ + storeName: 'swingset', + storeSubkey: toAscii(`swingset/data:${storagePath}`), + }); + +const PATH_SEPARATOR_BYTE = '.'.charCodeAt(0); +const DATA_PREFIX_BYTES = new Uint8Array([0]); + +/** + * @param {string} storagePath + * @param {string} [storeName] + * @returns {import('./types').CastingSpec} + */ +const vstoragePathToCastingSpec = (storagePath, storeName = 'vstorage') => { + const elems = storagePath ? storagePath.split('.') : []; + const buf = toAscii(`${elems.length}.${storagePath}`); + return harden({ + storeName, + storeSubkey: buf.map(b => (b === PATH_SEPARATOR_BYTE ? 0 : b)), + dataPrefixBytes: DATA_PREFIX_BYTES, + }); +}; + +export const DEFAULT_PATH_CONVERTER = vstoragePathToCastingSpec; + +/** + * @type {Record import('./types').CastingSpec>} + */ +export const pathPrefixToConverters = harden({ + 'swingset:': swingsetPathToCastingSpec, + 'vstore:': vstoragePathToCastingSpec, + ':': DEFAULT_PATH_CONVERTER, +}); + +/** + * @param {string} specString + * @returns {import('./types').CastingSpec} + */ +export const makeCastingSpec = specString => { + assert.typeof(specString, 'string'); + const match = specString.match(/^([^:.]*:)(.*)/); + assert( + match, + `spec string ${specString} does not match 'PREFIX:PATH' or ':PATH'`, + ); + const kind = match[1]; + const storePath = match[2]; + const converter = pathPrefixToConverters[kind]; + assert(converter, `Unknown pathKind ${kind}`); + return converter(storePath); +}; diff --git a/packages/casting/src/change-follower.js b/packages/casting/src/change-follower.js new file mode 100644 index 00000000000..93e738456c5 --- /dev/null +++ b/packages/casting/src/change-follower.js @@ -0,0 +1,41 @@ +// @ts-check +import { E, Far } from '@endo/far'; +import { DEFAULT_KEEP_POLLING } from './defaults.js'; + +/** + * Just return an unspecified allegedValue every poll period. + * + * @param {import('./types').Leader} leader + * @param {import('./types.js').CastingSpec} castingSpec + * @returns {Promise>} + */ +export const makePollingChangeFollower = async (leader, castingSpec) => { + const { keepPolling = DEFAULT_KEEP_POLLING } = await E(leader).getOptions(); + return Far('polling change follower', { + getLatestIterable: () => + Far('polling change follower iterable', { + [Symbol.asyncIterator]: () => { + /** @type {Promise | undefined} */ + let nextPollPromise; + return Far('polling change follower iterator', { + next: async () => { + if (!nextPollPromise) { + nextPollPromise = keepPolling(); + } + const keepGoing = await nextPollPromise; + nextPollPromise = undefined; + const change = harden({ + castingSpec, + // Make no warrant as to the values. + values: [], + }); + return harden({ + value: change, + done: !keepGoing, + }); + }, + }); + }, + }), + }); +}; diff --git a/packages/casting/src/defaults.js b/packages/casting/src/defaults.js new file mode 100644 index 00000000000..0bc766ce799 --- /dev/null +++ b/packages/casting/src/defaults.js @@ -0,0 +1,56 @@ +// @ts-check +/* global setTimeout */ +import { Far } from '@endo/far'; +import { makeMarshal } from '@endo/marshal'; + +/** + * Resolve a Promise after a given number of milliseconds. + * + * @param {number} ms + * @returns {Promise} + */ +export const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); + +/** + * Report an error, then retry the leader operation after a second or two. + * + * @param {any} err + * @param {number} _attempt + * @returns {Promise} + */ +export const DEFAULT_RETRY_CALLBACK = (err, _attempt = 0) => { + console.warn('retrying after error', err); + // TODO: `delay(Math.random() * Math.min(cap, base * 2 ** attempt)) + return delay(1000 + Math.random() * 1000); +}; + +/** + * Return true after we want to be sure we received latest state something. + * + * @returns {Promise} + */ +export const DEFAULT_KEEP_POLLING = () => + // TOOD: Remove this when the event-driven stuff is in place. + delay(5000 + Math.random() * 1000).then(() => true); +// ... and uses this instead. +// delay(10 * 60 * 1000 + Math.random() * 60_000).then(() => true); + +/** + * Decode utf-8 bytes, then parse the resulting JSON. + * + * @param {Uint8Array} buf + */ +export const DEFAULT_DECODER = harden(buf => { + const td = new TextDecoder(); + const str = td.decode(buf); + return harden(JSON.parse(str)); +}); + +/** + * Unserialize the JSONable data. + * + * @type {import('./types').Unserializer} + */ +export const DEFAULT_UNSERIALIZER = Far('marshal unserializer', { + unserialize: makeMarshal().unserialize, +}); diff --git a/packages/casting/src/follower-cosmjs.js b/packages/casting/src/follower-cosmjs.js new file mode 100644 index 00000000000..eebdd07f275 --- /dev/null +++ b/packages/casting/src/follower-cosmjs.js @@ -0,0 +1,307 @@ +// @ts-check +import { E, Far } from '@endo/far'; +import { makeNotifierKit } from '@agoric/notifier'; +import { Tendermint34Client } from '@cosmjs/tendermint-rpc'; +import { QueryClient } from '@cosmjs/stargate'; + +import { makeAsyncIterableFromNotifier, iterateLatest } from './iterable.js'; +import { DEFAULT_DECODER, DEFAULT_UNSERIALIZER } from './defaults.js'; + +const { details: X } = assert; + +/** @template T @typedef {import('./types.js').FollowerElement} FollowerElement */ +/** @template T @typedef {import('./types.js').Follower} Follower */ + +/** + * @template T + * @param {Iterable} values + * @returns {T} + */ +const collectSingle = values => { + /** @type {T[]} */ + const head = []; + let count = 0; + for (const value of values) { + count += 1; + if (count === 1) { + head.push(value); + } else { + assert.fail(`expected single value, got at least ${count}`); + } + } + + assert.equal(head.length, 1, 'expected single value'); + return head[0]; +}; + +/** + * @callback QueryVerifier + * @param {() => Promise} getProvenValue + * @param {(reason?: unknown) => void} crash + * @param {() => Promise} getAllegedValue + */ + +/** + * @type {Record['integrity'], QueryVerifier>} + */ +export const integrityToQueryVerifier = harden({ + strict: async (getProvenValue, crash, _getAllegedValue) => { + // Just ignore the alleged value. + // Crash hard if we can't prove. + return getProvenValue().catch(crash); + }, + none: async (_getProvenValue, _crash, getAllegedValue) => { + // Fast and loose. + return getAllegedValue(); + }, + optimistic: async (getProvenValue, crash, getAllegedValue) => { + const allegedValue = await getAllegedValue(); + // Prove later, since it may take time we say we can't afford. + getProvenValue().then(provenValue => { + if (provenValue.length === allegedValue.length) { + if (provenValue.every((proven, i) => proven === allegedValue[i])) { + return; + } + } + crash( + assert.error( + X`Alleged value ${allegedValue} did not match proof ${provenValue}`, + ), + ); + }, crash); + + // Speculate that we got the right value. + return allegedValue; + }, +}); + +/** + * @template T + * @param {ERef} leader + * @param {import('./types').CastingSpec} castingSpec + * @param {import('./types').FollowerOptions} options + * @returns {Follower>} + */ +export const makeFollower = (leader, castingSpec, options = {}) => { + const { + decode = DEFAULT_DECODER, + unserializer = DEFAULT_UNSERIALIZER, + integrity = 'optimistic', + crasher = null, + } = options; + const { + storeName, + storeSubkey, + dataPrefixBytes = new Uint8Array(), + } = castingSpec; + + /** @type {QueryVerifier} */ + const queryVerifier = integrityToQueryVerifier[integrity]; + assert(queryVerifier, X`unrecognized follower integrity mode ${integrity}`); + + /** @type {Map} */ + const endpointToQueryClient = new Map(); + + /** + * @param {string} endpoint + */ + const getOrCreateQueryClient = async endpoint => { + if (endpointToQueryClient.has(endpoint)) { + // Cache hit. + const queryClient = endpointToQueryClient.get(endpoint); + assert(queryClient); + return queryClient; + } + // Create a new client. They retry automatically. + const rpcClient = await Tendermint34Client.connect(endpoint); + const queryClient = QueryClient.withExtensions(rpcClient); + endpointToQueryClient.set(endpoint, queryClient); + return queryClient; + }; + + /** + * @param {'queryVerified' | 'queryUnverified'} method + * @param {string} queryPath + */ + const makeQuerier = + (method, queryPath) => + /** + * @param {number} [height] + * @returns {Promise} + */ + async height => { + const values = await E(leader).mapEndpoints(async endpoint => { + const queryClient = await getOrCreateQueryClient(endpoint); + return E(queryClient) + [method](queryPath, storeSubkey, height) + .then( + result => { + return { result, error: null }; + }, + error => { + return { result: null, error }; + }, + ); + }); + const { result, error } = collectSingle(values); + if (error !== null) { + throw error; + } + assert(result); + + if (result.length === 0) { + // No data. + return result; + } + + // Handle the data prefix if any. + assert( + result.length >= dataPrefixBytes.length, + X`result too short for data prefix ${dataPrefixBytes}`, + ); + assert( + dataPrefixBytes.every((v, i) => v === result[i]), + X`${result} doesn't start with data prefix ${dataPrefixBytes}`, + ); + return result.slice(dataPrefixBytes.length); + }; + + const getProvenValueAtHeight = makeQuerier('queryVerified', storeName); + const getUnprovenValueAtHeight = makeQuerier( + 'queryUnverified', + `store/${storeName}/key`, + ); + + // Enable the periodic fetch. + /** @type {Follower>} */ + return Far('chain follower', { + getLatestIterable: () => { + /** @type {NotifierRecord>} */ + const { updater, notifier } = makeNotifierKit(); + let finished = false; + + const fail = err => { + finished = true; + updater.fail(err); + return false; + }; + + const crash = err => { + fail(err); + if (crasher) { + E(crasher) + .crash(`PROOF VERIFICATION FAILURE; crashing follower`, err) + .catch(e => assert(false, X`crashing follower failed: ${e}`)); + } else { + console.error(`PROOF VERIFICATION FAILURE; crashing follower`, err); + } + }; + + let attempt = 0; + const retryOrFail = err => { + E(leader) + .retry(err, attempt) + .catch(e => { + fail(e); + throw e; + }); + attempt += 1; + }; + + /** + * These semantics are to ensure that later queries are not committed + * ahead of earlier ones. + * + * @template T + * @param {(...args: T[]) => void} commitAction + */ + const makePrepareInOrder = commitAction => { + let lastPrepareTicket = 0n; + let lastCommitTicket = 0n; + + const prepareInOrder = () => { + lastPrepareTicket += 1n; + const ticket = lastPrepareTicket; + assert(ticket > lastCommitTicket); + const committer = Far('committer', { + isValid: () => ticket > lastCommitTicket, + /** + * @type {(...args: T[]) => void} + */ + commit: (...args) => { + assert(committer.isValid()); + lastCommitTicket = ticket; + commitAction(...args); + }, + }); + return committer; + }; + return prepareInOrder; + }; + + const prepareUpdateInOrder = makePrepareInOrder(updater.updateState); + + /** @type {Uint8Array} */ + let lastBuf; + + /** + * @param {import('./types').CastingChange} allegedChange + */ + const queryAndUpdateOnce = async allegedChange => { + const committer = prepareUpdateInOrder(); + + // Make an unproven query if we have no alleged value. + const { values: allegedValues, blockHeight: allegedBlockHeight } = + allegedChange; + const getAllegedValue = + allegedValues.length > 0 + ? () => Promise.resolve(allegedValues[allegedValues.length - 1]) + : () => getUnprovenValueAtHeight(allegedBlockHeight); + const getProvenValue = () => getProvenValueAtHeight(allegedBlockHeight); + + const buf = await queryVerifier(getProvenValue, crash, getAllegedValue); + attempt = 0; + if (!committer.isValid()) { + return; + } + if (lastBuf) { + if (buf.length === lastBuf.length) { + if (buf.every((v, i) => v === lastBuf[i])) { + // Duplicate! + return; + } + } + } + lastBuf = buf; + const data = decode(buf); + if (!unserializer) { + /** @type {T} */ + const value = data; + committer.commit({ value }); + return; + } + const value = await E(unserializer).unserialize(data); + if (!committer.isValid()) { + return; + } + committer.commit({ value }); + }; + + const changeFollower = E(leader).watchCasting(castingSpec); + const queryWhenKeyChanges = async () => { + for await (const allegedChange of iterateLatest(changeFollower)) { + if (finished) { + return; + } + harden(allegedChange); + await queryAndUpdateOnce(allegedChange).catch(retryOrFail); + } + }; + + queryAndUpdateOnce({ values: [], castingSpec }).catch(retryOrFail); + queryWhenKeyChanges().catch(fail); + + return makeAsyncIterableFromNotifier(notifier); + }, + }); +}; diff --git a/packages/casting/src/iterable.js b/packages/casting/src/iterable.js new file mode 100644 index 00000000000..2275959c398 --- /dev/null +++ b/packages/casting/src/iterable.js @@ -0,0 +1,46 @@ +// @ts-check +import { E } from '@endo/far'; + +/** + * Consume a notifier, only returning next when there is a new publication. + * + * @template T + * @param {Notifier} notifier + */ +export const makeAsyncIterableFromNotifier = notifier => + harden({ + [Symbol.asyncIterator]: () => { + /** @type {UpdateCount} */ + let lastUpdateCount = 0; + return harden({ + next: async () => { + const { value, updateCount } = await notifier.getUpdateSince( + lastUpdateCount, + ); + lastUpdateCount = updateCount; + return { + value, + done: lastUpdateCount === undefined, + }; + }, + }); + }, + }); + +/** + * @template T + * @param {ERef>} follower + */ +export const iterateLatest = follower => { + // For now, just pass through the iterable. + return harden({ + /** @returns {AsyncIterator} */ + [Symbol.asyncIterator]: () => { + const latestIterable = E(follower).getLatestIterable(); + const iterator = E(latestIterable)[Symbol.asyncIterator](); + return harden({ + next: () => E(iterator).next(), + }); + }, + }); +}; diff --git a/packages/casting/src/leader-netconfig.js b/packages/casting/src/leader-netconfig.js new file mode 100644 index 00000000000..6861e2c86a1 --- /dev/null +++ b/packages/casting/src/leader-netconfig.js @@ -0,0 +1,70 @@ +// @ts-check +/* global fetch */ +import { makeRoundRobinLeader } from './leader.js'; +import { DEFAULT_RETRY_CALLBACK } from './defaults.js'; + +const { details: X } = assert; + +/** + * @param {string[]} rpcAddrs + * @param {import('./types.js').LeaderOptions} [leaderOptions] + */ +export const makeLeaderFromRpcAddresses = (rpcAddrs, leaderOptions) => { + assert(Array.isArray(rpcAddrs), X`rpcAddrs ${rpcAddrs} must be an array`); + + const rpcHrefs = rpcAddrs.map(rpcAddr => { + assert.typeof(rpcAddr, 'string', X`rpcAddr ${rpcAddr} must be a string`); + // Don't remove explicit port numbers from the URL, because the Cosmos + // `--node=xxx` flag requires them (it doesn't just assume that + // `--node=https://testnet.rpc.agoric.net` is the same as + // `--node=https://testnet.rpc.agoric.net:443`) + return rpcAddr.includes('://') ? rpcAddr : `http://${rpcAddr}`; + }); + + return makeRoundRobinLeader(rpcHrefs, leaderOptions); +}; + +/** + * @param {string} netconfigURL + * @param {import('./types.js').LeaderOptions} [options] + */ +export const makeLeaderFromNetworkConfig = (netconfigURL, options = {}) => { + const { retryCallback = DEFAULT_RETRY_CALLBACK } = options; + /** @type {import('./types.js').LeaderOptions['retryCallback']} */ + const retry = async (err, attempt) => { + if (retryCallback) { + return retryCallback(err, attempt); + } + throw err; + }; + let attempt = 0; + return new Promise((resolve, reject) => { + const makeLeader = async () => { + const response = await fetch(netconfigURL, { + headers: { accept: 'application/json' }, + }); + const { rpcAddrs } = await response.json(); + // Our part succeeded, so reset the attempt counter. + attempt = 0; + return makeLeaderFromRpcAddresses(rpcAddrs, options); + }; + const retryLeader = async err => { + retry(err, attempt) + .then(() => makeLeader().then(resolve, retryLeader)) + .catch(reject); + attempt += 1; + }; + makeLeader().then(resolve, retryLeader); + }); +}; + +/** + * @param {string} bootstrap + * @param {import('./types.js').LeaderOptions} options + */ +export const makeLeader = (bootstrap, options) => { + if (bootstrap.includes('network-config')) { + return makeLeaderFromNetworkConfig(bootstrap, options); + } + return makeLeaderFromRpcAddresses([bootstrap], options); +}; diff --git a/packages/casting/src/leader.js b/packages/casting/src/leader.js new file mode 100644 index 00000000000..1f89946ab70 --- /dev/null +++ b/packages/casting/src/leader.js @@ -0,0 +1,68 @@ +// @ts-check +import { E, Far } from '@endo/far'; +import { DEFAULT_RETRY_CALLBACK } from './defaults.js'; +import { shuffle } from './shuffle.js'; +import { makePollingChangeFollower } from './change-follower.js'; + +/** + * Create a chain leader that rotates through a list of endpoints. + * + * @param {string[]} endpoints + * @param {import('./types.js').LeaderOptions} leaderOptions + */ +export const makeRoundRobinLeader = (endpoints, leaderOptions = {}) => { + const { retryCallback = DEFAULT_RETRY_CALLBACK } = leaderOptions; + + // Shuffle the RPC addresses, so that we don't always hit the same one as all + // our peers. + shuffle(endpoints); + + let lastRespondingEndpointIndex = 0; + let thisAttempt = 0; + + /** @type {import('./types.js').Leader} */ + const leader = Far('round robin leader', { + getOptions: () => leaderOptions, + retry: async (err, attempt) => { + if (retryCallback) { + return retryCallback(err, attempt); + } + throw err; + }, + watchCasting: castingSpec => makePollingChangeFollower(leader, castingSpec), + /** + * @template T + * @param {(endpoint: string) => Promise} callback + */ + mapEndpoints: async callback => { + /** @type {Promise} */ + const p = new Promise((resolve, reject) => { + let endpointIndex = lastRespondingEndpointIndex; + + const retry = async err => { + endpointIndex = (endpointIndex + 1) % endpoints.length; + + // eslint-disable-next-line no-use-before-define + E(leader).retry(err, thisAttempt).then(applyOne, reject); + thisAttempt += 1; + }; + + const applyOne = () => { + Promise.resolve() + .then(() => callback(endpoints[endpointIndex])) + .then(res => { + resolve(harden([res])); + lastRespondingEndpointIndex = endpointIndex; + thisAttempt = 0; + }, retry); + + // Don't return to prevent a promise chain. + }; + + applyOne(); + }); + return p; + }, + }); + return leader; +}; diff --git a/packages/casting/src/main.js b/packages/casting/src/main.js new file mode 100644 index 00000000000..4a597fa0785 --- /dev/null +++ b/packages/casting/src/main.js @@ -0,0 +1,8 @@ +// eslint-disable-next-line import/export +export * from './types.js'; // no named exports +export * from './defaults.js'; +export * from './leader-netconfig.js'; +export * from './follower-cosmjs.js'; +export * from './casting-spec.js'; +export * from './leader.js'; +export * from './iterable.js'; diff --git a/packages/casting/src/shuffle.js b/packages/casting/src/shuffle.js new file mode 100644 index 00000000000..9300ab8ad97 --- /dev/null +++ b/packages/casting/src/shuffle.js @@ -0,0 +1,16 @@ +// @ts-check + +/** + * Modern version of Fisher-Yates shuffle algorithm (in-place). + * + * @template T + * @param {Array} a + */ +export const shuffle = a => { + for (let i = a.length - 1; i > 0; i -= 1) { + const j = Math.floor(Math.random() * (i + 1)); + const x = a[i]; + a[i] = a[j]; + a[j] = x; + } +}; diff --git a/packages/casting/src/types.js b/packages/casting/src/types.js new file mode 100644 index 00000000000..a0e73e87eee --- /dev/null +++ b/packages/casting/src/types.js @@ -0,0 +1,62 @@ +// @ts-check + +// Make this a module. +export {}; + +/** + * @typedef {object} LeaderOptions + * @property {null | ((err: any, attempt?: number) => Promise)} [retryCallback] + * @property {() => Promise} [keepPolling] + */ + +/** + * @typedef {object} CastingChange + * @property {CastingSpec} castingSpec + * @property {number} [blockHeight] + * @property {Uint8Array[]} values + */ + +/** + * @typedef {object} Leader + * @property {(error: any, attempt?: number) => Promise} retry + * @property {() => LeaderOptions} getOptions + * @property {(callback: (endpoint: string) => Promise) => Promise} mapEndpoints + * @property {(key: CastingSpec) => Promise>} watchCasting + */ + +/** + * @template T + * @typedef {object} Follower + * @property {() => AsyncIterable} [getLatestIterable] + */ + +/** + * @template T + * @typedef {object} FollowerElement + * @property {T} value + */ + +/** + * @typedef {object} Unserializer + * @property {(data: import('@endo/marshal').CapData) => any} unserialize + */ + +/** + * @typedef {object} Crasher + * @property {(...args: unknown[]) => void} crash + */ + +/** + * @typedef {object} FollowerOptions + * @property {null | import('@endo/far').FarRef} [unserializer] + * @property {(buf: Uint8Array) => any} [decode] + * @property {'strict'|'optimistic'|'none'} [integrity] + * @property {import('@endo/far').FarRef} [crasher] + */ + +/** + * @typedef {object} CastingSpec + * @property {string} storeName + * @property {Uint8Array} storeSubkey + * @property {Uint8Array} [dataPrefixBytes] + */ diff --git a/packages/casting/test/fake-rpc-server.js b/packages/casting/test/fake-rpc-server.js new file mode 100644 index 00000000000..dc95bd892e4 --- /dev/null +++ b/packages/casting/test/fake-rpc-server.js @@ -0,0 +1,238 @@ +// @ts-check +import './lockdown.js'; + +import { makeMarshal } from '@endo/marshal'; + +import express from 'express'; +// import morgan from 'morgan'; + +import { toAscii, toBase64 } from '@cosmjs/encoding'; + +let lastPort = 8989; + +const fakeStatusResult = { + node_info: { + protocol_version: { + p2p: '7', + block: '10', + app: '0', + }, + id: '5576458aef205977e18fd50b274e9b5d9014525a', + listen_addr: 'tcp://0.0.0.0:26656', + network: 'cosmoshub-2', + version: '0.32.1', + channels: '4020212223303800', + moniker: 'moniker-node', + other: { + tx_index: 'on', + rpc_address: 'tcp://0.0.0.0:26657', + }, + }, + sync_info: { + latest_block_hash: + '790BA84C3545FCCC49A5C629CEE6EA58A6E875C3862175BDC11EE7AF54703501', + latest_app_hash: + 'C9AEBB441B787D9F1D846DE51F3826F4FD386108B59B08239653ABF59455C3F8', + latest_block_height: '1262196', + latest_block_time: '2019-08-01T11:52:22.818762194Z', + earliest_block_hash: + '790BA84C3545FCCC49A5C629CEE6EA58A6E875C3862175BDC11EE7AF54703501', + earliest_app_hash: + 'C9AEBB441B787D9F1D846DE51F3826F4FD386108B59B08239653ABF59455C3F8', + earliest_block_height: '1262196', + earliest_block_time: '2019-08-01T11:52:22.818762194Z', + max_leader_block_height: '1262196', + catching_up: false, + total_synced_time: '1000000000', + remaining_time: '0', + total_snapshots: '10', + chunk_process_avg_time: '1000000000', + snapshot_height: '1262196', + snapshot_chunks_count: '10', + snapshot_chunks_total: '100', + backfilled_blocks: '10', + backfill_blocks_total: '100', + }, + validator_info: { + address: '5D6A51A8E9899C44079C6AF90618BA0369070E6E', + pub_key: { + type: 'tendermint/PubKeyEd25519', + value: 'A6DoBUypNtUAyEHWtQ9bFjfNg8Bo9CrnkUGl6k6OHN4=', + }, + voting_power: '0', + }, +}; + +export const startFakeServer = (t, fakeValues) => { + const { log = console.log } = t; + const { serialize } = makeMarshal(); + lastPort += 1; + const PORT = lastPort; + return new Promise(resolve => { + log('starting http server on port', PORT); + const app = express(); + // app.use(morgan()); + app.use((req, _res, next) => { + log('request', req.method, req.url); + next(); + }); + app.use(express.json()); + app.get('/bad-network-config', (req, res) => { + res.json({ + rpcAddrs: 'not an array', + }); + }); + app.get('/network-config', (req, res) => { + res.json({ + rpcAddrs: [`http://localhost:${PORT}/tendermint-rpc`], + }); + }); + + const dataPrefix = new Uint8Array([0]); + const encode = obj => { + const str = JSON.stringify(obj); + const ascii = toAscii(str); + const buf = new Uint8Array(dataPrefix.length + ascii.length); + buf.set(dataPrefix); + buf.set(ascii, dataPrefix.length); + return toBase64(buf); + }; + app.post('/tendermint-rpc', (req, res) => { + log('received', req.path, req.body, req.params); + const reply = result => { + log('response', result); + res.json({ + jsonrpc: '2.0', + id: req.body.id, + result, + }); + }; + switch (req.body.method) { + case 'status': { + reply(fakeStatusResult); + break; + } + case 'abci_query': { + const value = + fakeValues.length === 0 + ? null + : encode(serialize(fakeValues.shift())); + const result = { + response: { + code: 0, + log: '', + info: '', + index: '0', + key: 'c3dpbmdzZXQvZGF0YTptYWlsYm94LmFnb3JpYzFmb29iYXJiYXo=', + value, + proofOps: null, + height: '74863', + codespace: '', + }, + }; + reply(result); + break; + } + default: { + res.sendStatus(400); + } + } + }); + const listener = app.listen(PORT, () => { + log('started http server on', PORT); + const cleanup = () => { + log('shutting down http server on', PORT); + listener.close(); + }; + t.context.cleanups.push(cleanup); + resolve(PORT); + }); + }); +}; + +export const jsonPairs = harden([ + // Justin is the same as the JSON encoding but without unnecessary quoting + ['[1,2]', '[1,2]'], + ['{"foo":1}', '{foo:1}'], + ['{"a":1,"b":2}', '{a:1,b:2}'], + ['{"a":1,"b":{"c":3}}', '{a:1,b:{c:3}}'], + ['true', 'true'], + ['1', '1'], + ['"abc"', '"abc"'], + ['null', 'null'], + + // Primitives not representable in JSON + ['{"@qclass":"undefined"}', 'undefined'], + ['{"@qclass":"NaN"}', 'NaN'], + ['{"@qclass":"Infinity"}', 'Infinity'], + ['{"@qclass":"-Infinity"}', '-Infinity'], + ['{"@qclass":"bigint","digits":"4"}', '4n'], + ['{"@qclass":"bigint","digits":"9007199254740993"}', '9007199254740993n'], + ['{"@qclass":"symbol","name":"@@asyncIterator"}', 'Symbol.asyncIterator'], + ['{"@qclass":"symbol","name":"@@match"}', 'Symbol.match'], + ['{"@qclass":"symbol","name":"foo"}', 'Symbol.for("foo")'], + ['{"@qclass":"symbol","name":"@@@@foo"}', 'Symbol.for("@@foo")'], + + // Arrays and objects + ['[{"@qclass":"undefined"}]', '[undefined]'], + ['{"foo":{"@qclass":"undefined"}}', '{foo:undefined}'], + ['{"@qclass":"error","message":"","name":"Error"}', 'Error("")'], + [ + '{"@qclass":"error","message":"msg","name":"ReferenceError"}', + 'ReferenceError("msg")', + ], + + // The one case where JSON is not a semantic subset of JS + ['{"__proto__":8}', '{["__proto__"]:8}'], + + // The Hilbert Hotel is always tricky + ['{"@qclass":"hilbert","original":8}', '{"@qclass":8}'], + ['{"@qclass":"hilbert","original":"@qclass"}', '{"@qclass":"@qclass"}'], + [ + '{"@qclass":"hilbert","original":{"@qclass":"hilbert","original":8}}', + '{"@qclass":{"@qclass":8}}', + ], + [ + '{"@qclass":"hilbert","original":{"@qclass":"hilbert","original":8,"rest":{"foo":"foo1"}},"rest":{"bar":{"@qclass":"hilbert","original":{"@qclass":"undefined"}}}}', + '{"@qclass":{"@qclass":8,foo:"foo1"},bar:{"@qclass":undefined}}', + ], + + // tagged + ['{"@qclass":"tagged","tag":"x","payload":8}', 'makeTagged("x",8)'], + [ + '{"@qclass":"tagged","tag":"x","payload":{"@qclass":"undefined"}}', + 'makeTagged("x",undefined)', + ], + + // Slots + [ + '[{"@qclass":"slot","iface":"Alleged: for testing Justin","index":0}]', + '[slot(0,"Alleged: for testing Justin")]', + ], + // Tests https://github.com/endojs/endo/issues/1185 fix + [ + '[{"@qclass":"slot","iface":"Alleged: for testing Justin","index":0},{"@qclass":"slot","index":0}]', + '[slot(0,"Alleged: for testing Justin"),slot(0)]', + ], +]); + +export const develop = async () => { + const { unserialize } = makeMarshal(); + const fakeValues = harden( + jsonPairs.map(([jsonMarshalled]) => + unserialize({ body: jsonMarshalled, slots: [] }), + ), + ); + const mockT = { + log: console.log, + context: { cleanups: [] }, + }; + const PORT = await startFakeServer(mockT, [...fakeValues]); + console.log( + `Try this in another terminal: + agoric follower :fake.path --bootstrap=http://localhost:${PORT}/network-config --sleep=0.5 --integrity=none`, + ); + console.warn(`Control-C to interrupt...`); + // Wait forever. + await new Promise(() => {}); +}; diff --git a/packages/casting/test/lockdown.js b/packages/casting/test/lockdown.js new file mode 100644 index 00000000000..f597abccf3d --- /dev/null +++ b/packages/casting/test/lockdown.js @@ -0,0 +1,3 @@ +import '@endo/init/pre-remoting.js'; +import '../node-fetch-shim.js'; +import '@endo/init'; diff --git a/packages/casting/test/prepare-test-env-ava.js b/packages/casting/test/prepare-test-env-ava.js new file mode 100644 index 00000000000..164f82d07a2 --- /dev/null +++ b/packages/casting/test/prepare-test-env-ava.js @@ -0,0 +1,6 @@ +import './lockdown.js'; +import { wrapTest } from '@endo/ses-ava'; +import rawTest from 'ava'; + +/** @type {typeof rawTest} */ +export const test = wrapTest(rawTest); diff --git a/packages/casting/test/test-mvp.js b/packages/casting/test/test-mvp.js new file mode 100644 index 00000000000..6423b8e6ef4 --- /dev/null +++ b/packages/casting/test/test-mvp.js @@ -0,0 +1,84 @@ +// @ts-nocheck +// eslint-disable-next-line import/order +import { test } from './prepare-test-env-ava.js'; + +import { + iterateLatest, + makeFollower, + makeLeader, + makeCastingSpec, +} from '../src/main.js'; + +import { delay } from '../src/defaults.js'; +import { startFakeServer } from './fake-rpc-server.js'; + +test('happy path', async t => { + const expected = ['latest', 'later', 'done']; + t.plan(expected.length); + const PORT = await t.context.startServer(t, [...expected]); + /** @type {import('../src/types.js').LeaderOptions} */ + const lo = { + retryCallback: null, // fail fast, no retries + keepPolling: () => delay(200).then(() => true), // poll really quickly + }; + /** @type {import('../src/types.js').FollowerOptions} */ + const so = { + integrity: 'none', + }; + + // The rest of this test is taken almost verbatim from the README.md, with + // some minor modifications (testLeaderOptions and deepEqual). + const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo); + const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); + const follower = await makeFollower(leader, castingSpec, so); + for await (const { value } of iterateLatest(follower)) { + t.log(`here's a mailbox value`, value); + + // The rest here is to drive the test. + t.deepEqual(value, expected.shift()); + if (expected.length === 0) { + break; + } + } +}); + +test('bad network config', async t => { + const PORT = await t.context.startServer(t, []); + await t.throwsAsync( + () => + makeLeader(`http://localhost:${PORT}/bad-network-config`, { + retryCallback: null, + }), + { + message: /rpcAddrs .* must be an array/, + }, + ); +}); + +test('missing rpc server', async t => { + const PORT = await t.context.startServer(t, []); + await t.throwsAsync( + () => + makeLeader(`http://localhost:${PORT}/missing-network-config`, { + retryCallback: null, + }), + { + message: /^invalid json response body/, + }, + ); +}); + +test('unrecognized integrity', async t => { + await t.throws(() => makeFollower({}, {}, { integrity: 'bother' }), { + message: /unrecognized follower integrity mode.*/, + }); +}); + +test.before(t => { + t.context.cleanups = []; + t.context.startServer = startFakeServer; +}); + +test.after(t => { + t.context.cleanups.map(cleanup => cleanup()); +}); diff --git a/packages/deployment/ansible/prepare-machine.yml b/packages/deployment/ansible/prepare-machine.yml index 8a0190ec850..e6c1065dfa7 100644 --- a/packages/deployment/ansible/prepare-machine.yml +++ b/packages/deployment/ansible/prepare-machine.yml @@ -5,6 +5,6 @@ gather_facts: yes strategy: free vars: - - NODEJS_VERSION: 14 + - NODEJS_VERSION: 16 roles: - prereq diff --git a/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js b/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js index 33a2427c5a9..7961eab315b 100644 --- a/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js +++ b/packages/run-protocol/src/vpool-xyk-amm/multipoolMarketMaker.js @@ -168,7 +168,7 @@ const start = async (zcf, privateArgs) => { const { publication: metricsPublication, subscription: rawMetricsSubscription, - } = makeSubscriptionKit(); + } = makeSubscriptionKit(harden({ XYK: [] })); const { storageNode, marshaller } = privateArgs; const metricsStorageNode = storageNode && E(storageNode).getChildNode('metrics'); // TODO: magic string @@ -182,7 +182,6 @@ const start = async (zcf, privateArgs) => { harden({ XYK: Array.from(secondaryBrandToPool.keys()) }), ); }; - updateMetrics(); // For now, this seat collects protocol fees. It needs to be connected to // something that will extract the fees. diff --git a/packages/run-protocol/test/metrics.js b/packages/run-protocol/test/metrics.js index bc0dbf4fd36..76f2585af71 100644 --- a/packages/run-protocol/test/metrics.js +++ b/packages/run-protocol/test/metrics.js @@ -14,12 +14,14 @@ export const subscriptionTracker = async (t, subscription) => { const assertInitial = async expectedValue => { notif = await metrics.getUpdateSince(); + t.log('assertInitial notif', notif); t.deepEqual(notif.value, expectedValue); }; /** @param {Record} expectedDelta */ const assertChange = async expectedDelta => { const prevNotif = notif; notif = await metrics.getUpdateSince(notif.updateCount); + t.log('assertChange notif', notif); const actualDelta = diff(prevNotif.value, notif.value); t.deepEqual(actualDelta, expectedDelta, 'Unexpected delta'); }; diff --git a/packages/vats/src/lib-chainStorage.js b/packages/vats/src/lib-chainStorage.js index 8bb628329fb..56ee2f32979 100644 --- a/packages/vats/src/lib-chainStorage.js +++ b/packages/vats/src/lib-chainStorage.js @@ -28,9 +28,7 @@ export function makeChainStorageRoot(toStorage, storeName, rootPath) { function makeChainStorageNode(path) { const node = { getStoreKey() { - // This duplicates the Go code at - // https://github.com/Agoric/agoric-sdk/blob/cb272ae97a042ceefd3af93b1b4601ca49dfe3a7/golang/cosmos/x/swingset/keeper/keeper.go#L295 - return { storeName, storeSubkey: `swingset/data:${path}` }; + return toStorage({ key: path, method: 'getStoreKey' }); }, getChildNode(name) { assert.typeof(name, 'string'); @@ -44,22 +42,7 @@ export function makeChainStorageRoot(toStorage, storeName, rootPath) { assert.typeof(value, 'string'); toStorage({ key: path, method: 'set', value }); }, - async delete() { - assert(path !== rootPath); - // A 'set' with no value deletes a key if it has no children, but - // otherwise sets data to the empty string and leaves all nodes intact. - // We want to reject silently incomplete deletes (at least for now). - // This check is unfortunately racy (e.g., a vat could wake up - // and set data for a child before _this_ vat receives an - // already-enqueued response claiming no children), but we can tolerate - // that because transforming a deletion into a set-to-empty is - // effectively indistinguishable from a valid reordering where a fully - // successful 'delete' is followed by a child-key 'set' (for which - // absent parent keys are automatically created with empty-string data). - const childCount = await toStorage({ key: path, method: 'size' }); - if (childCount > 0) { - assert.fail(X`Refusing to delete node with children: ${path}`); - } + clearValue() { toStorage({ key: path, method: 'set' }); }, // Possible extensions: diff --git a/packages/vats/test/test-lib-chainStorage.js b/packages/vats/test/test-lib-chainStorage.js index 5d35a08bfb4..5a6eeeb1c6d 100644 --- a/packages/vats/test/test-lib-chainStorage.js +++ b/packages/vats/test/test-lib-chainStorage.js @@ -11,6 +11,12 @@ test('makeChainStorageRoot', async t => { const toStorage = message => { messages.push(message); switch (message.method) { + case 'getStoreKey': { + return { + storeName: 'swingset', + storeSubkey: `swingset/data:${message.key}`, + }; + } case 'set': if ('value' in message) { data.set(message.key, message.value); @@ -62,12 +68,7 @@ test('makeChainStorageRoot', async t => { ); } - // The root node cannot be deleted, but is otherwise normal. - await t.throwsAsync( - rootNode.delete(), - undefined, - 'root node deletion is disallowed', - ); + rootNode.clearValue(); rootNode.setValue('foo'); t.deepEqual( messages.slice(-1), @@ -117,12 +118,11 @@ test('makeChainStorageRoot', async t => { [{ key: childPath, method: 'set', value: 'foo' }], 'non-root setValue message', ); - // eslint-disable-next-line no-await-in-loop - await child.delete(); + child.clearValue(); t.deepEqual( messages.slice(-1), [{ key: childPath, method: 'set' }], - 'non-root delete message', + 'non-root clearValue message', ); } @@ -172,22 +172,22 @@ test('makeChainStorageRoot', async t => { 'level-skipping setValue message', ); - // Deletion requires absence of children. - await t.throwsAsync( - childNode.delete(), - undefined, - 'deleting a node with a child is disallowed', + childNode.clearValue(); + t.deepEqual( + messages.slice(-1), + [{ key: childPath, method: 'set' }], + 'child clearValue message', ); - await deepNode.delete(); + deepNode.clearValue(); t.deepEqual( messages.slice(-1), [{ key: deepPath, method: 'set' }], - 'granchild delete message', + 'granchild clearValue message', ); - await childNode.delete(); + childNode.clearValue(); t.deepEqual( messages.slice(-1), [{ key: childPath, method: 'set' }], - 'child delete message', + 'child clearValue message', ); }); diff --git a/yarn.lock b/yarn.lock index 1c0b2db9763..31d1abff2ff 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1137,7 +1137,7 @@ elliptic "^6.5.3" libsodium-wrappers "^0.7.6" -"@cosmjs/encoding@0.28.4": +"@cosmjs/encoding@0.28.4", "@cosmjs/encoding@^0.28.4": version "0.28.4" resolved "https://registry.yarnpkg.com/@cosmjs/encoding/-/encoding-0.28.4.tgz#ea39eb4c27ebf7b35e62e9898adae189b86d0da7" integrity sha512-N6Qnjs4dd8KwjW5m9t3L+rWYYGW2wyS+iLtJJ9DD8DiTTxpW9h7/AmUVO/dsRe5H2tV8/DzH/B9pFfpsgro22A== @@ -1161,7 +1161,7 @@ dependencies: bn.js "^5.2.0" -"@cosmjs/proto-signing@0.28.4": +"@cosmjs/proto-signing@0.28.4", "@cosmjs/proto-signing@^0.28.4": version "0.28.4" resolved "https://registry.yarnpkg.com/@cosmjs/proto-signing/-/proto-signing-0.28.4.tgz#7007651042bd05b3eee7e1c8562417bbed630198" integrity sha512-4vgCLK9gOsdWzD78V5XbAsupSSyntPEzokWYhgRQNwgVTcKX1kg0eKZqUvF5ua5iL9x6MevfH/sgwPyiYleMBw== @@ -1210,7 +1210,7 @@ dependencies: xstream "^11.14.0" -"@cosmjs/tendermint-rpc@0.28.4": +"@cosmjs/tendermint-rpc@0.28.4", "@cosmjs/tendermint-rpc@^0.28.4": version "0.28.4" resolved "https://registry.yarnpkg.com/@cosmjs/tendermint-rpc/-/tendermint-rpc-0.28.4.tgz#78835fdc8126baa3122c8b2b396c1d7d290c7167" integrity sha512-iz6p4UW2QUZNh55WeJy9wHbMdqM8COo0AJdrGU4Ikb/xU0/H6b0dFPoEK+i6ngR0cSizh+hpTMzh3AA7ySUKlA==