Skip to content

Commit

Permalink
feat(agoric): new stream command
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jun 3, 2022
1 parent 6b5ff21 commit cde3857
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 0 deletions.
2 changes: 2 additions & 0 deletions packages/agoric-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
"dependencies": {
"@agoric/access-token": "^0.4.18",
"@agoric/assert": "^0.4.0",
"@agoric/chain-streams": "^0.1.0",
"@agoric/nat": "^4.1.0",
"@endo/bundle-source": "^2.2.0",
"@endo/captp": "^2.0.7",
"@endo/compartment-mapper": "^0.7.5",
"@endo/init": "^0.5.41",
"@endo/marshal": "^0.6.7",
"@endo/promise-kit": "^0.2.41",
"@iarna/toml": "^2.2.3",
"anylogger": "^0.21.0",
Expand Down
1 change: 1 addition & 0 deletions packages/agoric-cli/src/entrypoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import '@endo/init/pre.js';
import 'esm';
import '@agoric/chain-streams/node-fetch-shim.js';
import '@endo/init';

import path from 'path';
Expand Down
43 changes: 43 additions & 0 deletions packages/agoric-cli/src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import initMain from './init.js';
import installMain from './install.js';
import setDefaultsMain from './set-defaults.js';
import startMain from './start.js';
import streamMain from './stream.js';
import walletMain from './open.js';

const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet';
Expand Down Expand Up @@ -160,6 +161,48 @@ const main = async (progname, rawArgs, powers) => {
return subMain(installMain, ['install', forceSdkVersion], opts);
});

program
.command('stream <path-spec...>')
.description('read an Agoric Chain Stream')
.option(
'--integrity <optimistic | strict | trusting>',
'set integrity mode',
value => {
assert(
['strict', 'optimistic', 'trusting'].includes(value),
X`--integrity must be one of 'strict', 'optimistic', or 'trusting'`,
TypeError,
);
return value;
},
'optimistic',
)
.option(
'-o, --output <format>',
'value output format',
value => {
assert(
[
'hex',
'justin',
'justinlines',
'json',
'jsonlines',
'text',
].includes(value),
X`--output must be one of 'hex', 'justin', 'justinlines', 'json', 'jsonlines', or 'text'`,
TypeError,
);
return value;
},
'justinlines',
)
.option('-B, --bootstrap <config>', 'network bootstrap configuration')
.action(async (pathSpecs, cmd) => {
const opts = { ...program.opts(), ...cmd.opts() };
return subMain(streamMain, ['stream', ...pathSpecs], opts);
});

const addRunOptions = cmd =>
cmd
.option(
Expand Down
105 changes: 105 additions & 0 deletions packages/agoric-cli/src/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// @ts-check
import process from 'process';
import { makeMarshal } from '@endo/marshal';
import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js';

import {
delay,
iterateLatest,
makeChainStream,
makeLeader,
makeStoreKey,
} from '@agoric/chain-streams';

export default async function streamMain(progname, rawArgs, powers, opts) {
const { anylogger } = powers;
const console = anylogger('agoric:follow');

const {
integrity,
output,
bootstrap = 'http://localhost:26657',
verbose,
} = opts;

let unserializer;
/** @type {undefined | ((buf: Uint8Array) => any)} */
let decode;
let formatOutput;
switch (output) {
case 'justinlines':
case 'justin': {
const { serialize } = makeMarshal(undefined, undefined, {
// Don't save errors, we just display the neutered form.
marshalSaveError: () => {},
});
const pretty = !output.endsWith('lines');
formatOutput = obj => {
const { body } = serialize(harden(obj));
const marshalled = JSON.parse(body);
return decodeToJustin(marshalled, pretty);
};
break;
}
case 'jsonlines':
case 'json': {
const spaces = output.endsWith('lines') ? undefined : 2;
const bigintToStringReplacer = (_, arg) => {
if (typeof arg === 'bigint') {
return `${arg}`;
}
return arg;
};
formatOutput = obj => JSON.stringify(obj, bigintToStringReplacer, spaces);
break;
}
case 'hex': {
// Dump as hex strings.
decode = buf => buf;
unserializer = null;
formatOutput = buf =>
buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), '');
break;
}
case 'text': {
decode = buf => new TextDecoder().decode(buf);
unserializer = null;
formatOutput = buf => buf;
break;
}
default: {
console.error(`Unknown output format: ${output}`);
return 1;
}
}

/** @type {import('@agoric/chain-streams').ChainLeaderOptions} */
const leaderOptions = {
retryCallback: e => {
verbose && console.warn('Retrying due to:', e);
return delay(1000 + Math.random() * 1000);
},
};
/** @type {import('@agoric/chain-streams').ChainStreamOptions} */
const streamOptions = {
integrity,
unserializer,
decode,
};

const [_cmd, ...specs] = rawArgs;

verbose && console.warn('Following leader at', bootstrap);
const leader = makeLeader(bootstrap, leaderOptions);
await Promise.all(
specs.map(async spec => {
verbose && console.warn('Consuming', spec);
const storeKey = makeStoreKey(spec);
const stream = makeChainStream(leader, storeKey, streamOptions);
for await (const value of iterateLatest(stream)) {
process.stdout.write(`${formatOutput(value)}\n`);
}
}),
);
return 0;
}

0 comments on commit cde3857

Please sign in to comment.