Skip to content

Commit

Permalink
feat(agoric): new stream command
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jun 5, 2022
1 parent 30a3db5 commit ec85c45
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 0 deletions.
2 changes: 2 additions & 0 deletions packages/agoric-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
"dependencies": {
"@agoric/access-token": "^0.4.18",
"@agoric/assert": "^0.4.0",
"@agoric/chain-streams": "^0.1.0",
"@agoric/nat": "^4.1.0",
"@endo/bundle-source": "^2.2.0",
"@endo/captp": "^2.0.7",
"@endo/compartment-mapper": "^0.7.5",
"@endo/init": "^0.5.41",
"@endo/marshal": "^0.6.7",
"@endo/promise-kit": "^0.2.41",
"@iarna/toml": "^2.2.3",
"anylogger": "^0.21.0",
Expand Down
1 change: 1 addition & 0 deletions packages/agoric-cli/src/entrypoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import '@endo/init/pre.js';
import 'esm';
import '@agoric/chain-streams/node-fetch-shim.js';
import '@endo/init';

import path from 'path';
Expand Down
53 changes: 53 additions & 0 deletions packages/agoric-cli/src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import initMain from './init.js';
import installMain from './install.js';
import setDefaultsMain from './set-defaults.js';
import startMain from './start.js';
import streamMain from './stream.js';
import walletMain from './open.js';

const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet';
Expand Down Expand Up @@ -160,6 +161,58 @@ const main = async (progname, rawArgs, powers) => {
return subMain(installMain, ['install', forceSdkVersion], opts);
});

program
.command('stream <path-spec...>')
.description('read an Agoric Chain Stream')
.option(
'--integrity <strict | optimistic | none>',
'set integrity mode',
value => {
assert(
['strict', 'optimistic', 'none'].includes(value),
X`--integrity must be one of 'strict', 'optimistic', or 'none'`,
TypeError,
);
return value;
},
'optimistic',
)
.option(
'--sleep <seconds>',
'sleep <seconds> between polling (may be fractional)',
value => {
const num = Number(value);
assert.equal(`${num}`, value, X`--sleep must be a number`, TypeError);
return num;
},
0,
)
.option(
'-o, --output <format>',
'value output format',
value => {
assert(
[
'hex',
'justin',
'justinlines',
'json',
'jsonlines',
'text',
].includes(value),
X`--output must be one of 'hex', 'justin', 'justinlines', 'json', 'jsonlines', or 'text'`,
TypeError,
);
return value;
},
'justin',
)
.option('-B, --bootstrap <config>', 'network bootstrap configuration')
.action(async (pathSpecs, cmd) => {
const opts = { ...program.opts(), ...cmd.opts() };
return subMain(streamMain, ['stream', ...pathSpecs], opts);
});

const addRunOptions = cmd =>
cmd
.option(
Expand Down
1 change: 1 addition & 0 deletions packages/agoric-cli/src/sdk-package-names.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export default [
"@agoric/access-token",
"@agoric/assert",
"@agoric/chain-streams",
"@agoric/cosmic-swingset",
"@agoric/cosmos",
"@agoric/deploy-script-support",
Expand Down
120 changes: 120 additions & 0 deletions packages/agoric-cli/src/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// @ts-check
import process from 'process';
import { Far } from '@endo/marshal';
import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js';

import {
delay,
iterateLatest,
makeChainStream,
makeLeader,
makeStoreKey,
} from '@agoric/chain-streams';

export default async function streamMain(progname, rawArgs, powers, opts) {
const { anylogger } = powers;
const console = anylogger('agoric:stream');

const {
integrity,
output,
bootstrap = 'http://localhost:26657',
verbose,
sleep,
} = opts;

/** @type {import('@agoric/chain-streams').ChainStreamOptions} */
const streamOptions = {
integrity,
};

/** @type {(buf: any) => any} */
let formatOutput;
switch (output) {
case 'justinlines':
case 'justin': {
streamOptions.unserializer = null;
const pretty = !output.endsWith('lines');
formatOutput = ({ body }) => {
const encoded = JSON.parse(body);
return decodeToJustin(encoded, pretty);
};
break;
}
case 'jsonlines':
case 'json': {
const spaces = output.endsWith('lines') ? undefined : 2;
const bigintToStringReplacer = (_, arg) => {
if (typeof arg === 'bigint') {
return `${arg}`;
}
return arg;
};
formatOutput = obj => JSON.stringify(obj, bigintToStringReplacer, spaces);
break;
}
case 'hex': {
// Dump as hex strings.
streamOptions.decode = buf => buf;
streamOptions.unserializer = null;
formatOutput = buf =>
buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), '');
break;
}
case 'text': {
streamOptions.decode = buf => new TextDecoder().decode(buf);
streamOptions.unserializer = null;
formatOutput = buf => buf;
break;
}
default: {
console.error(`Unknown output format: ${output}`);
return 1;
}
}

if (integrity !== 'none') {
streamOptions.crasher = Far('stream crasher', {
crash: (...args) => {
console.error(...args);
console.warn(`You are running with '--integrity=${integrity}'`);
console.warn(
`If you trust your RPC nodes, you can turn off proofs with '--integrity=none'`,
);
process.exit(1);
},
});
}

/** @type {import('@agoric/chain-streams').ChainLeaderOptions} */
const leaderOptions = {
retryCallback: e => {
verbose && console.warn('Retrying due to:', e);
return delay(1000 + Math.random() * 1000);
},
keepPolling: async () => {
let toSleep = sleep * 1000;
if (toSleep <= 0) {
toSleep = (5 + Math.random()) * 1000;
}
await delay(toSleep);
return true;
},
};

const [_cmd, ...specs] = rawArgs;

verbose && console.warn('Streaming from leader at', bootstrap);
const leader = makeLeader(bootstrap, leaderOptions);
await Promise.all(
specs.map(async spec => {
verbose && console.warn('Consuming', spec);
const storeKey = makeStoreKey(spec);
const stream = makeChainStream(leader, storeKey, streamOptions);
for await (const { value } of iterateLatest(stream)) {
process.stdout.write(`${formatOutput(value)}\n`);
}
}),
);
return 0;
}

0 comments on commit ec85c45

Please sign in to comment.