diff --git a/packages/agoric-cli/package.json b/packages/agoric-cli/package.json index bd0ef73b078f..a27c8057263d 100644 --- a/packages/agoric-cli/package.json +++ b/packages/agoric-cli/package.json @@ -29,11 +29,13 @@ "dependencies": { "@agoric/access-token": "^0.4.18", "@agoric/assert": "^0.4.0", + "@agoric/chain-streams": "^0.1.0", "@agoric/nat": "^4.1.0", "@endo/bundle-source": "^2.2.0", "@endo/captp": "^2.0.7", "@endo/compartment-mapper": "^0.7.5", "@endo/init": "^0.5.41", + "@endo/marshal": "^0.6.7", "@endo/promise-kit": "^0.2.41", "@iarna/toml": "^2.2.3", "anylogger": "^0.21.0", diff --git a/packages/agoric-cli/src/entrypoint.js b/packages/agoric-cli/src/entrypoint.js index d9d40c99f6b3..dd1020a7b43e 100755 --- a/packages/agoric-cli/src/entrypoint.js +++ b/packages/agoric-cli/src/entrypoint.js @@ -4,6 +4,7 @@ import '@endo/init/pre.js'; import 'esm'; +import '@agoric/chain-streams/node-fetch-shim.js'; import '@endo/init'; import path from 'path'; diff --git a/packages/agoric-cli/src/main.js b/packages/agoric-cli/src/main.js index 4e4a15fae320..9c88f72c2c42 100644 --- a/packages/agoric-cli/src/main.js +++ b/packages/agoric-cli/src/main.js @@ -8,6 +8,7 @@ import initMain from './init.js'; import installMain from './install.js'; import setDefaultsMain from './set-defaults.js'; import startMain from './start.js'; +import streamMain from './stream.js'; import walletMain from './open.js'; const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet'; @@ -160,6 +161,58 @@ const main = async (progname, rawArgs, powers) => { return subMain(installMain, ['install', forceSdkVersion], opts); }); + program + .command('stream ') + .description('read an Agoric Chain Stream') + .option( + '--integrity ', + 'set integrity mode', + value => { + assert( + ['strict', 'optimistic', 'none'].includes(value), + X`--integrity must be one of 'strict', 'optimistic', or 'none'`, + TypeError, + ); + return value; + }, + 'optimistic', + ) + .option( + '--sleep ', + 'sleep between polling (may be fractional)', + value => { + const num = Number(value); + assert.equal(`${num}`, value, X`--sleep must be a number`, TypeError); + return num; + }, + 0, + ) + .option( + '-o, --output ', + 'value output format', + value => { + assert( + [ + 'hex', + 'justin', + 'justinlines', + 'json', + 'jsonlines', + 'text', + ].includes(value), + X`--output must be one of 'hex', 'justin', 'justinlines', 'json', 'jsonlines', or 'text'`, + TypeError, + ); + return value; + }, + 'justin', + ) + .option('-B, --bootstrap ', 'network bootstrap configuration') + .action(async (pathSpecs, cmd) => { + const opts = { ...program.opts(), ...cmd.opts() }; + return subMain(streamMain, ['stream', ...pathSpecs], opts); + }); + const addRunOptions = cmd => cmd .option( diff --git a/packages/agoric-cli/src/sdk-package-names.js b/packages/agoric-cli/src/sdk-package-names.js index d38224786735..5438155c310f 100644 --- a/packages/agoric-cli/src/sdk-package-names.js +++ b/packages/agoric-cli/src/sdk-package-names.js @@ -4,6 +4,7 @@ export default [ "@agoric/access-token", "@agoric/assert", + "@agoric/chain-streams", "@agoric/cosmic-swingset", "@agoric/cosmos", "@agoric/deploy-script-support", diff --git a/packages/agoric-cli/src/stream.js b/packages/agoric-cli/src/stream.js new file mode 100644 index 000000000000..b5b4942d402d --- /dev/null +++ b/packages/agoric-cli/src/stream.js @@ -0,0 +1,120 @@ +// @ts-check +import process from 'process'; +import { Far } from '@endo/marshal'; +import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js'; + +import { + delay, + iterateLatest, + makeChainStream, + makeLeader, + makeStoreKey, +} from '@agoric/chain-streams'; + +export default async function streamMain(progname, rawArgs, powers, opts) { + const { anylogger } = powers; + const console = anylogger('agoric:stream'); + + const { + integrity, + output, + bootstrap = 'http://localhost:26657', + verbose, + sleep, + } = opts; + + /** @type {import('@agoric/chain-streams').ChainStreamOptions} */ + const streamOptions = { + integrity, + }; + + /** @type {(buf: any) => any} */ + let formatOutput; + switch (output) { + case 'justinlines': + case 'justin': { + streamOptions.unserializer = null; + const pretty = !output.endsWith('lines'); + formatOutput = ({ body }) => { + const encoded = JSON.parse(body); + return decodeToJustin(encoded, pretty); + }; + break; + } + case 'jsonlines': + case 'json': { + const spaces = output.endsWith('lines') ? undefined : 2; + const bigintToStringReplacer = (_, arg) => { + if (typeof arg === 'bigint') { + return `${arg}`; + } + return arg; + }; + formatOutput = obj => JSON.stringify(obj, bigintToStringReplacer, spaces); + break; + } + case 'hex': { + // Dump as hex strings. + streamOptions.decode = buf => buf; + streamOptions.unserializer = null; + formatOutput = buf => + buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), ''); + break; + } + case 'text': { + streamOptions.decode = buf => new TextDecoder().decode(buf); + streamOptions.unserializer = null; + formatOutput = buf => buf; + break; + } + default: { + console.error(`Unknown output format: ${output}`); + return 1; + } + } + + if (integrity !== 'none') { + streamOptions.crasher = Far('stream crasher', { + crash: (...args) => { + console.error(...args); + console.warn(`You are running with '--integrity=${integrity}'`); + console.warn( + `If you trust your RPC nodes, you can turn off proofs with '--integrity=none'`, + ); + process.exit(1); + }, + }); + } + + /** @type {import('@agoric/chain-streams').ChainLeaderOptions} */ + const leaderOptions = { + retryCallback: e => { + verbose && console.warn('Retrying due to:', e); + return delay(1000 + Math.random() * 1000); + }, + keepPolling: async () => { + let toSleep = sleep * 1000; + if (toSleep <= 0) { + toSleep = (5 + Math.random()) * 1000; + } + await delay(toSleep); + return true; + }, + }; + + const [_cmd, ...specs] = rawArgs; + + verbose && console.warn('Streaming from leader at', bootstrap); + const leader = makeLeader(bootstrap, leaderOptions); + await Promise.all( + specs.map(async spec => { + verbose && console.warn('Consuming', spec); + const storeKey = makeStoreKey(spec); + const stream = makeChainStream(leader, storeKey, streamOptions); + for await (const { value } of iterateLatest(stream)) { + process.stdout.write(`${formatOutput(value)}\n`); + } + }), + ); + return 0; +}