Skip to content

Commit

Permalink
chore(casting): @agoric/chain-streams -> @agoric/casting
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jun 7, 2022
1 parent daa6e14 commit 5ca9217
Show file tree
Hide file tree
Showing 24 changed files with 121 additions and 120 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-all-packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ jobs:
# END-TEST-BOILERPLATE
- name: yarn test (cosmos)
run: cd golang/cosmos && yarn ${{ steps.vars.outputs.test }}
- name: yarn test (chain-streams)
run: cd packages/chain-streams && yarn ${{ steps.vars.outputs.test }}
- name: yarn test (casting)
run: cd packages/casting && yarn ${{ steps.vars.outputs.test }}
- name: yarn test (run-protocol)
run: cd packages/run-protocol && yarn ${{ steps.vars.outputs.test }}
- name: yarn test (pegasus)
Expand Down
2 changes: 1 addition & 1 deletion packages/agoric-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"dependencies": {
"@agoric/access-token": "^0.4.18",
"@agoric/assert": "^0.4.0",
"@agoric/chain-streams": "^0.1.0",
"@agoric/casting": "^0.1.0",
"@agoric/nat": "^4.1.0",
"@endo/bundle-source": "^2.2.0",
"@endo/captp": "^2.0.7",
Expand Down
2 changes: 1 addition & 1 deletion packages/agoric-cli/src/entrypoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import '@endo/init/pre.js';
import 'esm';
import '@agoric/chain-streams/node-fetch-shim.js';
import '@agoric/casting/node-fetch-shim.js';
import '@endo/init';

import path from 'path';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import { decodeToJustin } from '@endo/marshal/src/marshal-justin.js';
import {
delay,
iterateLatest,
makeChainStream,
makeFollower,
makeLeader,
makeStoreKey,
} from '@agoric/chain-streams';
makeCastingSpec,
} from '@agoric/casting';

export default async function streamMain(progname, rawArgs, powers, opts) {
export default async function followerMain(progname, rawArgs, powers, opts) {
const { anylogger } = powers;
const console = anylogger('agoric:stream');
const console = anylogger('agoric:follower');

const {
integrity,
Expand All @@ -23,8 +23,8 @@ export default async function streamMain(progname, rawArgs, powers, opts) {
sleep,
} = opts;

/** @type {import('@agoric/chain-streams').ChainStreamOptions} */
const streamOptions = {
/** @type {import('@agoric/casting').FollowerOptions} */
const followerOptions = {
integrity,
};

Expand All @@ -33,7 +33,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) {
switch (output) {
case 'justinlines':
case 'justin': {
streamOptions.unserializer = null;
followerOptions.unserializer = null;
const pretty = !output.endsWith('lines');
formatOutput = ({ body }) => {
const encoded = JSON.parse(body);
Expand All @@ -55,15 +55,15 @@ export default async function streamMain(progname, rawArgs, powers, opts) {
}
case 'hex': {
// Dump as hex strings.
streamOptions.decode = buf => buf;
streamOptions.unserializer = null;
followerOptions.decode = buf => buf;
followerOptions.unserializer = null;
formatOutput = buf =>
buf.reduce((acc, b) => acc + b.toString(16).padStart(2, '0'), '');
break;
}
case 'text': {
streamOptions.decode = buf => new TextDecoder().decode(buf);
streamOptions.unserializer = null;
followerOptions.decode = buf => new TextDecoder().decode(buf);
followerOptions.unserializer = null;
formatOutput = buf => buf;
break;
}
Expand All @@ -74,7 +74,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) {
}

if (integrity !== 'none') {
streamOptions.crasher = Far('stream crasher', {
followerOptions.crasher = Far('follower crasher', {
crash: (...args) => {
console.error(...args);
console.warn(`You are running with '--integrity=${integrity}'`);
Expand All @@ -87,7 +87,7 @@ export default async function streamMain(progname, rawArgs, powers, opts) {
}

// TODO: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
/** @type {import('@agoric/chain-streams').ChainLeaderOptions} */
/** @type {import('@agoric/casting').LeaderOptions} */
const leaderOptions = {
retryCallback: (e, _attempt) => {
verbose && console.warn('Retrying due to:', e);
Expand All @@ -105,14 +105,14 @@ export default async function streamMain(progname, rawArgs, powers, opts) {

const [_cmd, ...specs] = rawArgs;

verbose && console.warn('Streaming from leader at', bootstrap);
verbose && console.warn('Creating leader for', bootstrap);
const leader = makeLeader(bootstrap, leaderOptions);
await Promise.all(
specs.map(async spec => {
verbose && console.warn('Consuming', spec);
const storeKey = makeStoreKey(spec);
const stream = makeChainStream(leader, storeKey, streamOptions);
for await (const { value } of iterateLatest(stream)) {
verbose && console.warn('Following', spec);
const castingSpec = makeCastingSpec(spec);
const follower = makeFollower(leader, castingSpec, followerOptions);
for await (const { value } of iterateLatest(follower)) {
process.stdout.write(`${formatOutput(value)}\n`);
}
}),
Expand Down
8 changes: 4 additions & 4 deletions packages/agoric-cli/src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import initMain from './init.js';
import installMain from './install.js';
import setDefaultsMain from './set-defaults.js';
import startMain from './start.js';
import streamMain from './stream.js';
import followMain from './follow.js';
import walletMain from './open.js';

const DEFAULT_DAPP_TEMPLATE = 'dapp-fungible-faucet';
Expand Down Expand Up @@ -162,8 +162,8 @@ const main = async (progname, rawArgs, powers) => {
});

program
.command('stream <path-spec...>')
.description('read an Agoric Chain Stream')
.command('follow <path-spec...>')
.description('follow an Agoric Casting leader')
.option(
'--integrity <strict | optimistic | none>',
'set integrity mode',
Expand Down Expand Up @@ -210,7 +210,7 @@ const main = async (progname, rawArgs, powers) => {
.option('-B, --bootstrap <config>', 'network bootstrap configuration')
.action(async (pathSpecs, cmd) => {
const opts = { ...program.opts(), ...cmd.opts() };
return subMain(streamMain, ['stream', ...pathSpecs], opts);
return subMain(followMain, ['follow', ...pathSpecs], opts);
});

const addRunOptions = cmd =>
Expand Down
2 changes: 1 addition & 1 deletion packages/agoric-cli/src/sdk-package-names.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
export default [
"@agoric/access-token",
"@agoric/assert",
"@agoric/chain-streams",
"@agoric/casting",
"@agoric/cosmic-swingset",
"@agoric/cosmos",
"@agoric/deploy-script-support",
Expand Down
38 changes: 19 additions & 19 deletions packages/chain-streams/README.md → packages/casting/README.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
# Chain Streams
# Agoric Casting

This [Agoric](https://agoric.com) Chain Streams package consumes data server
publication leaders in a flexible, future-proof way.
This [Agoric](https://agoric.com) Casting package follows ocap broadcasts in a
flexible, future-proof way.

TL;DR: You can run `yarn demo`, or to consume a mailbox stream do:
TL;DR: You can run `yarn demo`, or to follow a mailbox castingSpec do:
```sh
npx agoric stream -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext
npx agoric follow -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext
```

An example of following an on-chain mailbox in code (using this package) is:

```js
// First, obtain a Hardened JS environment via Endo.
import '@endo/init/pre-remoting.js'; // needed only for the next line
import '@agoric/chain-streams/node-fetch-shim.js'; // needed for Node.js
import '@agoric/castingSpec/node-fetch-shim.js'; // needed for Node.js
import '@endo/init';

import {
iterateLatest,
makeChainStream,
makeFollower,
makeLeader,
makeStoreKey,
} from '@agoric/chain-streams';
makeCastingSpec,
} from '@agoric/casting';

// Iterate over a mailbox stream on the devnet.
// Iterate over a mailbox follower on the devnet.
const leader = makeLeader('https://devnet.agoric.net/network-config');
const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz');
const stream = makeChainStream(leader, storeKey);
for await (const { value } of iterateLatest(stream)) {
const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz');
const follower = makeFollower(leader, castingSpec);
for await (const { value } of iterateLatest(follower)) {
console.log(`here's a mailbox value`, value);
}
```

## Stream options
## Follower options

The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides an optional bag of options:
The `followerOpts` argument in `makeFollower(leader, key, followerOpts)` provides an optional bag of options:
- the `integrity` option, which has three possibilities:
- `'strict'` - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block),
- `'optimistic'` (default) - release data immediately, but may crash the stream in the future if an already-released value could not be proven,
- `'optimistic'` (default) - release data immediately, but may crash the follower in the future if an already-released value could not be proven,
- `'none'` - release data immediately without validation
- the `decode` option is a function to translate `buf: Uint8Array` into `data: string`
- (default) - interpret buf as a utf-8 string, then `JSON.parse` it
Expand All @@ -46,19 +46,19 @@ The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides
- `null` - don't additionally unserialize data before releasing it
- any unserializer object supporting `E(unserializer).unserialize(data)`
- the `crasher` option can be
- `null` (default) stream failures only propagate an exception/rejection
- `null` (default) follower failures only propagate an exception/rejection
- any crasher object supporting `E(crasher).crash(reason)`

## Behind the scenes

- the network config contains enough information to obtain Tendermint RPC nodes
for a given Agoric network. You can use `makeLeaderFromRpcAddresses` directly
if you want to avoid fetching a network-config.
- each stream uses periodic CosmJS state polling (every X milliseconds) which
- each follower uses periodic CosmJS state polling (every X milliseconds) which
can be refreshed more expediently via a Tendermint subscription to the
corresponding `state_change` event
- published (string) values are automatically unmarshalled, but without object references. a custom `marshaller` for your application.
- the `iterateRecent` adapter transforms a stream into a local async iterator
- the `iterateRecent` adapter transforms a follower into a local async iterator
that produces only the last queried value (with no history reconstruction)

## Status
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@agoric/chain-streams",
"name": "@agoric/casting",
"version": "0.1.0",
"description": "Agoric's Chain Streams",
"description": "Agoric's OCap broadcastingSpec system",
"type": "module",
"main": "src/main.js",
"repository": "https://github.com/Agoric/agoric-sdk",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { toAscii } from '@cosmjs/encoding';

/**
* @param {string} storagePath
* @returns {import('./types').ChainStoreKey}
* @returns {import('./types').CastingSpec}
*/
const swingsetPathToStoreKey = storagePath =>
const swingsetPathToCastingSpec = storagePath =>
harden({
storeName: 'swingset',
storeSubkey: toAscii(`swingset/data:${storagePath}`),
Expand All @@ -17,9 +17,9 @@ const DATA_PREFIX_BYTES = new Uint8Array([0]);
/**
* @param {string} storagePath
* @param {string} [storeName]
* @returns {import('./types').ChainStoreKey}
* @returns {import('./types').CastingSpec}
*/
const vstoragePathToStoreKey = (storagePath, storeName = 'vstorage') => {
const vstoragePathToCastingSpec = (storagePath, storeName = 'vstorage') => {
const elems = storagePath ? storagePath.split('.') : [];
const buf = toAscii(`${elems.length}.${storagePath}`);
return harden({
Expand All @@ -29,26 +29,27 @@ const vstoragePathToStoreKey = (storagePath, storeName = 'vstorage') => {
});
};

export const DEFAULT_PATH_CONVERTER = vstoragePathToStoreKey;
export const DEFAULT_PATH_CONVERTER = vstoragePathToCastingSpec;

/**
* @type {Record<string, (path: string) => import('./types').ChainStoreKey>}
* @type {Record<string, (path: string) => import('./types').CastingSpec>}
*/
export const pathPrefixToConverters = harden({
'swingset:': swingsetPathToStoreKey,
'vstore:': vstoragePathToStoreKey,
'swingset:': swingsetPathToCastingSpec,
'vstore:': vstoragePathToCastingSpec,
':': DEFAULT_PATH_CONVERTER,
});

/**
* @param {string} pathSpec
* @returns {import('./types').ChainStoreKey}
* @param {string} specString
* @returns {import('./types').CastingSpec}
*/
export const makeStoreKey = pathSpec => {
const match = pathSpec.match(/^([^:.]*:)(.*)/);
export const makeCastingSpec = specString => {
assert.typeof(specString, 'string');
const match = specString.match(/^([^:.]*:)(.*)/);
assert(
match,
`path spec ${pathSpec} does not match 'PREFIX:PATH' or ':PATH'`,
`spec string ${specString} does not match 'PREFIX:PATH' or ':PATH'`,
);
const kind = match[1];
const storePath = match[2];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,27 @@ import { DEFAULT_KEEP_POLLING } from './defaults.js';
/**
* Just return an unspecified allegedValue every poll period.
*
* @param {import('./types').ChainLeader} leader
* @param {import('./types.js').ChainStoreKey} storeKey
* @returns {Promise<import('./types.js').ChainStream<import('./types').ChainStoreChange>>}
* @param {import('./types').Leader} leader
* @param {import('./types.js').CastingSpec} castingSpec
* @returns {Promise<import('./types.js').Follower<import('./types').CastingChange>>}
*/
export const makePollingWatcher = async (leader, storeKey) => {
export const makePollingChangeFollower = async (leader, castingSpec) => {
const { keepPolling = DEFAULT_KEEP_POLLING } = await E(leader).getOptions();
return Far('key watcher stream', {
return Far('polling change follower', {
getLatestIterable: () =>
Far('key watcher iterable', {
Far('polling change follower iterable', {
[Symbol.asyncIterator]: () => {
/** @type {Promise<boolean> | undefined} */
let nextPollPromise;
return Far('key watcher iterator', {
return Far('polling change follower iterator', {
next: async () => {
if (!nextPollPromise) {
nextPollPromise = keepPolling();
}
const keepGoing = await nextPollPromise;
nextPollPromise = undefined;
const change = harden({
storeKey,
castingSpec,
// Make no warrant as to the values.
values: [],
});
Expand Down
File renamed without changes.
Loading

0 comments on commit 5ca9217

Please sign in to comment.